blob: d5c87798b5252188691a44066c72a2135c1ce5c1 [file] [log] [blame]
* Copyright (c) 2016-2017 Nest Labs, Inc.
* All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
* @file
* This file defines subscription client for Weave
* Data Management (WDM) profile.
#include <Weave/Core/WeaveCore.h>
#include <Weave/Profiles/data-management/TraitCatalog.h>
#include <Weave/Profiles/data-management/EventLogging.h>
#include <Weave/Profiles/status-report/StatusReportProfile.h>
namespace nl {
namespace Weave {
namespace Profiles {
namespace WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current) {
class SubscriptionClient
kNoTimeOut = 0,
// Note the WDM spec says 0x7FFFFFFF, but Weave implementation can only hold timeout of much shorter
// 32-bit in milliseconds, which is about 1200 hours
kMaxTimeoutSec = 3600000,
enum EventID
// Marks the end of this subscription. The parameters sent to
// the EventCallback will indicate whether a resubscribe will
// be automatically attempted.
// If no retry will be attempted, the state of the client will
// be Aborted. No more downcalls into the application will
// be made.
// Otherwise, the state will not be aborted or freed. Downcalls
// will continue.
// The application is allowed to abort or free in this state.
// The parameters sent will also include an error code indicating
// the reason for ending the subscription. There are no guarantees
// for the state of mBinding or mEC.
// The subscription could have been terminated for a number of reasons
// (WRM ACK missing, EC allocation failure, response timeout,...)
// Some possible error codes generated by the client:
// WEAVE_ERROR_INVALID_MESSAGE_TYPE if some unrecognized message is received
// WEAVE_ERROR_TIMEOUT if an ack is not received or if a liveness check fails
// WEAVE_ERROR_INVALID_STATE if messages are received in an unexpected state
// WEAVE_ERROR_STATUS_REPORT_RECEIVED if a status report is received
// WEAVE_ERROR_INVALID_ARGUMENT if subscribe request fields are invalid
kEvent_OnSubscriptionTerminated = 1,
// Last chance to adjust EC, mEC is valid and can be tuned for timeout settings
// Don't touch anything else, and don't close the EC
kEvent_OnExchangeStart = 2,
// Sent as the client is creating the subscribe request
kEvent_OnSubscribeRequestPrepareNeeded = 3,
// cancel, abort, and free are allowed
kEvent_OnSubscriptionEstablished = 4,
// cancel, abort, and free are allowed
kEvent_OnNotificationRequest = 5,
// cancel, abort, and free are allowed
kEvent_OnNotificationProcessed = 6,
// An event stream has been received
kEvent_OnEventStreamReceived = 7,
struct LastObservedEvent
uint64_t mSourceId;
uint8_t mImportance;
uint64_t mEventId;
// union of structures for each event some of them might be empty
union InEventParam
void Clear(void) { memset(this, 0, sizeof(*this)); }
bool mIsStatusCodeValid;
uint32_t mStatusProfileId;
uint16_t mStatusCode;
ReferencedTLVData *mAdditionalInfoPtr;
bool mWillRetry;
SubscriptionClient * mClient;
} mSubscriptionTerminated;
// Do not close the EC
nl::Weave::ExchangeContext * mEC;
SubscriptionClient * mClient;
} mExchangeStart;
SubscriptionClient * mClient;
} mSubscribeRequestPrepareNeeded;
uint64_t mSubscriptionId;
SubscriptionClient * mClient;
} mSubscriptionEstablished;
// Do not close the EC
nl::Weave::ExchangeContext * mEC;
// Do not modify the message content
PacketBuffer * mMessage;
SubscriptionClient * mClient;
} mNotificationRequest;
SubscriptionClient * mClient;
} mNotificationProcessed;
nl::Weave::TLV::TLVReader *mReader;
SubscriptionClient * mClient;
} mEventStreamReceived;
union OutEventParam
void Clear(void) { memset(this, 0, sizeof(*this)); }
TraitPath *mPathList; //< Pointer to a list of trait paths
VersionedTraitPath *mVersionedPathList; //< Pointer to a list of versioned trait paths. If both this and mPathList are non-NULL, the versioned path list is selected
size_t mPathListSize; //< Number of trait paths in mPathList
LastObservedEvent *mLastObservedEventList; //< A list of the last known events received by the subscriber
size_t mLastObservedEventListSize; //< Number of observed events in mLastObservedEventList
uint32_t mTimeoutSecMin; //< Field specifying lower bound of liveness timeout
uint32_t mTimeoutSecMax; //< Field specifying upper bound of liveness timeout
uint64_t mSubscriptionId; //< The subscription ID to use for a mutual subscription
bool mNeedAllEvents; //< Indicates whether the subscriber is interested in events
} mSubscribeRequestPrepareNeeded;
struct ResubscribeParam
WEAVE_ERROR mReason; //< Error received on most recent failure
uint32_t mNumRetries; //< Number of retries, reset on a successful subscription
* @brief Callback to pass subscription events to application.
* @param aAppState[in] App state pointer set during initialization of
* the SubscriptionClient.
* @param aEvent[in] Indicates which event is happening
* @param aInParam[in] Struct with additional details about the event
* @param aOutParam[out] Information passed back by the application
typedef void (*EventCallback) (void * const aAppState, EventID aEvent, const InEventParam & aInParam, OutEventParam & aOutParam);
static void DefaultEventHandler(EventID aEvent, const InEventParam & aInParam, OutEventParam & aOutParam);
* @brief Callback to fetch the interval of time to wait before the next
* resubscribe. Applications are allowed to abort/free in this function
* if they've decided to give up on resubscribing.
* @param aAppState[in] App state pointer set during initialization of
* the SubscriptionClient.
* @param aInParam[in] Struct with additional details about the retry
* @param aOutIntervalMsec[out] Time in milliseconds to wait before next retry
typedef void (*ResubscribePolicyCallback) (void * const aAppState, ResubscribeParam & aInParam, uint32_t & aOutIntervalMsec);
static void DefaultResubscribePolicyCallback(void * const aAppState, ResubscribeParam & aInParam, uint32_t & aOutIntervalMsec);
void InitiateSubscription(void);
void InitiateCounterSubscription(const uint32_t aLivenessTimeoutSec);
Binding * GetBinding(void) const;
uint64_t GetPeerNodeId(void) const;
WEAVE_ERROR EndSubscription(void);
void AbortSubscription(void);
void Free (void);
WEAVE_ERROR GetSubscriptionId (uint64_t * const apSubscriptionId);
uint32_t GetLivenessTimeoutMsec (void) const { return mLivenessTimeoutMsec; };
void SetLivenessTimeoutMsec(uint32_t val) { mLivenessTimeoutMsec = val; }
void EnableResubscribe(ResubscribePolicyCallback aCallback);
void DisableResubscribe(void);
void ResetResubscribe(void);
bool IsEstablishedIdle () { return (mCurrentState == kState_SubscriptionEstablished_Idle); }
bool IsAborted () { return (mCurrentState == kState_Aborted); }
bool IsFree () { return (mCurrentState == kState_Free); }
bool IsRetryEnabled() { return (mResubscribePolicyCallback != NULL); }
friend class SubscriptionEngine;
friend class TestTdm;
friend class TestWdm;
enum ClientState
kState_Free = 0,
kState_Initialized = 1,
kState_Subscribing = 2,
kState_Subscribing_IdAssigned = 3,
kState_SubscriptionEstablished_Idle = 4,
kState_SubscriptionEstablished_Confirming = 5,
kState_Canceling = 6,
kState_Resubscribe_Holdoff = 7,
kState_NotifyDataSinkOnAbort_Begin = kState_Subscribing,
kState_NotifyDataSinkOnAbort_End = kState_Canceling,
kState_TimerTick_Begin = kState_Subscribing,
kState_TimerTick_End = kState_Resubscribe_Holdoff,
kState_Aborting = 8,
kState_Aborted = 9,
ClientState mCurrentState;
bool mIsInitiator;
bool mPrevIsPartialChange;
TraitDataHandle mPrevTraitDataHandle;
int8_t mRefCount;
Binding * mBinding;
nl::Weave::ExchangeContext * mEC;
void * mAppState;
EventCallback mEventCallback;
Binding::EventCallback mAppBindingCallback;
void * mAppBindingState;
ResubscribePolicyCallback mResubscribePolicyCallback;
const TraitCatalogBase<TraitDataSink>* mDataSinkCatalog;
ExchangeContext::Timeout mInactivityTimeoutDuringSubscribingMsec;
uint32_t mLivenessTimeoutMsec;
uint64_t mSubscriptionId;
// retry params
uint32_t mRetryCounter;
// Do nothing
SubscriptionClient (void);
void InitAsFree(void);
void Reset(void);
// AddRef to Binding
// store pointers to binding and delegate
// null out EC
WEAVE_ERROR Init (Binding * const apBinding,
void * const apAppState,
EventCallback const aEventCallback,
const TraitCatalogBase<TraitDataSink>* const apCatalog,
const uint32_t aInactivityTimeoutDuringSubscribingMsec);
void _InitiateSubscription(void);
WEAVE_ERROR SendSubscribeRequest(void);
WEAVE_ERROR ProcessDataList(nl::Weave::TLV::TLVReader &aReader);
void _AddRef (void);
void _Release (void);
void HandleSubscriptionTerminated(bool aWillRetry, WEAVE_ERROR aReason,
nl::Weave::Profiles::StatusReporting::StatusReport *aStatusReportPtr);
WEAVE_ERROR ReplaceExchangeContext (void);
void FlushExistingExchangeContext (const bool aAbortNow = false);
void NotificationRequestHandler (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, PacketBuffer *aPayload);
void CancelRequestHandler (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, PacketBuffer *aPayload);
void TimerEventHandler (void);
WEAVE_ERROR RefreshTimer (void);
void SetRetryTimer (WEAVE_ERROR aReason);
void MoveToState(const ClientState aTargetState);
const char * GetStateStr(void) const;
static void BindingEventCallback(void * const apAppState, const Binding::EventType aEvent,
const Binding::InEventParam & aInParam,
Binding::OutEventParam & aOutParam);
static void OnTimerCallback(System::Layer* aSystemLayer, void *aAppState, System::Error aErrorCode);
static void DataSinkOperation_NoMoreData (void * const apOpState, TraitDataSink * const apDataSink);
static void OnSendError (ExchangeContext *aEC, WEAVE_ERROR aErrorCode, void *aMsgSpecificContext);
static void OnResponseTimeout (nl::Weave::ExchangeContext *aEC);
static void OnMessageReceivedFromLocallyInitiatedExchange (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload);
}; // WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current)
}; // Profiles
}; // Weave
}; // nl