blob: 77c8624fd93b5ba5e204c19cff3e967aee13b90c [file] [log] [blame]
/*
*
* Copyright (c) 2015-2017 Nest Labs, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Weave/Profiles/data-management/Current/WdmManagedNamespace.h>
#include <Weave/Profiles/data-management/DataManagement.h>
namespace nl {
namespace Weave {
namespace Profiles {
namespace WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current) {
#if WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION
static WEAVE_ERROR
ReadAndCheckPresence(nl::Weave::TLV::TLVReader &inReader,
uint64_t &inOutReceivedMask,
const uint64_t &inReceivedFieldFlag,
uint64_t &inOutValue);
EventProcessor::EventProcessor(uint64_t inLocalNodeId) :
mLocalNodeId(inLocalNodeId),
mLastEventId()
{
}
EventProcessor::~EventProcessor(void)
{
}
WEAVE_ERROR
EventProcessor::ProcessEvents(nl::Weave::TLV::TLVReader &inReader,
nl::Weave::Profiles::DataManagement::SubscriptionClient &inClient)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
err = ParseEventList(inReader, inClient);
SuccessOrExit(err);
exit:
return err;
}
WEAVE_ERROR
EventProcessor::ParseEventList(nl::Weave::TLV::TLVReader &inReader,
nl::Weave::Profiles::DataManagement::SubscriptionClient &inClient)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
StreamParsingContext parsingContext(inClient.GetBinding()->GetPeerNodeId());
while (WEAVE_NO_ERROR == (err = inReader.Next()))
{
VerifyOrExit(nl::Weave::TLV::AnonymousTag == inReader.GetTag(), err = WEAVE_ERROR_TLV_TAG_NOT_FOUND);
VerifyOrExit(nl::Weave::TLV::kTLVType_Structure == inReader.GetType(), err = WEAVE_ERROR_WRONG_TLV_TYPE);
{
nl::Weave::TLV::TLVType outerContainerType;
err = inReader.EnterContainer(outerContainerType);
SuccessOrExit(err);
err = ParseEvent(inReader, inClient, parsingContext);
SuccessOrExit(err);
err = inReader.ExitContainer(outerContainerType);
SuccessOrExit(err);
}
}
exit:
if (err == WEAVE_END_OF_TLV)
{
err = WEAVE_NO_ERROR;
}
return err;
}
WEAVE_ERROR
EventProcessor::ParseEvent(nl::Weave::TLV::TLVReader &inReader,
nl::Weave::Profiles::DataManagement::SubscriptionClient &inClient,
StreamParsingContext &inOutParsingContext)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
EventHeader eventHeader;
uint64_t receivedMask = 0;
while (WEAVE_NO_ERROR == (err = inReader.Next()))
{
uint32_t tag = 0;
VerifyOrExit(nl::Weave::TLV::IsContextTag(inReader.GetTag()), err = WEAVE_ERROR_INVALID_TLV_TAG);
tag = nl::Weave::TLV::TagNumFromTag(inReader.GetTag());
switch (tag)
{
case Event::kCsTag_Source:
{
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_Source,
eventHeader.mSource);
SuccessOrExit(err);
break;
}
case Event::kCsTag_Importance:
{
uint64_t v = 0;
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_Importance,
v);
SuccessOrExit(err);
eventHeader.mImportance = (ImportanceType)v;
break;
}
case Event::kCsTag_Id:
{
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_Id,
eventHeader.mId);
SuccessOrExit(err);
break;
}
case Event::kCsTag_RelatedImportance:
{
uint64_t v = 0;
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_RelatedImportance,
v);
SuccessOrExit(err);
eventHeader.mRelatedImportance = (ImportanceType)v;
break;
}
case Event::kCsTag_RelatedId:
{
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_RelatedId,
eventHeader.mRelatedId);
SuccessOrExit(err);
break;
}
case Event::kCsTag_UTCTimestamp:
{
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_UTCTimestamp,
eventHeader.mUTCTimestamp);
SuccessOrExit(err);
break;
}
case Event::kCsTag_SystemTimestamp:
{
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_SystemTimestamp,
eventHeader.mSystemTimestamp);
SuccessOrExit(err);
break;
}
case Event::kCsTag_ResourceId:
{
// Mandatory field.
VerifyOrExit(nl::Weave::TLV::kTLVType_UnsignedInteger == inReader.GetType(), err = WEAVE_ERROR_WRONG_TLV_TYPE);
err = inReader.Get(eventHeader.mResourceId);
SuccessOrExit(err);
break;
}
case Event::kCsTag_TraitProfileId:
{
nl::Weave::TLV::TLVType outerContainerType;
// Mandatory field.
VerifyOrExit(nl::Weave::TLV::kTLVType_UnsignedInteger == inReader.GetType() || nl::Weave::TLV::kTLVType_Array == inReader.GetType(), err = WEAVE_ERROR_WRONG_TLV_TYPE);
if (inReader.GetType() == nl::Weave::TLV::kTLVType_Array) {
err = inReader.EnterContainer(outerContainerType);
SuccessOrExit(err);
err = inReader.Next();
SuccessOrExit(err);
VerifyOrExit(inReader.GetType() == nl::Weave::TLV::kTLVType_UnsignedInteger, err = WEAVE_ERROR_WRONG_TLV_TYPE);
err = inReader.Get(eventHeader.mTraitProfileId);
SuccessOrExit(err);
err = inReader.Next();
VerifyOrExit(err == WEAVE_NO_ERROR || err == WEAVE_END_OF_TLV, );
// MaxVersion is only encoded if it isn't 1.
if (err == WEAVE_NO_ERROR) {
VerifyOrExit(inReader.GetType() == nl::Weave::TLV::kTLVType_UnsignedInteger, err = WEAVE_ERROR_WRONG_TLV_TYPE);
err = inReader.Get(eventHeader.mDataSchemaVersionRange.mMaxVersion);
SuccessOrExit(err);
}
// Similarly, MinVersion is only encoded if it isn't 1.
err = inReader.Next();
VerifyOrExit(err == WEAVE_NO_ERROR || err == WEAVE_END_OF_TLV, );
if (err == WEAVE_NO_ERROR) {
VerifyOrExit(inReader.GetType() == nl::Weave::TLV::kTLVType_UnsignedInteger, err = WEAVE_ERROR_WRONG_TLV_TYPE);
err = inReader.Get(eventHeader.mDataSchemaVersionRange.mMinVersion);
SuccessOrExit(err);
}
err = inReader.Next();
VerifyOrExit(err == WEAVE_END_OF_TLV, err = WEAVE_ERROR_WDM_MALFORMED_DATA_ELEMENT);
err = inReader.ExitContainer(outerContainerType);
SuccessOrExit(err);
}
else {
VerifyOrExit(inReader.GetType() == nl::Weave::TLV::kTLVType_UnsignedInteger, err = WEAVE_ERROR_WRONG_TLV_TYPE);
err = inReader.Get(eventHeader.mTraitProfileId);
}
break;
}
case Event::kCsTag_TraitInstanceId:
{
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_TraitInstanceId,
eventHeader.mTraitInstanceId);
SuccessOrExit(err);
break;
}
case Event::kCsTag_Type:
{
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_Type,
eventHeader.mType);
SuccessOrExit(err);
break;
}
case Event::kCsTag_DeltaUTCTime:
{
uint64_t v = 0;
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_DeltaUTCTime,
v);
SuccessOrExit(err);
eventHeader.mDeltaUTCTime = (int64_t)v;
break;
}
case Event::kCsTag_DeltaSystemTime:
{
uint64_t v = 0;
err = ReadAndCheckPresence(inReader,
receivedMask,
ReceivedEventHeaderFieldPresenceMask_DeltaSystemTime,
v);
SuccessOrExit(err);
eventHeader.mDeltaSystemTime = (int64_t)v;
break;
}
case Event::kCsTag_Data:
{
// check if this tag has appeared before
VerifyOrExit((receivedMask & ReceivedEventHeaderFieldPresenceMask_Data) == 0, err = WEAVE_ERROR_INVALID_TLV_TAG);
receivedMask |= ReceivedEventHeaderFieldPresenceMask_Data;
// Mandatory field. Make sure we're sending up a
// fully-qualified header to the app.
err = UpdateContextQualifyHeader(eventHeader, inOutParsingContext, receivedMask);
SuccessOrExit(err);
// Potentially this could be moved to outside the while(inReader.Next())?
// but so could the call to ProcessEvent (this would enable data-less events).
// For now leaving both together makes up for it in readability.
err = UpdateGapDetection(eventHeader);
SuccessOrExit(err);
err = ProcessEvent(inReader, inClient, eventHeader);
SuccessOrExit(err);
break;
}
default:
{
// Unknown. If a newly-added field is not optional,
// it needs to be handled in a case above.
WeaveLogError(EventLogging, "EventProcessor encountered unknown tag 0x%" PRIx32 " (%d)", tag, tag);
break;
}
}
}
// almost all fields in an Event are optional
if (err == WEAVE_END_OF_TLV)
{
err = WEAVE_NO_ERROR;
}
exit:
return err;
}
WEAVE_ERROR
EventProcessor::MapReceivedMaskToPublishedMask(const uint64_t &inReceivedMask,
uint64_t &inOutPublishedMask)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
const uint8_t numFields = 6;
const uint64_t headerFields[numFields] =
{
EventHeaderFieldPresenceMask_RelatedImportance,
EventHeaderFieldPresenceMask_RelatedId,
EventHeaderFieldPresenceMask_UTCTimestamp,
EventHeaderFieldPresenceMask_SystemTimestamp,
EventHeaderFieldPresenceMask_DeltaUTCTime,
EventHeaderFieldPresenceMask_DeltaSystemTime
};
const uint64_t receivedHeaderFields[numFields] =
{
ReceivedEventHeaderFieldPresenceMask_RelatedImportance,
ReceivedEventHeaderFieldPresenceMask_RelatedId,
ReceivedEventHeaderFieldPresenceMask_UTCTimestamp,
ReceivedEventHeaderFieldPresenceMask_SystemTimestamp,
ReceivedEventHeaderFieldPresenceMask_DeltaUTCTime,
ReceivedEventHeaderFieldPresenceMask_DeltaSystemTime
};
inOutPublishedMask = 0;
for (uint8_t i = 0; i < numFields; i++)
{
if ((inReceivedMask & receivedHeaderFields[i]) != 0)
{
inOutPublishedMask |= headerFields[i];
}
}
return err;
}
WEAVE_ERROR
EventProcessor::UpdateContextQualifyHeader(EventHeader &inOutEventHeader,
StreamParsingContext &inOutContext,
uint64_t inReceivedMask)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
err = MapReceivedMaskToPublishedMask(inReceivedMask, inOutEventHeader.mPresenceMask);
SuccessOrExit(err);
if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_Source) == 0)
{
// No source present. Use our default.
inOutEventHeader.mSource = inOutContext.mPublisherSourceId;
}
if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_Importance) != 0)
{
// Importance is present.
inOutContext.mCurrentEventImportance = inOutEventHeader.mImportance;
}
else
{
// No importance is present.
inOutEventHeader.mImportance = inOutContext.mCurrentEventImportance;
}
if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_Id) != 0)
{
// Event ID is present.
inOutContext.mCurrentEventId = inOutEventHeader.mId;
}
else
{
// No event ID is present.
inOutEventHeader.mId = ++inOutContext.mCurrentEventId;
}
if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_Type) != 0)
{
// Event type is present.
inOutContext.mCurrentEventType = inOutEventHeader.mType;
}
else
{
// No event type is present.
inOutEventHeader.mType = inOutContext.mCurrentEventType;
}
if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_TraitInstanceId) == 0)
{
// No trait instance ID was present, default is 0.
inOutEventHeader.mTraitInstanceId = 0;
}
if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_SystemTimestamp) != 0)
{
// System timestamp is present, save it off.
inOutContext.mCurrentSystemTimestamp = inOutEventHeader.mSystemTimestamp;
}
else if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_DeltaSystemTime) != 0)
{
// No system timestamp present, but delta is, so system
// timestamp is our saved value plus any delta.
inOutEventHeader.mSystemTimestamp = inOutContext.mCurrentSystemTimestamp + inOutEventHeader.mDeltaSystemTime;
// Update mCurrentSystemTimestamp.
inOutContext.mCurrentSystemTimestamp = inOutEventHeader.mSystemTimestamp;
}
if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_UTCTimestamp) != 0)
{
// UTC timestamp is present, save it off.
inOutContext.mCurrentUTCTimestamp = inOutEventHeader.mUTCTimestamp;
}
else if ((inReceivedMask & ReceivedEventHeaderFieldPresenceMask_DeltaUTCTime) != 0)
{
// No UTC timestamp present, but delta is, so system timestamp
// is our saved value plus any delta.
inOutEventHeader.mUTCTimestamp = inOutContext.mCurrentUTCTimestamp + inOutEventHeader.mDeltaUTCTime;
// Update mCurrentUTCTimestamp.
inOutContext.mCurrentUTCTimestamp = inOutEventHeader.mUTCTimestamp;
}
exit:
return err;
}
WEAVE_ERROR
EventProcessor::UpdateGapDetection(const EventHeader &inEventHeader)
{
// If any event was ever received for that importance
if (mLastEventId[inEventHeader.mImportance] != 0)
{
if (inEventHeader.mId != (mLastEventId[inEventHeader.mImportance] + 1))
{
WeaveLogDetail(DataManagement, "EventProcessor found gap for importance: %u (0x%" PRIx32 " -> 0x%" PRIx64 ") NodeId=0x%" PRIx64,
inEventHeader.mImportance,
mLastEventId[inEventHeader.mImportance],
inEventHeader.mId,
inEventHeader.mSource);
GapDetected(inEventHeader);
}
mLastEventId[inEventHeader.mImportance] = inEventHeader.mId;
}
else
{
WeaveLogDetail(DataManagement, "EventProcessor stream for importance: %u initialized with id: 0x%" PRIx64 , inEventHeader.mImportance, inEventHeader.mId);
mLastEventId[inEventHeader.mImportance] = inEventHeader.mId;
}
return WEAVE_NO_ERROR;
}
WEAVE_ERROR
ReadAndCheckPresence(nl::Weave::TLV::TLVReader &inReader,
uint64_t &inOutReceivedMask,
const uint64_t &inReceivedFieldFlag,
uint64_t &inOutValue)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
// Check if this tag has appeared before.
VerifyOrExit((inOutReceivedMask & inReceivedFieldFlag) == 0, err = WEAVE_ERROR_INVALID_TLV_TAG);
// The only two types we should see here.
VerifyOrExit((inReader.GetType() == nl::Weave::TLV::kTLVType_UnsignedInteger) ||
(inReader.GetType() == nl::Weave::TLV::kTLVType_SignedInteger),
err = WEAVE_ERROR_WRONG_TLV_TYPE);
err = inReader.Get(inOutValue);
SuccessOrExit(err);
inOutReceivedMask |= inReceivedFieldFlag;
exit:
return err;
}
#endif // WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION
} // WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current)
} // Profiles
} // Weave
} // nl