| /* |
| * |
| * Copyright (c) 2016-2017 Nest Labs, Inc. |
| * All rights reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /** |
| * @file |
| * This file implements subscription engine for Weave |
| * Data Management (WDM) profile. |
| * |
| */ |
| |
| #ifndef __STDC_FORMAT_MACROS |
| #define __STDC_FORMAT_MACROS |
| #endif // __STDC_FORMAT_MACROS |
| |
| #include <Weave/Profiles/WeaveProfiles.h> |
| #include <Weave/Profiles/common/CommonProfile.h> |
| #include <Weave/Profiles/data-management/Current/WdmManagedNamespace.h> |
| #include <Weave/Profiles/data-management/DataManagement.h> |
| #include <Weave/Profiles/time/WeaveTime.h> |
| #include <Weave/Support/crypto/WeaveCrypto.h> |
| #include <Weave/Support/WeaveFaultInjection.h> |
| #include <SystemLayer/SystemStats.h> |
| |
| #if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING |
| |
| namespace nl { |
| namespace Weave { |
| namespace Profiles { |
| namespace WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current) { |
| |
| SubscriptionEngine::SubscriptionEngine() |
| { |
| } |
| |
| void SubscriptionEngine::SetEventCallback(void * const aAppState, const EventCallback aEventCallback) |
| { |
| mAppState = aAppState; |
| mEventCallback = aEventCallback; |
| } |
| |
| void SubscriptionEngine::DefaultEventHandler(EventID aEvent, const InEventParam & aInParam, OutEventParam & aOutParam) |
| { |
| IgnoreUnusedVariable(aInParam); |
| IgnoreUnusedVariable(aOutParam); |
| |
| WeaveLogDetail(DataManagement, "%s event: %d", __func__, aEvent); |
| } |
| |
| WEAVE_ERROR SubscriptionEngine::Init (nl::Weave::WeaveExchangeManager * const apExchangeMgr, void * const aAppState, const EventCallback aEventCallback) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| |
| mExchangeMgr = apExchangeMgr; |
| mAppState = aAppState; |
| mEventCallback = aEventCallback; |
| mLock = NULL; |
| |
| err = mExchangeMgr->RegisterUnsolicitedMessageHandler(nl::Weave::Profiles::kWeaveProfile_WDM, UnsolicitedMessageHandler, this); |
| SuccessOrExit(err); |
| |
| #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| for (size_t i = 0; i < kMaxNumCommandObjs; ++i) |
| { |
| mCommandObjs[i].Init(NULL); |
| } |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i) |
| { |
| mClients[i].InitAsFree (); |
| } |
| |
| #endif // WDM_ENABLE_SUBSCRIPTION_CLIENT |
| |
| #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| err = mNotificationEngine.Init(); |
| SuccessOrExit(err); |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| mHandlers[i].InitAsFree(); |
| } |
| |
| // erase everything |
| DisablePublisher (); |
| |
| #endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| mNumTraitInfosInPool = 0; |
| |
| exit: |
| WeaveLogFunctError(err); |
| |
| return err; |
| } |
| |
| #if WEAVE_DETAIL_LOGGING |
| void SubscriptionEngine::LogSubscriptionFreed(void) const |
| { |
| // Report number of clients and handlers that are still allocated |
| uint32_t countAllocatedClients = 0; |
| uint32_t countAllocatedHandlers = 0; |
| |
| #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| for (int i = 0; i < kMaxNumSubscriptionClients; ++i) |
| { |
| if (SubscriptionClient::kState_Free != mClients[i].mCurrentState) |
| { |
| ++countAllocatedClients; |
| } |
| } |
| #endif // #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| |
| #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| for (int i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| if (SubscriptionHandler::kState_Free != mHandlers[i].mCurrentState) |
| { |
| ++countAllocatedHandlers; |
| } |
| } |
| #endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| WeaveLogDetail(DataManagement, "Allocated clients: %" PRIu32 ". Allocated handlers: %" PRIu32 ".", |
| countAllocatedClients, countAllocatedHandlers); |
| } |
| #endif // #if WEAVE_DETAIL_LOGGING |
| |
| #if WDM_ENABLE_SUBSCRIPTION_CANCEL |
| void SubscriptionEngine::OnCancelRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo, |
| const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId, |
| uint8_t aMsgType, PacketBuffer *aPayload) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState); |
| uint64_t SubscriptionId = 0; |
| bool found = false; |
| |
| { |
| nl::Weave::TLV::TLVReader reader; |
| SubscribeCancelRequest::Parser request; |
| |
| reader.Init(aPayload); |
| |
| err = reader.Next(); |
| SuccessOrExit(err); |
| |
| err = request.Init(reader); |
| SuccessOrExit(err); |
| |
| #if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK |
| err = request.CheckSchemaValidity(); |
| SuccessOrExit(err); |
| #endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK |
| |
| err = request.GetSubscriptionID(&SubscriptionId); |
| SuccessOrExit(err); |
| } |
| |
| #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i) |
| { |
| if ((SubscriptionClient::kState_SubscriptionEstablished_Idle == pEngine->mClients[i].mCurrentState) || |
| (SubscriptionClient::kState_SubscriptionEstablished_Confirming == pEngine->mClients[i].mCurrentState)) |
| { |
| if (pEngine->mClients[i].mSubscriptionId == SubscriptionId) |
| { |
| pEngine->mClients[i].CancelRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload); |
| found = true; |
| break; |
| } |
| } |
| } |
| #endif // #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| |
| #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| if ((pEngine->mHandlers[i].mCurrentState >= SubscriptionHandler::kState_SubscriptionInfoValid_Begin) && |
| (pEngine->mHandlers[i].mCurrentState <= SubscriptionHandler::kState_SubscriptionInfoValid_End)) |
| { |
| // Note that there is no need to compare more than subscription ID, because it must already be unique on publisher side |
| if (pEngine->mHandlers[i].mSubscriptionId == SubscriptionId) |
| { |
| pEngine->mHandlers[i].CancelRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload); |
| found = true; |
| break; |
| } |
| } |
| } |
| #endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| if (!found) |
| { |
| err = SendStatusReport(aEC, nl::Weave::Profiles::kWeaveProfile_WDM, kStatus_InvalidSubscriptionID); |
| SuccessOrExit(err); |
| } |
| |
| exit: |
| WeaveLogFunctError(err); |
| |
| // aPayload guaranteed to be non-NULL |
| PacketBuffer::Free(aPayload); |
| |
| // aEC guaranteed to be non-NULL |
| aEC->Close(); |
| } |
| #endif // WDM_ENABLE_SUBSCRIPTION_CANCEL |
| |
| #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| |
| uint16_t SubscriptionEngine::GetClientId (const SubscriptionClient * const apClient) const |
| { |
| return static_cast<uint16_t>(apClient - mClients); |
| } |
| |
| WEAVE_ERROR SubscriptionEngine::NewClient (SubscriptionClient ** const appClient, |
| Binding * const apBinding, |
| void * const apAppState, |
| SubscriptionClient::EventCallback const aEventCallback, |
| const TraitCatalogBase<TraitDataSink>* const apCatalog, |
| const uint32_t aInactivityTimeoutDuringSubscribingMsec) |
| { |
| WEAVE_ERROR err = WEAVE_ERROR_NO_MEMORY; |
| |
| WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_SubscriptionClientNew, ExitNow()); |
| |
| *appClient = NULL; |
| for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i) |
| { |
| if (SubscriptionClient::kState_Free == mClients[i].mCurrentState) |
| { |
| *appClient = &mClients[i]; |
| err = (*appClient)->Init(apBinding, |
| apAppState, |
| aEventCallback, |
| apCatalog, |
| aInactivityTimeoutDuringSubscribingMsec); |
| |
| if (WEAVE_NO_ERROR != err) |
| { |
| *appClient = NULL; |
| ExitNow(); |
| } |
| SYSTEM_STATS_INCREMENT(nl::Weave::System::Stats::kWDMNext_NumSubscriptionClients); |
| break; |
| } |
| } |
| |
| exit: |
| |
| return err; |
| } |
| |
| /** |
| * Reply to a request with a StatuReport message. |
| * |
| * @param[in] aEC Pointer to the ExchangeContext on which the request was received. |
| * This function does not take ownership of this object. The ExchangeContext |
| * must be closed or aborted by the calling function according to the |
| * WEAVE_ERROR returned. |
| * @param[in] aProfileId The profile to be put in the StatusReport payload. |
| * @param[in] aStatusCode The status code to be put in the StatusReport payload; must refer to the |
| * profile passed in aProfileId, but this function does not enforce this |
| * condition. |
| * |
| * @return WEAVE_NO_ERROR in case of success. |
| * WEAVE_NO_MEMORY if no pbufs are available. |
| * Any other WEAVE_ERROR code returned by ExchangeContext::SendMessage |
| */ |
| WEAVE_ERROR SubscriptionEngine::SendStatusReport (nl::Weave::ExchangeContext *aEC, |
| uint32_t aProfileId, |
| uint16_t aStatusCode) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| |
| err = nl::Weave::WeaveServerBase::SendStatusReport(aEC, aProfileId, aStatusCode, WEAVE_NO_ERROR, |
| aEC->HasPeerRequestedAck() ? nl::Weave::ExchangeContext::kSendFlag_RequestAck : 0); |
| WeaveLogFunctError(err); |
| |
| return err; |
| } |
| |
| /** |
| * Unsolicited message handler for all WDM messages. |
| * This function is a @ref ExchangeContext::MessageReceiveFunct. |
| */ |
| void SubscriptionEngine::UnsolicitedMessageHandler (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo, |
| const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId, |
| uint8_t aMsgType, PacketBuffer *aPayload) |
| { |
| nl::Weave::ExchangeContext::MessageReceiveFunct func = OnUnknownMsgType; |
| |
| switch (aMsgType) |
| { |
| #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| case kMsgType_NotificationRequest: |
| func = OnNotificationRequest; |
| break; |
| #endif // WDM_ENABLE_SUBSCRIPTION_CLIENT |
| |
| #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| case kMsgType_SubscribeRequest: |
| func = OnSubscribeRequest; |
| break; |
| |
| case kMsgType_SubscribeConfirmRequest: |
| func = OnSubscribeConfirmRequest; |
| break; |
| |
| case kMsgType_CustomCommandRequest: |
| func = OnCustomCommandRequest; |
| break; |
| |
| #endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| #if WDM_ENABLE_SUBSCRIPTION_CANCEL |
| case kMsgType_SubscribeCancelRequest: |
| func = OnCancelRequest; |
| break; |
| #endif // WDM_ENABLE_SUBSCRIPTION_CANCEL |
| |
| default: |
| break; |
| } |
| |
| func(aEC, aPktInfo, aMsgInfo, aProfileId, aMsgType, aPayload); |
| } |
| |
| /** |
| * Unsolicited message handler for unsupported WDM messages. |
| * This function is a @ref ExchangeContext::MessageReceiveFunct. |
| */ |
| void SubscriptionEngine::OnUnknownMsgType (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo, |
| const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId, |
| uint8_t aMsgType, PacketBuffer *aPayload) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| |
| PacketBuffer::Free(aPayload); |
| aPayload = NULL; |
| |
| WeaveLogDetail(DataManagement, "Msg type %" PRIu8 " not supported", aMsgType); |
| |
| err = SendStatusReport(aEC, nl::Weave::Profiles::kWeaveProfile_Common, nl::Weave::Profiles::Common::kStatus_UnsupportedMessage); |
| SuccessOrExit(err); |
| |
| aEC->Close(); |
| aEC = NULL; |
| |
| exit: |
| WeaveLogFunctError(err); |
| |
| if (NULL != aEC) |
| { |
| aEC->Abort(); |
| aEC = NULL; |
| } |
| } |
| |
| void SubscriptionEngine::OnNotificationRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo, |
| const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId, |
| uint8_t aMsgType, PacketBuffer *aPayload) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState); |
| uint64_t SubscriptionId = 0; |
| |
| { |
| nl::Weave::TLV::TLVReader reader; |
| NotificationRequest::Parser notify; |
| |
| reader.Init(aPayload); |
| |
| err = reader.Next(); |
| SuccessOrExit(err); |
| |
| err = notify.Init(reader); |
| SuccessOrExit(err); |
| |
| // Note that it is okay to bail out, without any response, if the message doesn't even have a subscription ID in it |
| err = notify.GetSubscriptionID(&SubscriptionId); |
| SuccessOrExit(err); |
| |
| WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_BadSubscriptionId, SubscriptionId += 1); |
| } |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i) |
| { |
| if ((SubscriptionClient::kState_SubscriptionEstablished_Idle == pEngine->mClients[i].mCurrentState) || |
| (SubscriptionClient::kState_SubscriptionEstablished_Confirming == pEngine->mClients[i].mCurrentState)) |
| { |
| if (pEngine->mClients[i].mBinding->IsAuthenticMessageFromPeer(aMsgInfo) && |
| pEngine->mClients[i].mSubscriptionId == SubscriptionId) |
| { |
| pEngine->mClients[i].NotificationRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload); |
| aPayload = NULL; |
| aEC = NULL; |
| ExitNow(); |
| } |
| } |
| } |
| |
| WeaveLogDetail(DataManagement, "%s: couldn't find matching client. Subscription ID: 0x%" PRIX64, __func__, SubscriptionId); |
| |
| err = SendStatusReport(aEC, nl::Weave::Profiles::kWeaveProfile_WDM, kStatus_InvalidSubscriptionID); |
| SuccessOrExit(err); |
| |
| exit: |
| WeaveLogFunctError(err); |
| |
| if (NULL != aPayload) |
| { |
| PacketBuffer::Free(aPayload); |
| aPayload = NULL; |
| } |
| |
| if (NULL != aEC) |
| { |
| aEC->Abort(); |
| aEC = NULL; |
| } |
| } |
| |
| SubscriptionClient * SubscriptionEngine::FindClient(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId) |
| { |
| SubscriptionClient * result = NULL; |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionClients; ++i) |
| { |
| if ((mClients[i].mCurrentState >= SubscriptionClient::kState_Subscribing_IdAssigned) && |
| (mClients[i].mCurrentState <= SubscriptionClient::kState_SubscriptionEstablished_Confirming)) |
| { |
| if ((aPeerNodeId == mClients[i].mBinding->GetPeerNodeId()) && (mClients[i].mSubscriptionId == aSubscriptionId)) |
| { |
| result = &mClients[i]; |
| break; |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| bool SubscriptionEngine::UpdateClientLiveness(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId, const bool aKill) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| bool found = false; |
| SubscriptionClient * pClient = FindClient(aPeerNodeId, aSubscriptionId); |
| |
| if (NULL != pClient) |
| { |
| found = true; |
| |
| if (aKill) |
| { |
| err = WEAVE_ERROR_TRANSACTION_CANCELED; |
| } |
| else |
| { |
| WeaveLogDetail(DataManagement, "Client[%d] [%5.5s] liveness confirmed", |
| GetClientId(pClient), pClient->GetStateStr()); |
| |
| // ignore incorrect state error, otherwise, let it flow through |
| err = pClient->RefreshTimer(); |
| if (WEAVE_ERROR_INCORRECT_STATE == err) |
| { |
| err = WEAVE_NO_ERROR; |
| |
| WeaveLogDetail(DataManagement, "Client[%d] [%5.5s] liveness confirmation failed, ignore", |
| GetClientId(pClient), pClient->GetStateStr()); |
| } |
| } |
| |
| if (WEAVE_NO_ERROR != err) |
| { |
| WeaveLogDetail(DataManagement, "Client[%d] [%5.5s] bound mutual subscription is going away", |
| GetClientId(pClient), pClient->GetStateStr()); |
| |
| pClient->HandleSubscriptionTerminated(pClient->IsRetryEnabled(), err, NULL); |
| } |
| } |
| |
| return found; |
| } |
| |
| |
| #endif // WDM_ENABLE_SUBSCRIPTION_CLIENT |
| |
| #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| uint16_t SubscriptionEngine::GetHandlerId (const SubscriptionHandler * const apHandler) const |
| { |
| return static_cast<uint16_t>(apHandler - mHandlers); |
| } |
| |
| uint16_t SubscriptionEngine::GetCommandObjId (const Command * const apHandle) const |
| { |
| return static_cast<uint16_t>(apHandle - mCommandObjs); |
| } |
| |
| bool SubscriptionEngine::UpdateHandlerLiveness(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId, const bool aKill) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| bool found = false; |
| SubscriptionHandler * pHandler = FindHandler(aPeerNodeId, aSubscriptionId); |
| if (NULL != pHandler) |
| { |
| found = true; |
| |
| if (aKill) |
| { |
| err = WEAVE_ERROR_TRANSACTION_CANCELED; |
| } |
| else |
| { |
| WeaveLogDetail(DataManagement, "Handler[%d] [%5.5s] liveness confirmed", |
| GetHandlerId(pHandler), pHandler->GetStateStr()); |
| |
| // ignore incorrect state error, otherwise, let it flow through |
| err = pHandler->RefreshTimer(); |
| if (WEAVE_ERROR_INCORRECT_STATE == err) |
| { |
| err = WEAVE_NO_ERROR; |
| |
| WeaveLogDetail(DataManagement, "Handler[%d] [%5.5s] liveness confirmation failed, ignore", |
| GetHandlerId(pHandler), pHandler->GetStateStr()); |
| } |
| } |
| |
| if (WEAVE_NO_ERROR != err) |
| { |
| WeaveLogDetail(DataManagement, "Handler[%d] [%5.5s] bound mutual subscription is going away", |
| GetHandlerId(pHandler), pHandler->GetStateStr()); |
| |
| pHandler->HandleSubscriptionTerminated(err, NULL); |
| } |
| } |
| |
| return found; |
| } |
| |
| SubscriptionHandler * SubscriptionEngine::FindHandler(const uint64_t aPeerNodeId, const uint64_t aSubscriptionId) |
| { |
| SubscriptionHandler * result = NULL; |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| if ((mHandlers[i].mCurrentState >= SubscriptionHandler::kState_SubscriptionInfoValid_Begin) && |
| (mHandlers[i].mCurrentState <= SubscriptionHandler::kState_SubscriptionInfoValid_End)) |
| { |
| if ((aPeerNodeId == mHandlers[i].mBinding->GetPeerNodeId()) && (aSubscriptionId == mHandlers[i].mSubscriptionId)) |
| { |
| result = &mHandlers[i]; |
| break; |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| WEAVE_ERROR SubscriptionEngine::GetMinEventLogPosition(size_t &outLogPosition) const |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| |
| for (size_t subIdx = 0; subIdx < kMaxNumSubscriptionHandlers; ++subIdx) |
| { |
| const SubscriptionHandler *subHandler = &(mHandlers[subIdx]); |
| if (subHandler->mCurrentState == SubscriptionHandler::kState_Free) |
| { |
| continue; |
| } |
| |
| if (subHandler->mBytesOffloaded < outLogPosition) |
| { |
| outLogPosition = subHandler->mBytesOffloaded; |
| } |
| } |
| |
| return err; |
| } |
| |
| void SubscriptionEngine::ReclaimTraitInfo(SubscriptionHandler * const aHandlerToBeReclaimed) |
| { |
| SubscriptionHandler::TraitInstanceInfo* const traitInfoList = aHandlerToBeReclaimed->mTraitInstanceList; |
| const uint16_t numTraitInstances = aHandlerToBeReclaimed->mNumTraitInstances; |
| size_t numTraitInstancesToBeAffected; |
| |
| aHandlerToBeReclaimed->mTraitInstanceList = NULL; |
| aHandlerToBeReclaimed->mNumTraitInstances = 0; |
| |
| if (!numTraitInstances) |
| { |
| WeaveLogDetail(DataManagement, "No trait instances allocated for this subscription"); |
| ExitNow(); |
| } |
| |
| // make sure everything is still sane |
| WeaveLogIfFalse(traitInfoList >= mTraitInfoPool); |
| WeaveLogIfFalse(numTraitInstances <= mNumTraitInfosInPool); |
| |
| // mPathGroupPool + kMaxNumPathGroups is a pointer which points to the last+1byte of this array |
| // traitInfoList is a pointer to the first trait instance to be released |
| // the result of subtraction is the number of trait instances from traitInfoList to the end of this array |
| numTraitInstancesToBeAffected = (mTraitInfoPool + mNumTraitInfosInPool) - traitInfoList; |
| |
| // Shrink the traitInfosInPool by the number of trait instances in this subscription. |
| mNumTraitInfosInPool -= numTraitInstances; |
| SYSTEM_STATS_DECREMENT_BY_N(nl::Weave::System::Stats::kWDMNext_NumTraits, numTraitInstances); |
| |
| if (numTraitInstances == numTraitInstancesToBeAffected) |
| { |
| WeaveLogDetail(DataManagement, "Releasing the last block of trait instances"); |
| ExitNow(); |
| } |
| |
| WeaveLogDetail(DataManagement, "Moving %u trait instances forward", static_cast<unsigned int>(numTraitInstancesToBeAffected - numTraitInstances)); |
| |
| memmove(traitInfoList, traitInfoList + numTraitInstances, |
| sizeof(SubscriptionHandler::TraitInstanceInfo) * |
| (numTraitInstancesToBeAffected - numTraitInstances)); |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| SubscriptionHandler * const pHandler = mHandlers + i; |
| |
| if ((aHandlerToBeReclaimed != pHandler) && (pHandler->mTraitInstanceList > traitInfoList)) |
| { |
| pHandler->mTraitInstanceList -= numTraitInstances; |
| } |
| } |
| |
| exit: |
| WeaveLogDetail(DataManagement, "Number of allocated trait instances: %u", mNumTraitInfosInPool); |
| } |
| |
| WEAVE_ERROR SubscriptionEngine::EnablePublisher(IWeavePublisherLock *aLock, TraitCatalogBase<TraitDataSource>* const aPublisherCatalog) |
| { |
| // force abandon all subscription first, so we can have a clean slate |
| DisablePublisher(); |
| |
| mLock = aLock; |
| |
| // replace catalog |
| mPublisherCatalog = aPublisherCatalog; |
| |
| mIsPublisherEnabled = true; |
| |
| mNextHandlerToNotify = 0; |
| |
| return WEAVE_NO_ERROR; |
| } |
| |
| WEAVE_ERROR SubscriptionEngine::Lock() |
| { |
| if (mLock) { |
| return mLock->Lock(); |
| } |
| |
| return WEAVE_NO_ERROR; |
| } |
| |
| WEAVE_ERROR SubscriptionEngine::Unlock() |
| { |
| if (mLock) { |
| return mLock->Unlock(); |
| } |
| |
| return WEAVE_NO_ERROR; |
| } |
| |
| void SubscriptionEngine::DisablePublisher () |
| { |
| mIsPublisherEnabled = false; |
| mPublisherCatalog = NULL; |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| switch (mHandlers[i].mCurrentState) |
| { |
| case SubscriptionHandler::kState_Free: |
| case SubscriptionHandler::kState_Aborted: |
| break; |
| default: |
| mHandlers[i].AbortSubscription(); |
| } |
| } |
| |
| // Note that the command objects are not closed when publisher is disabled. |
| // This is because the processing flow of commands are not directly linked |
| // with subscriptions. |
| } |
| |
| WEAVE_ERROR SubscriptionEngine::NewSubscriptionHandler(SubscriptionHandler **subHandler) |
| { |
| WEAVE_ERROR err = WEAVE_ERROR_NO_MEMORY; |
| |
| *subHandler = NULL; |
| |
| WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_SubscriptionHandlerNew, ExitNow()); |
| |
| for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| if (SubscriptionHandler::kState_Free == mHandlers[i].mCurrentState) |
| { |
| WeaveLogIfFalse(0 == mHandlers[i].mRefCount); |
| *subHandler = &mHandlers[i]; |
| err = WEAVE_NO_ERROR; |
| |
| SYSTEM_STATS_INCREMENT(nl::Weave::System::Stats::kWDMNext_NumSubscriptionHandlers); |
| |
| break; |
| } |
| } |
| |
| ExitNow(); // silence warnings about unused labels. |
| exit: |
| return err; |
| } |
| |
| void SubscriptionEngine::OnSubscribeRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo, |
| const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId, |
| uint8_t aMsgType, PacketBuffer *aPayload) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState); |
| SubscriptionHandler* handler = NULL; |
| uint32_t reasonProfileId = nl::Weave::Profiles::kWeaveProfile_Common; |
| uint16_t reasonStatusCode = nl::Weave::Profiles::Common::kStatus_InternalServerProblem; |
| InEventParam inParam; |
| OutEventParam outParam; |
| Binding* binding; |
| uint64_t subscriptionId = 0; |
| |
| // Note that there is no event callback nor App state assigned to this newly allocated binding |
| // We will need to assign a callback handler when binding actually generates useful events |
| binding = pEngine->mExchangeMgr->NewBinding(); |
| if (NULL == binding) |
| { |
| // log as error as it might be difficult to estimate how many bindings are needed on a system |
| WeaveLogError(DataManagement, "%s: Out of Binding", __func__); |
| ExitNow(err = WEAVE_ERROR_NO_MEMORY); |
| } |
| |
| err = binding->BeginConfiguration() |
| .ConfigureFromMessage(aMsgInfo, aPktInfo) |
| .PrepareBinding(); |
| SuccessOrExit(err); |
| |
| // If the peer requested an ACK, we need to ensure that the exchange context will automatically |
| // request an ACK when we send messages out on this exchange. |
| // |
| // In future exchanges that we initiate to this peer, the binding will automatically vend out exchange |
| // contexts with this auto-ack bit set due to the binding configuration that happens in the line above. |
| if (aMsgInfo->Flags & kWeaveMessageFlag_PeerRequestedAck) |
| { |
| aEC->SetAutoRequestAck(true); |
| } |
| |
| if (pEngine->mIsPublisherEnabled && (NULL != pEngine->mEventCallback)) |
| { |
| outParam.mIncomingSubscribeRequest.mAutoClosePriorSubscription = true; |
| outParam.mIncomingSubscribeRequest.mRejectRequest = false; |
| outParam.mIncomingSubscribeRequest.mpReasonProfileId = &reasonProfileId; |
| outParam.mIncomingSubscribeRequest.mpReasonStatusCode = &reasonStatusCode; |
| |
| inParam.mIncomingSubscribeRequest.mEC = aEC; |
| inParam.mIncomingSubscribeRequest.mPktInfo = aPktInfo; |
| inParam.mIncomingSubscribeRequest.mMsgInfo = aMsgInfo; |
| inParam.mIncomingSubscribeRequest.mPayload = aPayload; |
| inParam.mIncomingSubscribeRequest.mBinding = binding; |
| |
| // note the binding is exposed to app layer for configuration here, and again later after |
| // the request is fully parsed |
| pEngine->mEventCallback(pEngine->mAppState, kEvent_OnIncomingSubscribeRequest, inParam, outParam); |
| |
| // Make sure messages sent through this EC are sent with proper re-transmission/timeouts settings |
| // This is mainly for rejections, as the EC would be configured again in SubscriptionHandler::AcceptSubscribeRequest |
| |
| err = binding->AdjustResponseTimeout(aEC); |
| SuccessOrExit(err); |
| } |
| else |
| { |
| ExitNow(err = WEAVE_ERROR_NO_MESSAGE_HANDLER); |
| } |
| |
| if (outParam.mIncomingSubscribeRequest.mRejectRequest) |
| { |
| // reject this request (without touching existing subscriptions) |
| ExitNow(err = WEAVE_ERROR_TRANSACTION_CANCELED); |
| } |
| else |
| { |
| if (outParam.mIncomingSubscribeRequest.mAutoClosePriorSubscription) |
| { |
| // if not rejected, default behavior is to abort any prior communication with this node id |
| for (size_t i = 0; i < kMaxNumSubscriptionHandlers; ++i) |
| { |
| if ((pEngine->mHandlers[i].mCurrentState >= SubscriptionHandler::kState_SubscriptionInfoValid_Begin) && |
| (pEngine->mHandlers[i].mCurrentState <= SubscriptionHandler::kState_SubscriptionInfoValid_End)) |
| { |
| uint64_t nodeId = pEngine->mHandlers[i].GetPeerNodeId(); |
| |
| if (nodeId == aEC->PeerNodeId) |
| { |
| pEngine->mHandlers[i].HandleSubscriptionTerminated(err, NULL); |
| } |
| } |
| } |
| } |
| |
| err = nl::Weave::Platform::Security::GetSecureRandomData((uint8_t *)&subscriptionId, sizeof(subscriptionId)); |
| SuccessOrExit(err); |
| |
| err = pEngine->NewSubscriptionHandler(&handler); |
| if (err != WEAVE_NO_ERROR) |
| { |
| // try to give slightly more detail on the issue for this potentially common problem |
| reasonStatusCode = (err == WEAVE_ERROR_NO_MEMORY ? |
| nl::Weave::Profiles::Common::kStatus_OutOfMemory : |
| nl::Weave::Profiles::Common::kStatus_InternalServerProblem); |
| |
| ExitNow(); |
| } |
| else |
| { |
| handler->mAppState = outParam.mIncomingSubscribeRequest.mHandlerAppState; |
| handler->mEventCallback = outParam.mIncomingSubscribeRequest.mHandlerEventCallback; |
| uint32_t maxSize = WDM_MAX_NOTIFICATION_SIZE; |
| |
| WEAVE_FAULT_INJECT_WITH_ARGS(FaultInjection::kFault_WDM_NotificationSize, |
| // Code executed with the Manager's lock: |
| if (numFaultArgs > 0) |
| { |
| maxSize = static_cast<uint32_t>(faultArgs[0]); |
| } |
| else |
| { |
| maxSize = WDM_MAX_NOTIFICATION_SIZE/2; |
| } |
| , |
| // Code executed withouth the Manager's lock: |
| WeaveLogDetail(DataManagement, "Handler[%d] Payload size set to %d", pEngine->GetHandlerId(handler), maxSize) |
| ); |
| |
| handler->SetMaxNotificationSize(maxSize); |
| |
| handler->InitWithIncomingRequest(binding, subscriptionId, aEC, aPktInfo, aMsgInfo, aPayload); |
| aEC = NULL; |
| aPayload = NULL; |
| } |
| } |
| |
| exit: |
| WeaveLogFunctError(err); |
| |
| if (NULL != aPayload) |
| { |
| PacketBuffer::Free(aPayload); |
| aPayload = NULL; |
| } |
| |
| if (NULL != aEC) |
| { |
| err = SendStatusReport(aEC, reasonProfileId, reasonStatusCode); |
| WeaveLogFunctError(err); |
| |
| aEC->Close(); |
| aEC = NULL; |
| } |
| |
| if (NULL != binding) |
| { |
| binding->Release(); |
| } |
| } |
| |
| void SubscriptionEngine::OnSubscribeConfirmRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo, |
| const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId, |
| uint8_t aMsgType, PacketBuffer *aPayload) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| SubscriptionEngine* const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState); |
| uint32_t reasonProfileId = nl::Weave::Profiles::kWeaveProfile_Common; |
| uint16_t reasonStatusCode = nl::Weave::Profiles::Common::kStatus_InternalServerProblem; |
| uint64_t subscriptionId; |
| |
| { |
| nl::Weave::TLV::TLVReader reader; |
| SubscribeConfirmRequest::Parser request; |
| |
| reader.Init(aPayload); |
| |
| err = reader.Next(); |
| SuccessOrExit(err); |
| |
| err = request.Init(reader); |
| SuccessOrExit(err); |
| |
| err = request.GetSubscriptionID(&subscriptionId); |
| SuccessOrExit(err); |
| } |
| |
| if (pEngine->mIsPublisherEnabled) |
| { |
| // find a matching subscription |
| bool found = false; |
| |
| #if WDM_ENABLE_SUBSCRIPTION_CLIENT |
| if (pEngine->UpdateClientLiveness(aEC->PeerNodeId, subscriptionId)) |
| { |
| found = true; |
| } |
| #endif // WDM_ENABLE_SUBSCRIPTION_CLIENT |
| |
| #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| if (pEngine->UpdateHandlerLiveness(aEC->PeerNodeId, subscriptionId)) |
| { |
| found = true; |
| } |
| #endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| if (found) |
| { |
| reasonStatusCode = nl::Weave::Profiles::Common::kStatus_Success; |
| } |
| else |
| { |
| reasonProfileId = nl::Weave::Profiles::kWeaveProfile_WDM; |
| reasonStatusCode = kStatus_InvalidSubscriptionID; |
| } |
| } |
| else |
| { |
| reasonStatusCode = nl::Weave::Profiles::Common::kStatus_Busy; |
| } |
| |
| { |
| err = SendStatusReport(aEC, reasonProfileId, reasonStatusCode); |
| SuccessOrExit(err); |
| } |
| |
| exit: |
| WeaveLogFunctError(err); |
| |
| // aPayload is guaranteed to be non-NULL |
| PacketBuffer::Free(aPayload); |
| |
| // aEC is guaranteed to be non-NULL. |
| aEC->Abort(); |
| } |
| |
| #if WDM_PUBLISHER_ENABLE_CUSTOM_COMMANDS |
| void SubscriptionEngine::OnCustomCommandRequest (nl::Weave::ExchangeContext *aEC, const nl::Inet::IPPacketInfo *aPktInfo, |
| const nl::Weave::WeaveMessageInfo *aMsgInfo, uint32_t aProfileId, |
| uint8_t aMsgType, PacketBuffer *aPayload) |
| { |
| WEAVE_ERROR err = WEAVE_NO_ERROR; |
| SubscriptionEngine * const pEngine = reinterpret_cast<SubscriptionEngine *>(aEC->AppState); |
| Command * command = NULL; |
| uint32_t statusReportProfile = nl::Weave::Profiles::kWeaveProfile_WDM; |
| uint16_t statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_InvalidPath; |
| |
| // WRM, in general, only works with WRM. Reject anything not requesting ACK immediately |
| // and do not propagate this request downstream. |
| VerifyOrExit(aEC->HasPeerRequestedAck(), err = WEAVE_ERROR_INVALID_TRANSFER_MODE); |
| |
| for (size_t i = 0; i < kMaxNumCommandObjs; ++i) |
| { |
| if (pEngine->mCommandObjs[i].IsFree()) |
| { |
| SYSTEM_STATS_INCREMENT(nl::Weave::System::Stats::kWDMNext_NumCommands); |
| command = &(pEngine->mCommandObjs[i]); |
| command->Init(aEC); |
| aEC = NULL; |
| break; |
| } |
| } |
| VerifyOrExit(NULL != command, err = WEAVE_ERROR_NO_MEMORY); |
| |
| if (!pEngine->mIsPublisherEnabled) |
| { |
| // Has to be a publisher to be processing a command request |
| statusReportProfile = nl::Weave::Profiles::kWeaveProfile_Common; |
| statusReportCode = nl::Weave::Profiles::Common::kStatus_UnsupportedMessage; |
| ExitNow(err = WEAVE_ERROR_INVALID_MESSAGE_TYPE); |
| } |
| |
| { |
| nl::Weave::TLV::TLVReader reader; |
| TraitDataSource *dataSource = NULL; |
| bool isVersionValid = false; |
| bool isExpiryTimeValid = false; |
| uint64_t commandType; |
| uint64_t mustBeVersion; |
| int64_t expiryTimeMicroSecond; |
| |
| reader.Init(aPayload); |
| |
| err = reader.Next(); |
| SuccessOrExit(err); |
| |
| { |
| CustomCommandRequest::Parser request; |
| TraitDataHandle traitDataHandle; |
| nl::Weave::TLV::TLVReader pathReader; |
| SchemaVersionRange requestedSchemaVersion, computedVersionIntersection; |
| |
| err = request.Init(reader); |
| SuccessOrExit(err); |
| |
| #if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK |
| err = request.CheckSchemaValidity(); |
| SuccessOrExit(err); |
| #endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK |
| |
| err = request.GetReaderOnPath(&pathReader); |
| SuccessOrExit(err); |
| |
| err = pEngine->mPublisherCatalog->AddressToHandle(pathReader, traitDataHandle, requestedSchemaVersion); |
| SuccessOrExit(err); |
| |
| err = SubscriptionEngine::GetInstance()->mPublisherCatalog->Locate(traitDataHandle, &dataSource); |
| SuccessOrExit(err); |
| |
| if (!dataSource->GetSchemaEngine()->GetVersionIntersection(requestedSchemaVersion, computedVersionIntersection)) { |
| WeaveLogDetail(DataManagement, "Mismatch in requested version on handle %u (requested: %u, %u)", traitDataHandle, requestedSchemaVersion.mMaxVersion, requestedSchemaVersion.mMinVersion); |
| |
| statusReportProfile = nl::Weave::Profiles::kWeaveProfile_WDM; |
| statusReportCode = kStatus_IncompatibleDataSchemaVersion; |
| ExitNow(err = WEAVE_ERROR_INCOMPATIBLE_SCHEMA_VERSION); |
| } |
| |
| err = request.GetCommandType(&commandType); |
| SuccessOrExit(err); |
| |
| err = request.GetExpiryTimeMicroSecond(&expiryTimeMicroSecond); |
| if (WEAVE_NO_ERROR == err) |
| { |
| isExpiryTimeValid = true; |
| } |
| else if (WEAVE_END_OF_TLV == err) |
| { |
| err = WEAVE_NO_ERROR; |
| } |
| else |
| { |
| ExitNow(); |
| } |
| |
| err = request.GetMustBeVersion(&mustBeVersion); |
| if (WEAVE_NO_ERROR == err) |
| { |
| isVersionValid = true; |
| } |
| else if (WEAVE_END_OF_TLV == err) |
| { |
| err = WEAVE_NO_ERROR; |
| } |
| else |
| { |
| ExitNow(); |
| } |
| |
| err = request.GetReaderOnArgument(&reader); |
| SuccessOrExit(err); |
| } |
| |
| #if WDM_ENFORCE_EXPIRY_TIME |
| if (isExpiryTimeValid) |
| { |
| nl::Weave::Profiles::Time::timesync_t now_usec; |
| err = nl::Weave::Platform::Time::GetSystemTime(&now_usec); |
| if (WEAVE_ERROR_UNSUPPORTED_CLOCK == err) |
| { |
| statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_ExpiryTimeNotSupported; |
| ExitNow(); |
| } |
| else if (WEAVE_ERROR_TIME_NOT_SYNCED_YET == err) |
| { |
| statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_NotTimeSyncedYet; |
| ExitNow(); |
| } |
| else if (now_usec >= expiryTimeMicroSecond) |
| { |
| statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_RequestExpiredInTime; |
| ExitNow(); |
| } |
| WeaveLogDetail(DataManagement, "Command ExpiryTime 0x%" PRIX64 ", now: 0x% " PRIX64 " ", expiryTimeMicroSecond, now_usec); |
| } |
| #endif // WDM_ENFORCE_EXPIRY_TIME |
| |
| if (isVersionValid) |
| { |
| uint64_t currentVersion = dataSource->GetVersion(); |
| |
| if (mustBeVersion != currentVersion) |
| { |
| WeaveLogDetail(DataManagement, "Version required 0x%" PRIX64 ", current: 0x% " PRIX64 " ", mustBeVersion, currentVersion); |
| statusReportCode = nl::Weave::Profiles::DataManagement::kStatus_VersionMismatch; |
| ExitNow(); |
| } |
| } |
| |
| // Note we cannot just use pathReader at here because the TDM related functions |
| // generally assume they can move the reader at their will. |
| // Note that callee is supposed to cache whatever is useful in the TLV stream into its own memory |
| // when this callback returns, we'd destroy the TLV object |
| dataSource->OnCustomCommand(command, aMsgInfo, aPayload, commandType, |
| isExpiryTimeValid, expiryTimeMicroSecond, isVersionValid, mustBeVersion, |
| reader); |
| command = NULL; |
| aPayload = NULL; |
| } |
| |
| exit: |
| WeaveLogFunctError(err); |
| |
| if (NULL != aPayload) |
| { |
| PacketBuffer::Free(aPayload); |
| aPayload = NULL; |
| } |
| |
| // Note that when dispatched == true, ownership of aEC is already passed on to OnCustomCommand, and hence set to NULL |
| if (NULL != command) |
| { |
| err = command->SendError(statusReportProfile, statusReportCode, err); |
| WeaveLogFunctError(err); |
| } |
| |
| if (NULL != aEC) |
| { |
| aEC->Close(); |
| aEC = NULL; |
| } |
| } |
| #endif // WDM_PUBLISHER_ENABLE_CUSTOM_COMMANDS |
| |
| #endif // #if WDM_ENABLE_SUBSCRIPTION_PUBLISHER |
| |
| }; // WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current) |
| }; // Profiles |
| }; // Weave |
| }; // nl |
| |
| #endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING |