blob: 343ca8d8b7ded22fde068fc86f813b3bb2634c7c [file] [log] [blame]
/*
*
* Copyright (c) 2016-2017 Nest Labs, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file
* This file implements subscription engine for Weave
* Data Management (WDM) profile.
*
*/
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif // __STDC_FORMAT_MACROS
#include <Weave/Profiles/WeaveProfiles.h>
#include <Weave/Profiles/common/CommonProfile.h>
#include <Weave/Profiles/data-management/Current/WdmManagedNamespace.h>
#include <Weave/Profiles/data-management/DataManagement.h>
#include <Weave/Profiles/time/WeaveTime.h>
#include <Weave/Support/crypto/WeaveCrypto.h>
#include <Weave/Support/WeaveFaultInjection.h>
#include <SystemLayer/SystemStats.h>
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
namespace nl {
namespace Weave {
namespace Profiles {
namespace WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current) {
SubscriptionEngine::SubscriptionEngine()
{
}
void SubscriptionEngine::SetEventCallback(void * const aAppState, const EventCallback aEventCallback)
{
mAppState = aAppState;
mEventCallback = aEventCallback;
}
void SubscriptionEngine::DefaultEventHandler(EventID aEvent, const InEventParam & aInParam, OutEventParam & aOutParam)
{
IgnoreUnusedVariable(aInParam);
IgnoreUnusedVariable(aOutParam);
WeaveLogDetail(DataManagement, "%s event: %d", __func__, aEvent);
}
WEAVE_ERROR SubscriptionEngine::Init (nl::Weave::WeaveExchangeManager * const apExchangeMgr, void * const aAppState, const EventCallback aEventCallback)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
mExchangeMgr = apExchangeMgr;
mAppState = aAppState;
mEventCallback = aEventCallback;
mLock = NULL;
err = mExchangeMgr->RegisterUnsolicitedMessageHandler(nl::Weave::Profiles::kWeaveProfile_WDM, UnsolicitedMessageHandler, this);
SuccessOrExit(err);
#if WDM_ENABLE_SUBSCRIPTION_CLIENT
for (size_t i = 0; i < kMaxNumCommandObjs; ++i)
{
mCommandObjs[i].Init(NULL);
}
for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i)
{
mClients[i].InitAsFree ();
}
#endif // WDM_ENABLE_SUBSCRIPTION_CLIENT
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
err = mNotificationEngine.Init();
SuccessOrExit(err);
for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
mHandlers[i].InitAsFree();
}
// erase everything
DisablePublisher ();
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
mNumTraitInfosInPool = 0;
exit:
WeaveLogFunctError(err);
return err;
}
#if WEAVE_DETAIL_LOGGING
void SubscriptionEngine::LogSubscriptionFreed(void) const
{
// Report number of clients and handlers that are still allocated
uint32_t countAllocatedClients = 0;
uint32_t countAllocatedHandlers = 0;
#if WDM_ENABLE_SUBSCRIPTION_CLIENT
for (int i = 0; i < kMaxNumSubscriptionClients; ++i)
{
if (SubscriptionClient::kState_Free != mClients[i].mCurrentState)
{
++countAllocatedClients;
}
}
#endif // #if WDM_ENABLE_SUBSCRIPTION_CLIENT
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
for (int i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
if (SubscriptionHandler::kState_Free != mHandlers[i].mCurrentState)
{
++countAllocatedHandlers;
}
}
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
WeaveLogDetail(DataManagement, "Allocated clients: %" PRIu32 ". Allocated handlers: %" PRIu32 ".",
countAllocatedClients, countAllocatedHandlers);
}
#endif // #if WEAVE_DETAIL_LOGGING
#if WDM_ENABLE_SUBSCRIPTION_CANCEL
void SubscriptionEngine::OnCancelRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState);
uint64_t SubscriptionId = 0;
bool found = false;
{
nl::Weave::TLV::TLVReader reader;
SubscribeCancelRequest::Parser request;
reader.Init(aPayload);
err = reader.Next();
SuccessOrExit(err);
err = request.Init(reader);
SuccessOrExit(err);
#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
err = request.CheckSchemaValidity();
SuccessOrExit(err);
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
err = request.GetSubscriptionID(&SubscriptionId);
SuccessOrExit(err);
}
#if WDM_ENABLE_SUBSCRIPTION_CLIENT
for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i)
{
if ((SubscriptionClient::kState_SubscriptionEstablished_Idle == pEngine->mClients[i].mCurrentState) ||
(SubscriptionClient::kState_SubscriptionEstablished_Confirming == pEngine->mClients[i].mCurrentState))
{
if (pEngine->mClients[i].mSubscriptionId == SubscriptionId)
{
pEngine->mClients[i].CancelRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload);
found = true;
break;
}
}
}
#endif // #if WDM_ENABLE_SUBSCRIPTION_CLIENT
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
if ((pEngine->mHandlers[i].mCurrentState >= SubscriptionHandler::kState_SubscriptionInfoValid_Begin) &&
(pEngine->mHandlers[i].mCurrentState <= SubscriptionHandler::kState_SubscriptionInfoValid_End))
{
// Note that there is no need to compare more than subscription ID, because it must already be unique on publisher side
if (pEngine->mHandlers[i].mSubscriptionId == SubscriptionId)
{
pEngine->mHandlers[i].CancelRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload);
found = true;
break;
}
}
}
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
if (!found)
{
err = SendStatusReport(aEC, nl::Weave::Profiles::kWeaveProfile_WDM, kStatus_InvalidSubscriptionID);
SuccessOrExit(err);
}
exit:
WeaveLogFunctError(err);
// aPayload guaranteed to be non-NULL
PacketBuffer::Free(aPayload);
// aEC guaranteed to be non-NULL
aEC->Close();
}
#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL
#if WDM_ENABLE_SUBSCRIPTION_CLIENT
uint16_t SubscriptionEngine::GetClientId (const SubscriptionClient * const apClient) const
{
return static_cast<uint16_t>(apClient - mClients);
}
WEAVE_ERROR SubscriptionEngine::NewClient (SubscriptionClient ** const appClient,
Binding * const apBinding,
void * const apAppState,
SubscriptionClient::EventCallback const aEventCallback,
const TraitCatalogBase<TraitDataSink>* const apCatalog,
const uint32_t aInactivityTimeoutDuringSubscribingMsec)
{
WEAVE_ERROR err = WEAVE_ERROR_NO_MEMORY;
WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_SubscriptionClientNew, ExitNow());
*appClient = NULL;
for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i)
{
if (SubscriptionClient::kState_Free == mClients[i].mCurrentState)
{
*appClient = &mClients[i];
err = (*appClient)->Init(apBinding,
apAppState,
aEventCallback,
apCatalog,
aInactivityTimeoutDuringSubscribingMsec);
if (WEAVE_NO_ERROR != err)
{
*appClient = NULL;
ExitNow();
}
SYSTEM_STATS_INCREMENT(nl::Weave::System::Stats::kWDMNext_NumSubscriptionClients);
break;
}
}
exit:
return err;
}
/**
* Reply to a request with a StatuReport message.
*
* @param[in] aEC Pointer to the ExchangeContext on which the request was received.
* This function does not take ownership of this object. The ExchangeContext
* must be closed or aborted by the calling function according to the
* WEAVE_ERROR returned.
* @param[in] aProfileId The profile to be put in the StatusReport payload.
* @param[in] aStatusCode The status code to be put in the StatusReport payload; must refer to the
* profile passed in aProfileId, but this function does not enforce this
* condition.
*
* @return WEAVE_NO_ERROR in case of success.
* WEAVE_NO_MEMORY if no pbufs are available.
* Any other WEAVE_ERROR code returned by ExchangeContext::SendMessage
*/
WEAVE_ERROR SubscriptionEngine::SendStatusReport (nl::Weave::ExchangeContext *aEC,
uint32_t aProfileId,
uint16_t aStatusCode)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
err = nl::Weave::WeaveServerBase::SendStatusReport(aEC, aProfileId, aStatusCode, WEAVE_NO_ERROR,
aEC->HasPeerRequestedAck() ? nl::Weave::ExchangeContext::kSendFlag_RequestAck : 0);
WeaveLogFunctError(err);
return err;
}
/**
* Unsolicited message handler for all WDM messages.
* This function is a @ref ExchangeContext::MessageReceiveFunct.
*/
void SubscriptionEngine::UnsolicitedMessageHandler (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
nl::Weave::ExchangeContext::MessageReceiveFunct func = OnUnknownMsgType;
switch (aMsgType)
{
#if WDM_ENABLE_SUBSCRIPTION_CLIENT
case kMsgType_NotificationRequest:
func = OnNotificationRequest;
break;
#endif // WDM_ENABLE_SUBSCRIPTION_CLIENT
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
case kMsgType_SubscribeRequest:
func = OnSubscribeRequest;
break;
case kMsgType_SubscribeConfirmRequest:
func = OnSubscribeConfirmRequest;
break;
case kMsgType_CustomCommandRequest:
func = OnCustomCommandRequest;
break;
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
#if WDM_ENABLE_SUBSCRIPTION_CANCEL
case kMsgType_SubscribeCancelRequest:
func = OnCancelRequest;
break;
#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL
default:
break;
}
func(aEC, aPktInfo, aMsgInfo, aProfileId, aMsgType, aPayload);
}
/**
* Unsolicited message handler for unsupported WDM messages.
* This function is a @ref ExchangeContext::MessageReceiveFunct.
*/
void SubscriptionEngine::OnUnknownMsgType (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer::Free(aPayload);
aPayload = NULL;
WeaveLogDetail(DataManagement, "Msg type %" PRIu8 " not supported", aMsgType);
err = SendStatusReport(aEC, nl::Weave::Profiles::kWeaveProfile_Common, nl::Weave::Profiles::Common::kStatus_UnsupportedMessage);
SuccessOrExit(err);
aEC->Close();
aEC = NULL;
exit:
WeaveLogFunctError(err);
if (NULL != aEC)
{
aEC->Abort();
aEC = NULL;
}
}
void SubscriptionEngine::OnNotificationRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState);
uint64_t SubscriptionId = 0;
{
nl::Weave::TLV::TLVReader reader;
NotificationRequest::Parser notify;
reader.Init(aPayload);
err = reader.Next();
SuccessOrExit(err);
err = notify.Init(reader);
SuccessOrExit(err);
// Note that it is okay to bail out, without any response, if the message doesn't even have a subscription ID in it
err = notify.GetSubscriptionID(&SubscriptionId);
SuccessOrExit(err);
WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_BadSubscriptionId, SubscriptionId += 1);
}
for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i)
{
if ((SubscriptionClient::kState_SubscriptionEstablished_Idle == pEngine->mClients[i].mCurrentState) ||
(SubscriptionClient::kState_SubscriptionEstablished_Confirming == pEngine->mClients[i].mCurrentState))
{
if (pEngine->mClients[i].mBinding->IsAuthenticMessageFromPeer(aMsgInfo) &&
pEngine->mClients[i].mSubscriptionId == SubscriptionId)
{
pEngine->mClients[i].NotificationRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload);
aPayload = NULL;
aEC = NULL;
ExitNow();
}
}
}
WeaveLogDetail(DataManagement, "%s: couldn't find matching client. Subscription ID: 0x%" PRIX64, __func__, SubscriptionId);
err = SendStatusReport(aEC, nl::Weave::Profiles::kWeaveProfile_WDM, kStatus_InvalidSubscriptionID);
SuccessOrExit(err);
exit:
WeaveLogFunctError(err);
if (NULL != aPayload)
{
PacketBuffer::Free(aPayload);
aPayload = NULL;
}
if (NULL != aEC)
{
aEC->Abort();
aEC = NULL;
}
}
SubscriptionClient * SubscriptionEngine::FindClient(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId)
{
SubscriptionClient * result = NULL;
for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i)
{
if ((mClients[i].mCurrentState >= SubscriptionClient::kState_Subscribing_IdAssigned) &&
(mClients[i].mCurrentState <= SubscriptionClient::kState_SubscriptionEstablished_Confirming))
{
if ((aPeerNodeId == mClients[i].mBinding->GetPeerNodeId()) && (mClients[i].mSubscriptionId == aSubscriptionId))
{
result = &mClients[i];
break;
}
}
}
return result;
}
bool SubscriptionEngine::UpdateClientLiveness(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId, const bool aKill)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
bool found = false;
SubscriptionClient * pClient = FindClient(aPeerNodeId, aSubscriptionId);
if (NULL != pClient)
{
found = true;
if (aKill)
{
err = WEAVE_ERROR_TRANSACTION_CANCELED;
}
else
{
WeaveLogDetail(DataManagement, "Client[%d] [%5.5s] liveness confirmed",
GetClientId(pClient), pClient->GetStateStr());
// ignore incorrect state error, otherwise, let it flow through
err = pClient->RefreshTimer();
if (WEAVE_ERROR_INCORRECT_STATE == err)
{
err = WEAVE_NO_ERROR;
WeaveLogDetail(DataManagement, "Client[%d] [%5.5s] liveness confirmation failed, ignore",
GetClientId(pClient), pClient->GetStateStr());
}
}
if (WEAVE_NO_ERROR != err)
{
WeaveLogDetail(DataManagement, "Client[%d] [%5.5s] bound mutual subscription is going away",
GetClientId(pClient), pClient->GetStateStr());
pClient->HandleSubscriptionTerminated(pClient->IsRetryEnabled(), err, NULL);
}
}
return found;
}
#endif // WDM_ENABLE_SUBSCRIPTION_CLIENT
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
uint16_t SubscriptionEngine::GetHandlerId (const SubscriptionHandler * const apHandler) const
{
return static_cast<uint16_t>(apHandler - mHandlers);
}
uint16_t SubscriptionEngine::GetCommandObjId (const Command * const apHandle) const
{
return static_cast<uint16_t>(apHandle - mCommandObjs);
}
bool SubscriptionEngine::UpdateHandlerLiveness(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId, const bool aKill)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
bool found = false;
SubscriptionHandler * pHandler = FindHandler(aPeerNodeId, aSubscriptionId);
if (NULL != pHandler)
{
found = true;
if (aKill)
{
err = WEAVE_ERROR_TRANSACTION_CANCELED;
}
else
{
WeaveLogDetail(DataManagement, "Handler[%d] [%5.5s] liveness confirmed",
GetHandlerId(pHandler), pHandler->GetStateStr());
// ignore incorrect state error, otherwise, let it flow through
err = pHandler->RefreshTimer();
if (WEAVE_ERROR_INCORRECT_STATE == err)
{
err = WEAVE_NO_ERROR;
WeaveLogDetail(DataManagement, "Handler[%d] [%5.5s] liveness confirmation failed, ignore",
GetHandlerId(pHandler), pHandler->GetStateStr());
}
}
if (WEAVE_NO_ERROR != err)
{
WeaveLogDetail(DataManagement, "Handler[%d] [%5.5s] bound mutual subscription is going away",
GetHandlerId(pHandler), pHandler->GetStateStr());
pHandler->HandleSubscriptionTerminated(err, NULL);
}
}
return found;
}
SubscriptionHandler * SubscriptionEngine::FindHandler(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId)
{
SubscriptionHandler * result = NULL;
for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
if ((mHandlers[i].mCurrentState >= SubscriptionHandler::kState_SubscriptionInfoValid_Begin) &&
(mHandlers[i].mCurrentState <= SubscriptionHandler::kState_SubscriptionInfoValid_End))
{
if ((aPeerNodeId == mHandlers[i].mBinding->GetPeerNodeId()) && (aSubscriptionId == mHandlers[i].mSubscriptionId))
{
result = &mHandlers[i];
break;
}
}
}
return result;
}
WEAVE_ERROR SubscriptionEngine::GetMinEventLogPosition(size_t &outLogPosition) const
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
for (size_t subIdx = 0; subIdx < kMaxNumSubscriptionHandlers; ++subIdx)
{
const SubscriptionHandler *subHandler = &(mHandlers[subIdx]);
if (subHandler->mCurrentState == SubscriptionHandler::kState_Free)
{
continue;
}
if (subHandler->mBytesOffloaded < outLogPosition)
{
outLogPosition = subHandler->mBytesOffloaded;
}
}
return err;
}
void SubscriptionEngine::ReclaimTraitInfo(SubscriptionHandler * const aHandlerToBeReclaimed)
{
SubscriptionHandler::TraitInstanceInfo* const traitInfoList = aHandlerToBeReclaimed->mTraitInstanceList;
const uint16_t numTraitInstances = aHandlerToBeReclaimed->mNumTraitInstances;
size_t numTraitInstancesToBeAffected;
aHandlerToBeReclaimed->mTraitInstanceList = NULL;
aHandlerToBeReclaimed->mNumTraitInstances = 0;
if (!numTraitInstances)
{
WeaveLogDetail(DataManagement, "No trait instances allocated for this subscription");
ExitNow();
}
// make sure everything is still sane
WeaveLogIfFalse(traitInfoList >= mTraitInfoPool);
WeaveLogIfFalse(numTraitInstances <= mNumTraitInfosInPool);
// mPathGroupPool + kMaxNumPathGroups is a pointer which points to the last+1byte of this array
// traitInfoList is a pointer to the first trait instance to be released
// the result of subtraction is the number of trait instances from traitInfoList to the end of this array
numTraitInstancesToBeAffected = (mTraitInfoPool + mNumTraitInfosInPool) - traitInfoList;
// Shrink the traitInfosInPool by the number of trait instances in this subscription.
mNumTraitInfosInPool -= numTraitInstances;
SYSTEM_STATS_DECREMENT_BY_N(nl::Weave::System::Stats::kWDMNext_NumTraits, numTraitInstances);
if (numTraitInstances == numTraitInstancesToBeAffected)
{
WeaveLogDetail(DataManagement, "Releasing the last block of trait instances");
ExitNow();
}
WeaveLogDetail(DataManagement, "Moving %u trait instances forward", static_cast<unsigned int>(numTraitInstancesToBeAffected - numTraitInstances));
memmove(traitInfoList, traitInfoList + numTraitInstances,
sizeof(SubscriptionHandler::TraitInstanceInfo) *
(numTraitInstancesToBeAffected - numTraitInstances));
for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
SubscriptionHandler * const pHandler = mHandlers + i;
if ((aHandlerToBeReclaimed != pHandler) && (pHandler->mTraitInstanceList > traitInfoList))
{
pHandler->mTraitInstanceList -= numTraitInstances;
}
}
exit:
WeaveLogDetail(DataManagement, "Number of allocated trait instances: %u", mNumTraitInfosInPool);
}
WEAVE_ERROR SubscriptionEngine::EnablePublisher(IWeavePublisherLock *aLock, TraitCatalogBase<TraitDataSource>* const aPublisherCatalog)
{
// force abandon all subscription first, so we can have a clean slate
DisablePublisher();
mLock = aLock;
// replace catalog
mPublisherCatalog = aPublisherCatalog;
mIsPublisherEnabled = true;
mNextHandlerToNotify = 0;
return WEAVE_NO_ERROR;
}
WEAVE_ERROR SubscriptionEngine::Lock()
{
if (mLock) {
return mLock->Lock();
}
return WEAVE_NO_ERROR;
}
WEAVE_ERROR SubscriptionEngine::Unlock()
{
if (mLock) {
return mLock->Unlock();
}
return WEAVE_NO_ERROR;
}
void SubscriptionEngine::DisablePublisher ()
{
mIsPublisherEnabled = false;
mPublisherCatalog = NULL;
for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
switch (mHandlers[i].mCurrentState)
{
case SubscriptionHandler::kState_Free:
case SubscriptionHandler::kState_Aborted:
break;
default:
mHandlers[i].AbortSubscription();
}
}
// Note that the command objects are not closed when publisher is disabled.
// This is because the processing flow of commands are not directly linked
// with subscriptions.
}
WEAVE_ERROR SubscriptionEngine::NewSubscriptionHandler(SubscriptionHandler **subHandler)
{
WEAVE_ERROR err = WEAVE_ERROR_NO_MEMORY;
*subHandler = NULL;
WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_SubscriptionHandlerNew, ExitNow());
for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
if (SubscriptionHandler::kState_Free == mHandlers[i].mCurrentState)
{
WeaveLogIfFalse(0 == mHandlers[i].mRefCount);
*subHandler = &mHandlers[i];
err = WEAVE_NO_ERROR;
SYSTEM_STATS_INCREMENT(nl::Weave::System::Stats::kWDMNext_NumSubscriptionHandlers);
break;
}
}
ExitNow(); // silence warnings about unused labels.
exit:
return err;
}
void SubscriptionEngine::OnSubscribeRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState);
SubscriptionHandler* handler = NULL;
uint32_t reasonProfileId = nl::Weave::Profiles::kWeaveProfile_Common;
uint16_t reasonStatusCode = nl::Weave::Profiles::Common::kStatus_InternalServerProblem;
InEventParam inParam;
OutEventParam outParam;
Binding* binding;
uint64_t subscriptionId = 0;
// Note that there is no event callback nor App state assigned to this newly allocated binding
// We will need to assign a callback handler when binding actually generates useful events
binding = pEngine->mExchangeMgr->NewBinding();
if (NULL == binding)
{
// log as error as it might be difficult to estimate how many bindings are needed on a system
WeaveLogError(DataManagement, "%s: Out of Binding", __func__);
ExitNow(err = WEAVE_ERROR_NO_MEMORY);
}
err = binding->BeginConfiguration()
.ConfigureFromMessage(aMsgInfo, aPktInfo)
.PrepareBinding();
SuccessOrExit(err);
// If the peer requested an ACK, we need to ensure that the exchange context will automatically
// request an ACK when we send messages out on this exchange.
//
// In future exchanges that we initiate to this peer, the binding will automatically vend out exchange
// contexts with this auto-ack bit set due to the binding configuration that happens in the line above.
if (aMsgInfo->Flags & kWeaveMessageFlag_PeerRequestedAck)
{
aEC->SetAutoRequestAck(true);
}
if (pEngine->mIsPublisherEnabled && (NULL != pEngine->mEventCallback))
{
outParam.mIncomingSubscribeRequest.mAutoClosePriorSubscription = true;
outParam.mIncomingSubscribeRequest.mRejectRequest = false;
outParam.mIncomingSubscribeRequest.mpReasonProfileId = &reasonProfileId;
outParam.mIncomingSubscribeRequest.mpReasonStatusCode = &reasonStatusCode;
inParam.mIncomingSubscribeRequest.mEC = aEC;
inParam.mIncomingSubscribeRequest.mPktInfo = aPktInfo;
inParam.mIncomingSubscribeRequest.mMsgInfo = aMsgInfo;
inParam.mIncomingSubscribeRequest.mPayload = aPayload;
inParam.mIncomingSubscribeRequest.mBinding = binding;
// note the binding is exposed to app layer for configuration here, and again later after
// the request is fully parsed
pEngine->mEventCallback(pEngine->mAppState, kEvent_OnIncomingSubscribeRequest, inParam, outParam);
// Make sure messages sent through this EC are sent with proper re-transmission/timeouts settings
// This is mainly for rejections, as the EC would be configured again in SubscriptionHandler::AcceptSubscribeRequest
err = binding->AdjustResponseTimeout(aEC);
SuccessOrExit(err);
}
else
{
ExitNow(err = WEAVE_ERROR_NO_MESSAGE_HANDLER);
}
if (outParam.mIncomingSubscribeRequest.mRejectRequest)
{
// reject this request (without touching existing subscriptions)
ExitNow(err = WEAVE_ERROR_TRANSACTION_CANCELED);
}
else
{
if (outParam.mIncomingSubscribeRequest.mAutoClosePriorSubscription)
{
// if not rejected, default behavior is to abort any prior communication with this node id
for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i)
{
if ((pEngine->mHandlers[i].mCurrentState >= SubscriptionHandler::kState_SubscriptionInfoValid_Begin) &&
(pEngine->mHandlers[i].mCurrentState <= SubscriptionHandler::kState_SubscriptionInfoValid_End))
{
uint64_t nodeId = pEngine->mHandlers[i].GetPeerNodeId();
if (nodeId == aEC->PeerNodeId)
{
pEngine->mHandlers[i].HandleSubscriptionTerminated(err, NULL);
}
}
}
}
err = nl::Weave::Platform::Security::GetSecureRandomData((uint8_t *)&subscriptionId, sizeof(subscriptionId));
SuccessOrExit(err);
err = pEngine->NewSubscriptionHandler(&handler);
if (err != WEAVE_NO_ERROR)
{
// try to give slightly more detail on the issue for this potentially common problem
reasonStatusCode = (err == WEAVE_ERROR_NO_MEMORY ?
nl::Weave::Profiles::Common::kStatus_OutOfMemory :
nl::Weave::Profiles::Common::kStatus_InternalServerProblem);
ExitNow();
}
else
{
handler->mAppState = outParam.mIncomingSubscribeRequest.mHandlerAppState;
handler->mEventCallback = outParam.mIncomingSubscribeRequest.mHandlerEventCallback;
uint32_t maxSize = WDM_MAX_NOTIFICATION_SIZE;
WEAVE_FAULT_INJECT_WITH_ARGS(FaultInjection::kFault_WDM_NotificationSize,
// Code executed with the Manager's lock:
if (numFaultArgs > 0)
{
maxSize = static_cast<uint32_t>(faultArgs[0]);
}
else
{
maxSize = WDM_MAX_NOTIFICATION_SIZE/2;
}
,
// Code executed withouth the Manager's lock:
WeaveLogDetail(DataManagement, "Handler[%d] Payload size set to %d", pEngine->GetHandlerId(handler), maxSize)
);
handler->SetMaxNotificationSize(maxSize);
handler->InitWithIncomingRequest(binding, subscriptionId, aEC, aPktInfo, aMsgInfo, aPayload);
aEC = NULL;
aPayload = NULL;
}
}
exit:
WeaveLogFunctError(err);
if (NULL != aPayload)
{
PacketBuffer::Free(aPayload);
aPayload = NULL;
}
if (NULL != aEC)
{
err = SendStatusReport(aEC, reasonProfileId, reasonStatusCode);
WeaveLogFunctError(err);
aEC->Close();
aEC = NULL;
}
if (NULL != binding)
{
binding->Release();
}
}
void SubscriptionEngine::OnSubscribeConfirmRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState);
uint32_t reasonProfileId = nl::Weave::Profiles::kWeaveProfile_Common;
uint16_t reasonStatusCode = nl::Weave::Profiles::Common::kStatus_InternalServerProblem;
uint64_t subscriptionId;
{
nl::Weave::TLV::TLVReader reader;
SubscribeConfirmRequest::Parser request;
reader.Init(aPayload);
err = reader.Next();
SuccessOrExit(err);
err = request.Init(reader);
SuccessOrExit(err);
err = request.GetSubscriptionID(&subscriptionId);
SuccessOrExit(err);
}
if (pEngine->mIsPublisherEnabled)
{
// find a matching subscription
bool found = false;
#if WDM_ENABLE_SUBSCRIPTION_CLIENT
if (pEngine->UpdateClientLiveness(aEC->PeerNodeId, subscriptionId))
{
found = true;
}
#endif // WDM_ENABLE_SUBSCRIPTION_CLIENT
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
if (pEngine->UpdateHandlerLiveness(aEC->PeerNodeId, subscriptionId))
{
found = true;
}
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
if (found)
{
reasonStatusCode = nl::Weave::Profiles::Common::kStatus_Success;
}
else
{
reasonProfileId = nl::Weave::Profiles::kWeaveProfile_WDM;
reasonStatusCode = kStatus_InvalidSubscriptionID;
}
}
else
{
reasonStatusCode = nl::Weave::Profiles::Common::kStatus_Busy;
}
{
err = SendStatusReport(aEC, reasonProfileId, reasonStatusCode);
SuccessOrExit(err);
}
exit:
WeaveLogFunctError(err);
// aPayload is guaranteed to be non-NULL
PacketBuffer::Free(aPayload);
// aEC is guaranteed to be non-NULL.
aEC->Abort();
}
#if WDM_PUBLISHER_ENABLE_CUSTOM_COMMANDS
void SubscriptionEngine::OnCustomCommandRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo,
const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId,
uint8_t aMsgType, PacketBuffer *aPayload)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
SubscriptionEngine * const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState);
Command * command = NULL;
uint32_t statusReportProfile = nl::Weave::Profiles::kWeaveProfile_WDM;
uint16_t statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_InvalidPath;
// WRM, in general, only works with WRM. Reject anything not requesting ACK immediately
// and do not propagate this request downstream.
VerifyOrExit(aEC->HasPeerRequestedAck(), err = WEAVE_ERROR_INVALID_TRANSFER_MODE);
for (size_t i = 0; i < kMaxNumCommandObjs; ++i)
{
if (pEngine->mCommandObjs[i].IsFree())
{
SYSTEM_STATS_INCREMENT(nl::Weave::System::Stats::kWDMNext_NumCommands);
command = &(pEngine->mCommandObjs[i]);
command->Init(aEC);
aEC = NULL;
break;
}
}
VerifyOrExit(NULL != command, err = WEAVE_ERROR_NO_MEMORY);
if (!pEngine->mIsPublisherEnabled)
{
// Has to be a publisher to be processing a command request
statusReportProfile = nl::Weave::Profiles::kWeaveProfile_Common;
statusReportCode = nl::Weave::Profiles::Common::kStatus_UnsupportedMessage;
ExitNow(err = WEAVE_ERROR_INVALID_MESSAGE_TYPE);
}
{
nl::Weave::TLV::TLVReader reader;
TraitDataSource *dataSource = NULL;
bool isVersionValid = false;
bool isExpiryTimeValid = false;
uint64_t commandType;
uint64_t mustBeVersion;
int64_t expiryTimeMicroSecond;
reader.Init(aPayload);
err = reader.Next();
SuccessOrExit(err);
{
CustomCommandRequest::Parser request;
TraitDataHandle traitDataHandle;
nl::Weave::TLV::TLVReader pathReader;
SchemaVersionRange requestedSchemaVersion, computedVersionIntersection;
err = request.Init(reader);
SuccessOrExit(err);
#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
err = request.CheckSchemaValidity();
SuccessOrExit(err);
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
err = request.GetReaderOnPath(&pathReader);
SuccessOrExit(err);
err = pEngine->mPublisherCatalog->AddressToHandle(pathReader, traitDataHandle, requestedSchemaVersion);
SuccessOrExit(err);
err = SubscriptionEngine::GetInstance()->mPublisherCatalog->Locate(traitDataHandle, &dataSource);
SuccessOrExit(err);
if (!dataSource->GetSchemaEngine()->GetVersionIntersection(requestedSchemaVersion, computedVersionIntersection)) {
WeaveLogDetail(DataManagement, "Mismatch in requested version on handle %u (requested: %u, %u)", traitDataHandle, requestedSchemaVersion.mMaxVersion, requestedSchemaVersion.mMinVersion);
statusReportProfile = nl::Weave::Profiles::kWeaveProfile_WDM;
statusReportCode = kStatus_IncompatibleDataSchemaVersion;
ExitNow(err = WEAVE_ERROR_INCOMPATIBLE_SCHEMA_VERSION);
}
err = request.GetCommandType(&commandType);
SuccessOrExit(err);
err = request.GetExpiryTimeMicroSecond(&expiryTimeMicroSecond);
if (WEAVE_NO_ERROR == err)
{
isExpiryTimeValid = true;
}
else if (WEAVE_END_OF_TLV == err)
{
err = WEAVE_NO_ERROR;
}
else
{
ExitNow();
}
err = request.GetMustBeVersion(&mustBeVersion);
if (WEAVE_NO_ERROR == err)
{
isVersionValid = true;
}
else if (WEAVE_END_OF_TLV == err)
{
err = WEAVE_NO_ERROR;
}
else
{
ExitNow();
}
err = request.GetReaderOnArgument(&reader);
SuccessOrExit(err);
}
#if WDM_ENFORCE_EXPIRY_TIME
if (isExpiryTimeValid)
{
nl::Weave::Profiles::Time::timesync_t now_usec;
err = nl::Weave::Platform::Time::GetSystemTime(&now_usec);
if (WEAVE_ERROR_UNSUPPORTED_CLOCK == err)
{
statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_ExpiryTimeNotSupported;
ExitNow();
}
else if (WEAVE_ERROR_TIME_NOT_SYNCED_YET == err)
{
statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_NotTimeSyncedYet;
ExitNow();
}
else if (now_usec >= expiryTimeMicroSecond)
{
statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_RequestExpiredInTime;
ExitNow();
}
WeaveLogDetail(DataManagement, "Command ExpiryTime 0x%" PRIX64 ", now: 0x% " PRIX64 " ", expiryTimeMicroSecond, now_usec);
}
#endif // WDM_ENFORCE_EXPIRY_TIME
if (isVersionValid)
{
uint64_t currentVersion = dataSource->GetVersion();
if (mustBeVersion != currentVersion)
{
WeaveLogDetail(DataManagement, "Version required 0x%" PRIX64 ", current: 0x% " PRIX64 " ", mustBeVersion, currentVersion);
statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_VersionMismatch;
ExitNow();
}
}
// Note we cannot just use pathReader at here because the TDM related functions
// generally assume they can move the reader at their will.
// Note that callee is supposed to cache whatever is useful in the TLV stream into its own memory
// when this callback returns, we'd destroy the TLV object
dataSource->OnCustomCommand(command, aMsgInfo, aPayload, commandType,
isExpiryTimeValid, expiryTimeMicroSecond, isVersionValid, mustBeVersion,
reader);
command = NULL;
aPayload = NULL;
}
exit:
WeaveLogFunctError(err);
if (NULL != aPayload)
{
PacketBuffer::Free(aPayload);
aPayload = NULL;
}
// Note that when dispatched == true, ownership of aEC is already passed on to OnCustomCommand, and hence set to NULL
if (NULL != command)
{
err = command->SendError(statusReportProfile, statusReportCode, err);
WeaveLogFunctError(err);
}
if (NULL != aEC)
{
aEC->Close();
aEC = NULL;
}
}
#endif // WDM_PUBLISHER_ENABLE_CUSTOM_COMMANDS
#endif // #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
}; // WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current)
}; // Profiles
}; // Weave
}; // nl
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING