blob: 56fc3f1e45e2d41b236553b59579f4d8e260def3 [file] [log] [blame]
/*
*
* Copyright (c) 2013-2017 Nest Labs, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file
* This file implements the ExchangeContext class.
*
*/
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif
#include <stdint.h>
#include <stdlib.h>
#include <Weave/Core/WeaveCore.h>
#include <Weave/Core/WeaveEncoding.h>
#include <Weave/Profiles/WeaveProfiles.h>
#include <Weave/Profiles/common/CommonProfile.h>
#include <Weave/Support/CodeUtils.h>
#include <Weave/Support/FlagUtils.hpp>
#include <Weave/Support/RandUtils.h>
#include <Weave/Support/logging/WeaveLogging.h>
#include <SystemLayer/SystemTimer.h>
#include <Weave/Support/WeaveFaultInjection.h>
#include <SystemLayer/SystemStats.h>
//#include <nestlabs/log/nllog.hpp>
#undef nlLogError
#define nlLogError(MSG, ...)
namespace nl {
namespace Weave {
using namespace nl::Weave::Encoding;
enum {
kFlagInitiator = 0x0001, /// This context is the initiator of the exchange.
kFlagConnectionClosed = 0x0002, /// This context was associated with a WeaveConnection.
kFlagAutoRequestAck = 0x0004, /// When set, automatically request an acknowledgment whenever a message is sent via UDP.
kFlagDropAck = 0x0008, /// Internal and debug only: when set, the exchange layer does not send an acknowledgment.
kFlagResponseExpected = 0x0010, /// If a response is expected for a message that is being sent.
kFlagAckPending = 0x0020, /// When set, signifies that there is an acknowledgment pending to be sent back.
kFlagPeerRequestedAck = 0x0040, /// When set, signifies that at least one message received on this exchange requested an acknowledgment.
/// This flag is read by the application to decide if it needs to request an acknowledgment for the
/// response message it is about to send. This flag can also indicate whether peer is using WRMP.
kFlagMsgRcvdFromPeer = 0x0080, /// When set, signifies that at least one message has been received from peer on this exchange context.
kFlagAutoReleaseKey = 0x0100, /// Automatically release the message encryption key when the exchange context is freed.
};
/**
* Determine whether the context is the initiator of the exchange.
*
* @return Returns 'true' if it is the initiator, else 'false'.
*
*/
bool ExchangeContext::IsInitiator(void) const
{
return GetFlag(mFlags, kFlagInitiator);
}
/**
* Determine whether the ExchangeContext has an associated active WeaveConnection.
*
* @return Returns 'true' if connection is closed, else 'false'.
*/
bool ExchangeContext::IsConnectionClosed(void) const
{
return GetFlag(mFlags, kFlagConnectionClosed);
}
/**
* Determine whether a response is expected for messages sent over
* this exchange.
*
* @return Returns 'true' if response expected, else 'false'.
*/
bool ExchangeContext::IsResponseExpected(void) const
{
return GetFlag(mFlags, kFlagResponseExpected);
}
/**
* Set the kFlagInitiator flag bit. This flag is set by the node that
* initiates an exchange.
*
* @param[in] inInitiator A Boolean indicating whether (true) or not
* (false) the context is the initiator of
* the exchange.
*
*/
void ExchangeContext::SetInitiator(bool inInitiator)
{
SetFlag(mFlags, kFlagInitiator, inInitiator);
}
/**
* Set the kFlagConnectionClosed flag bit. This flag is set
* when a WeaveConnection associated with an ExchangeContext
* is closed.
*
* @param[in] inConnectionClosed A Boolean indicating whether
* (true) or not (false) the context
* was associated with a connection.
*
*/
void ExchangeContext::SetConnectionClosed(bool inConnectionClosed)
{
SetFlag(mFlags, kFlagConnectionClosed, inConnectionClosed);
}
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
/**
* Determine whether there is already an acknowledgment pending to be sent
* to the peer on this exchange.
*
*/
bool ExchangeContext::IsAckPending(void) const
{
return GetFlag(mFlags, kFlagAckPending);
}
/**
* Determine whether peer requested acknowledgment for at least one message
* on this exchange.
*
* @return Returns 'true' if acknowledgment requested, else 'false'.
*/
bool ExchangeContext::HasPeerRequestedAck(void) const
{
return GetFlag(mFlags, kFlagPeerRequestedAck);
}
/**
* Determine whether at least one message has been received
* on this exchange from peer.
*
* @return Returns 'true' if message received, else 'false'.
*/
bool ExchangeContext::HasRcvdMsgFromPeer(void) const
{
return GetFlag(mFlags, kFlagMsgRcvdFromPeer);
}
/**
* Set if a message has been received from the peer
* on this exchange.
*
* @param[in] inMsgRcvdFromPeer A Boolean indicating whether (true) or not
* (false) a message has been received
* from the peer on this exchange context.
*
*/
void ExchangeContext::SetMsgRcvdFromPeer(bool inMsgRcvdFromPeer)
{
SetFlag(mFlags, kFlagMsgRcvdFromPeer, inMsgRcvdFromPeer);
}
/**
* Set if an acknowledgment needs to be sent back to the peer on this exchange.
*
* @param[in] inAckPending A Boolean indicating whether (true) or not
* (false) an acknowledgment should be sent back
* in response to a received message.
*
*/
void ExchangeContext::SetAckPending(bool inAckPending)
{
SetFlag(mFlags, kFlagAckPending, inAckPending);
}
/**
* Set if an acknowledgment was requested in the last message received
* on this exchange.
*
* @param[in] inPeerRequestedAck A Boolean indicating whether (true) or not
* (false) an acknowledgment was requested
* in the last received message.
*
*/
void ExchangeContext::SetPeerRequestedAck(bool inPeerRequestedAck)
{
SetFlag(mFlags, kFlagPeerRequestedAck, inPeerRequestedAck);
}
/**
* Set whether the WeaveExchangeManager should not send acknowledgements
* for this context.
*
* For internal, debug use only.
*
* @param[in] inDropAck A Boolean indicating whether (true) or not
* (false) the acknowledgements should be not
* sent for the exchange.
*
*/
void ExchangeContext::SetDropAck(bool inDropAck)
{
SetFlag(mFlags, kFlagDropAck, inDropAck);
}
/**
* Determine whether the WeaveExchangeManager should not send an
* acknowledgement.
*
* For internal, debug use only.
*
*/
bool ExchangeContext::ShouldDropAck(void) const
{
return GetFlag(mFlags, kFlagDropAck);
}
static inline bool IsWRMPControlMessage(uint32_t profileId, uint8_t msgType)
{
return (profileId == nl::Weave::Profiles::kWeaveProfile_Common &&
(msgType == nl::Weave::Profiles::Common::kMsgType_WRMP_Throttle_Flow ||
msgType == nl::Weave::Profiles::Common::kMsgType_WRMP_Delayed_Delivery));
}
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
/**
* Set whether a response is expected on this exchange.
*
* @param[in] inResponseExpected A Boolean indicating whether (true) or not
* (false) a response is expected on this
* exchange.
*
*/
void ExchangeContext::SetResponseExpected(bool inResponseExpected)
{
SetFlag(mFlags, kFlagResponseExpected, inResponseExpected);
}
/**
* Returns whether an acknowledgment will be requested whenever a message is sent.
*/
bool ExchangeContext::AutoRequestAck() const
{
return GetFlag(mFlags, kFlagAutoRequestAck);
}
/**
* Set whether an acknowledgment should be requested whenever a message is sent.
*
* @param[in] autoReqAck A Boolean indicating whether or not an
* acknowledgment should be requested whenever a
* message is sent.
*/
void ExchangeContext::SetAutoRequestAck(bool autoReqAck)
{
SetFlag(mFlags, kFlagAutoRequestAck, autoReqAck);
}
/**
* Return whether the encryption key associated with the exchange should be
* released when the exchange is freed.
*/
bool ExchangeContext::GetAutoReleaseKey() const
{
return GetFlag(mFlags, kFlagAutoReleaseKey);
}
/**
* Set whether the encryption key associated with the exchange should be
* released when the exchange is freed.
*
* @param[in] autoReleaseKey True if the message encryption key should be
* automatically released.
*/
void ExchangeContext::SetAutoReleaseKey(bool autoReleaseKey)
{
SetFlag(mFlags, kFlagAutoReleaseKey, autoReleaseKey);
}
/**
* Send a Weave message on this exchange.
*
* @param[in] profileId The profile identifier of the Weave message to be sent.
*
* @param[in] msgType The message type of the corresponding profile.
*
* @param[in] msgBuf A pointer to the PacketBuffer object holding the Weave message.
*
* @param[in] sendFlags Flags set by the application for the Weave message being sent.
*
* @param[in] msgCtxt A pointer to an application-specific context object to be associated
* with the message being sent.
* @retval #WEAVE_ERROR_INVALID_ARGUMENT if an invalid argument was passed to this SendMessage API.
* @retval #WEAVE_ERROR_SEND_THROTTLED if this exchange context has been throttled when using the
* Weave reliable messaging protocol.
* @retval #WEAVE_ERROR_WRONG_MSG_VERSION_FOR_EXCHANGE if there is a mismatch in the specific send operation and the
* Weave message protocol version that is supported. For example,
* this error would be generated if Weave Reliable Messaging
* semantics are being attempted when the Weave message protocol
* version is V1.
* @retval #WEAVE_ERROR_NOT_CONNECTED if the context was associated with a connection that is now
* closed.
* @retval #WEAVE_ERROR_INCORRECT_STATE if the state of the exchange context is incorrect.
* @retval #WEAVE_NO_ERROR if the Weave layer successfully sent the message down to the
* network layer.
*/
WEAVE_ERROR ExchangeContext::SendMessage(uint32_t profileId, uint8_t msgType, PacketBuffer *msgBuf, uint16_t sendFlags,
void *msgCtxt)
{
// Setup the message info structure.
WeaveMessageInfo msgInfo;
msgInfo.Clear();
msgInfo.SourceNodeId = ExchangeMgr->FabricState->LocalNodeId;
msgInfo.DestNodeId = PeerNodeId;
msgInfo.EncryptionType = EncryptionType;
msgInfo.KeyId = KeyId;
return SendMessage(profileId, msgType, msgBuf, sendFlags, &msgInfo, msgCtxt);
}
/**
* Send a Weave message on this exchange.
*
* @param[in] profileId The profile identifier of the Weave message to be sent.
*
* @param[in] msgType The message type of the corresponding profile.
*
* @param[in] msgBuf A pointer to the PacketBuffer object holding the Weave message.
*
* @param[in] sendFlags Flags set by the application for the Weave message being sent.
*
* @param[in] msgInfo A pointer to the WeaveMessageInfo object.
*
* @param[in] msgCtxt A pointer to an application-specific context object to be
* associated with the message being sent.
*
* @retval #WEAVE_ERROR_INVALID_ARGUMENT if an invalid argument was passed to this SendMessage API.
* @retval #WEAVE_ERROR_SEND_THROTTLED if this exchange context has been throttled when using the
* Weave reliable messaging protocol.
* @retval #WEAVE_ERROR_WRONG_MSG_VERSION_FOR_EXCHANGE if there is a mismatch in the specific send operation and the
* Weave message protocol version that is supported. For example,
* this error would be generated if Weave Reliable Messaging
* semantics are being attempted when the Weave message protocol
* version is V1.
* @retval #WEAVE_ERROR_NOT_CONNECTED if the context was associated with a connection that is now
* closed.
* @retval #WEAVE_ERROR_INCORRECT_STATE if the state of the exchange context is incorrect.
* @retval #WEAVE_NO_ERROR if the Weave layer successfully sent the message down to the
* network layer.
*/
WEAVE_ERROR ExchangeContext::SendMessage(uint32_t profileId, uint8_t msgType, PacketBuffer *msgBuf, uint16_t sendFlags,
WeaveMessageInfo * msgInfo, void *msgCtxt)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
bool sendCalled = false;
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
WeaveExchangeManager::RetransTableEntry *entry = NULL;
#endif
#if WEAVE_RETAIN_LOGGING
uint16_t payloadLen = msgBuf->DataLength();
#endif
// Don't let method get called on a freed object.
VerifyOrDie(ExchangeMgr != NULL && mRefCount != 0);
// we hold the exchange context here in case the entity that
// originally generated it tries to close it as a result of
// an error arising below. at the end, we have to close it.
AddRef();
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
// If sending via UDP and the auto-request ACK feature is enabled, automatically
// request an acknowledgment, UNLESS the NoAutoRequestAck send flag has been specified.
if (Con == NULL && (mFlags & kFlagAutoRequestAck) != 0 && (sendFlags & kSendFlag_NoAutoRequestAck) == 0)
{
sendFlags |= kSendFlag_RequestAck;
}
// Do not allow WRM to be used over a TCP connection
if ((sendFlags & kSendFlag_RequestAck) && Con != NULL)
{
ExitNow(err = WEAVE_ERROR_INVALID_ARGUMENT);
}
// Abort early if Throttle is already set;
VerifyOrExit(mWRMPThrottleTimeout == 0, err = WEAVE_ERROR_SEND_THROTTLED);
#else // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
// If WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING == 0, then
// kSendFlag_RequestAck should not be set.
if (sendFlags & kSendFlag_RequestAck)
{
ExitNow(err = WEAVE_ERROR_INVALID_ARGUMENT);
}
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
// Set the Message Protocol Version
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
if (sendFlags & kSendFlag_RequestAck || IsWRMPControlMessage(profileId, msgType) ||
(profileId == nl::Weave::Profiles::kWeaveProfile_Common &&
msgType == nl::Weave::Profiles::Common::kMsgType_Null))
{
if (kWeaveMessageVersion_Unspecified == mMsgProtocolVersion)
{
mMsgProtocolVersion = msgInfo->MessageVersion = kWeaveMessageVersion_V2;
}
else
{
VerifyOrExit(mMsgProtocolVersion == kWeaveMessageVersion_V2, err = WEAVE_ERROR_WRONG_MSG_VERSION_FOR_EXCHANGE);
}
}
#endif
if (kWeaveMessageVersion_Unspecified == mMsgProtocolVersion)
{
mMsgProtocolVersion = msgInfo->MessageVersion = kWeaveMessageVersion_V1;
}
else
{
msgInfo->MessageVersion = mMsgProtocolVersion;
}
// Prevent sending if the context was associated with a connection that is now closed.
VerifyOrExit(!IsConnectionClosed(), err = WEAVE_ERROR_NOT_CONNECTED);
// TODO: implement support for retransmissions.
// flag validation
if (sendFlags & kSendFlag_RetransmissionTrickle)
{
//We do not allow WRM to be used when Trickle retransmission is requested
if (sendFlags & kSendFlag_RequestAck)
{
ExitNow(err = WEAVE_ERROR_INVALID_ARGUMENT);
}
// We do not support trickle retrasnmissions over
// connection-oriented exchanges
VerifyOrExit(Con == NULL, err=WEAVE_ERROR_INVALID_ARGUMENT);
if (0 == RetransInterval)
{ // we're not retransmitting, do not hold onto the buffer
sendFlags &= ~kSendFlag_RetainBuffer;
}
else
{
sendFlags |= kSendFlag_RetainBuffer;
msg = msgBuf;
}
}
// Add the exchange header to the message buffer.
WeaveExchangeHeader exchangeHeader;
memset(&exchangeHeader, 0, sizeof(exchangeHeader));
err = EncodeExchHeader(&exchangeHeader, profileId, msgType, msgBuf, sendFlags);
SuccessOrExit(err);
// If a response message is expected...
if ((sendFlags & kSendFlag_ExpectResponse) != 0)
{
// Only one 'response expected' message can be outstanding at a time.
VerifyOrExit(!IsResponseExpected(), err = WEAVE_ERROR_INCORRECT_STATE);
SetResponseExpected(true);
// Arm the response timer if a timeout has been specified.
if (ResponseTimeout > 0)
{
err = StartResponseTimer();
SuccessOrExit(err);
}
}
//Fill in appropriate message header flags
if (sendFlags & kSendFlag_DelaySend)
msgInfo->Flags |= kWeaveMessageFlag_DelaySend;
//FIXME: RS: possibly unnecessary, should addref instead
if (sendFlags & kSendFlag_RetainBuffer)
msgInfo->Flags |= kWeaveMessageFlag_RetainBuffer;
if (sendFlags & kSendFlag_AlreadyEncoded)
msgInfo->Flags |= kWeaveMessageFlag_MessageEncoded;
if (sendFlags & kSendFlag_ReuseMessageId)
msgInfo->Flags |= kWeaveMessageFlag_ReuseMessageId;
if (sendFlags & kSendFlag_ReuseSourceId)
msgInfo->Flags |= kWeaveMessageFlag_ReuseSourceId;
if (sendFlags & kSendFlag_MulticastFromLinkLocal)
msgInfo->Flags |= kWeaveMessageFlag_MulticastFromLinkLocal;
// Send the message via UDP or TCP based on the presence of a connection.
if (Con != NULL)
{
// Hook the message received callback on the connection so that the WeaveExchangeManager gets
// called when messages arrive.
Con->OnMessageReceived = WeaveExchangeManager::HandleMessageReceived;
err = Con->SendMessage(msgInfo, msgBuf);
msgBuf = NULL;
sendCalled = true;
}
else
{
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
if (sendFlags & kSendFlag_RequestAck)
{
err = ExchangeMgr->MessageLayer->SelectDestNodeIdAndAddress(msgInfo->DestNodeId, PeerAddr);
SuccessOrExit(err);
err = ExchangeMgr->MessageLayer->EncodeMessage(PeerAddr, PeerPort, PeerIntf,
msgInfo, msgBuf);
SuccessOrExit(err);
// Copy msg to a right-sized buffer if applicable
msgBuf = PacketBuffer::RightSize(msgBuf);
//Add to Table for subsequent sending
err = ExchangeMgr->AddToRetransTable(this, msgBuf, msgInfo->MessageId, msgCtxt, &entry);
SuccessOrExit(err);
msgBuf = NULL;
err = ExchangeMgr->SendFromRetransTable(entry);
sendCalled = true;
SuccessOrExit(err);
WEAVE_FAULT_INJECT(FaultInjection::kFault_WRMDoubleTx,
entry->nextRetransTime = 0;
ExchangeMgr->WRMPStartTimer()
);
}
else
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
{
err = ExchangeMgr->MessageLayer->SendMessage(PeerAddr, PeerPort, PeerIntf,
msgInfo, msgBuf);
msgBuf = NULL;
sendCalled = true;
SuccessOrExit(err);
}
if (sendFlags & kSendFlag_RetransmissionTrickle)
{
currentBcastMsgID = msgInfo->MessageId;
if (RetransInterval != 0)
{
if (StartTimerT() != WEAVE_NO_ERROR)
{
nlLogError("EC: cant start T\n");
}
}
}
}
exit:
if (sendCalled)
{
#if defined(DEBUG)
WeaveLogRetain(ExchangeManager, "Msg %s %08" PRIX32 ":%d %d %016" PRIX64 " %04" PRIX16 " %04" PRIX16 " %ld MsgId:%08" PRIX32,
"sent", profileId, msgType, (int)payloadLen, msgInfo->DestNodeId,
(Con ? Con->LogId() : 0), ExchangeId, (long)err, msgInfo->MessageId);
#else
WeaveLogRetain(ExchangeManager, "Msg %s %08" PRIX32 ":%d %d %016" PRIX64 " %04" PRIX16 " %04" PRIX16" %ld",
"sent", profileId, msgType, (int)payloadLen, msgInfo->DestNodeId,
(Con ? Con->LogId() : 0), ExchangeId, (long)err);
#endif
}
if (err != WEAVE_NO_ERROR && IsResponseExpected())
{
CancelResponseTimer();
SetResponseExpected(false);
}
if (msgBuf != NULL && (sendFlags & kSendFlag_RetainBuffer) == 0)
{
PacketBuffer::Free(msgBuf);
if (msg == msgBuf)
msg = NULL;
}
//Release the reference to the exchange context acquired above. Under normal circumstances
//this will merely decrement the reference count, without actually freeing the exchange context.
//However if one of the function calls in this method resulted in a callback to the application,
//the application may have released its reference, resulting in the exchange context actually
//being freed here.
Release();
return err;
}
/**
* Send a Common::Null message.
*
* @note When sent via UDP, the null message is sent *without* requesting an acknowledgment,
* even in the case where the auto-request acknowledgment feature has been enabled on the
* exchange.
*
* @retval #WEAVE_ERROR_NO_MEMORY If no available PacketBuffers.
* @retval #WEAVE_NO_ERROR If the method succeeded or the error wasn't critical.
* @retval other Another critical error returned by SendMessage().
*
*/
WEAVE_ERROR ExchangeContext::SendCommonNullMessage(void)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer *msgBuf = NULL;
// Allocate a buffer for the null message
msgBuf = PacketBuffer::NewWithAvailableSize(0);
VerifyOrExit(msgBuf != NULL, err = WEAVE_ERROR_NO_MEMORY);
// Send the null message
err = SendMessage(nl::Weave::Profiles::kWeaveProfile_Common,
nl::Weave::Profiles::Common::kMsgType_Null, msgBuf,
kSendFlag_NoAutoRequestAck);
msgBuf = NULL;
exit:
if (WeaveMessageLayer::IsSendErrorNonCritical(err))
{
WeaveLogError(ExchangeManager, "Non-crit err %ld sending solitary ack",
long(err));
err = WEAVE_NO_ERROR;
}
if (err != WEAVE_NO_ERROR)
{
WeaveLogError(ExchangeManager, "Failed to send Solitary ack for MsgId:%08" PRIX32 " to Peer %016" PRIX64 ":%ld",
mPendingPeerAckId, PeerNodeId, (long)err);
}
return err;
}
/**
* Encode the exchange header into a message buffer.
*
* @param[in] exchangeHeader A pointer to the Weave Exchange header object.
*
* @param[in] profileId The profile identifier of the Weave message to be sent.
*
* @param[in] msgType The message type of the corresponding profile.
*
* @param[in] msgBuf A pointer to the PacketBuffer needed for the Weave message.
*
* @param[in] sendFlags Flags set by the application for the Weave message being sent.
*
*
* @retval #WEAVE_ERROR_BUFFER_TOO_SMALL If the message buffer does not have sufficient space
* for encoding the exchange header.
* @retval #WEAVE_NO_ERROR If encoding of the message was successful.
*/
WEAVE_ERROR ExchangeContext::EncodeExchHeader(WeaveExchangeHeader *exchangeHeader, uint32_t profileId, uint8_t msgType,
PacketBuffer *msgBuf, uint16_t sendFlags)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
// Fill the exchange header to the message buffer.
exchangeHeader->Version = kWeaveExchangeVersion_V1;
exchangeHeader->ExchangeId = ExchangeId;
exchangeHeader->ProfileId = profileId;
exchangeHeader->MessageType = msgType;
// sendFlags under special circumstances (such as a retransmission
// of the remote alarm) can override the initiator flag in the
// exchange header. The semantics here really is: use the
// ExchangeId in the namespace of the SourceNodeId
exchangeHeader->Flags = (IsInitiator() || (sendFlags & kSendFlag_FromInitiator)) ? kWeaveExchangeFlag_Initiator : 0;
// WRMP PreProcess Checks and Flag setting
if (mMsgProtocolVersion == kWeaveMessageVersion_V2)
{
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
//If there is a pending acknowledgment piggyback it on this message.
//If there is no pending acknowledgment piggyback the last Ack that was sent.
// - HasPeerRequestedAck() is used to verify that AckId field is valid
// to avoid piggybacking uninitialized AckId.
if (HasPeerRequestedAck())
{
// Expire any virtual ticks that have expired so all wakeup sources reflect the current time
ExchangeMgr->WRMPExpireTicks();
exchangeHeader->Flags |= kWeaveExchangeFlag_AckId;
exchangeHeader->AckMsgId = mPendingPeerAckId;
//Set AckPending flag to false after setting the Ack flag;
SetAckPending(false);
// Schedule next physical wakeup
ExchangeMgr->WRMPStartTimer();
#if defined(DEBUG)
WeaveLogProgress(ExchangeManager, "Piggybacking Ack for MsgId:%08" PRIX32 " with msg",
mPendingPeerAckId);
#endif
}
//Assert the flag if message requires an Ack back;
if ((sendFlags & kSendFlag_RequestAck) && !IsWRMPControlMessage(profileId, msgType))
{
exchangeHeader->Flags |= kWeaveExchangeFlag_NeedsAck;
}
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
}
err = WeaveExchangeManager::PrependHeader(exchangeHeader, msgBuf);
return err;
}
/**
* Cancel the Trickle retransmission mechanism.
*
*/
void ExchangeContext::CancelRetrans()
{
// NOTE: modify for other retransmission schemes
TeardownTrickleRetransmit();
}
/**
* Increment the reference counter for the exchange context by one.
*
*/
void ExchangeContext::AddRef()
{
mRefCount++;
#if defined(WEAVE_EXCHANGE_CONTEXT_DETAIL_LOGGING)
WeaveLogProgress(ExchangeManager, "ec id: %d [%04" PRIX16 "], refCount++: %d", EXCHANGE_CONTEXT_ID(this - ExchangeMgr->ContextPool), ExchangeId, mRefCount);
#endif
}
void ExchangeContext::DoClose(bool clearRetransTable)
{
// Clear app callbacks
OnMessageReceived = NULL;
OnResponseTimeout = NULL;
OnRetransmissionTimeout = NULL;
OnConnectionClosed = NULL;
OnKeyError = NULL;
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
// Expire any virtual ticks that have expired so all wakeup sources reflect the current time
ExchangeMgr->WRMPExpireTicks();
OnThrottleRcvd = NULL;
OnDDRcvd = NULL;
OnSendError = NULL;
OnAckRcvd = NULL;
// Flush any pending WRM acks
WRMPFlushAcks();
// Clear the WRM retransmission table
if (clearRetransTable)
{
ExchangeMgr->ClearRetransmitTable(this);
}
// Schedule next physical wakeup
ExchangeMgr->WRMPStartTimer();
#endif
// Cancel the trickle retransmission timer.
CancelRetrans();
// Cancel the response timer.
CancelResponseTimer();
}
/**
* Gracefully close an exchange context. This call decrements the
* reference count and releases the exchange when the reference
* count goes to zero.
*
*/
void ExchangeContext::Close()
{
VerifyOrDie(ExchangeMgr != NULL && mRefCount != 0);
DoClose(false);
Release();
}
/**
* Abort the Exchange context immediately and release all
* references to it.
*
*/
void ExchangeContext::Abort()
{
VerifyOrDie(ExchangeMgr != NULL && mRefCount != 0);
DoClose(true);
Release();
}
/**
* Release reference to this exchange context. If count is down
* to one then close the context, reset all application callbacks,
* and stop all timers.
*
*/
void ExchangeContext::Release(void)
{
VerifyOrDie(ExchangeMgr != NULL && mRefCount != 0);
if (mRefCount == 1)
{
//Ideally, in this scenario, the retransmit table should
//be clear of any outstanding messages for this context and
//the boolean parameter passed to DoClose() should not matter.
WeaveExchangeManager *em = ExchangeMgr;
#if defined(WEAVE_EXCHANGE_CONTEXT_DETAIL_LOGGING)
uint16_t tmpid = ExchangeId;
#endif
// If so configured, automatically release any reservation held on
// the message encryption key.
if (GetAutoReleaseKey())
{
em->MessageLayer->SecurityMgr->ReleaseKey(PeerNodeId, KeyId);
}
DoClose(false);
mRefCount = 0;
ExchangeMgr = NULL;
em->mContextsInUse--;
em->MessageLayer->SignalMessageLayerActivityChanged();
#if defined(WEAVE_EXCHANGE_CONTEXT_DETAIL_LOGGING)
WeaveLogProgress(ExchangeManager, "ec-- id: %d [%04" PRIX16 "], inUse: %d, addr: 0x%x", EXCHANGE_CONTEXT_ID(this - em->ContextPool), tmpid, em->mContextsInUse, this);
#endif
SYSTEM_STATS_DECREMENT(nl::Weave::System::Stats::kExchangeMgr_NumContexts);
}
else
{
mRefCount--;
#if defined(WEAVE_EXCHANGE_CONTEXT_DETAIL_LOGGING)
WeaveLogProgress(ExchangeManager, "ec id: %d [%04" PRIX16 "], refCount--: %d", EXCHANGE_CONTEXT_ID(this - ExchangeMgr->ContextPool), ExchangeId, mRefCount);
#endif
}
}
WEAVE_ERROR ExchangeContext::ResendMessage()
{
WeaveMessageInfo msgInfo;
WEAVE_ERROR res;
uint8_t * payload;
if (msg == NULL)
{
return WEAVE_ERROR_INCORRECT_STATE;
}
msgInfo.Clear();
msgInfo.MessageVersion = mMsgProtocolVersion;
msgInfo.SourceNodeId = ExchangeMgr->FabricState->LocalNodeId;
msgInfo.EncryptionType = EncryptionType;
msgInfo.KeyId = KeyId;
msgInfo.DestNodeId = PeerNodeId;
res = ExchangeMgr->MessageLayer->DecodeHeader(msg, &msgInfo, &payload);
if (res != WEAVE_NO_ERROR)
return WEAVE_ERROR_INCORRECT_STATE;
msgInfo.Flags |=
kWeaveMessageFlag_RetainBuffer |
kWeaveMessageFlag_MessageEncoded |
kWeaveMessageFlag_ReuseMessageId |
kWeaveMessageFlag_ReuseSourceId;
return ExchangeMgr->MessageLayer->ResendMessage(PeerAddr, PeerPort, PeerIntf, &msgInfo, msg);
}
/**
* Start the Trickle rebroadcast algorithm's periodic retransmission timer mechanism.
*
* @return #WEAVE_NO_ERROR if successful, else an INET_ERROR mapped into a WEAVE_ERROR.
*
*/
WEAVE_ERROR ExchangeContext::StartTimerT()
{
if (RetransInterval == 0)
{
return WEAVE_NO_ERROR;
}
// range from 1 to RetransInterval
backoff = 1 + (GetRandU32() % (RetransInterval-1));
msgsReceived = 0;
WeaveLogDetail(ExchangeManager, "Trickle new interval");
return ExchangeMgr->MessageLayer->SystemLayer->StartTimer(backoff, TimerTau, this);
}
void ExchangeContext::TimerT(System::Layer* aSystemLayer, void* aAppState, System::Error aError)
{
ExchangeContext* client = reinterpret_cast<ExchangeContext*>(aAppState);
if ( (aSystemLayer == NULL) || (aAppState == NULL) || (aError != WEAVE_SYSTEM_NO_ERROR))
{
return;
}
if (client->StartTimerT() != WEAVE_NO_ERROR)
{
nlLogError("EC: cant start T\n");
}
}
void ExchangeContext::TimerTau(System::Layer* aSystemLayer, void* aAppState, System::Error aError)
{
ExchangeContext* ec = reinterpret_cast<ExchangeContext*>(aAppState);
if ( (aSystemLayer == NULL) || (aAppState == NULL) || (aError != WEAVE_SYSTEM_NO_ERROR))
{
return;
}
if (ec->msgsReceived < ec->rebroadcastThreshold)
{
WeaveLogDetail(ExchangeManager, "Trickle re-send with duplicate message counter: %u", ec->msgsReceived);
ec->ResendMessage();
}
else
{
WeaveLogDetail(ExchangeManager, "Trickle skipping this interval");
}
if ((ec->RetransInterval == 0) || (ec->RetransInterval <= ec->backoff))
{
return;
}
if (aSystemLayer->StartTimer(ec->RetransInterval - ec->backoff, TimerT, ec) != WEAVE_NO_ERROR)
{
nlLogError("EC: cant start Tau\n");
}
}
bool ExchangeContext::MatchExchange(WeaveConnection *msgCon, const WeaveMessageInfo *msgInfo,
const WeaveExchangeHeader *exchangeHeader)
{
// A given message is part of a particular exchange if...
return
// The exchange identifier of the message matches the exchange identifier of the context.
(ExchangeId == exchangeHeader->ExchangeId)
// AND The message was received over the connection bound to the context, or the
// message was received over UDP (msgCon == NULL) and the context is not bound
// to a connection (Con == NULL).
&& (Con == msgCon)
// AND The message was received from the peer node associated with the exchange, or the peer node identifier is 'any'.
&& (((PeerNodeId == kAnyNodeId) && (msgInfo->DestNodeId == ExchangeMgr->FabricState->LocalNodeId))
|| (PeerNodeId == msgInfo->SourceNodeId))
// AND The message was sent by an initiator and the exchange context is a responder (IsInitiator==false)
// OR The message was sent by a responder and the exchange context is an initiator (IsInitiator==true) (for the broadcast case, the initiator is ill defined)
&& (((exchangeHeader->Flags & kWeaveExchangeFlag_Initiator) != 0) != IsInitiator());
}
/**
* Handle trickle message within the exchange context.
*
* @param[in] pktInfo A pointer to the IPPacketInfo object.
*
* @param[in] msgInfo A pointer to the Weave message info structure.
*
*/
void ExchangeContext::HandleTrickleMessage(const IPPacketInfo *pktInfo, const WeaveMessageInfo *msgInfo)
{
// check if we're at all interested in this message
const bool isMessageIdMatching = currentBcastMsgID == msgInfo->MessageId;
const bool isNodeIdMatching = (PeerNodeId == kAnyNodeId) || (PeerNodeId == msgInfo->SourceNodeId);
if (isMessageIdMatching &&
isNodeIdMatching)
{
msgsReceived++;
WeaveLogDetail(ExchangeManager, "Increasing trickle duplicate message counter: %u", msgsReceived);
}
else
{
WeaveLogDetail(ExchangeManager, "Not counted as duplicate message, for MsgId:%08" PRIX32 " NodeId:%d",
isMessageIdMatching, isNodeIdMatching);
}
}
/**
* Setup the trickle retransmission mechanism by setting the corresponding retransmission interval
* and rebroadcast threshold.
*
* @param[in] retransInterval The retransmit interval of the Trickle rebroadcast algorithm.
*
* @param[in] threshold The maximum number of times a message is rebroadcast.
*
* @param[in] timeout The maximum time to wait before canceling the Trickle retransmission timer.
*
* @return #WEAVE_NO_ERROR if Trickle setup was successful, else an INET_ERROR mapped into a WEAVE_ERROR.
*
*/
WEAVE_ERROR ExchangeContext::SetupTrickleRetransmit(uint32_t retransInterval, uint8_t threshold, uint32_t timeout)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
CancelRetrans();
RetransInterval = retransInterval;
rebroadcastThreshold = threshold;
if (timeout != 0)
{
err = ExchangeMgr->MessageLayer->SystemLayer->StartTimer(timeout, CancelRetransmissionTimer, this);
if (err != WEAVE_NO_ERROR)
{
nlLogError("EC: cant setup timeout\n");
return err;
}
}
WeaveLogDetail(ExchangeManager, "Trickle interval %u ms, threshold %u, timeout %u ms", retransInterval, threshold, timeout);
return WEAVE_NO_ERROR;
}
/**
* Tear down the Trickle retransmission mechanism by canceling the periodic timers
* within Trickle and freeing the message buffer holding the Weave
* message.
*
*/
void ExchangeContext::TeardownTrickleRetransmit()
{
System::Layer* lSystemLayer = ExchangeMgr->MessageLayer->SystemLayer;
if (lSystemLayer == NULL)
{
// this is an assertion error, which shall never happen
return;
}
lSystemLayer->CancelTimer(TimerT, this);
lSystemLayer->CancelTimer(TimerTau, this);
lSystemLayer->CancelTimer(CancelRetransmissionTimer, this);
if (msg != NULL)
{
PacketBuffer::Free(msg);
}
msg = NULL;
backoff = 0;
RetransInterval = 0;
}
void ExchangeContext::CancelRetransmissionTimer(System::Layer* aSystemLayer, void* aAppState, System::Error aError)
{
ExchangeContext* ec = reinterpret_cast<ExchangeContext*>(aAppState);
if (ec == NULL)
return;
ec->CancelRetrans();
if (ec->OnRetransmissionTimeout != NULL)
{
ec->OnRetransmissionTimeout(ec);
}
}
WEAVE_ERROR ExchangeContext::StartResponseTimer()
{
return ExchangeMgr->MessageLayer->SystemLayer->StartTimer(ResponseTimeout, HandleResponseTimeout, this);
}
void ExchangeContext::CancelResponseTimer()
{
ExchangeMgr->MessageLayer->SystemLayer->CancelTimer(HandleResponseTimeout, this);
}
void ExchangeContext::HandleResponseTimeout(System::Layer* aSystemLayer, void* aAppState, System::Error aError)
{
ExchangeContext* ec = reinterpret_cast<ExchangeContext*>(aAppState);
// NOTE: we don't set mResponseExpected to false here because the response could still arrive. If the user
// wants to never receive the response, they must close the exchange context.
// Call the user's timeout handler.
if (ec->OnResponseTimeout != NULL)
ec->OnResponseTimeout(ec);
}
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
bool ExchangeContext::WRMPCheckAndRemRetransTable(uint32_t ackMsgId, void **rCtxt)
{
bool res = false;
for (int i = 0; i < WEAVE_CONFIG_WRMP_RETRANS_TABLE_SIZE; i++)
{
if ((ExchangeMgr->RetransTable[i].exchContext == this) &&
(ExchangeMgr->RetransTable[i].msgId == ackMsgId))
{
//Return context value
*rCtxt = ExchangeMgr->RetransTable[i].msgCtxt;
//Clear the entry from the retransmision table.
ExchangeMgr->ClearRetransmitTable(ExchangeMgr->RetransTable[i]);
#if defined(DEBUG)
WeaveLogProgress(ExchangeManager, "Rxd Ack; Removing MsgId:%08" PRIX32 " from Retrans Table",
ackMsgId);
#endif
res = true;
break;
}
else
{
continue;
}
}
return res;
}
//Flush the pending Ack
WEAVE_ERROR ExchangeContext::WRMPFlushAcks(void)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
if (IsAckPending())
{
//Send the acknowledgment as a Common::Null message
err = SendCommonNullMessage();
if (err == WEAVE_NO_ERROR)
{
#if defined(DEBUG)
WeaveLogProgress(ExchangeManager, "Flushed pending ack for MsgId:%08" PRIX32 " to Peer %016" PRIX64,
mPendingPeerAckId, PeerNodeId);
#endif
}
}
return err;
}
/**
* Get the current retransmit timeout. It would be either the initial or
* the active retransmit timeout based on whether the ExchangeContext has
* an active message exchange going with its peer.
*
* @return the current retransmit time.
*/
uint32_t ExchangeContext::GetCurrentRetransmitTimeout(void)
{
return (HasRcvdMsgFromPeer() ? mWRMPConfig.mActiveRetransTimeout :
mWRMPConfig.mInitialRetransTimeout);
}
/**
* Send a Throttle Flow message to the peer node requesting it to throttle its sending of messages.
*
* @note
* This message is part of the Weave Reliable Messaging protocol.
*
* @param[in] pauseTimeMillis The time (in milliseconds) that the recipient is expected
* to throttle its sending.
* @retval #WEAVE_ERROR_INVALID_ARGUMENT If an invalid argument was passed to this SendMessage API.
* @retval #WEAVE_ERROR_SEND_THROTTLED If this exchange context has been throttled when using the
* Weave reliable messaging protocol.
* @retval #WEAVE_ERROR_WRONG_MSG_VERSION_FOR_EXCHANGE If there is a mismatch in the specific send operation and the
* Weave message protocol version that is supported. For example,
* this error would be generated if Weave Reliable Messaging
* semantics are being attempted when the Weave message protocol
* version is V1.
* @retval #WEAVE_ERROR_NOT_CONNECTED If the context was associated with a connection that is now
* closed.
* @retval #WEAVE_ERROR_INCORRECT_STATE If the state of the exchange context is incorrect.
* @retval #WEAVE_NO_ERROR If the Weave layer successfully sent the message down to the
* network layer.
*
*/
WEAVE_ERROR ExchangeContext::WRMPSendThrottleFlow(uint32_t pauseTimeMillis)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer *msgBuf = NULL;
uint8_t *p = NULL;
uint8_t msgLen = sizeof(pauseTimeMillis);
msgBuf = PacketBuffer::NewWithAvailableSize(msgLen);
VerifyOrExit(msgBuf != NULL, err = WEAVE_ERROR_NO_MEMORY);
p = msgBuf->Start();
//Encode the fields in the buffer
LittleEndian::Write32(p, pauseTimeMillis);
msgBuf->SetDataLength(msgLen);
// Send a Throttle Flow message to the peer. Throttle Flow messages must never request
// acknowledgment, so suppress the auto-request ACK feature on the exchange in case it has been
// enabled by the application.
err = SendMessage(Profiles::kWeaveProfile_Common, Profiles::Common::kMsgType_WRMP_Throttle_Flow,
msgBuf, kSendFlag_NoAutoRequestAck);
exit:
msgBuf = NULL;
return err;
}
/**
* Send a Delayed Delivery message to notify a sender node that its previously sent message would experience an expected
* delay before being delivered to the recipient. One of the possible causes for messages to be delayed before being
* delivered is when the recipient end node is sleepy. This message is potentially generated by a suitable intermediate
* node in the send path who has enough knowledge of the recipient to infer about the delayed delivery. Upon receiving
* this message, the sender would re-adjust its retransmission timers for messages that seek acknowledgments back.
*
* @note
* This message is part of the Weave Reliable Messaging protocol.
*
* @param[in] pauseTimeMillis The time (in milliseconds) that the previously sent message is expected
* to be delayed before being delivered.
*
* @param[in] delayedNodeId The node identifier of the peer node to whom the mesage delivery would be delayed.
*
* @retval #WEAVE_ERROR_INVALID_ARGUMENT if an invalid argument was passed to this SendMessage API.
* @retval #WEAVE_ERROR_WRONG_MSG_VERSION_FOR_EXCHANGE if there is a mismatch in the specific send operation and the
* Weave message protocol version that is supported. For example,
* this error would be generated if Weave Reliable Messaging
* semantics are being attempted when the Weave message protocol
* version is V1.
* @retval #WEAVE_ERROR_NOT_CONNECTED if the context was associated with a connection that is now
* closed.
* @retval #WEAVE_ERROR_INCORRECT_STATE if the state of the exchange context is incorrect.
* @retval #WEAVE_NO_ERROR if the Weave layer successfully sent the message down to the
* network layer.
*
*/
WEAVE_ERROR ExchangeContext::WRMPSendDelayedDelivery(uint32_t pauseTimeMillis, uint64_t delayedNodeId)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer *msgBuf = NULL;
uint8_t *p = NULL;
uint8_t msgLen = sizeof(pauseTimeMillis) + sizeof(delayedNodeId);
msgBuf = PacketBuffer::NewWithAvailableSize(msgLen);
VerifyOrExit(msgBuf != NULL, err = WEAVE_ERROR_NO_MEMORY);
p = msgBuf->Start();
//Set back the pointer by the length of the fields
//Encode the fields in the buffer
LittleEndian::Write32(p, pauseTimeMillis);
LittleEndian::Write64(p, delayedNodeId);
msgBuf->SetDataLength(msgLen);
// Send a Delayed Delivery message to the peer. Delayed Delivery messages must never request
// acknowledgment, so suppress the auto-request ACK feature on the exchange in case it has been
// enabled by the application.
err = SendMessage(Profiles::kWeaveProfile_Common, Profiles::Common::kMsgType_WRMP_Delayed_Delivery,
msgBuf, kSendFlag_NoAutoRequestAck);
exit:
msgBuf = NULL;
return err;
}
/**
* Process received Ack. Remove the corresponding message context from the RetransTable and execute the application
* callback
*
* @note
* This message is part of the Weave Reliable Messaging protocol.
*
* @param[in] exchHeader Weave exchange information for incoming Ack message.
*
* @param[in] msgInfo General Weave message information for the incoming Ack message.
*
* @retval #WEAVE_ERROR_INVALID_ACK_ID if the msgId of received Ack is not in the RetransTable.
* @retval #WEAVE_NO_ERROR if the context was removed.
*
*/
WEAVE_ERROR ExchangeContext::WRMPHandleRcvdAck(const WeaveExchangeHeader *exchHeader, const WeaveMessageInfo *msgInfo)
{
void *msgCtxt = NULL;
WEAVE_ERROR err = WEAVE_NO_ERROR;
//Msg is an Ack; Check Retrans Table and remove message context
if (!WRMPCheckAndRemRetransTable(exchHeader->AckMsgId, &msgCtxt))
{
#if defined(DEBUG)
WeaveLogError(ExchangeManager, "Weave MsgId:%08" PRIX32" not in RetransTable",
exchHeader->AckMsgId);
#endif
err = WEAVE_ERROR_INVALID_ACK_ID;
//Optionally call an application callback with this error.
}
else
{
//Call OnAckRcvd Here
if (OnAckRcvd)
{
OnAckRcvd(this, msgCtxt);
}
else
{
WeaveLogDetail(ExchangeManager,
"No App Handler for Ack");
}
#if defined(DEBUG)
WeaveLogProgress(ExchangeManager, "Removed Weave MsgId:%08" PRIX32 " from RetransTable",
exchHeader->AckMsgId);
#endif
}
return err;
}
WEAVE_ERROR ExchangeContext::WRMPHandleNeedsAck(const WeaveMessageInfo *msgInfo)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
// Expire any virtual ticks that have expired so all wakeup sources reflect the current time
ExchangeMgr->WRMPExpireTicks();
// If the message IS a duplicate.
if (msgInfo->Flags & kWeaveMessageFlag_DuplicateMessage)
{
#if defined(DEBUG)
WeaveLogProgress(ExchangeManager, "Forcing tx of solitary ack for duplicate MsgId:%08" PRIX32,
msgInfo->MessageId);
#endif
// Is there pending ack for a different message id.
bool wasAckPending = IsAckPending() && mPendingPeerAckId != msgInfo->MessageId;
// Temporary store currently pending ack id (even if there is none).
uint32_t tempAckId = mPendingPeerAckId;
// Set the pending ack id.
mPendingPeerAckId = msgInfo->MessageId;
// Send the Ack for the duplication message in a Common::Null message.
err = SendCommonNullMessage();
// If there was pending ack for a different message id.
if (wasAckPending)
{
// Restore previously pending ack id.
mPendingPeerAckId = tempAckId;
SetAckPending(true);
}
SuccessOrExit(err);
}
// Otherwise, the message IS NOT a duplicate.
else
{
if (IsAckPending())
{
#if defined(DEBUG)
WeaveLogProgress(ExchangeManager, "Pending ack queue full; forcing tx of solitary ack for MsgId:%08" PRIX32,
mPendingPeerAckId);
#endif
// Send the Ack for the currently pending Ack in a Common::Null message.
err = SendCommonNullMessage();
SuccessOrExit(err);
}
// Replace the Pending ack id.
mPendingPeerAckId = msgInfo->MessageId;
mWRMPNextAckTime = ExchangeMgr->GetTickCounterFromTimeDelta(mWRMPConfig.mAckPiggybackTimeout + System::Timer::GetCurrentEpoch(), ExchangeMgr->mWRMPTimeStampBase);
SetAckPending(true);
}
exit:
// Schedule next physical wakeup
ExchangeMgr->WRMPStartTimer();
return err;
}
WEAVE_ERROR ExchangeContext::HandleThrottleFlow(uint32_t PauseTimeMillis)
{
// Expire any virtual ticks that have expired so all wakeup sources reflect the current time
ExchangeMgr->WRMPExpireTicks();
// Flow Control Message Received; Adjust Throttle timeout accordingly.
// A PauseTimeMillis of zero indicates that peer is unthrottling this Exchange.
if (0 != PauseTimeMillis)
{
mWRMPThrottleTimeout = ExchangeMgr->GetTickCounterFromTimeDelta((System::Timer::GetCurrentEpoch() + PauseTimeMillis),
ExchangeMgr->mWRMPTimeStampBase);
}
else
{
mWRMPThrottleTimeout = 0;
}
// Go through the retrans table entries for that node and adjust the timer.
for (int i = 0; i < WEAVE_CONFIG_WRMP_RETRANS_TABLE_SIZE; i++)
{
// Check if ExchangeContext matches
if (ExchangeMgr->RetransTable[i].exchContext == this)
{
// Adjust the retrans timer value to account for throttling.
if (0 != PauseTimeMillis)
{
ExchangeMgr->RetransTable[i].nextRetransTime += PauseTimeMillis / ExchangeMgr->mWRMPTimerInterval;
}
// UnThrottle when PauseTimeMillis is set to 0
else
{
ExchangeMgr->RetransTable[i].nextRetransTime = 0;
}
break;
}
}
// Call OnThrottleRcvd application callback
if (OnThrottleRcvd)
{
OnThrottleRcvd(this, PauseTimeMillis);
}
else
{
WeaveLogDetail(ExchangeManager,
"No App Handler for Throttle Message");
}
// Schedule next physical wakeup
ExchangeMgr->WRMPStartTimer();
return WEAVE_NO_ERROR;
}
#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
/**
* @overload
*/
WEAVE_ERROR ExchangeContext::HandleMessage(WeaveMessageInfo *msgInfo, const WeaveExchangeHeader *exchHeader, PacketBuffer *msgBuf)
{
return HandleMessage(msgInfo, exchHeader, msgBuf, NULL);
}
/**
* Handle a message in the context of an Exchange. This method processes ACKs and duplicate messages
* and then invokes the application handler.
*
* Note on OnMessageReceived and the umhandler argument:
* When the ExchangeManager creates a new EC for an inbound message,
* OnMessageReceived is set by default to a handler that drops the message, so
* any future message on the exchange is discarded unless the application
* installs an OnMessageReceived handler.
* The unsolicited message that triggers the creation of the EC is
* handled by an UMH, which is passed to this method via the umhandler param.
*
* @param[in] msgInfo General Weave message information for the incoming message.
* @param[in] exchHeader Weave exchange information for the incoming message.
* @param[in] msgBuf PacketBuffer containing the payload of the incoming message.
* @param[in] umhandler Pointer to a message receive callback function; if this function
* is not NULL it will be used in place of the OnMessageReceived function
* installed in the ExchangeContext.
*
*/
WEAVE_ERROR ExchangeContext::HandleMessage(WeaveMessageInfo *msgInfo, const WeaveExchangeHeader *exchHeader, PacketBuffer *msgBuf,
ExchangeContext::MessageReceiveFunct umhandler)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
const uint8_t *p = NULL;
uint32_t PauseTimeMillis = 0;
#endif
// We hold a reference to the ExchangeContext here to
// guard against Close() calls(decrementing the reference
// count) by the application before the Weave Exchange
// layer has completed its work on the ExchangeContext.
AddRef();
if (msgInfo->MessageVersion == kWeaveMessageVersion_V2)
{
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
if (exchHeader->Flags & kWeaveExchangeFlag_AckId)
{
err = WRMPHandleRcvdAck(exchHeader, msgInfo);
}
if (exchHeader->Flags & kWeaveExchangeFlag_NeedsAck)
{
//Set the flag in message header indicating an ack requested by peer;
msgInfo->Flags |= kWeaveMessageFlag_PeerRequestedAck;
//Set the flag in the exchange context indicating an ack requested;
SetPeerRequestedAck(true);
if (!ShouldDropAck())
{
err = WRMPHandleNeedsAck(msgInfo);
}
}
#endif
}
// If the message is a duplicate and duplicates are not allowed for this exchange.
if ((msgInfo->Flags & kWeaveMessageFlag_DuplicateMessage) && !AllowDuplicateMsgs)
{
ExitNow(err = WEAVE_NO_ERROR);
}
//Received Flow Throttle
if (exchHeader->ProfileId == nl::Weave::Profiles::kWeaveProfile_Common &&
exchHeader->MessageType == nl::Weave::Profiles::Common::kMsgType_WRMP_Throttle_Flow)
{
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
//Extract PauseTimeMillis from msgBuf
p = msgBuf->Start();
PauseTimeMillis = LittleEndian::Read32(p);
HandleThrottleFlow(PauseTimeMillis);
ExitNow(err = WEAVE_NO_ERROR);
#endif
}
//Return and not pass this to Application if Common::Null Msg Type
else if ((exchHeader->ProfileId == nl::Weave::Profiles::kWeaveProfile_Common) &&
(exchHeader->MessageType == nl::Weave::Profiles::Common::kMsgType_Null))
{
ExitNow(err = WEAVE_NO_ERROR);
}
else
{
// Since we got the response, cancel the response timer.
CancelResponseTimer();
// If the context was expecting a response to a previously sent message, this message
// is implicitly that response.
SetResponseExpected(false);
// Deliver the message to the app via its callback.
if (umhandler)
{
umhandler(this, msgInfo->InPacketInfo, const_cast<WeaveMessageInfo *>(msgInfo), exchHeader->ProfileId,
exchHeader->MessageType, msgBuf);
msgBuf = NULL;
}
else if (OnMessageReceived != NULL)
{
OnMessageReceived(this, msgInfo->InPacketInfo, const_cast<WeaveMessageInfo *>(msgInfo), exchHeader->ProfileId,
exchHeader->MessageType, msgBuf);
msgBuf = NULL;
}
else
{
WeaveLogError(ExchangeManager,
"No App Handler for Msg(MsgId:%08" PRIX32 ")",
msgInfo->MessageId);
}
}
exit:
// Release the reference to the ExchangeContext that was held at the beginning of this function.
// This call should also do the needful of closing the ExchangeContext if the application has
// already made a prior call to Close().
Release();
if (msgBuf != NULL)
{
PacketBuffer::Free(msgBuf);
}
return err;
}
} // namespace nl
} // namespace Weave