blob: 62bd9f926ed04f315bff559ebb116f0bba06dd60 [file] [log] [blame]
/*
*
* Copyright (c) 2016-2017 Nest Labs, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file
*
* @brief
* Implementations for the abstract DMPublisher base class.
*
* This file contains implementations for the abstract DMPublisher
* base class, which should be used as the basis for
* application-specific publishers base on WDM. See the document,
* "Nest Weave-Data Management Protocol" document for a complete
* description.
*/
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif // __STDC_FORMAT_MACROS
#include <Weave/Profiles/data-management/Legacy/WdmManagedNamespace.h>
#include <Weave/Support/CodeUtils.h>
#include <Weave/Profiles/data-management/DataManagement.h>
using namespace ::nl;
using namespace ::nl::Inet;
using namespace ::nl::Weave;
using namespace ::nl::Weave::TLV;
using namespace ::nl::Weave::Profiles;
using namespace ::nl::Weave::Profiles::Common;
using namespace ::nl::Weave::Profiles::DataManagement;
using namespace ::nl::Weave::Profiles::StatusReporting;
/*
* This is the listener that's put in place when the publisher starts
* up and which handles unsolicited View, Subscribe,
* CancelSubscription and Update requests.
*/
static void PublisherListener(ExchangeContext *ec,
const IPPacketInfo *pktInfo,
const WeaveMessageInfo *msgInfo,
uint32_t profileId,
uint8_t msgType,
PacketBuffer *payload)
{
DMPublisher *scribners = static_cast<DMPublisher*>(ec->AppState);
scribners->OnMsgReceived(ec, profileId, msgType, payload);
}
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
/*
* This low-level handler is called on receipt of the ACK to a
* subscribe response frame and is used to activate the subscription
* in question since. This protects the client from receiving
* notifications before it is ready.
*/
static void SubscriptionSuccess(ExchangeContext *ec, void *aSubscription)
{
DMPublisher::Subscription *s = static_cast<DMPublisher::Subscription*>(aSubscription);
s->Activate();
// Only close out the context once we get the ACK back for the subscription response.
s->mSubscriptionCtx->Close();
s->mSubscriptionCtx = NULL;
}
/*
* This low-level handler is called on failure to receive an ACK to a
* subscribe response frame. It cancels the subscription and logs an
* error
*/
static void SubscriptionFailure(ExchangeContext *ec, WEAVE_ERROR aError, void *aSubscription)
{
DMPublisher::Subscription *s = static_cast<DMPublisher::Subscription*>(aSubscription);
WeaveLogError(DataManagement,
"Subscription [0x%" PRIx64 ", 0x%" PRIx64 ", 0x%" PRIx64 "] failed - %s",
s->mAssignedId,
s->mRequestedId,
s->mClientId,
ErrorStr(aError));
s->mSubscriptionCtx->Abort();
s->Free();
}
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
/**
* @brief
* The default constructor for DMPublisher objects.
*
* Clears all internal state.
*/
DMPublisher::DMPublisher(void) { Clear(); }
/**
* @brief
* The destructor for DMPublisher objects.
*
* Clears all internal state and removes the listener from the
* exchange manager if one is in place.
*/
DMPublisher::~DMPublisher(void) { Finalize(); }
/**
* @brief
* Initialize a DMPublisher object.
*
* This method has the side effect of installing a listener in the
* exchange manager for the full range of client requests possibly
* including those for subscription.
*
* @param [in] aExchangeMgr A pointer to the
* WeaveExchangeManager object to
* use for all exchanges in which
* the publisher wishes to
* participate.
*
* @param [in] aResponseTimeout A response timeout in
* milliseconds, i.e. the maximum
* time to wait for a response.
*
* @return #WEAVE_NO_ERROR On success. Otherwise, return a
* #WEAVE_ERROR reflecting a failure to properly set up the
* publisher.
*/
WEAVE_ERROR DMPublisher::Init(WeaveExchangeManager *aExchangeMgr, uint32_t aResponseTimeout)
{
WEAVE_ERROR err;
err = ProtocolEngine::Init(aExchangeMgr, aResponseTimeout);
SuccessOrExit(err);
err = aExchangeMgr->RegisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_ViewRequest, PublisherListener, this);
SuccessOrExit(err);
err = aExchangeMgr->RegisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_UpdateRequest, PublisherListener, this);
SuccessOrExit(err);
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_LEGACY_MESSAGE_TYPES
err = aExchangeMgr->RegisterUnsolicitedMessageHandler(kWeaveProfile_WDM,
kMsgType_UpdateRequest_Deprecated,
PublisherListener,
this);
SuccessOrExit(err);
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_LEGACY_MESSAGE_TYPES
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
ClearSubscriptionTable();
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
err = aExchangeMgr->RegisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_SubscribeRequest, PublisherListener, this);
SuccessOrExit(err);
err = aExchangeMgr->RegisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_CancelSubscriptionRequest, PublisherListener, this);
SuccessOrExit(err);
exit:
return err;
}
/**
* @brief
* Clear the internal state of DMPublisher object.
*
* Clears the notify transaction pool and the subscription table.
*/
void DMPublisher::Clear(void)
{
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
int i;
ClearSubscriptionTable();
for (i = 0; i<kNotifyPoolSize; i++)
mNotifyPool[i].Free();
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
}
/**
* @brief
* Shut down an operating DMPublisher.
*
* Clears all the operating state and shuts down the listener if one
* is running.
*/
void DMPublisher::Finalize(void)
{
if (mExchangeMgr)
{
mExchangeMgr->UnregisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_ViewRequest);
mExchangeMgr->UnregisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_UpdateRequest);
mExchangeMgr->UnregisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_SubscribeRequest);
mExchangeMgr->UnregisterUnsolicitedMessageHandler(kWeaveProfile_WDM, kMsgType_CancelSubscriptionRequest);
}
Clear();
ProtocolEngine::Finalize();
}
void DMPublisher::IncompleteIndication(Binding *aBinding, StatusReport &aReport)
{
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
FailSubscription(kTopicIdNotSpecified, aBinding->mPeerNodeId, aReport);
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
}
/**
* @brief
* Respond to a view request.
*
* Send the response to a view request after processing, using the
* exchange context that was given in the indication.
*
* @param [in] aResponseCtx A pointer to the exchange context
* under which the request was
* received.
*
* @param [in] aStatus A reference to a StatusReport
* object containing information
* about the status of the
* request. In the case where this is
* success, the requestor will be
* expecting a data list containing
* the data of interest.
*
* @param [in] aDataList A pointer to an optional
* ReferencedTLVData object
* containing a TLV-encoded data list
* containing the data of interest
* and the paths indicating the
* disposition of that data. Note
* that this parameter shall be NULL
* in the case where the status given
* in the previous parameter is not
* success.
*
* @retval #WEAVE_NO_ERROR On success. Otherwise, return a
* #WEAVE_ERROR reflecting a failure to send the response message.
* @retval #WEAVE_ERROR_INVALID_ARGUMENT If the given parameters are
* inconsistent
* @retval #WEAVE_ERROR_NO_MEMORY If an Inet buffer could not be
* allocated.
*/
WEAVE_ERROR DMPublisher::ViewResponse(ExchangeContext *aResponseCtx, StatusReport &aStatus, ReferencedTLVData *aDataList)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer* buf = NULL;
VerifyOrExit((aDataList || !aStatus.success()), err = WEAVE_ERROR_INVALID_ARGUMENT);
if (aStatus.success())
{
buf = PacketBuffer::New();
VerifyOrExit(buf != NULL, err = WEAVE_ERROR_NO_MEMORY);
err = aDataList->pack(buf);
SuccessOrExit(err);
err = aResponseCtx->SendMessage(kWeaveProfile_WDM, kMsgType_ViewResponse, buf);
buf = NULL;
}
else
{
err = StatusResponse(aResponseCtx, aStatus);
}
exit:
if (buf)
{
PacketBuffer::Free(buf);
}
return err;
}
/**
* @brief
* Respond to an update request.
*
* Send the response to an update request after processing, using the
* exchange context that was given in the indication.
*
* @param [in] aResponseCtx A pointer to the exchange context
* under which the request was
* received.
*
* @param [in] aStatus A reference to a StatusReport
* object containing information
* about the status of the request.
*
* @return #WEAVE_NO_ERROR On success. Otherwise return a
* #WEAVE_ERROR reflecting a failure to send the response message.
*/
WEAVE_ERROR DMPublisher::UpdateResponse(ExchangeContext *aResponseCtx, StatusReport &aStatus)
{
return StatusResponse(aResponseCtx, aStatus);
}
/*
* note that an exchange context is passed to the "request received"
* method and is assumed to be passed through to the "send response"
* methods. the NHL IS responsible for managing this exchange context.
*/
void DMPublisher::OnMsgReceived(ExchangeContext *aExchangeCtx, uint32_t aProfileId, uint8_t aMsgType, PacketBuffer *aMsg)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
MessageIterator i(aMsg);
StatusReport status;
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
TopicIdentifier topicId;
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
ReferencedTLVData pathList;
ReferencedTLVData dataList;
/*
* in the case where NONE of the indication methods get called we
* still have to close the context AND we should still send a
* response of some sort.
*/
bool sendRsp = false;
if (aProfileId == kWeaveProfile_WDM)
{
switch (aMsgType)
{
case kMsgType_ViewRequest:
err = ReferencedTLVData::parse(i, pathList);
SuccessOrExit(err);
err = ViewIndication(aExchangeCtx, pathList);
break;
case kMsgType_UpdateRequest:
err = ReferencedTLVData::parse(i, dataList);
SuccessOrExit(err);
err = UpdateIndication(aExchangeCtx, dataList);
break;
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_LEGACY_MESSAGE_TYPES
case kMsgType_UpdateRequest_Deprecated:
err = ReferencedTLVData::parse(i, dataList);
SuccessOrExit(err);
err = UpdateIndication(aExchangeCtx, dataList);
break;
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_LEGACY_MESSAGE_TYPES
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
case kMsgType_SubscribeRequest:
err = i.read64(&topicId);
SuccessOrExit(err);
if (topicId == kTopicIdNotSpecified)
{
err = ReferencedTLVData::parse(i, pathList);
SuccessOrExit(err);
err = SubscribeIndication(aExchangeCtx, pathList);
}
else
{
err = SubscribeIndication(aExchangeCtx, topicId);
}
break;
case kMsgType_CancelSubscriptionRequest:
err = i.read64(&topicId);
SuccessOrExit(err);
err = CancelSubscriptionIndication(aExchangeCtx, topicId);
if (err == WEAVE_NO_ERROR)
{
status.init(kWeaveProfile_Common, kStatus_Success, NULL);
}
else
{
status.init(err);
}
sendRsp = true;
break;
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
default:
err = WEAVE_ERROR_INVALID_MESSAGE_TYPE;
status.init(kWeaveProfile_WDM, kStatus_UnsupportedMessage);
sendRsp = true;
break;
}
}
else
{
err = WEAVE_ERROR_INVALID_PROFILE_ID;
status.init(kWeaveProfile_WDM, kStatus_UnsupportedMessage);
sendRsp = true;
}
exit:
if (err != WEAVE_NO_ERROR)
WeaveLogError(DataManagement, "OnMsgReceived() - %s", ErrorStr(err));
if (sendRsp)
{
err = StatusResponse(aExchangeCtx, status);
aExchangeCtx->Close();
}
PacketBuffer::Free(aMsg);
}
#if WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
/**
* @brief
* Start a subscription.
*
* This method installs a subscription to a particular assigned
* ID/requested ID pair in the publisher's subscription table with
* the given client as the client requiring notification.
*
* @param [in] aTopicId A reference to a well-known topic
* ID.
*
* @param [in] aClientId A reference to the 64-bit node ID
* of the client.
*
* @retval #WEAVE_NO_ERROR On success.
* @retval #WEAVE_ERROR_NO_MEMORY If a subscription could not be added.
*/
WEAVE_ERROR DMPublisher::BeginSubscription(const TopicIdentifier &aTopicId, const uint64_t &aClientId)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
Subscription *s;
s = AddSubscription(aTopicId, aTopicId, aClientId);
VerifyOrExit(s, err = WEAVE_ERROR_NO_MEMORY);
s->Activate();
exit:
return err;
}
/**
* @brief
* Stop, and remove, a subscription.
*
* Remove a subscription from the publisher's subscription table
* thereby stopping any future notifications from being delivered to
* the remote client.
*
* @param [in] aTopicId A reference to a topic ID
* associated with the subscription.
* This may be the "working" ID or
* the requested ID.
*
* @param [in] aClientId A reference to the 64-bit node ID
* of the client.
*
* @return #WEAVE_NO_ERROR On success. Otherwise returns a
* #WEAVE_ERROR reflecting a failure to remove the subscription.
*/
WEAVE_ERROR DMPublisher::EndSubscription(const TopicIdentifier &aTopicId, const uint64_t &aClientId)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
RemoveSubscription(aTopicId, aClientId);
return err;
}
/**
* @brief
* Free all items in the subscription table.
*/
void DMPublisher::ClearSubscriptionTable(void)
{
for (int i = 0; i < kSubscriptionMgrTableSize; i++)
{
Subscription &s = mSubscriptionTable[i];
s.Free();
}
}
/**
* @brief
* Check that the subscription table is empty.
*
* In particular, this means, check that all entries are free.
*
* @return true if all entries are free, false otherwise.
*/
bool DMPublisher::SubscriptionTableEmpty(void) const
{
bool result = true;
for (int i = 0; i < kSubscriptionMgrTableSize; i++)
{
const Subscription &s = mSubscriptionTable[i];
if (s.mAssignedId != kTopicIdNotSpecified && s.mClientId != kNodeIdNotSpecified)
{
result = false;
break;
}
}
return result;
}
/**
* @brief
* Respond to a subscribe request.
*
* Send the response to a subscribe request after processing, using
* the exchange context that was given in the indication. Invoking
* this method has the side effect of actually installing the
* subscription.
*
* @param [in] aResponseCtx A pointer to the exchange context
* under which the request was
* received.
*
* @param [in] aStatus A reference to a StatusReport
* object containing information
* about the status of the request.
* In the case where this is success,
* the requestor will be expecting a
* data list containing the data of
* interest.
*
* @param [in] aTopicId A reference to the topic ID, if
* any, associated with the subscribe
* request. This parameter may have a
* value of kTopicIdNotSpecified in
* the case where the subscribe
* request did not contain a
* well-known topic ID.
*
* @param [in] aDataList A pointer to an optional
* ReferencedTLVData object
* containing a TLV-encoded data list
* containing the data of interest
* and the paths indicating the
* disposition of that data. Note
* that this parameter shall be NULL
* in the case where the status given
* above is not success.
*
* @retval #WEAVE_NO_ERROR On success. Otherwise, return a
* #WEAVE_ERROR reflecting a failure to send the response message.
* @retval #WEAVE_ERROR_NO_MEMORY If an Inet buffer could not be allocated.
* @retval #WEAVE_ERROR_INVALID_ARGUMENT If the given parameters are inconsistent.
*/
WEAVE_ERROR DMPublisher::SubscribeResponse(ExchangeContext *aResponseCtx,
StatusReport &aStatus,
const TopicIdentifier &aTopicId,
ReferencedTLVData *aDataList)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer* buf = NULL;
TopicIdentifier topic = PublisherSpecificTopicId();
Binding* binding;
Subscription* subscription;
VerifyOrExit((aDataList || !aStatus.success()), err = WEAVE_ERROR_INVALID_ARGUMENT);
/*
* generate a binding and add the subscription here but don't
* activate it yet.
*/
binding = FromExchangeCtx(aResponseCtx);
VerifyOrExit(binding, err = WEAVE_ERROR_NO_MEMORY);
subscription = AddSubscription(topic, aTopicId, binding->mPeerNodeId);
VerifyOrExit(subscription, err = WEAVE_ERROR_NO_MEMORY);
if (aStatus.success())
{
buf = PacketBuffer::New();
VerifyOrExit(buf != NULL, err = WEAVE_ERROR_NO_MEMORY);
{
MessageIterator i(buf);
i.append();
err = i.write64(topic);
SuccessOrExit(err);
err = aDataList->pack(i);
SuccessOrExit(err);
}
/*
* what we do here depends on what transport we're
* using. if it's raw UDP or, sadly, TCP we have no choice
* but to just activate the subscription after we send the
* response. if it's WRMP, on the other hand, we really
* want to install the subscription here but only activate
* it when ACK is received on the response.
*/
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
if (binding->mTransport == kTransport_WRMP)
{
aResponseCtx->OnAckRcvd = SubscriptionSuccess;
aResponseCtx->OnSendError = SubscriptionFailure;
subscription->mSubscriptionCtx = aResponseCtx;
err = aResponseCtx->SendMessage(kWeaveProfile_WDM,
kMsgType_SubscribeResponse,
buf, ExchangeContext::kSendFlag_RequestAck,
subscription);
buf = NULL;
}
else
#endif
{
err = aResponseCtx->SendMessage(kWeaveProfile_WDM, kMsgType_SubscribeResponse, buf);
buf = NULL;
if (err == WEAVE_NO_ERROR)
{
subscription->Activate();
}
else
{
subscription->Free();
}
}
}
else
{
err = StatusResponse(aResponseCtx, aStatus);
}
exit:
if (buf)
{
PacketBuffer::Free(buf);
}
return err;
}
/**
* @brief
* Cancel a subscription.
*
* Cancel a subscription in response to the receipt of a cancel
* subscription request. This method doesn't generate further network
* traffic but simply remove the subscription record.
*
* @note
* This virtual method simply handles the subscription
* cancellation itself and must be called by derived sub-classes that
* implement their own method.
*
* @param [in] aResponseCtx A pointer to the exchange
* context under which the request
* was received.
*
* @param [in] aTopicId A reference to the topic ID,
* if any, associated with the
* subscribe request.
*
* @return #WEAVE_NO_ERROR On success. Otherwise return a
* #WEAVE_ERROR reflecting a failure to end the subscription.
*/
WEAVE_ERROR DMPublisher::CancelSubscriptionIndication(ExchangeContext *aResponseCtx, const TopicIdentifier &aTopicId)
{
return EndSubscription(aTopicId, aResponseCtx->PeerNodeId);
}
/**
* @brief
* Request a notification.
*
* Notify a specific remote client of changes to data of interest
* managed by this publisher.
*
* @param [in] aDestinationId A reference to the 64-bit node ID
* of the remote client.
*
* @param [in] aTopicId A reference to the topic ID
* associated with this subscription.
*
* @param [in] aDataList A reference to a ReferencedTLVData
* object containing a TLV_encoded
* data list with the changed data as
* well as paths describing its
* disposition.
*
* @param [in] aTxnId An identifier for the transaction
* set up to manage this notification.
*
* @param [in] aTimeout A maximum time in milliseconds to
* wait for the notify response.
*
* @retval #WEAVE_NO_ERROR On success. Otherwise, return
* a #WEAVE_ERROR describing a failure to initialize or start the
* transaction.
* @retval #WEAVE_ERROR_INVALID_ARGUMENT If the destination ID is not specified.
* @retval #WEAVE_ERROR_NO_MEMORY If a new notify transaction cannot be allocated.
*/
WEAVE_ERROR DMPublisher::NotifyRequest(const uint64_t &aDestinationId,
const TopicIdentifier &aTopicId,
ReferencedTLVData &aDataList,
uint16_t aTxnId,
uint32_t aTimeout)
{
WEAVE_ERROR err = WEAVE_ERROR_INCORRECT_STATE;
Binding *binding = GetBinding(aDestinationId);
Notify *notify = NULL;
if (binding)
{
notify = NewNotify();
if (notify)
{
err = notify->Init(this, aTopicId, aDataList, aTxnId, aTimeout);
SuccessOrExit(err);
err = StartTransaction(notify, binding);
}
else
{
err = WEAVE_ERROR_NO_MEMORY;
}
}
else
{
err = WEAVE_ERROR_INCORRECT_STATE;
}
exit:
return err;
}
/**
* @brief
* Request notifications based on topic.
*
* Notify interested clients of changes to data of interest managed by
* this publisher. This version includes a specific topic identifier
* as a parameter.
*
* @param [in] aTopicId A reference to the topic ID
* associated with this subscription.
*
* @param [in] aDataList A reference to a ReferencedTLVData
* object containing a TLV_encoded
* data list containing the changed
* data as well as paths describing
* its disposition.
*
* @param [in] aTxnId An identifier for the transaction
* set up to manage this notification.
*
* @param [in] aTimeout A maximum time in milliseconds to
* wait for the notify response.
*
* @retval #WEAVE_NO_ERROR On success. Otherwise, return
* a #WEAVE_ERROR describing a failure to initialize or start the
* transaction.
* @retval #WEAVE_ERROR_INVALID_ARGUMENT is the destination ID is not specified.
* @retval #WEAVE_ERROR_NO_MEMORY If a new notify transaction cannot be allocated.
*/
WEAVE_ERROR DMPublisher::NotifyRequest(const TopicIdentifier &aTopicId, ReferencedTLVData &aDataList, uint16_t aTxnId, uint32_t aTimeout)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
for (int i = 0; i < kSubscriptionMgrTableSize; i++)
{
Subscription &s = mSubscriptionTable[i];
if (s.IsActive())
{
if (s.mAssignedId == aTopicId || s.mRequestedId == aTopicId)
{
if (s.mClientId == kNodeIdNotSpecified)
{
/*
* we leave open the possibility that someone has put in
* an entry with an unspecified client and intends to use
* a default binding.
*/
Notify *notify = NewNotify();
if (notify)
{
err = notify->Init(this, aTopicId, aDataList, aTxnId, aTimeout);
if (err == WEAVE_NO_ERROR)
err = StartTransaction(notify);
}
else
{
err = WEAVE_ERROR_NO_MEMORY;
}
}
else
{
err = NotifyRequest(s.mClientId, s.mAssignedId, aDataList, aTxnId, aTimeout);
}
}
}
if (err != WEAVE_NO_ERROR)
break;
}
return err;
}
/**
* @brief
* Request notifications for changed data.
*
* Notify clients of changes to data of interest managed by this
* publisher.
*
* @warning Not implemented.
*
* @param [in] aDataList A reference to a ReferencedTLVData
* object containing a TLV_encoded
* data list with the changed data as
* well as paths describing its
* disposition.
*
* @param [in] aTxnId An identifier for the transaction
* set up to manage this notification.
*
* @param [in] aTimeout A maximum time in milliseconds to
* wait for the notify response.
*
* @retval #WEAVE_NO_ERROR On success. Otherwise, return a
* #WEAVE_ERROR describing a failure to initialize or start the
* transaction.
* @retval #WEAVE_ERROR_INVALID_ARGUMENT is the destination ID is not specified.
* @retval #WEAVE_ERROR_NO_MEMORY if a new notify transaction cannot be allocated.
*
*/
WEAVE_ERROR DMPublisher::NotifyRequest(ReferencedTLVData &aDataList, uint16_t aTxnId, uint32_t aTimeout)
{
// jira://WEAV-265 has been created to track this
return WEAVE_ERROR_NOT_IMPLEMENTED;
}
/**
* @brief
* Handle a notify confirm.
*
* @note
* In the case of an unknown topic, which indicates that the
* notification has been successfully delivered but did not match any
* topic of interest on the client, the subscription is automatically
* revoked. IN ANY other error case, the subscription remains in place
* and it is the responsibility of the NHL to revoke it if so desired.
*
* This virtual method does the subscription-based crank-turning
* on behalf of higher layers and must be invoked by higher-layer
* handlers in order to maintain the subscription table.
*
* @param [in] aResponderId A reference to the 64-bit node ID
* of the remote client that sent the
* confirmation.
*
* @param [in] aAssignedId A reference to the publisher-
* assigned "working" topic ID for
* this subscription.
*
* @param [in] aStatus A reference to a StatusReport
* object containing the reported
* status of the
* notification. Specifically, there
* are two possible values here:
*
* 1) kStatus_Success, or
*
* 2) kStatus_UnknownTopic.
*
* In the latter case, the
* subscription should be removed.
*
* @param [in] aTxnId An identifier for the transaction
* set up to manage the original
* notification.
*
* @return #WEAVE_NO_ERROR On success. Otherwise, a #WEAVE_ERROR reflecting the
* failure to end the subscription in the case where this is called for.
*/
WEAVE_ERROR DMPublisher::NotifyConfirm(const uint64_t &aResponderId,
const TopicIdentifier &aAssignedId,
StatusReport &aStatus,
uint16_t aTxnId)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
if (aStatus.mProfileId == kWeaveProfile_WDM && aStatus.mStatusCode == kStatus_UnknownTopic)
FailSubscription(aAssignedId, aResponderId, aStatus);
/*
* we could get an error status here, e.g. in the case of a
* timeout. in this case, return the given error.
*/
else if (aStatus.mProfileId == kWeaveProfile_Common && aStatus.mStatusCode == kStatus_InternalError)
err = aStatus.mError;
return err;
}
/*
* these are methods for the inner Subscription class and, as such,
* are not part of the DMPublisher public interface.
*/
DMPublisher::Subscription::Subscription(void)
{
Free();
}
DMPublisher::Subscription::~Subscription(void)
{
Free();
}
WEAVE_ERROR DMPublisher::Subscription::Init(const TopicIdentifier &aAssignedId,
const TopicIdentifier &aRequestedId,
const uint64_t &aClientId)
{
mFlags = kSubscriptionFlags_Allocated;
mAssignedId = aAssignedId;
mRequestedId = aRequestedId;
mClientId = aClientId;
return WEAVE_NO_ERROR;
}
void DMPublisher::Subscription::Free(void)
{
mFlags = kSubscriptionFlags_Free;
mAssignedId = kTopicIdNotSpecified;
mRequestedId = kTopicIdNotSpecified;
mClientId = kNodeIdNotSpecified;
mSubscriptionCtx = NULL;
}
DMPublisher::Subscription *DMPublisher::AddSubscription(const TopicIdentifier &aAssignedId,
const TopicIdentifier &aRequestedId,
const uint64_t &aClientId)
{
int i;
Subscription *result = NULL;
// look to see if it's already there.
for (i = 0; i < kSubscriptionMgrTableSize; i++)
{
Subscription &s = mSubscriptionTable[i];
if (s.mAssignedId == aAssignedId && s.mRequestedId == aRequestedId &&s.mClientId == aClientId)
{
result = &s;
result->mSubscriptionCtx = NULL;
break;
}
}
// and if not, add it.
if (!result)
{
for (i = 0; i < kSubscriptionMgrTableSize; i++)
{
Subscription &s = mSubscriptionTable[i];
if (s.IsFree())
{
s.Init(aAssignedId, aRequestedId, aClientId);
result = &s;
result->mSubscriptionCtx = NULL;
break;
}
}
}
return result;
}
void DMPublisher::RemoveSubscription(const TopicIdentifier &aTopicId, const uint64_t &aClientId)
{
for (int i = 0; i < kSubscriptionMgrTableSize; i++)
{
Subscription &s = mSubscriptionTable[i];
if (s.MatchSubscription(aTopicId, aClientId))
s.Free();
}
}
void DMPublisher::FailSubscription(const TopicIdentifier &aTopicId, const uint64_t &aClientId, StatusReport &aReport)
{
for (int i = 0; i < kSubscriptionMgrTableSize; i++)
{
Subscription &s = mSubscriptionTable[i];
if (s.MatchSubscription(aTopicId, aClientId))
{
UnsubscribeIndication(s.mClientId, s.mAssignedId, aReport);
s.Free();
}
}
}
/*
* transaction-related methods are not part of the DMPublisher public
* interface.
*/
WEAVE_ERROR DMPublisher::Notify::Init(DMPublisher *aPublisher,
const TopicIdentifier &aTopicId,
ReferencedTLVData &aDataList,
uint16_t aTxnId,
uint32_t aTimeout)
{
DMTransaction::Init(aPublisher, aTxnId, aTimeout);
mDataList = aDataList;
mTopicId = aTopicId;
return WEAVE_NO_ERROR;
}
void DMPublisher::Notify::Free(void)
{
DMTransaction::Free();
mDataList.free();
mTopicId = kTopicIdNotSpecified;
}
WEAVE_ERROR DMPublisher::Notify::SendRequest(PacketBuffer *aBuffer, uint16_t aSendFlags)
{
WEAVE_ERROR err;
StatusReport report;
VerifyOrExit(mExchangeCtx, err = WEAVE_ERROR_INCORRECT_STATE);
{
MessageIterator i(aBuffer);
i.append();
err = i.write64(mTopicId);
SuccessOrExit(err);
err = mDataList.pack(i);
SuccessOrExit(err);
}
err = mExchangeCtx->SendMessage(kWeaveProfile_WDM, kMsgType_NotifyRequest, aBuffer, aSendFlags);
aBuffer = NULL;
exit:
if (aBuffer != NULL)
{
PacketBuffer::Free(aBuffer);
}
/*
* free the data list since we're done with it. note
* that this ONLY does something substantive if the
* list has an PacketBuffer associated with it.
*/
mDataList.free();
return err;
}
WEAVE_ERROR DMPublisher::Notify::OnStatusReceived(const uint64_t &aResponderId, StatusReport &aStatus)
{
DMPublisher *publisher = static_cast<DMPublisher *>(mEngine);
uint16_t txnId = mTxnId;
TopicIdentifier topicId = mTopicId;
Finalize();
return publisher->NotifyConfirm(aResponderId, topicId, aStatus, txnId);
}
DMPublisher::Notify *DMPublisher::NewNotify(void)
{
int i;
Notify *result = NULL;
for (i = 0; i < kNotifyPoolSize; i++)
{
if (mNotifyPool[i].IsFree())
{
result = &mNotifyPool[i];
break;
}
}
return result;
}
/**
* @brief
* Request that an executing transaction be canceled.
*
* @note
* The only transactions of interest in a publisher at this
* point are subscription/notification related so this code only
* compiles if subscriptions are allowed.
*
* @sa #WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION
*
* @param [in] aTxnId The number of the transaction to
* be canceled.
*
* @parame [in] aError The #WEAVE_ERROR to report when
* canceling the transaction.
*
* @return #WEAVE_NO_ERROR On success. Otherwise a #WEAVE_ERROR reflecting
* a failure to cancel the transaction.
*/
WEAVE_ERROR DMPublisher::CancelTransactionRequest(uint16_t aTxnId, WEAVE_ERROR aError)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
int i;
for (i = 0; i < kNotifyPoolSize; i++)
{
Notify &n = mNotifyPool[i];
VerifyOrExit(n.mTxnId != aTxnId, err = n.Finalize());
}
exit:
return err;
}
#endif // WEAVE_CONFIG_WDM_ALLOW_PUBLISHER_SUBSCRIPTION