blob: 2b463c8ce17e20ab125ede4770f84cc0c4f4c99d [file] [log] [blame]
/*
*
* Copyright (c) 2016-2017 Nest Labs, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file
* This file implements subscription client for Weave
* Data Management (WDM) profile.
*
*/
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif // __STDC_FORMAT_MACROS
#ifndef __STDC_CONSTANT_MACROS
#define __STDC_CONSTANT_MACROS
#endif // __STDC_CONSTANT_MACROS
#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif // __STDC_LIMIT_MACROS
#include <Weave/Profiles/data-management/Current/WdmManagedNamespace.h>
#include <Weave/Profiles/data-management/DataManagement.h>
#include <Weave/Support/WeaveFaultInjection.h>
#include <Weave/Support/RandUtils.h>
#include <Weave/Support/FibonacciUtils.h>
#include <SystemLayer/SystemStats.h>
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
namespace nl {
namespace Weave {
namespace Profiles {
namespace WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current) {
// Do nothing
SubscriptionClient::SubscriptionClient ()
{
}
void SubscriptionClient::InitAsFree ()
{
mCurrentState = kState_Free;
mRefCount = 0;
Reset();
}
void SubscriptionClient::Reset(void)
{
mBinding = NULL;
mEC = NULL;
mAppState = NULL;
mEventCallback = NULL;
mResubscribePolicyCallback = NULL;
mDataSinkCatalog = NULL;
mInactivityTimeoutDuringSubscribingMsec = kNoTimeOut;
mLivenessTimeoutMsec = kNoTimeOut;
mSubscriptionId = 0;
mIsInitiator = false;
mRetryCounter = 0;
#if WDM_ENABLE_PROTOCOL_CHECKS
mPrevTraitDataHandle = -1;
#endif
mPrevIsPartialChange = false;
}
// AddRef to Binding
// store pointers to binding and delegate
// null out EC
WEAVE_ERROR SubscriptionClient::Init (Binding * const apBinding,
void * const apAppState,
EventCallback const aEventCallback,
const TraitCatalogBase<TraitDataSink>* const apCatalog,
const uint32_t aInactivityTimeoutDuringSubscribingMsec)
{
WeaveLogIfFalse(0 == mRefCount);
// add reference to the binding
apBinding->AddRef();
// make a copy of the pointers
mBinding = apBinding;
mAppState = apAppState;
mEventCallback = aEventCallback;
mDataSinkCatalog = apCatalog;
mInactivityTimeoutDuringSubscribingMsec = aInactivityTimeoutDuringSubscribingMsec;
MoveToState(kState_Initialized);
_AddRef ();
return WEAVE_NO_ERROR;
}
#if WEAVE_DETAIL_LOGGING
const char * SubscriptionClient::GetStateStr() const
{
switch (mCurrentState)
{
case kState_Free:
return "FREE";
case kState_Initialized:
return "INIT";
case kState_Subscribing:
return "SReq1";
case kState_Subscribing_IdAssigned:
return "SReq2";
case kState_SubscriptionEstablished_Idle:
return "ALIVE";
case kState_SubscriptionEstablished_Confirming:
return "CONFM";
case kState_Canceling:
return "CANCL";
case kState_Resubscribe_Holdoff:
return "RETRY";
case kState_Aborting:
return "ABTNG";
case kState_Aborted:
return "ABORT";
}
return "N/A";
}
#else // WEAVE_DETAIL_LOGGING
const char * SubscriptionClient::GetStateStr() const
{
return "N/A";
}
#endif // WEAVE_DETAIL_LOGGING
void SubscriptionClient::MoveToState(const ClientState aTargetState)
{
mCurrentState = aTargetState;
WeaveLogDetail(DataManagement, "Client[%u] moving to [%5.5s] Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), mRefCount);
#if WEAVE_DETAIL_LOGGING
if (kState_Free == mCurrentState)
{
SubscriptionEngine::GetInstance()->LogSubscriptionFreed();
}
#endif // #if WEAVE_DETAIL_LOGGING
}
/**
* @brief Enable automatic resubscribes. Attach a callback to specify
* the next retry time on failure.
*
* @param aCallback[in] Optional callback to fetch the amount of time to
* wait before retrying after a failure. If NULL use
* a default policy.
*/
void SubscriptionClient::EnableResubscribe(ResubscribePolicyCallback aCallback)
{
if (aCallback)
{
mResubscribePolicyCallback = aCallback;
}
else
{
mResubscribePolicyCallback = DefaultResubscribePolicyCallback;
}
}
/**
* @brief Disable the resubscribe mechanism. This will abort if a resubscribe
* was pending.
*/
void SubscriptionClient::DisableResubscribe(void)
{
mResubscribePolicyCallback = NULL;
if (mCurrentState == kState_Resubscribe_Holdoff)
{
// cancel timer
SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->
SystemLayer->CancelTimer(OnTimerCallback, this);
// app doesn't need to know since it triggered this
AbortSubscription();
}
}
/**
* @brief Kick the resubscribe mechanism. This will initiate an immediate retry
*/
void SubscriptionClient::ResetResubscribe(void)
{
if (mCurrentState == kState_Resubscribe_Holdoff)
{
// cancel timer
SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->
SystemLayer->CancelTimer(OnTimerCallback, this);
MoveToState(kState_Initialized);
}
mRetryCounter = 0;
if (mCurrentState == kState_Initialized || mCurrentState == kState_Resubscribe_Holdoff)
{
SetRetryTimer(WEAVE_NO_ERROR);
}
}
WEAVE_ERROR SubscriptionClient::GetSubscriptionId (uint64_t * const apSubscriptionId)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
*apSubscriptionId = 0;
switch (mCurrentState)
{
case kState_Subscribing_IdAssigned:
case kState_SubscriptionEstablished_Idle:
case kState_SubscriptionEstablished_Confirming:
case kState_Canceling:
*apSubscriptionId = mSubscriptionId;
ExitNow();
break;
default:
ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
}
exit:
WeaveLogFunctError(err);
return err;
}
void SubscriptionClient::DefaultEventHandler(EventID aEvent, const InEventParam & aInParam, OutEventParam & aOutParam)
{
IgnoreUnusedVariable(aInParam);
IgnoreUnusedVariable(aOutParam);
WeaveLogDetail(DataManagement, "%s event: %d", __func__, aEvent);
}
/**
* @brief The default policy implementation will pick a random timeslot
* with millisecond resolution over an ever increasing window,
* following a fibonacci sequence upto WDM_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX.
* Average of the randomized wait time past the WDM_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX
* will be around one hour.
* When the retry count resets to 0, the sequence starts from the beginning again.
*/
void SubscriptionClient::DefaultResubscribePolicyCallback(void * const aAppState,
ResubscribeParam & aInParam,
uint32_t & aOutIntervalMsec)
{
IgnoreUnusedVariable(aAppState);
uint32_t fibonacciNum = 0;
uint32_t maxWaitTimeInMsec = 0;
uint32_t waitTimeInMsec = 0;
uint32_t minWaitTimeInMsec = 0;
if (aInParam.mNumRetries <= WDM_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
{
fibonacciNum = GetFibonacciForIndex(aInParam.mNumRetries);
maxWaitTimeInMsec = fibonacciNum * WDM_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
}
else
{
maxWaitTimeInMsec = WDM_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
}
if (maxWaitTimeInMsec != 0)
{
minWaitTimeInMsec = (WDM_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
waitTimeInMsec = minWaitTimeInMsec + (GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
}
aOutIntervalMsec = waitTimeInMsec;
WeaveLogDetail(DataManagement, "Computing resubscribe policy: attempts %" PRIu32 ", max wait time %" PRIu32 " ms, selected wait time %" PRIu32 " ms",
aInParam.mNumRetries, maxWaitTimeInMsec, waitTimeInMsec);
return;
}
void SubscriptionClient::_InitiateSubscription(void)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
// Make sure the client object is not freed during the callback to the application.
_AddRef();
VerifyOrExit(kState_Subscribing != mCurrentState && kState_Subscribing_IdAssigned != mCurrentState, /* no-op */);
VerifyOrExit(kState_Initialized == mCurrentState, err = WEAVE_ERROR_INCORRECT_STATE);
// Set the protocol callback on the binding object. NOTE: This should only happen once the
// app has explicitly started the subscription process by calling either InitiateSubscription() or
// InitiateCounterSubscription(). Otherwise the client object might receive callbacks from
// the binding before its ready.
mBinding->SetProtocolLayerCallback(BindingEventCallback, this);
#if WDM_ENABLE_PROTOCOL_CHECKS
mPrevTraitDataHandle = -1;
#endif
mPrevIsPartialChange = false;
// If the binding is ready...
if (mBinding->IsReady())
{
// Using the binding, form and send a SubscribeRequest to the publisher.
err = SendSubscribeRequest();
SuccessOrExit(err);
// Enter the Subscribing state.
if (mIsInitiator)
{
MoveToState(kState_Subscribing);
}
else
{
MoveToState(kState_Subscribing_IdAssigned);
}
err = RefreshTimer();
SuccessOrExit(err);
}
// Otherwise, if the binding needs to be prepared...
else if (mBinding->CanBePrepared())
{
// Ask the application prepare the binding by delivering a PrepareRequested API event to it via the
// binding's callback. At some point the binding will callback into the SubscriptionClient signaling
// that preparation has completed (successfully or otherwise). Note that this callback can happen
// synchronously within the RequestPrepare() method, implying that _InitiateSubscription() will recurse.
err = mBinding->RequestPrepare();
SuccessOrExit(err);
}
// Otherwise, verify that the binding is in one of the preparing states. Once preparation completes, the
// binding will call back, at which point, if preparation was successful, _InitiateSubscription() will be
// called again.
else
{
VerifyOrExit(mBinding->IsPreparing(), err = WEAVE_ERROR_INCORRECT_STATE);
}
exit:
WeaveLogFunctError(err);
if (WEAVE_NO_ERROR != err)
{
HandleSubscriptionTerminated(IsRetryEnabled(), err, NULL);
}
_Release();
}
WEAVE_ERROR SubscriptionClient::SendSubscribeRequest(void)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer* msgBuf = NULL;
uint8_t msgType = kMsgType_SubscribeRequest;
InEventParam inSubscribeParam;
OutEventParam outSubscribeParam;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
inSubscribeParam.Clear();
outSubscribeParam.Clear();
outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList = NULL;
outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList = NULL;
inSubscribeParam.mSubscribeRequestPrepareNeeded.mClient = this;
mEventCallback(mAppState, kEvent_OnSubscribeRequestPrepareNeeded,
inSubscribeParam, outSubscribeParam);
if (!mIsInitiator)
{
mSubscriptionId = outSubscribeParam.mSubscribeRequestPrepareNeeded.mSubscriptionId;
}
VerifyOrExit(kState_Initialized == mCurrentState,
err = WEAVE_ERROR_INCORRECT_STATE);
VerifyOrExit((outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin <= kMaxTimeoutSec) ||
(kNoTimeOut == outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin),
err = WEAVE_ERROR_INVALID_ARGUMENT);
VerifyOrExit((outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax <= kMaxTimeoutSec) ||
(kNoTimeOut == outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax),
err = WEAVE_ERROR_INVALID_ARGUMENT);
msgBuf = PacketBuffer::New();
VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);
{
nl::Weave::TLV::TLVWriter writer;
SubscribeRequest::Builder request;
writer.Init(msgBuf);
err = request.Init(&writer);
SuccessOrExit(err);
if (kNoTimeOut != outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin)
{
request.SubscribeTimeoutMin(outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin);
}
if (kNoTimeOut != outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax)
{
request.SubscribeTimeoutMax(outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax);
}
if (!mIsInitiator)
{
request.SubscriptionID(mSubscriptionId);
}
// It's safe to bail out after a series of operation, for
// SubscriptionRequest::Builder would internally turn to NOP after error is logged
SuccessOrExit(err = request.GetError());
{
PathList::Builder & pathList = request.CreatePathListBuilder();
for (size_t i = 0; i < outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathListSize; ++i)
{
TraitDataSink *pDataSink;
nl::Weave::TLV::TLVType dummyContainerType;
SchemaVersionRange versionIntersection;
VersionedTraitPath versionedTraitPath;
// Applications can set either the versioned or non versioned path lists for now. We pick either
// depending on which is non-NULL. If both are non-NULL, we then select the versioned list.
if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList) {
versionedTraitPath = outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList[i];
}
else {
versionedTraitPath.mTraitDataHandle = outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mTraitDataHandle;
versionedTraitPath.mPropertyPathHandle = outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mPropertyPathHandle;
}
err = mDataSinkCatalog->Locate(versionedTraitPath.mTraitDataHandle, &pDataSink);
SuccessOrExit(err);
// Start the TLV Path
err = writer.StartContainer(nl::Weave::TLV::AnonymousTag, nl::Weave::TLV::kTLVType_Path, dummyContainerType);
SuccessOrExit(err);
// Start, fill, and close the TLV Structure that contains ResourceID, ProfileID, and InstanceID
err = mDataSinkCatalog->HandleToAddress(versionedTraitPath.mTraitDataHandle, writer, versionedTraitPath.mRequestedVersionRange);
SuccessOrExit(err);
// Append zero or more TLV tags based on the Path Handle
err = pDataSink->GetSchemaEngine()->MapHandleToPath(versionedTraitPath.mPropertyPathHandle, writer);
SuccessOrExit(err);
// Close the TLV Path
err = writer.EndContainer(dummyContainerType);
SuccessOrExit(err);
}
pathList.EndOfPathList();
SuccessOrExit(err = pathList.GetError());
}
{
VersionList::Builder & versionList = request.CreateVersionListBuilder();
for (size_t i = 0; i < outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathListSize; ++i)
{
TraitDataSink *pDataSink;
VersionedTraitPath versionedTraitPath;
if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList) {
versionedTraitPath = outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList[i];
}
else {
versionedTraitPath.mTraitDataHandle = outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mTraitDataHandle;
versionedTraitPath.mPropertyPathHandle = outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mPropertyPathHandle;
}
err = mDataSinkCatalog->Locate(versionedTraitPath.mTraitDataHandle, &pDataSink);
SuccessOrExit(err);
if (pDataSink->IsVersionValid()) {
versionList.AddVersion(pDataSink->GetVersion());
}
else
{
versionList.AddNull();
}
}
versionList.EndOfVersionList();
SuccessOrExit(err = versionList.GetError());
}
if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mNeedAllEvents)
{
request.SubscribeToAllEvents(true);
if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize > 0)
{
EventList::Builder & eventList = request.CreateLastObservedEventIdListBuilder();
for (size_t n = 0; n < outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize; ++n)
{
Event::Builder & event = eventList.CreateEventBuilder();
event.SourceId(outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList[n].mSourceId)
.Importance(outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList[n].mImportance)
.EventId(outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList[n].mEventId)
.EndOfEvent ();
SuccessOrExit(err = event.GetError());
}
eventList.EndOfEventList ();
SuccessOrExit(err = eventList.GetError());
}
}
request.EndOfRequest();
SuccessOrExit(err = request.GetError());
err = writer.Finalize();
SuccessOrExit(err);
}
err = ReplaceExchangeContext ();
SuccessOrExit(err);
// NOTE: State could be changed in sync error callback by message layer
WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_SendUnsupportedReqMsgType, msgType += 50);
err = mEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_WDM, msgType, msgBuf,
nl::Weave::ExchangeContext::kSendFlag_ExpectResponse);
msgBuf = NULL;
SuccessOrExit(err);
exit:
WeaveLogFunctError(err);
if (NULL != msgBuf)
{
PacketBuffer::Free(msgBuf);
msgBuf = NULL;
}
return err;
}
void SubscriptionClient::InitiateSubscription(void)
{
mIsInitiator = true;
if (IsRetryEnabled())
{
SetRetryTimer(WEAVE_NO_ERROR);
}
else
{
_InitiateSubscription();
}
}
void SubscriptionClient::InitiateCounterSubscription(const uint32_t aLivenessTimeoutSec)
{
mIsInitiator = false;
// the liveness timeout spec is given and not part of the subscription setup
mLivenessTimeoutMsec = aLivenessTimeoutSec * 1000;
_InitiateSubscription();
}
void SubscriptionClient::_AddRef ()
{
WeaveLogIfFalse(mRefCount < INT8_MAX);
++mRefCount;
// 0: free
// 1: in some phase of subscription
// increase: in downcall to message layer, some callback might come from message layer (send error/connection broken)
// increase: in callback to app layer
}
void SubscriptionClient::_Release ()
{
WeaveLogIfFalse(mRefCount > 0);
--mRefCount;
if (0 == mRefCount)
{
AbortSubscription();
SYSTEM_STATS_DECREMENT(nl::Weave::System::Stats::kWDMNext_NumSubscriptionClients);
}
}
Binding * SubscriptionClient::GetBinding() const
{
return mBinding;
}
uint64_t SubscriptionClient::GetPeerNodeId(void) const
{
return (mBinding != NULL) ? mBinding->GetPeerNodeId() : kNodeIdNotSpecified;
}
WEAVE_ERROR SubscriptionClient::ReplaceExchangeContext ()
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
InEventParam inParam;
OutEventParam outParam;
// Make sure we're not freed by accident.
_AddRef();
FlushExistingExchangeContext();
err = mBinding->NewExchangeContext(mEC);
SuccessOrExit(err);
mEC->AppState = this;
mEC->OnMessageReceived = OnMessageReceivedFromLocallyInitiatedExchange;
mEC->OnResponseTimeout = OnResponseTimeout;
mEC->OnSendError = OnSendError;
mEC->OnAckRcvd = NULL;
inParam.mExchangeStart.mEC = mEC;
inParam.mExchangeStart.mClient = this;
// NOTE: app layer is not supposed to change state/ref count in this callback
mEventCallback(mAppState, kEvent_OnExchangeStart, inParam, outParam);
exit:
WeaveLogFunctError(err);
_Release ();
return err;
}
void SubscriptionClient::FlushExistingExchangeContext (const bool aAbortNow)
{
if (NULL != mEC)
{
mEC->AppState = NULL;
mEC->OnMessageReceived = NULL;
mEC->OnResponseTimeout = NULL;
mEC->OnSendError = NULL;
mEC->OnAckRcvd = NULL;
if (aAbortNow)
{
mEC->Abort();
}
else
{
mEC->Close();
}
mEC = NULL;
}
}
#if WDM_ENABLE_SUBSCRIPTION_CANCEL
WEAVE_ERROR SubscriptionClient::EndSubscription ()
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer* msgBuf = NULL;
nl::Weave::TLV::TLVWriter writer;
SubscribeCancelRequest::Builder request;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
// Make sure we're not freed by accident.
_AddRef();
switch (mCurrentState)
{
case kState_Subscribing:
// fall through
case kState_Subscribing_IdAssigned:
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s: subscription not established yet, abort",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__);
AbortSubscription();
ExitNow();
break;
case kState_SubscriptionEstablished_Confirming:
// forget we're in the middle of confirmation, as the outcome
// has become irrelevant
FlushExistingExchangeContext();
// fall through
case kState_SubscriptionEstablished_Idle:
msgBuf = PacketBuffer::NewWithAvailableSize(request.kBaseMessageSubscribeId_PayloadLen);
VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);
err = ReplaceExchangeContext ();
SuccessOrExit(err);
writer.Init(msgBuf);
request.Init(&writer);
err = request.SubscriptionID(mSubscriptionId)
.EndOfRequest()
.GetError();
SuccessOrExit(err);
err = writer.Finalize();
SuccessOrExit(err);
// NOTE: State could be changed if there is a sync error callback from message layer
err = mEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_WDM, kMsgType_SubscribeCancelRequest,
msgBuf, nl::Weave::ExchangeContext::kSendFlag_ExpectResponse);
msgBuf = NULL;
SuccessOrExit(err);
MoveToState(kState_Canceling);
break;
// Cancel is not supported in any other state
default:
ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
}
exit:
WeaveLogFunctError(err);
if (NULL != msgBuf)
{
PacketBuffer::Free(msgBuf);
msgBuf = NULL;
}
_Release();
return err;
}
#else // WDM_ENABLE_SUBSCRIPTION_CANCEL
WEAVE_ERROR SubscriptionClient::EndSubscription()
{
AbortSubscription();
return WEAVE_NO_ERROR;
}
#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL
void SubscriptionClient::AbortSubscription(void)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
const bool nullReference = (0 == mRefCount);
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
if (!nullReference)
{
// Make sure we're not freed by accident.
// NOTE: In the last Abort call from _Release, mRefCount is already 0.
// In that case, we do not need this AddRef/Release pair, and we move to FREE state.
_AddRef();
}
if (kState_Free == mCurrentState)
{
// This must not happen
ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
}
else if (kState_Aborted == mCurrentState || kState_Aborting == mCurrentState)
{
// we're already aborted, so there is nothing else to flush
ExitNow();
}
else
{
// This is an intermediate state for external calls during the abort process
uint64_t peerNodeId = mBinding->GetPeerNodeId();
uint64_t subscriptionId = mSubscriptionId;
bool deliverSubTerminatedToCatalog =
((NULL != mDataSinkCatalog) &&
(mCurrentState >= kState_NotifyDataSinkOnAbort_Begin) && (mCurrentState <= kState_NotifyDataSinkOnAbort_End));
MoveToState(kState_Aborting);
if (deliverSubTerminatedToCatalog)
{
// iterate through the whole catalog and deliver kEventSubscriptionTerminated event
mDataSinkCatalog->DispatchEvent(TraitDataSink::kEventSubscriptionTerminated, NULL);
}
mBinding->SetProtocolLayerCallback(NULL, NULL);
mBinding->Release();
mBinding = NULL;
// Note that ref count is not touched at here, as _Abort doesn't change the ownership
FlushExistingExchangeContext(true);
(void)RefreshTimer();
Reset();
MoveToState(kState_Aborted);
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
if (!mIsInitiator)
{
SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(peerNodeId, subscriptionId, true);
}
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
}
exit:
WeaveLogFunctError(err);
if (nullReference)
{
// No one is referencing us, move to FREE
MoveToState(kState_Free);
}
else
{
_Release();
}
}
void SubscriptionClient::HandleSubscriptionTerminated(bool aWillRetry, WEAVE_ERROR aReason,
StatusReporting::StatusReport *aStatusReportPtr)
{
void * const pAppState = mAppState;
EventCallback callbackFunc = mEventCallback;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
_AddRef();
if (!aWillRetry)
{
// Flush most internal states, except for mRefCount and mCurrentState
// move to kState_Aborted
AbortSubscription();
}
else
{
// We do not need to perform a full-fledged subscription
// abort. On the other hand, we can safely flush existing
// exchange context as any communication on that exchange
// context should be considered an error.
const bool abortExchangeContext = true;
FlushExistingExchangeContext(abortExchangeContext);
}
if (NULL != callbackFunc)
{
InEventParam inParam;
OutEventParam outParam;
inParam.Clear();
outParam.Clear();
inParam.mSubscriptionTerminated.mReason = aReason;
inParam.mSubscriptionTerminated.mClient = this;
inParam.mSubscriptionTerminated.mWillRetry = aWillRetry;
inParam.mSubscriptionTerminated.mIsStatusCodeValid = (aStatusReportPtr != NULL);
if (aStatusReportPtr != NULL)
{
inParam.mSubscriptionTerminated.mStatusProfileId = aStatusReportPtr->mProfileId;
inParam.mSubscriptionTerminated.mStatusCode = aStatusReportPtr->mStatusCode;
inParam.mSubscriptionTerminated.mAdditionalInfoPtr = &(aStatusReportPtr->mAdditionalInfo);
}
callbackFunc(pAppState, kEvent_OnSubscriptionTerminated, inParam, outParam);
}
else
{
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) app layer callback skipped",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
}
// only set this timer if the app cb hasn't changed our state.
if (aWillRetry && !IsAborted())
{
SetRetryTimer(aReason);
}
_Release();
}
void SubscriptionClient::SetRetryTimer(WEAVE_ERROR aReason)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
ClientState entryState = mCurrentState;
ResubscribePolicyCallback entryCb = mResubscribePolicyCallback;
// this check serves to see whether we already have a timer set
// and if resubscribes are enabled
if (entryCb && entryState < kState_Resubscribe_Holdoff)
{
uint32_t timeoutMsec = 0;
_AddRef();
MoveToState(kState_Resubscribe_Holdoff);
ResubscribeParam param;
param.mNumRetries = mRetryCounter;
param.mReason = aReason;
mResubscribePolicyCallback(mAppState, param, timeoutMsec);
VerifyOrExit(mCurrentState != kState_Aborted, /* exit without error */);
err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->
SystemLayer->StartTimer(timeoutMsec, OnTimerCallback, this);
SuccessOrExit(err);
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) timeout: %u",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount, timeoutMsec);
}
exit:
// all errors are considered fatal in this function
if (err != WEAVE_NO_ERROR)
{
HandleSubscriptionTerminated(false, err, NULL);
}
if (entryCb && (entryState < kState_Resubscribe_Holdoff))
{
_Release();
}
}
void SubscriptionClient::Free ()
{
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
WeaveLogIfFalse(kState_Free != mCurrentState);
WeaveLogIfFalse(mRefCount > 0);
// Abort the subscription if we're not already aborted
if (kState_Aborted != mCurrentState)
{
AbortSubscription();
}
// If mRefCount == 1, _Release would decrement it to 0, call Abort again and move us to FREE state
_Release();
}
void SubscriptionClient::BindingEventCallback(void * const aAppState,
const Binding::EventType aEvent,
const Binding::InEventParam & aInParam,
Binding::OutEventParam & aOutParam)
{
SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient*>(aAppState);
pClient->_AddRef();
switch (aEvent)
{
case Binding::kEvent_BindingReady:
// Binding is ready. We can send the subscription req now.
pClient->_InitiateSubscription();
break;
case Binding::kEvent_BindingFailed:
pClient->SetRetryTimer(aInParam.BindingFailed.Reason);
break;
case Binding::kEvent_PrepareFailed:
// need to prepare again.
pClient->SetRetryTimer(aInParam.PrepareFailed.Reason);
break;
default:
Binding::DefaultEventHandler(aAppState, aEvent, aInParam, aOutParam);
}
pClient->_Release();
}
void SubscriptionClient::OnTimerCallback(System::Layer* aSystemLayer, void *aAppState, System::Error)
{
SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient*>(aAppState);
pClient->TimerEventHandler();
}
WEAVE_ERROR SubscriptionClient::RefreshTimer (void)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
bool isTimerNeeded = false;
uint32_t timeoutMsec = 0;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
// Cancel timer first
SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->
SystemLayer->CancelTimer(OnTimerCallback, this);
// Arm timer according to current state
switch (mCurrentState)
{
case kState_Subscribing:
case kState_Subscribing_IdAssigned:
if (kNoTimeOut != mInactivityTimeoutDuringSubscribingMsec)
{
// Note that loss of range is not expected, as ExchangeManager::Timeout is indeed uint32_t
timeoutMsec = mInactivityTimeoutDuringSubscribingMsec;
isTimerNeeded = true;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Set inactivity time limit during subscribing to %" PRIu32 " msec",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount, timeoutMsec);
}
break;
case kState_SubscriptionEstablished_Idle:
if (kNoTimeOut != mLivenessTimeoutMsec)
{
if (mIsInitiator)
{
// Calculate margin to reserve for WRM activity, so we send out SubscribeConfirm earlier
// Note that wrap around could happen, if the system is configured with excessive delays and number of retries
const nl::Weave::WRMPConfig& defaultWRMPConfig = mBinding->GetDefaultWRMPConfig();
const uint32_t marginMsec = (defaultWRMPConfig.mMaxRetrans + 1) * defaultWRMPConfig.mInitialRetransTimeout;
// If the margin is smaller than the desired liveness timeout, set a timer for the difference.
// Otherwise, set the timer to 0 (which will fire immediately)
if (marginMsec < mLivenessTimeoutMsec)
{
timeoutMsec = mLivenessTimeoutMsec - marginMsec;
}
else
{
// This is a system configuration problem
WeaveLogError(DataManagement, "Client[%u] Liveness period (%" PRIu32 " msec) <= margin reserved for WRM (%" PRIu32 " msec)",
SubscriptionEngine::GetInstance()->GetClientId(this), mLivenessTimeoutMsec, marginMsec);
ExitNow(err = WEAVE_ERROR_TIMEOUT);
}
}
else
{
timeoutMsec = mLivenessTimeoutMsec;
}
isTimerNeeded = true;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Set timer for liveness confirmation to %" PRIu32 " msec",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount, timeoutMsec);
}
break;
case kState_SubscriptionEstablished_Confirming:
// Do nothing
break;
case kState_Aborting:
// Do nothing
break;
default:
ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
}
if (isTimerNeeded)
{
err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->
SystemLayer->StartTimer(timeoutMsec, OnTimerCallback, this);
VerifyOrExit(WEAVE_SYSTEM_NO_ERROR == err, /* no-op */);
}
exit:
WeaveLogFunctError(err);
return err;
}
void SubscriptionClient::TimerEventHandler (void)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer* msgBuf = NULL;
bool skipTimerCheck = false;
if ((0 == mRefCount) ||
(mCurrentState < kState_TimerTick_Begin) ||
(mCurrentState > kState_TimerTick_End))
{
skipTimerCheck = true;
ExitNow();
}
// Make sure we're not freed by accident
_AddRef();
switch (mCurrentState)
{
case kState_Subscribing:
case kState_Subscribing_IdAssigned:
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Timeout for subscribing phase, abort",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
ExitNow(err = WEAVE_ERROR_TIMEOUT);
break;
case kState_SubscriptionEstablished_Idle:
if (mIsInitiator)
{
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Confirming liveness",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
// timeout for liveness check
nl::Weave::TLV::TLVWriter writer;
SubscribeConfirmRequest::Builder request;
msgBuf = PacketBuffer::NewWithAvailableSize(request.kBaseMessageSubscribeId_PayloadLen);
VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);
err = ReplaceExchangeContext ();
SuccessOrExit(err);
writer.Init(msgBuf);
request.Init(&writer);
err = request.SubscriptionID(mSubscriptionId)
.EndOfRequest()
.GetError();
SuccessOrExit(err);
err = writer.Finalize();
SuccessOrExit(err);
// NOTE: State could be changed if there is a send error callback from message layer
err = mEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_WDM, kMsgType_SubscribeConfirmRequest,
msgBuf, nl::Weave::ExchangeContext::kSendFlag_ExpectResponse);
msgBuf = NULL;
SuccessOrExit(err);
if (kState_SubscriptionEstablished_Idle == mCurrentState)
{
MoveToState(kState_SubscriptionEstablished_Confirming);
}
else
{
// state has changed, probably because some callback from message layer
ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
}
}
else
{
// We are not the initiator, so we cannot send out the subscribe confirm
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Timeout",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
// abort the subscription as we've timed out
ExitNow(err = WEAVE_ERROR_TIMEOUT);
}
break;
case kState_Resubscribe_Holdoff:
mRetryCounter++;
MoveToState(kState_Initialized);
_InitiateSubscription();
break;
default:
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Timer event fired at wrong state, ignore",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
break;
}
exit:
WeaveLogFunctError(err);
if (NULL != msgBuf)
{
PacketBuffer::Free(msgBuf);
msgBuf = NULL;
}
if (err != WEAVE_NO_ERROR)
{
HandleSubscriptionTerminated(IsRetryEnabled(), err, NULL);
}
if (!skipTimerCheck)
{
_Release();
}
}
WEAVE_ERROR SubscriptionClient::ProcessDataList(nl::Weave::TLV::TLVReader &aReader)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
// TODO: We currently don't support changes that span multiple notifies, nor changes
// that get aborted and restarted within the same notify. See WEAV-1586 for more details.
bool isPartialChange = false;
uint8_t flags;
while (WEAVE_NO_ERROR == (err = aReader.Next()))
{
nl::Weave::TLV::TLVReader pathReader;
{
DataElement::Parser element;
err = element.Init(aReader);
SuccessOrExit(err);
err = element.GetReaderOnPath(&pathReader);
SuccessOrExit(err);
isPartialChange = false;
err = element.GetPartialChangeFlag(&isPartialChange);
VerifyOrExit(err == WEAVE_NO_ERROR || err == WEAVE_END_OF_TLV, );
}
TraitDataSink *DataSink;
TraitDataHandle handle;
PropertyPathHandle pathHandle;
SchemaVersionRange versionRange;
err = mDataSinkCatalog->AddressToHandle(pathReader, handle, versionRange);
SuccessOrExit(err);
err = mDataSinkCatalog->Locate(handle, &DataSink);
SuccessOrExit(err);
err = DataSink->GetSchemaEngine()->MapPathToHandle(pathReader, pathHandle);
#if TDM_DISABLE_STRICT_SCHEMA_COMPLIANCE
// if we're not in strict compliance mode, we can ignore data elements that refer to paths we can't map due to mismatching schema.
// The eventual call to StoreDataElement will correctly deal with the presence of a null property path handle that
// has been returned by the above call. It's necessary to call into StoreDataElement with this null handle to ensure
// the requisite OnEvent calls are made to the application despite the presence of an unknown tag. It's also necessary to ensure
// that we update the internal version tracked by the sink.
if (err == WEAVE_ERROR_TLV_TAG_NOT_FOUND) {
WeaveLogDetail(DataManagement, "Ignoring un-mappable path!");
err = WEAVE_NO_ERROR;
}
#endif
SuccessOrExit(err);
pathReader = aReader;
flags = 0;
#if WDM_ENABLE_PROTOCOL_CHECKS
// If we previously had a partial change, the current handle should match the previous one.
// If they don't, we have a partial change violation.
if (mPrevIsPartialChange && (mPrevTraitDataHandle != handle)) {
WeaveLogError(DataManagement, "Encountered partial change flag violation (%u, %x, %x)", mPrevIsPartialChange, mPrevTraitDataHandle, handle);
err = WEAVE_ERROR_INVALID_DATA_LIST;
goto exit;
}
#endif
if (!mPrevIsPartialChange) {
flags = TraitDataSink::kFirstElementInChange;
}
if (!isPartialChange) {
flags |= TraitDataSink::kLastElementInChange;
}
err = DataSink->StoreDataElement(pathHandle, pathReader, flags, NULL, NULL);
SuccessOrExit(err);
mPrevIsPartialChange = isPartialChange;
#if WDM_ENABLE_PROTOCOL_CHECKS
mPrevTraitDataHandle = handle;
#endif
}
// if we have exhausted this container
if (WEAVE_END_OF_TLV == err)
{
err = WEAVE_NO_ERROR;
}
exit:
return err;
}
void SubscriptionClient::NotificationRequestHandler (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
InEventParam inParam;
OutEventParam outParam;
NotificationRequest::Parser notify;
const ClientState StateWhenEntered = mCurrentState;
nl::Weave::TLV::TLVReader reader;
bool isDataListPresent = false;
#if WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION
bool isEventListPresent = false;
#endif
uint8_t statusReportLen = 6;
PacketBuffer* msgBuf = PacketBuffer::NewWithAvailableSize(statusReportLen);
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
// Make sure we're not freed by accident
_AddRef();
if (mEC != aEC)
{
// only re-configure if this is an incoming EC
mBinding->AdjustResponseTimeout(aEC);
}
VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);
// skip the first 6 bytes in status report, as they are reserved for the profile ID and status code
msgBuf->SetDataLength(statusReportLen);
switch (StateWhenEntered)
{
case kState_Subscribing:
case kState_Subscribing_IdAssigned:
// In these two states, all notifications must come in the same exchange context
VerifyOrExit(aEC == mEC, err = WEAVE_ERROR_INCORRECT_STATE);
// refresh inactivity monitor every time we receive a notification request
err = RefreshTimer();
SuccessOrExit(err);
break;
case kState_SubscriptionEstablished_Idle:
case kState_SubscriptionEstablished_Confirming:
// refresh inactivity monitor every time we receive a notification request
err = RefreshTimer();
SuccessOrExit(err);
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(mBinding->GetPeerNodeId(), mSubscriptionId);
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
break;
// we are going to ignore any notification requests in other states
default:
ExitNow();
}
inParam.mNotificationRequest.mEC = aEC;
inParam.mNotificationRequest.mMessage = aPayload;
inParam.mNotificationRequest.mClient = this;
// NOTE: state could be changed in the callback to app layer
mEventCallback(mAppState, kEvent_OnNotificationRequest, inParam, outParam);
mDataSinkCatalog->DispatchEvent(TraitDataSink::kEventNotifyRequestBegin, NULL);
// jump to Exit if the state has been changed in the callback to app layer
VerifyOrExit(StateWhenEntered == mCurrentState, /* no-op */);
reader.Init(aPayload);
reader.Next();
err = notify.Init(reader);
SuccessOrExit(err);
#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
// simple schema checking
err = notify.CheckSchemaValidity();
SuccessOrExit(err);
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
// TODO: use the new GetReaderOnXYZ pattern to locate the data list, instead creating a data list parser object
{
DataList::Parser dataList;
err = notify.GetDataList(&dataList);
if (WEAVE_NO_ERROR == err)
{
isDataListPresent = true;
}
else if (WEAVE_END_OF_TLV == err)
{
isDataListPresent = false;
err = WEAVE_NO_ERROR;
}
SuccessOrExit(err);
// re-initialize the reader to point to individual date element (reuse to save stack depth).
dataList.GetReader(&reader);
}
if (isDataListPresent)
{
err = ProcessDataList(reader);
SuccessOrExit(err);
}
#if WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION
{
EventList::Parser eventList;
err = notify.GetEventList(&eventList);
if (WEAVE_NO_ERROR == err)
{
isEventListPresent = true;
}
else if (WEAVE_END_OF_TLV == err)
{
isEventListPresent = false;
err = WEAVE_NO_ERROR;
}
SuccessOrExit(err);
// re-initialize the reader (reuse to save stack depth).
eventList.GetReader(&reader);
}
if (isEventListPresent)
{
inParam.mEventStreamReceived.mReader = &reader;
inParam.mEventStreamReceived.mClient = this;
// Invoke our callback.
mEventCallback(mAppState, kEvent_OnEventStreamReceived, inParam, outParam);
}
#endif // WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION
// TODO: As I've commented in Weave PR#614, there is no support for event sink
inParam.mNotificationProcessed.mClient = this;
// NOTE: state could be changed in the callback to app layer
mEventCallback(mAppState, kEvent_OnNotificationProcessed, inParam, outParam);
mDataSinkCatalog->DispatchEvent(TraitDataSink::kEventNotifyRequestEnd, NULL);
// jump to Exit if the state has been changed in the callback to app layer
VerifyOrExit(StateWhenEntered == mCurrentState, /* no-op */);
{
uint8_t * p = msgBuf->Start();
nl::Weave::Encoding::LittleEndian::Write32(p, nl::Weave::Profiles::kWeaveProfile_Common);
nl::Weave::Encoding::LittleEndian::Write16(p, nl::Weave::Profiles::Common::kStatus_Success);
err = aEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_Common,
nl::Weave::Profiles::Common::kMsgType_StatusReport,
msgBuf,
aEC->HasPeerRequestedAck() ? nl::Weave::ExchangeContext::kSendFlag_RequestAck : 0);
msgBuf = NULL;
SuccessOrExit(err);
}
exit:
WeaveLogFunctError(err);
if (NULL != msgBuf)
{
PacketBuffer::Free(msgBuf);
msgBuf = NULL;
}
if (NULL != aPayload) {
PacketBuffer::Free(aPayload);
aPayload = NULL;
}
// If this is not a locally initiated exchange, always close the exchange
if (aEC != mEC)
{
aEC->Close();
aEC = NULL;
}
if (WEAVE_NO_ERROR != err)
{
// If we're not aborted yet, make a callback to app layer
HandleSubscriptionTerminated(IsRetryEnabled(), err, NULL);
}
_Release();
}
#if WDM_ENABLE_SUBSCRIPTION_CANCEL
void SubscriptionClient::CancelRequestHandler (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
uint8_t statusReportLen = 6;
PacketBuffer *msgBuf = PacketBuffer::NewWithAvailableSize(statusReportLen);
uint8_t * p;
bool canceled = true;
uint32_t statusProfile = nl::Weave::Profiles::kWeaveProfile_Common;
uint16_t statusCode = nl::Weave::Profiles::Common::kStatus_Success;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
// Make sure we're not freed by accident
_AddRef();
mBinding->AdjustResponseTimeout(aEC);
VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);
// Verify the cancel request is truly from the publisher. If not, reject the request with
// "invalid subscription id" to avoid revealing the existence of the subscription.
if (!mBinding->IsAuthenticMessageFromPeer(aMsgInfo))
{
WeaveLogDetail(DataManagement, "Rejecting SubscribeCancelRequest from unauthorized source");
canceled = false;
statusProfile = nl::Weave::Profiles::kWeaveProfile_WDM;
statusCode = kStatus_InvalidSubscriptionID;
}
p = msgBuf->Start();
nl::Weave::Encoding::LittleEndian::Write32(p, statusProfile);
nl::Weave::Encoding::LittleEndian::Write16(p, statusCode);
msgBuf->SetDataLength(statusReportLen);
err = aEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_Common,
nl::Weave::Profiles::Common::kMsgType_StatusReport,
msgBuf,
aEC->HasPeerRequestedAck() ? nl::Weave::ExchangeContext::kSendFlag_RequestAck : 0);
msgBuf = NULL;
SuccessOrExit(err);
exit:
WeaveLogFunctError(err);
if (NULL != msgBuf)
{
PacketBuffer::Free(msgBuf);
msgBuf = NULL;
}
// In either case, the subscription is already canceled, move to ABORTED
if ((WEAVE_NO_ERROR != err) || canceled)
{
HandleSubscriptionTerminated(false, err, NULL);
}
_Release();
}
#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL
void SubscriptionClient::OnSendError (ExchangeContext *aEC, WEAVE_ERROR aErrorCode, void *aMsgSpecificContext)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
IgnoreUnusedVariable(aMsgSpecificContext);
SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aEC->AppState);
bool subscribeRequestFailed = false;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(pClient), pClient->GetStateStr(), __func__, pClient->mRefCount);
// Make sure we're not freed by accident
pClient->_AddRef();
switch (pClient->mCurrentState)
{
case kState_Subscribing:
case kState_Subscribing_IdAssigned:
// Subscribe request failed, deliver mSubscriptionRequestFailedEventParam
subscribeRequestFailed = true;
ExitNow(err = aErrorCode);
break;
case kState_SubscriptionEstablished_Confirming:
// Subscribe Confirm request failed, so no point trying to send a cancel.
// Go ahead and terminate it.
ExitNow(err = aErrorCode);
break;
case kState_Resubscribe_Holdoff:
// OnResponseTimeout posts an error to OnSendError (this function).
// That can happen after we've already received a cb for OnSendError.
// So if we've already set a timeout, then we can ignore this error.
if (aErrorCode == WEAVE_ERROR_TIMEOUT)
{
err = WEAVE_NO_ERROR;
}
break;
case kState_Canceling:
ExitNow(err = aErrorCode);
break;
// In any of these states, we must not see this callback
default:
ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
}
exit:
WeaveLogFunctError(err);
if ((subscribeRequestFailed) || (WEAVE_NO_ERROR != err))
{
pClient->HandleSubscriptionTerminated(pClient->IsRetryEnabled(), err, NULL);
}
pClient->_Release();
}
void SubscriptionClient::OnResponseTimeout (nl::Weave::ExchangeContext *aEC)
{
SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aEC->AppState);
IgnoreUnusedVariable(pClient);
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(pClient), pClient->GetStateStr(), __func__, pClient->mRefCount);
OnSendError(aEC, WEAVE_ERROR_TIMEOUT, NULL);
}
void SubscriptionClient::OnMessageReceivedFromLocallyInitiatedExchange (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
// Notification Requests during initial setup
// Subscribe response
// Status Report for Subscribe request
// Status Report for Subscribe Cancel request
// Status Report for Subscribe Confirm request
WEAVE_ERROR err = WEAVE_NO_ERROR;
SubscriptionClient * pClient = reinterpret_cast<SubscriptionClient *>(aEC->AppState);
InEventParam inParam;
OutEventParam outParam;
bool retainExchangeContext = false;
bool isStatusReportValid = false;
nl::Weave::Profiles::StatusReporting::StatusReport status;
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)",
SubscriptionEngine::GetInstance()->GetClientId(pClient), pClient->GetStateStr(), __func__, pClient->mRefCount);
// Make sure we're not freed by accident
pClient->_AddRef();
WeaveLogIfFalse(aEC == pClient->mEC);
if ((nl::Weave::Profiles::kWeaveProfile_Common == aProfileId) && (nl::Weave::Profiles::Common::kMsgType_StatusReport == aMsgType))
{
// Note that payload is not freed in this call to parse
err = nl::Weave::Profiles::StatusReporting::StatusReport::parse(aPayload, status);
SuccessOrExit(err);
isStatusReportValid = true;
WeaveLogDetail(DataManagement, "Received Status Report 0x%" PRIX32 " : 0x%" PRIX16, status.mProfileId, status.mStatusCode);
}
switch (pClient->mCurrentState)
{
case kState_Subscribing:
case kState_Subscribing_IdAssigned:
if (isStatusReportValid)
{
ExitNow(err = WEAVE_ERROR_STATUS_REPORT_RECEIVED);
}
else if ((nl::Weave::Profiles::kWeaveProfile_WDM == aProfileId) && (kMsgType_NotificationRequest == aMsgType))
{
// notification request, don't close the exchange context, for more notification requests might arrive
// through this same exchange context
retainExchangeContext = true;
pClient->NotificationRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload);
aPayload = NULL;
}
else if ((nl::Weave::Profiles::kWeaveProfile_WDM == aProfileId) && (kMsgType_SubscribeResponse == aMsgType))
{
// capture subscription ID and liveness timeout
nl::Weave::TLV::TLVReader reader;
reader.Init(aPayload);
err = reader.Next();
SuccessOrExit(err);
SubscribeResponse::Parser response;
err = response.Init(reader);
SuccessOrExit(err);
#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
// simple schema checking
err = response.CheckSchemaValidity();
SuccessOrExit(err);
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
{
uint64_t subscriptionId;
err = response.GetSubscriptionID(&subscriptionId);
SuccessOrExit(err);
if (kState_Subscribing == pClient->mCurrentState)
{
// capture subscription ID
pClient->mSubscriptionId = subscriptionId;
}
else
{
// verify they are the same
VerifyOrExit(pClient->mSubscriptionId == subscriptionId, err = WEAVE_ERROR_INVALID_TLV_ELEMENT);
}
}
if (kState_Subscribing == pClient->mCurrentState)
{
uint32_t livenessTimeoutSec;
err = response.GetSubscribeTimeout(&livenessTimeoutSec);
if (WEAVE_NO_ERROR == err)
{
VerifyOrExit(livenessTimeoutSec <= kMaxTimeoutSec, err = WEAVE_ERROR_INVALID_TLV_ELEMENT);
// capture liveness timeout
pClient->mLivenessTimeoutMsec = livenessTimeoutSec * 1000;
}
else if (WEAVE_END_OF_TLV == err)
{
err = WEAVE_NO_ERROR;
}
else
{
ExitNow();
}
}
// Subscribe response, move to alive-idle state (and close the exchange context)
pClient->MoveToState(kState_SubscriptionEstablished_Idle);
err = pClient->RefreshTimer();
SuccessOrExit(err);
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(pClient->mBinding->GetPeerNodeId(), pClient->mSubscriptionId);
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
pClient->mRetryCounter = 0;
inParam.mSubscriptionEstablished.mSubscriptionId = pClient->mSubscriptionId;
inParam.mSubscriptionEstablished.mClient = pClient;
// it's allowed to cancel or even abandon this subscription right inside this callback
pClient->mEventCallback(pClient->mAppState, kEvent_OnSubscriptionEstablished, inParam, outParam);
// since the state could have been changed, we must not assume anything
ExitNow();
}
else
{
// protocol error
ExitNow(err = WEAVE_ERROR_INVALID_MESSAGE_TYPE);
}
break;
case kState_SubscriptionEstablished_Confirming:
if (isStatusReportValid && status.success())
{
// Status Report (success) for Subscribe Confirm request
// confirmed, move back to idle state
pClient->FlushExistingExchangeContext();
pClient->MoveToState(kState_SubscriptionEstablished_Idle);
WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] liveness confirmed",
SubscriptionEngine::GetInstance()->GetClientId(pClient), pClient->GetStateStr());
err = pClient->RefreshTimer();
SuccessOrExit(err);
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(pClient->mBinding->GetPeerNodeId(), pClient->mSubscriptionId);
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
}
else
{
// anything else is a failure, tear down the subscription
ExitNow(err = WEAVE_ERROR_INVALID_MESSAGE_TYPE);
}
break;
#if WDM_ENABLE_SUBSCRIPTION_CANCEL
case kState_Canceling:
// It doesn't really matter what we receive from the other end, as we're heading out
// call abort silently without callback to upper layer, for this subscription was canceled by the upper layer
pClient->AbortSubscription();
ExitNow();
break;
#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL
// We must not receive this callback in any other states
default:
WeaveLogDetail(DataManagement, "Received message in some wrong state, ignore");
ExitNow();
}
exit:
WeaveLogFunctError(err);
if (NULL != aPayload)
{
PacketBuffer::Free(aPayload);
aPayload = NULL;
}
if (!retainExchangeContext)
{
pClient->FlushExistingExchangeContext();
}
if (err != WEAVE_NO_ERROR)
{
// if we're already aborted, this call becomes a no-op
pClient->HandleSubscriptionTerminated(pClient->IsRetryEnabled(), err, isStatusReportValid ? &status : NULL);
}
pClient->_Release();
}
}; // WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current)
}; // Profiles
}; // Weave
}; // nl
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING