/*
 *
 *    Copyright (c) 2016-2017 Nest Labs, Inc.
 *    All rights reserved.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

/**
 *    @file
 *      This file implements the Weave Data Management mock subscription initiator.
 *
 */

#define WEAVE_CONFIG_ENABLE_FUNCT_ERROR_LOGGING 1

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif // __STDC_FORMAT_MACROS

#include <inttypes.h>
#include <new>

// Note that the choice of namespace alias must be made up front for each and every compile unit
// This is because many include paths could set the default alias to unintended target.
#include <Weave/Profiles/bulk-data-transfer/Development/BDXManagedNamespace.hpp>
#include <Weave/Profiles/data-management/Current/WdmManagedNamespace.h>

#include <Weave/Core/WeaveError.h>
#include <Weave/Core/WeaveSecurityMgr.h>
#include <Weave/Support/CodeUtils.h>
#include <Weave/Profiles/WeaveProfiles.h>
#include <Weave/Profiles/data-management/DataManagement.h>
#include <Weave/Profiles/security/ApplicationKeysTraitDataSink.h>
#include "MockWdmTestVerifier.h"
#include "MockWdmSubscriptionInitiator.h"
#include "MockSinkTraits.h"
#include "MockSourceTraits.h"
#include "TestGroupKeyStore.h"

#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING

using namespace nl::Weave::TLV;
using namespace nl::Weave::Profiles;
using namespace nl::Weave::Profiles::DataManagement;
using namespace Schema::Weave::Trait::Auth::ApplicationKeysTrait;

const nl::Weave::ExchangeContext::Timeout kResponseTimeoutMsec = 15000;
const nl::Weave::ExchangeContext::Timeout kWRMPActiveRetransTimeoutMsec = 3000;
const nl::Weave::ExchangeContext::Timeout kWRMPInitialRetransTimeoutMsec = 3000;
const uint16_t kWRMPMaxRetrans = 3;
const uint16_t kWRMPAckTimeoutMsec = 200;

// Any time setting lower than this would force the subscription client to send Subscribe Confirm continuously.
uint32_t gMinimumTimeBetweenLivenessCheckSec = ((WEAVE_CONFIG_WRMP_DEFAULT_MAX_RETRANS + 1) * kWRMPActiveRetransTimeoutMsec + 999) / 1000;

static int gNumDataChangeBeforeCancellation;
static int gFinalStatus;
static SubscriptionHandler *gSubscriptionHandler = NULL;
static int gTimeBetweenDataChangeMsec = 0;
static bool gIsMutualSubscription = true;
static bool gEnableDataFlip = true;
static bool gMutualSubscriptionEstablished = false;
static bool gOnewaySubscriptionEstablished = false;
static bool gEvaluateSuccessIteration = false;
static bool gCleanStatus = true;
static bool gTestCase_TestOversizeTrait2DumpFlip = true;
static nl::Weave::WRMPConfig gWRMPConfig = { kWRMPInitialRetransTimeoutMsec, kWRMPActiveRetransTimeoutMsec, kWRMPAckTimeoutMsec, kWRMPMaxRetrans };

static DEFINE_ALIGNED_VAR(sTestGroupKeyStore, sizeof(TestGroupKeyStore), void*);

struct VersionNode
{
    uint64_t versionInfo;
    VersionNode * next;
};


class WdmInitiatorState
{
public:
    int mDataflipCount;
    int mClientStateCount;
    int mPublisherStateCount;
    void init(void)
    {
        mDataflipCount = 1;
        mClientStateCount = 1;
        mPublisherStateCount = 1;
    }
};

static WdmInitiatorState gInitiatorState;


class MockWdmSubscriptionInitiatorImpl: public MockWdmSubscriptionInitiator
{
public:
    MockWdmSubscriptionInitiatorImpl();

    virtual WEAVE_ERROR Init (nl::Weave::WeaveExchangeManager *aExchangeMgr, const bool aMutualSubscription,
    const char * const aTestCaseId, const char * const aNumDataChangeBeforeCancellation,
    const char * const aFinalStatus, const char * const aTimeBetweenDataChangeMsec, const bool aEnableDataFlip,
    const char * const aTimeBetweenLivenessCheckSec, const bool aEnableDictionaryTest, const int aTestSecurityMode, const uint32_t aKeyId,
    const bool aEnableRetry);

    virtual WEAVE_ERROR StartTesting(const uint64_t aPublisherNodeId, const uint16_t aSubnetId);
    void PrintVersionsLog();
    void ClearDataSinkState(void);

private:
    nl::Weave::WeaveExchangeManager *mExchangeMgr;
    nl::Weave::Binding * mBinding;

    uint64_t mPublisherNodeId;
    uint16_t mPublisherSubnetId;

    static bool mClearDataSink;
    int mTestCaseId;
    int mTestSecurityMode;
    uint32_t mKeyId;

    TraitPath mTraitPaths[4];
    VersionedTraitPath mVersionedTraitPaths[4];
    uint32_t mNumPaths;

    bool mEnableRetry;
    bool mWillRetry;

    // publisher side
    SingleResourceSourceTraitCatalog mSourceCatalog;
    SingleResourceSourceTraitCatalog::CatalogItem mSourceCatalogStore[4];
    nl::Weave::Profiles::DataManagement_Current::TraitSchemaEngine::IDataSourceDelegate* mSinkAddressList[6];

    // source traits
    LocaleCapabilitiesTraitDataSource mLocaleCapabilitiesDataSource;
    TestATraitDataSource mTestATraitDataSource0;
    TestATraitDataSource mTestATraitDataSource1;
    TestBTraitDataSource mTestBTraitDataSource;
    TestBLargeTraitDataSource mTestBLargeTraitDataSource;

    static void ClearDataSinkIterator(void *aTraitInstance, TraitDataHandle aHandle, void *aContext);

    static void EngineEventCallback (void * const aAppState, SubscriptionEngine::EventID aEvent,
        const SubscriptionEngine::InEventParam & aInParam, SubscriptionEngine::OutEventParam & aOutParam);

    static void PublisherEventCallback (void * const aAppState,
        SubscriptionHandler::EventID aEvent, const SubscriptionHandler::InEventParam & aInParam,
        SubscriptionHandler::OutEventParam & aOutParam);

    // client side
    SingleResourceSinkTraitCatalog mSinkCatalog;
    SingleResourceSinkTraitCatalog::CatalogItem mSinkCatalogStore[6];

    // sink traits
    LocaleSettingsTraitDataSink mLocaleSettingsTraitDataSink;
    BoltLockSettingTraitDataSink mBoltLockSettingsTraitDataSink;
    TestATraitDataSink mTestATraitDataSink0;
    TestATraitDataSink mTestATraitDataSink1;
    TestBTraitDataSink mTestBTraitDataSink;
    TestApplicationKeysTraitDataSink mApplicationKeysTraitDataSink;

    enum
    {
        kTestATraitSink0Index = 0,
        kTestATraitSink1Index,
        kTestBTraitSinkIndex,
        kLocaleSettingsSinkIndex,
        kBoltLockSettingTraitSinkIndex,
        kApplicationKeysTraitSinkIndex,

        kLocaleCapabilitiesSourceIndex,
        kTestATraitSource0Index,
        kTestATraitSource1Index,
        kTestBTraitSourceIndex,
        kTestBLargeTraitSourceIndex,
        kMaxNumTraitHandles,
    };

    enum
    {
        kClientCancel = 0,
        kPublisherCancel,
        kClientAbort,
        kPublisherAbort,
        kIdle
    };

    TraitDataHandle mTraitHandleSet[kMaxNumTraitHandles];

    enum
    {
        // subscribe LocaleSettings, TestA(two instances) and TestB traits in initiator
        // publish TestA(two instances) and TestB traits in initiator
        kTestCase_TestTrait = 1,

        // subscribe Locale Setting, ApplicationKeys traits in initiator
        // publish Locale Capabilities traits in responder
        kTestCase_IntegrationTrait = 2,

        //Reject Incoming subscribe request
        kTestCase_RejectIncomingSubscribeRequest    = 3,

        // subscribe oversize TestB, TestA(two instances) traits and LocaleSettings in initiator
        // publish TestA(two instances) and oversize TestB traits in initiator
        kTestCase_TestOversizeTrait1 = 4,

        // subscribe oversize LocaleSettings, TestB, and TestA(two instances) traits in initiator
        // publish TestA(two instances) and oversize TestB traits in initiator
        kTestCase_TestOversizeTrait2 = 5,

        kTestCase_CompatibleVersionedRequest = 6,

        kTestCase_ForwardCompatibleVersionedRequest = 7,

        kTestCase_IncompatibleVersionedRequest = 8
    };

    enum
    {
        kMonitorCurrentStateCnt = 160,
        kMonitorCurrentStateInterval = 120 //msec
    };

    VersionNode mTraitVersionSet[kMaxNumTraitHandles];

    SubscriptionClient * mSubscriptionClient;

    void AddNewVersion (int aTraitDataSinkIndex);

    void DumpClientTraitChecksum(int TraitDataSinkIndex);
    void DumpClientTraits(void);

    void DumpPublisherTraitChecksum(int TraitDataSounceIndex);
    void DumpPublisherTraits(void);

    WEAVE_ERROR PrepareBinding();

    static void ClientEventCallback (void * const aAppState, SubscriptionClient::EventID aEvent,
        const SubscriptionClient::InEventParam & aInParam, SubscriptionClient::OutEventParam & aOutParam);

    static void BindingEventCallback (void * const apAppState, const nl::Weave::Binding::EventType aEvent,
        const nl::Weave::Binding::InEventParam & aInParam, nl::Weave::Binding::OutEventParam & aOutParam);

    static void HandleClientComplete(void *aAppState);

    static void HandleClientRelease(void *aAppState);

    static void HandlePublisherComplete();

    static void HandlePublisherRelease();

    static void HandleDataFlipTimeout (nl::Weave::System::Layer *aSystemLayer, void *aAppState, nl::Weave::System::Error aErr);

    static void MonitorPublisherCurrentState (nl::Weave::System::Layer* aSystemLayer, void *aAppState, INET_ERROR aErr);

    static void MonitorClientCurrentState (nl::Weave::System::Layer* aSystemLayer, void *aAppState, INET_ERROR aErr);
};

static MockWdmSubscriptionInitiatorImpl gWdmSubscriptionInitiator;

bool MockWdmSubscriptionInitiatorImpl::mClearDataSink = false;

MockWdmSubscriptionInitiatorImpl::MockWdmSubscriptionInitiatorImpl() :
    mSourceCatalog(ResourceIdentifier(ResourceIdentifier::SELF_NODE_ID), mSourceCatalogStore, sizeof(mSourceCatalogStore) / sizeof(mSourceCatalogStore[0])),
    mSinkCatalog(ResourceIdentifier(ResourceIdentifier::SELF_NODE_ID), mSinkCatalogStore, sizeof(mSinkCatalogStore) / sizeof(mSinkCatalogStore[0]))
{
}

MockWdmSubscriptionInitiator * MockWdmSubscriptionInitiator::GetInstance ()
{
    return &gWdmSubscriptionInitiator;
}

WEAVE_ERROR MockWdmSubscriptionInitiatorImpl::Init(nl::Weave::WeaveExchangeManager *aExchangeMgr,
const bool aMutualSubscription, const char * const aTestCaseId, const char * const aNumDataChangeBeforeCancellation,
const char * const aFinalStatus, const char * const aTimeBetweenDataChangeMsec, const bool aEnableDataFlip,
const char * const aTimeBetweenLivenessCheckSec, const bool aEnableDictionaryTest, const int aTestSecurityMode, const uint32_t aKeyId,
const bool aEnableRetry)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    gIsMutualSubscription = aMutualSubscription;

    WeaveLogDetail(DataManagement, "Test Case ID: %s", (aTestCaseId == NULL) ? "NULL" : aTestCaseId);

    if (NULL != aNumDataChangeBeforeCancellation)
    {
        gNumDataChangeBeforeCancellation = atoi(aNumDataChangeBeforeCancellation);
    }
    else
    {
        gNumDataChangeBeforeCancellation = -1;
    }

    if (NULL != aFinalStatus)
    {
        gFinalStatus = atoi(aFinalStatus);
    }
    else
    {
        gFinalStatus = 0;
    }

    if (NULL != aTimeBetweenDataChangeMsec)
    {
        gTimeBetweenDataChangeMsec = atoi(aTimeBetweenDataChangeMsec);
    }
    else
    {
        gTimeBetweenDataChangeMsec = 15000;
    }

    if (NULL != aTimeBetweenLivenessCheckSec)
    {
        gMinimumTimeBetweenLivenessCheckSec = atoi(aTimeBetweenLivenessCheckSec);
    }
    else
    {
        gMinimumTimeBetweenLivenessCheckSec = 30;
    }

    gEnableDataFlip = aEnableDataFlip;

    printf("aTestCaseId = %s\n", aTestCaseId);

    if (NULL != aTestCaseId)
    {
        mTestCaseId = atoi(aTestCaseId);
    }
    else
    {
        mTestCaseId = kTestCase_TestTrait;
    }

    mTestSecurityMode = aTestSecurityMode;

    mKeyId = aKeyId;

    mTestATraitDataSource0.mTraitTestSet = 0;

    mTestATraitDataSource1.mTraitTestSet = 0;

    if (aEnableDictionaryTest)
    {
        mTestATraitDataSource1.mTraitTestSet = 1;
    }

    mEnableRetry = aEnableRetry;

    mSinkCatalog.Add(0, &mTestATraitDataSink0, mTraitHandleSet[kTestATraitSink0Index]);
    mSinkCatalog.Add(1, &mTestATraitDataSink1, mTraitHandleSet[kTestATraitSink1Index]);
    mSinkCatalog.Add(0, &mTestBTraitDataSink, mTraitHandleSet[kTestBTraitSinkIndex]);
    mSinkCatalog.Add(0, &mLocaleSettingsTraitDataSink, mTraitHandleSet[kLocaleSettingsSinkIndex]);
    mSinkCatalog.Add(0, &mBoltLockSettingsTraitDataSink, mTraitHandleSet[kBoltLockSettingTraitSinkIndex]);
    mApplicationKeysTraitDataSink.SetGroupKeyStore(new (&sTestGroupKeyStore) TestGroupKeyStore());
    mSinkCatalog.Add(0, &mApplicationKeysTraitDataSink, mTraitHandleSet[kApplicationKeysTraitSinkIndex]);

    mSourceCatalog.Add(0, &mLocaleCapabilitiesDataSource, mTraitHandleSet[kLocaleCapabilitiesSourceIndex]);
    mSourceCatalog.Add(1, &mTestATraitDataSource0, mTraitHandleSet[kTestATraitSource0Index]);
    mSourceCatalog.Add(2, &mTestATraitDataSource1, mTraitHandleSet[kTestATraitSource1Index]);

    switch (mTestCaseId)
    {
    case kTestCase_TestOversizeTrait1:
    case kTestCase_TestOversizeTrait2:
         mSourceCatalog.Add(1, &mTestBLargeTraitDataSource, mTraitHandleSet[kTestBLargeTraitSourceIndex]);
         break;
    default:
        mSourceCatalog.Add(1, &mTestBTraitDataSource, mTraitHandleSet[kTestBTraitSourceIndex]);
        break;
    }

    switch (mTestCaseId)
    {
    case kTestCase_IntegrationTrait:
        WeaveLogDetail(DataManagement, "kTestCase_IntegrationTrait");
        break;

    case kTestCase_RejectIncomingSubscribeRequest:
        WeaveLogDetail(DataManagement, "kTestCase_RejectIncomingSubscribeRequest");
        break;

    case kTestCase_TestTrait:
        WeaveLogDetail(DataManagement, "kTestCase_TestTrait");
        break;

    case kTestCase_TestOversizeTrait1:
    case kTestCase_TestOversizeTrait2:
        WeaveLogDetail(DataManagement, "kTestCase_TestOversizeTrait %d", mTestCaseId);
        break;

    case kTestCase_CompatibleVersionedRequest:
        WeaveLogDetail(DataManagement, "kTestCase_CompatibleVersionedRequest");
        break;

    case kTestCase_ForwardCompatibleVersionedRequest:
        WeaveLogDetail(DataManagement, "kTestCase_ForwardCompatibleVersionedRequest");
        break;

    case kTestCase_IncompatibleVersionedRequest:
        WeaveLogDetail(DataManagement, "kTestCase_IncompatibleVersionedRequest");
        break;

    default:
        mTestCaseId = kTestCase_TestTrait;
        WeaveLogDetail(DataManagement, "kTestCase_TestTrait");
        break;
    }

    mExchangeMgr = aExchangeMgr;
    mBinding = NULL;

    mSubscriptionClient = NULL;

    // Note if you don't use publisher side, there is no need to initialize using this longer form
    err = SubscriptionEngine::GetInstance()->Init(mExchangeMgr, this, EngineEventCallback);
    SuccessOrExit(err);

    if (gIsMutualSubscription == true)
    {
        err = SubscriptionEngine::GetInstance()->EnablePublisher(NULL, &mSourceCatalog);
        SuccessOrExit(err);
    }

    mTraitVersionSet[kTestATraitSink0Index].versionInfo = mTestATraitDataSink0.GetVersion();
    mTraitVersionSet[kTestATraitSink0Index].next = NULL;
    mTraitVersionSet[kTestATraitSink1Index].versionInfo = mTestATraitDataSink1.GetVersion();
    mTraitVersionSet[kTestATraitSink1Index].next = NULL;
    mTraitVersionSet[kTestBTraitSinkIndex].versionInfo = mTestBTraitDataSink.GetVersion();
    mTraitVersionSet[kTestBTraitSinkIndex].next = NULL;
    mTraitVersionSet[kLocaleSettingsSinkIndex].versionInfo = mLocaleSettingsTraitDataSink.GetVersion();
    mTraitVersionSet[kLocaleSettingsSinkIndex].next = NULL;
    mTraitVersionSet[kBoltLockSettingTraitSinkIndex].versionInfo = mBoltLockSettingsTraitDataSink.GetVersion();
    mTraitVersionSet[kBoltLockSettingTraitSinkIndex].next = NULL;
    mTraitVersionSet[kApplicationKeysTraitSinkIndex].versionInfo = mApplicationKeysTraitDataSink.GetVersion();
    mTraitVersionSet[kApplicationKeysTraitSinkIndex].next = NULL;

    mSinkAddressList[kTestATraitSink0Index] = &mTestATraitDataSink0;
    mSinkAddressList[kTestATraitSink1Index] = &mTestATraitDataSink1;
    mSinkAddressList[kTestBTraitSinkIndex] = &mTestBTraitDataSink;
    mSinkAddressList[kLocaleSettingsSinkIndex] = &mLocaleSettingsTraitDataSink;
    mSinkAddressList[kBoltLockSettingTraitSinkIndex] = &mBoltLockSettingsTraitDataSink;
    mSinkAddressList[kApplicationKeysTraitSinkIndex] = &mApplicationKeysTraitDataSink;

    //onCompleteTest = NULL;

exit:
    return err;
}

WEAVE_ERROR MockWdmSubscriptionInitiatorImpl::StartTesting(const uint64_t aPublisherNodeId, const uint16_t aSubnetId)
{
    gInitiatorState.init();
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    mPublisherNodeId = aPublisherNodeId;
    mPublisherSubnetId = aSubnetId;

    mBinding = mExchangeMgr->NewBinding(BindingEventCallback, this);
    VerifyOrExit(NULL != mBinding, err = WEAVE_ERROR_NO_MEMORY);

    err =  SubscriptionEngine::GetInstance()->NewClient(&mSubscriptionClient,
                mBinding,
                this,
                ClientEventCallback,
                &mSinkCatalog,
                kResponseTimeoutMsec * 2); // max num of msec between subscribe request and subscribe response
    SuccessOrExit(err);

    // TODO: EVENT-DEMO
    // TODO: Fix this dummy observed event list
    /*
    SubscriptionClient::LastObservedEvent DummyObservedEvents[] =
    {
        {1, 2, 3},
        {4, 5, 6},
    };
    */

    switch (mTestCaseId)
    {
    case kTestCase_IntegrationTrait:
    case kTestCase_RejectIncomingSubscribeRequest:
        mTraitPaths[0].mTraitDataHandle = mTraitHandleSet[kLocaleSettingsSinkIndex];
        mTraitPaths[0].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[1].mTraitDataHandle = mTraitHandleSet[kApplicationKeysTraitSinkIndex];
        mTraitPaths[1].mPropertyPathHandle = kRootPropertyPathHandle;

        mNumPaths = 2;
        break;

    case kTestCase_TestTrait:
        mTraitPaths[0].mTraitDataHandle = mTraitHandleSet[kLocaleSettingsSinkIndex];
        mTraitPaths[0].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[1].mTraitDataHandle = mTraitHandleSet[kTestATraitSink0Index];
        mTraitPaths[1].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[2].mTraitDataHandle = mTraitHandleSet[kTestATraitSink1Index];
        mTraitPaths[2].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[3].mTraitDataHandle = mTraitHandleSet[kTestBTraitSinkIndex];
        mTraitPaths[3].mPropertyPathHandle = kRootPropertyPathHandle;

        mNumPaths = 4;
        break;

    case kTestCase_TestOversizeTrait1:
        mTraitPaths[0].mTraitDataHandle = mTraitHandleSet[kTestBTraitSinkIndex];
        mTraitPaths[0].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[1].mTraitDataHandle = mTraitHandleSet[kTestATraitSink0Index];
        mTraitPaths[1].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[2].mTraitDataHandle = mTraitHandleSet[kTestATraitSink1Index];
        mTraitPaths[2].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[3].mTraitDataHandle = mTraitHandleSet[kLocaleSettingsSinkIndex];
        mTraitPaths[3].mPropertyPathHandle = kRootPropertyPathHandle;

        mNumPaths = 4;
        break;

     case kTestCase_TestOversizeTrait2:
        mTraitPaths[0].mTraitDataHandle = mTraitHandleSet[kLocaleSettingsSinkIndex];
        mTraitPaths[0].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[1].mTraitDataHandle = mTraitHandleSet[kTestBTraitSinkIndex];
        mTraitPaths[1].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[2].mTraitDataHandle = mTraitHandleSet[kTestATraitSink0Index];
        mTraitPaths[2].mPropertyPathHandle = kRootPropertyPathHandle;

        mTraitPaths[3].mTraitDataHandle = mTraitHandleSet[kTestATraitSink1Index];
        mTraitPaths[3].mPropertyPathHandle = kRootPropertyPathHandle;

        mNumPaths = 4;
        break;

    case kTestCase_CompatibleVersionedRequest:
    case kTestCase_ForwardCompatibleVersionedRequest:
    case kTestCase_IncompatibleVersionedRequest:
        for (int i = 0; i < 4; i++) {
            if (mTestCaseId == kTestCase_CompatibleVersionedRequest) {
                mVersionedTraitPaths[i].mRequestedVersionRange.mMinVersion = 1;
                mVersionedTraitPaths[i].mRequestedVersionRange.mMaxVersion = 1;
            }
            else if (mTestCaseId == kTestCase_ForwardCompatibleVersionedRequest) {
                mVersionedTraitPaths[i].mRequestedVersionRange.mMinVersion = 1;
                mVersionedTraitPaths[i].mRequestedVersionRange.mMaxVersion = 4;
            }
            else if (mTestCaseId == kTestCase_IncompatibleVersionedRequest) {
                mVersionedTraitPaths[i].mRequestedVersionRange.mMinVersion = 2;
                mVersionedTraitPaths[i].mRequestedVersionRange.mMaxVersion = 4;
            }
        }

        mVersionedTraitPaths[0].mTraitDataHandle = mTraitHandleSet[kLocaleSettingsSinkIndex];
        mVersionedTraitPaths[0].mPropertyPathHandle = kRootPropertyPathHandle;

        mVersionedTraitPaths[1].mTraitDataHandle = mTraitHandleSet[kTestATraitSink0Index];
        mVersionedTraitPaths[1].mPropertyPathHandle = kRootPropertyPathHandle;

        mVersionedTraitPaths[2].mTraitDataHandle = mTraitHandleSet[kTestATraitSink1Index];
        mVersionedTraitPaths[2].mPropertyPathHandle = kRootPropertyPathHandle;

        mVersionedTraitPaths[3].mTraitDataHandle = mTraitHandleSet[kTestBTraitSinkIndex];
        mVersionedTraitPaths[3].mPropertyPathHandle = kRootPropertyPathHandle;

        mNumPaths = 4;
        break;

    default:
        mNumPaths = 0;
        break;
    }

    if (mEnableRetry)
    {
        mSubscriptionClient->EnableResubscribe(NULL);
    }

    // TODO: EVENT-DEMO
    mSubscriptionClient->InitiateSubscription();

exit:
    WeaveLogFunctError(err);
    if (err != WEAVE_NO_ERROR && mBinding != NULL)
    {
        mBinding->Release();
        mBinding = NULL;
    }
    return err;
}

WEAVE_ERROR MockWdmSubscriptionInitiatorImpl::PrepareBinding()
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    Binding::Configuration bindingConfig = mBinding->BeginConfiguration()
        .Target_NodeId(mPublisherNodeId) // TODO: aPublisherNodeId
        .Transport_UDP_WRM()
        .Transport_DefaultWRMPConfig(gWRMPConfig)

        // (default) max num of msec between any outgoing message and next incoming message (could be a response to it)
        .Exchange_ResponseTimeoutMsec(kResponseTimeoutMsec);

    if (nl::Weave::kWeaveSubnetId_NotSpecified != mPublisherSubnetId)
    {
        bindingConfig.TargetAddress_WeaveFabric(mPublisherSubnetId);
    }

    switch (mTestSecurityMode)
    {
    case WeaveSecurityMode::kCASE:
        WeaveLogDetail(DataManagement, "security mode is kWdmSecurity_CASE");
        bindingConfig.Security_SharedCASESession();
        break;

    case WeaveSecurityMode::kGroupEnc:
        WeaveLogDetail(DataManagement, "security mode is kWdmSecurity_GroupKey");
        if (mKeyId == WeaveKeyId::kNone)
        {
            WeaveLogDetail(DataManagement, "Please specify a group encryption key id using the --group-enc-... options.\n");
            err = WEAVE_ERROR_INVALID_KEY_ID;
            SuccessOrExit(err);
        }
        bindingConfig.Security_Key(mKeyId);
        //.Security_Key(0x5536);
        //.Security_Key(0x4436);
        break;

    case WeaveSecurityMode::kNone:
        bindingConfig.Security_None();
        break;

    default:
        WeaveLogDetail(DataManagement, "security mode is not supported");
        err = WEAVE_ERROR_UNSUPPORTED_AUTH_MODE;
        SuccessOrExit(err);
    }

    err = bindingConfig.PrepareBinding();
    SuccessOrExit(err);

exit:
    WeaveLogFunctError(err);
    return err;
}

void MockWdmSubscriptionInitiatorImpl::BindingEventCallback (void * const apAppState, const nl::Weave::Binding::EventType aEvent,
    const nl::Weave::Binding::InEventParam & aInParam, nl::Weave::Binding::OutEventParam & aOutParam)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    WeaveLogDetail(DataManagement, "%s: Event(%d)", __func__, aEvent);

    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(apAppState);

    VerifyOrDie(aInParam.Source != NULL);
    VerifyOrDie(aEvent == nl::Weave::Binding::kEvent_DefaultCheck || initiator->mBinding == aInParam.Source);

    switch (aEvent)
    {
    case nl::Weave::Binding::kEvent_PrepareRequested:
        WeaveLogDetail(DataManagement, "kEvent_PrepareRequested");
        err = initiator->PrepareBinding();
        SuccessOrExit(err);
        break;

    case nl::Weave::Binding::kEvent_PrepareFailed:
        err = aInParam.PrepareFailed.Reason;
        WeaveLogDetail(DataManagement, "kEvent_PrepareFailed: reason %d", err);
        break;

    case nl::Weave::Binding::kEvent_BindingFailed:
        err = aInParam.BindingFailed.Reason;
        WeaveLogDetail(DataManagement, "kEvent_BindingFailed: reason %d", err);
        break;

    case nl::Weave::Binding::kEvent_BindingReady:
        WeaveLogDetail(DataManagement, "kEvent_BindingReady");
        break;
    case nl::Weave::Binding::kEvent_DefaultCheck:
        WeaveLogDetail(DataManagement, "kEvent_DefaultCheck");
        // fall through
    default:
        nl::Weave::Binding::DefaultEventHandler(apAppState, aEvent, aInParam, aOutParam);
    }

exit:
    if (err != WEAVE_NO_ERROR)
    {
        if (NULL != initiator->onError)
        {
            initiator->onError();
        }
        initiator->mBinding->Release();
        initiator->mBinding = NULL;
        if (initiator->mSubscriptionClient)
        {
            initiator->mSubscriptionClient->Free();
            initiator->mSubscriptionClient = NULL;
        }
    }
    WeaveLogFunctError(err);
}

void MockWdmSubscriptionInitiatorImpl::DumpPublisherTraitChecksum(int inTraitDataSourceIndex)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    TraitDataSource *dataSource;
    err = mSourceCatalog.Locate(mTraitHandleSet[inTraitDataSourceIndex], &dataSource);
    SuccessOrExit(err);

    ::DumpPublisherTraitChecksum(dataSource);
exit:
    WeaveLogFunctError(err);
}

void MockWdmSubscriptionInitiatorImpl::DumpClientTraitChecksum(int inTraitDataSinkIndex)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    TraitDataSink *dataSink;
    TraitSchemaEngine::IDataSourceDelegate *dataSource;

    dataSource = mSinkAddressList[inTraitDataSinkIndex];
    err = mSinkCatalog.Locate(mTraitHandleSet[inTraitDataSinkIndex], &dataSink);
    SuccessOrExit(err);

    ::DumpClientTraitChecksum(dataSink->GetSchemaEngine(), dataSource);
exit:
    WeaveLogFunctError(err);
}

void MockWdmSubscriptionInitiatorImpl::DumpClientTraits(void)
{
    switch (mTestCaseId)
    {
        case kTestCase_IntegrationTrait:
        case kTestCase_RejectIncomingSubscribeRequest:
            DumpClientTraitChecksum(kLocaleSettingsSinkIndex);
            DumpClientTraitChecksum(kApplicationKeysTraitSinkIndex);
            break;
        case kTestCase_TestTrait:
            DumpClientTraitChecksum(kTestATraitSink0Index);
            DumpClientTraitChecksum(kTestATraitSink1Index);
            DumpClientTraitChecksum(kTestBTraitSinkIndex);
            DumpClientTraitChecksum(kLocaleSettingsSinkIndex);
            break;
        case kTestCase_TestOversizeTrait1:
            DumpClientTraitChecksum(kTestATraitSink0Index);
            DumpClientTraitChecksum(kTestATraitSink1Index);
            DumpClientTraitChecksum(kLocaleSettingsSinkIndex);
            break;
        case kTestCase_TestOversizeTrait2:
            if (gTestCase_TestOversizeTrait2DumpFlip)
            {
                DumpClientTraitChecksum(kLocaleSettingsSinkIndex);
            }
            else
            {
                DumpClientTraitChecksum(kTestATraitSink0Index);
                DumpClientTraitChecksum(kTestATraitSink1Index);
            }
            break;
    }
}
void MockWdmSubscriptionInitiatorImpl::DumpPublisherTraits(void)
{
    switch (mTestCaseId)
    {
        case kTestCase_IntegrationTrait:
        case kTestCase_RejectIncomingSubscribeRequest:
            DumpPublisherTraitChecksum(kLocaleCapabilitiesSourceIndex);
            break;
        case kTestCase_TestTrait:
            DumpPublisherTraitChecksum(kTestATraitSource0Index);
            DumpPublisherTraitChecksum(kTestATraitSource1Index);
            DumpPublisherTraitChecksum(kTestBTraitSourceIndex);
            break;
        case kTestCase_TestOversizeTrait1:
        case kTestCase_TestOversizeTrait2:
            DumpPublisherTraitChecksum(kTestATraitSource0Index);
            DumpPublisherTraitChecksum(kTestATraitSource1Index);
            break;
    }
}

void MockWdmSubscriptionInitiatorImpl::EngineEventCallback (void * const aAppState,
    SubscriptionEngine::EventID aEvent,
    const SubscriptionEngine::InEventParam & aInParam, SubscriptionEngine::OutEventParam & aOutParam)
{
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);
    switch (aEvent)
    {
    case SubscriptionEngine::kEvent_OnIncomingSubscribeRequest:
        WeaveLogDetail(DataManagement, "Engine->kEvent_OnIncomingSubscribeRequest peer = 0x%" PRIX64, aInParam.mIncomingSubscribeRequest.mEC->PeerNodeId);
        aOutParam.mIncomingSubscribeRequest.mHandlerAppState = initiator;
        aOutParam.mIncomingSubscribeRequest.mHandlerEventCallback = MockWdmSubscriptionInitiatorImpl::PublisherEventCallback;
        aOutParam.mIncomingSubscribeRequest.mRejectRequest = false;

        aInParam.mIncomingSubscribeRequest.mBinding->SetDefaultResponseTimeout(kResponseTimeoutMsec);
        aInParam.mIncomingSubscribeRequest.mBinding->SetDefaultWRMPConfig(gWRMPConfig);

        break;
    default:
        SubscriptionEngine::DefaultEventHandler(aEvent, aInParam, aOutParam);
        break;
    }
}

void MockWdmSubscriptionInitiatorImpl::AddNewVersion(int aTraitDataSinkIndex)
{
    VersionNode * curr = &mTraitVersionSet[aTraitDataSinkIndex];
    while (curr->next != NULL)
    {
        curr = curr->next;
    }

    if (curr->versionInfo != mSinkCatalogStore[aTraitDataSinkIndex].mItem->GetVersion())
    {
        VersionNode * updatingVersion = (VersionNode *)malloc(sizeof(VersionNode));
        WeaveLogDetail(DataManagement, "Trait %u version is changed %" PRIu64 " ---> %" PRIu64, aTraitDataSinkIndex, curr->versionInfo, mSinkCatalogStore[aTraitDataSinkIndex].mItem->GetVersion());
        updatingVersion->versionInfo = mSinkCatalogStore[aTraitDataSinkIndex].mItem->GetVersion();
        updatingVersion->next = NULL;
        curr->next = updatingVersion;
    }
}

void MockWdmSubscriptionInitiatorImpl::PrintVersionsLog()
{
    for (int i = 0; i< kMaxNumTraitHandles; i++)
    {
        VersionNode * pre = &mTraitVersionSet[i];
        VersionNode * curr = mTraitVersionSet[i].next;

        printf("Initiator's trait %u versions log is : %" PRIu64, i, pre->versionInfo);
        while (curr != NULL)
        {
            pre = curr;
            curr = curr->next;
            printf(" ==> %" PRIu64, pre->versionInfo);
        }
        printf("\n");
    }
}

void MockWdmSubscriptionInitiatorImpl::ClearDataSinkIterator(void *aTraitInstance, TraitDataHandle aHandle, void *aContext)
{
    MockTraitDataSink *traitInstance = static_cast<MockTraitDataSink *>(aTraitInstance);
    traitInstance->ResetDataSink();
}

void MockWdmSubscriptionInitiatorImpl::ClearDataSinkState(void)
{
    mSinkCatalog.Iterate(MockWdmSubscriptionInitiatorImpl::ClearDataSinkIterator, NULL);
    mClearDataSink = true;
}

void MockWdmSubscriptionInitiatorImpl::ClientEventCallback (void * const aAppState, SubscriptionClient::EventID aEvent,
    const SubscriptionClient::InEventParam & aInParam, SubscriptionClient::OutEventParam & aOutParam)
{
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);

    switch (aEvent)
    {
    case SubscriptionClient::kEvent_OnExchangeStart:
        WeaveLogDetail(DataManagement, "Client->kEvent_OnExchangeStart");
        break;
    case SubscriptionClient::kEvent_OnSubscribeRequestPrepareNeeded:
        WeaveLogDetail(DataManagement, "Client->kEvent_OnSubscribeRequestPrepareNeeded");
        if (initiator->mTestCaseId >= kTestCase_CompatibleVersionedRequest && initiator->mTestCaseId <= kTestCase_IncompatibleVersionedRequest) {
            aOutParam.mSubscribeRequestPrepareNeeded.mVersionedPathList = initiator->mVersionedTraitPaths;
        }
        else {
            aOutParam.mSubscribeRequestPrepareNeeded.mPathList = initiator->mTraitPaths;
        }

        aOutParam.mSubscribeRequestPrepareNeeded.mPathListSize = initiator->mNumPaths;
        aOutParam.mSubscribeRequestPrepareNeeded.mNeedAllEvents = true;
        aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList = NULL;
        aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize = 0;
        aOutParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin = gMinimumTimeBetweenLivenessCheckSec;
        aOutParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax = 3600;
        break;

    case SubscriptionClient::kEvent_OnSubscriptionEstablished:
        WeaveLogDetail(DataManagement, "Client->kEvent_OnSubscriptionEstablished");
        WeaveLogDetail(DataManagement, "Liveness Timeout: %u msec", aInParam.mSubscriptionEstablished.mClient->GetLivenessTimeoutMsec());
        if (gIsMutualSubscription == false)
        {
            gOnewaySubscriptionEstablished = true;

            if (gNumDataChangeBeforeCancellation != 0)
            {
                initiator->mExchangeMgr->MessageLayer->SystemLayer->StartTimer(gTimeBetweenDataChangeMsec, HandleDataFlipTimeout, initiator);
            }
            else
            {
                if (gFinalStatus != kIdle)
                {
                    switch (gFinalStatus)
                    {
                    case kPublisherCancel:
                    case kPublisherAbort:
                        initiator->mExchangeMgr->MessageLayer->SystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorPublisherCurrentState, initiator);
                        break;
                    case kClientCancel:
                    case kClientAbort:
                        initiator->mExchangeMgr->MessageLayer->SystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorClientCurrentState, initiator);
                        break;
                    default:
                        break;
                    }
                }
            }
        }
        break;
    case SubscriptionClient::kEvent_OnNotificationRequest:
        WeaveLogDetail(DataManagement, "Client->kEvent_OnNotificationRequest");
        break;
    case SubscriptionClient::kEvent_OnNotificationProcessed:
        WeaveLogDetail(DataManagement, "Client->kEvent_OnNotificationProcessed");

        switch (initiator->mTestCaseId)
        {
        case kTestCase_IntegrationTrait:
        case kTestCase_RejectIncomingSubscribeRequest:
            initiator->AddNewVersion(initiator->kLocaleSettingsSinkIndex);
            initiator->AddNewVersion(initiator->kApplicationKeysTraitSinkIndex);
            break;
        case kTestCase_TestTrait:
            initiator->AddNewVersion(initiator->kTestATraitSink0Index);
            initiator->AddNewVersion(initiator->kTestATraitSink1Index);
            initiator->AddNewVersion(initiator->kTestBTraitSinkIndex);
            initiator->AddNewVersion(initiator->kLocaleSettingsSinkIndex);
            break;
        case kTestCase_TestOversizeTrait1:
            initiator->AddNewVersion(initiator->kTestATraitSink0Index);
            initiator->AddNewVersion(initiator->kTestATraitSink1Index);
            initiator->AddNewVersion(initiator->kLocaleSettingsSinkIndex);
            break;
       case kTestCase_TestOversizeTrait2:
            if (gTestCase_TestOversizeTrait2DumpFlip)
            {
                initiator->AddNewVersion(initiator->kLocaleSettingsSinkIndex);
            }
            else
            {
                initiator->AddNewVersion(initiator->kTestATraitSink0Index);
                initiator->AddNewVersion(initiator->kTestATraitSink1Index);
            }
            break;
        }

        initiator->DumpClientTraits();

        if (initiator->mTestCaseId == kTestCase_TestOversizeTrait2)
        {
            gTestCase_TestOversizeTrait2DumpFlip = !gTestCase_TestOversizeTrait2DumpFlip;
        }

        break;
    case SubscriptionClient::kEvent_OnSubscriptionTerminated:
        WeaveLogDetail(DataManagement, "Client->kEvent_OnSubscriptionTerminated. Reason: %u, peer = 0x%" PRIX64 "\n",
                aInParam.mSubscriptionTerminated.mReason,
                aInParam.mSubscriptionTerminated.mClient->GetPeerNodeId());

        gInitiatorState.mDataflipCount = 1;
        initiator->mWillRetry = aInParam.mSubscriptionTerminated.mWillRetry;

        switch (gFinalStatus)
        {
        case kPublisherCancel:
        case kPublisherAbort:
            initiator->mExchangeMgr->MessageLayer->SystemLayer->CancelTimer(MonitorPublisherCurrentState, initiator);
            break;
        case kClientCancel:
        case kClientAbort:
            initiator->mExchangeMgr->MessageLayer->SystemLayer->CancelTimer(MonitorClientCurrentState, initiator);
            break;
        case kIdle:
        default:
            break;
        }

        if (initiator->mEnableRetry == false || initiator->mWillRetry == false)
        {
            if (gEvaluateSuccessIteration == true)
            {
                WeaveLogDetail(DataManagement, "Mutual: Good Iteration");
                gEvaluateSuccessIteration = false;
            }
            if (gNumDataChangeBeforeCancellation != 0)
            {
                initiator->mExchangeMgr->MessageLayer->SystemLayer->CancelTimer(HandleDataFlipTimeout, initiator);
            }
            HandleClientRelease(initiator);
            initiator->onCompleteTest();
        }
        break;
    default:
        SubscriptionClient::DefaultEventHandler(aEvent, aInParam, aOutParam);
        break;
    }
}

void MockWdmSubscriptionInitiatorImpl::PublisherEventCallback (void * const aAppState,
        SubscriptionHandler::EventID aEvent, const SubscriptionHandler::InEventParam & aInParam, SubscriptionHandler::OutEventParam & aOutParam)
{
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    switch (aEvent)
    {
    case SubscriptionHandler::kEvent_OnSubscribeRequestParsed:
        WeaveLogDetail(DataManagement, "Publisher->kEvent_OnSubscribeRequestParsed");

        // ideally this number should be set to something for cloud service, and something else for everyone else
        // we can potentially copy this from the client side, but it would take considerable amount of code to be generic enough
        // setting to some constant here seems to be easier

        aInParam.mSubscribeRequestParsed.mHandler->GetBinding()->SetDefaultResponseTimeout(kResponseTimeoutMsec);
        aInParam.mSubscribeRequestParsed.mHandler->GetBinding()->SetDefaultWRMPConfig(gWRMPConfig);

        if (NULL != initiator->mSubscriptionClient)
        {
            if (aInParam.mSubscribeRequestParsed.mIsSubscriptionIdValid)
            {
                uint64_t subscriptionId;
                err = initiator->mSubscriptionClient->GetSubscriptionId(&subscriptionId);
                SuccessOrExit(err);

                // subscription ID is largely peer-specific
                if ((aInParam.mSubscribeRequestParsed.mEC->PeerNodeId == initiator->mBinding->GetPeerNodeId()) &&
                    (aInParam.mSubscribeRequestParsed.mSubscriptionId == subscriptionId))
                {
                    WeaveLogDetail(DataManagement, "Request for mutual subscription found");
                }
            }
        }

        // AcceptSubscribeRequest and EndSubscription may be used either sync or async, to move the state machine forward
        aInParam.mSubscribeRequestParsed.mHandler->AcceptSubscribeRequest();

        break;

    case SubscriptionHandler::kEvent_OnExchangeStart:
        WeaveLogDetail(DataManagement, "Publisher->kEvent_OnExchangeStart");
        break;

    case SubscriptionHandler::kEvent_OnSubscriptionEstablished:
        if (true == mClearDataSink || true == gCleanStatus)
        {
            initiator->DumpPublisherTraits();
            gCleanStatus = false;
        }

        WeaveLogDetail(DataManagement, "Publisher->kEvent_OnSubscriptionEstablished");
        gMutualSubscriptionEstablished = true;
        gSubscriptionHandler = aInParam.mSubscriptionEstablished.mHandler;
        if (gNumDataChangeBeforeCancellation != 0)
        {
            initiator->mExchangeMgr->MessageLayer->SystemLayer->StartTimer(gTimeBetweenDataChangeMsec, HandleDataFlipTimeout, initiator);
        }
        else
        {
            if (gFinalStatus != kIdle)
            {
                switch (gFinalStatus)
                {
                case kPublisherCancel:
                case kPublisherAbort:
                    initiator->mExchangeMgr->MessageLayer->SystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorPublisherCurrentState, initiator);
                    break;
                case kClientCancel:
                case kClientAbort:
                    initiator->mExchangeMgr->MessageLayer->SystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorClientCurrentState, initiator);
                    break;
                default:
                    break;
                }
            }
        }
        break;

    case SubscriptionHandler::kEvent_OnSubscriptionTerminated:
        WeaveLogDetail(DataManagement, "Pub: kEvent_OnSubscriptionTerminated, Reason = %d, peer = 0x%" PRIX64 "\n",
                aInParam.mSubscriptionTerminated.mReason,
                aInParam.mSubscriptionTerminated.mHandler->GetPeerNodeId());
        switch (gFinalStatus)
        {
        case kPublisherCancel:
        case kPublisherAbort:
            initiator->mExchangeMgr->MessageLayer->SystemLayer->CancelTimer(MonitorPublisherCurrentState, initiator);
            break;
        case kClientCancel:
        case kClientAbort:
            initiator->mExchangeMgr->MessageLayer->SystemLayer->CancelTimer(MonitorClientCurrentState, initiator);
            break;
        case kIdle:
        default:
            break;
        }

        if (gNumDataChangeBeforeCancellation != 0)
        {
            initiator->mExchangeMgr->MessageLayer->SystemLayer->CancelTimer(HandleDataFlipTimeout, initiator);
        }

        if (initiator->mEnableRetry == false || initiator->mWillRetry == false)
        {
            HandlePublisherRelease();
            HandleClientRelease(initiator);
            if (gEvaluateSuccessIteration == true)
            {
                WeaveLogDetail(DataManagement, "Mutual: Good Iteration");
                gEvaluateSuccessIteration = false;
            }
            gMutualSubscriptionEstablished = false;
            initiator->onCompleteTest();
        }
        break;

    default:
        SubscriptionHandler::DefaultEventHandler(aEvent, aInParam, aOutParam);
        break;
    }

exit:
    WeaveLogFunctError(err);
}

void MockWdmSubscriptionInitiatorImpl::HandleClientComplete(void *aAppState)
{
    WEAVE_ERROR err;
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);

    if (gIsMutualSubscription == true)
    {
        gEvaluateSuccessIteration = true;
        initiator->mWillRetry = false;
    }

    if (NULL != initiator->mSubscriptionClient)
    {
        if (gFinalStatus == kClientCancel)
        {
            err = initiator->mSubscriptionClient->EndSubscription();
            if (err != WEAVE_NO_ERROR)
            {
                initiator->mSubscriptionClient->AbortSubscription();
            }
        }
        if (gFinalStatus == kClientAbort)
        {
            (void)initiator->mSubscriptionClient->AbortSubscription();
        }
    }
}

void MockWdmSubscriptionInitiatorImpl::HandleClientRelease(void *aAppState)
{
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);

    if (NULL != initiator->mSubscriptionClient)
    {
        initiator->mSubscriptionClient->Free();
        initiator->mSubscriptionClient = NULL;
    }

    if (NULL !=initiator->mBinding)
    {
        initiator->mBinding->Release();
        initiator->mBinding = NULL;
    }
}

void MockWdmSubscriptionInitiatorImpl::HandlePublisherComplete()
{

    if (gIsMutualSubscription == true)
    {
        gEvaluateSuccessIteration = true;
    }

    if (NULL != gSubscriptionHandler)
    {
        if (gFinalStatus == kPublisherCancel)
        {
            (void)gSubscriptionHandler->EndSubscription();

        }
        if (gFinalStatus == kPublisherAbort)
        {
            (void)gSubscriptionHandler->AbortSubscription();
        }
    }
}

void MockWdmSubscriptionInitiatorImpl::HandlePublisherRelease()
{
    gSubscriptionHandler = NULL;
}

void MockWdmSubscriptionInitiatorImpl::HandleDataFlipTimeout(nl::Weave::System::Layer* aSystemLayer, void *aAppState,
    nl::Weave::System::Error aErr)
{
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);

    IgnoreUnusedVariable(aErr);

    if (gIsMutualSubscription == true && gMutualSubscriptionEstablished == false)
    {
        WeaveLogDetail(DataManagement, "mutual subscription cannot be established, and do nothing until response timeout happens");
        return;
    }

    if (gIsMutualSubscription == false && gOnewaySubscriptionEstablished == false)
    {
        WeaveLogDetail(DataManagement, "one way subscription cannot be established, and do nothing until response timeout happens");
        return;
    }

    if (gIsMutualSubscription == true && gEnableDataFlip == true) {
        WeaveLogDetail(DataManagement, "\n\n\n\n\nFlipping data...");

        switch (initiator->mTestCaseId)
        {
        case kTestCase_IntegrationTrait:
        case kTestCase_RejectIncomingSubscribeRequest:
            initiator->mLocaleCapabilitiesDataSource.Mutate();
            SubscriptionEngine::GetInstance()->GetNotificationEngine()->Run();
            break;
        case kTestCase_TestTrait:
            initiator->mTestATraitDataSource0.Mutate();
            initiator->mTestATraitDataSource1.Mutate();
            initiator->mTestBTraitDataSource.Mutate();
            SubscriptionEngine::GetInstance()->GetNotificationEngine()->Run();
            break;
        case kTestCase_TestOversizeTrait1:
        case kTestCase_TestOversizeTrait2:
            initiator->mTestATraitDataSource0.Mutate();
            initiator->mTestATraitDataSource1.Mutate();
            SubscriptionEngine::GetInstance()->GetNotificationEngine()->Run();
            break;
        }
        initiator->DumpPublisherTraits();
    }

    if (gNumDataChangeBeforeCancellation == -1)
    {
        WeaveLogDetail(DataManagement, "immortal, no cancel or abort, completed cycle %d", gInitiatorState.mDataflipCount);
        // alter data every gTimeBetweenDataChangeMsec milliseconds
        aSystemLayer->StartTimer(gTimeBetweenDataChangeMsec, HandleDataFlipTimeout, initiator);
        ++gInitiatorState.mDataflipCount;
    }
    else
    {
        WeaveLogDetail(DataManagement, "Completed cycle %d per %d", gInitiatorState.mDataflipCount, gNumDataChangeBeforeCancellation);
        if (gInitiatorState.mDataflipCount == gNumDataChangeBeforeCancellation)
        {
            gInitiatorState.mDataflipCount = 1;
            if (gIsMutualSubscription)
            {
                switch (gFinalStatus)
                {
                case kPublisherCancel:
                case kPublisherAbort:
                    aSystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorPublisherCurrentState, initiator);
                    break;
                case kClientCancel:
                case kClientAbort:
                    aSystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorClientCurrentState, initiator);
                    break;
                }
            }
            else
            {
                aSystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorClientCurrentState, initiator);
            }
        }
        else
        {
            // alter data every gTimeBetweenDataChangeMsec milliseconds
            ++gInitiatorState.mDataflipCount;
            aSystemLayer->StartTimer(gTimeBetweenDataChangeMsec, HandleDataFlipTimeout, initiator);
        }

    }

}

void MockWdmSubscriptionInitiatorImpl::MonitorPublisherCurrentState (nl::Weave::System::Layer* aSystemLayer, void *aAppState, INET_ERROR aErr)
{
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);
    if (NULL != gSubscriptionHandler)
    {
        if (initiator->mSubscriptionClient->IsEstablishedIdle() && gSubscriptionHandler->IsEstablishedIdle())
        {
            WeaveLogDetail(DataManagement, "state transitions to idle within %d msec", kMonitorCurrentStateInterval * kMonitorCurrentStateCnt);
            gInitiatorState.mPublisherStateCount = 1;
            HandlePublisherComplete();
        }
        else
        {
            if (gInitiatorState.mPublisherStateCount < kMonitorCurrentStateCnt)
            {
                gInitiatorState.mPublisherStateCount ++;
                aSystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorPublisherCurrentState, initiator);
            }
            else
            {
                gInitiatorState.mPublisherStateCount = 1;
                WeaveLogDetail(DataManagement, "state is not idle or aborted within %d msec", kMonitorCurrentStateInterval * kMonitorCurrentStateCnt);
                (void)initiator->mSubscriptionClient->AbortSubscription();
                HandlePublisherRelease();
                HandleClientRelease(initiator);
                initiator->onCompleteTest();
            }
        }
    }
    else
    {
        WeaveLogDetail(DataManagement, "gSubscriptionHandler is NULL, and current session is torn down");
        (void)initiator->mSubscriptionClient->AbortSubscription();
        HandlePublisherRelease();
        HandleClientRelease(initiator);
        initiator->onCompleteTest();
    }
}

void MockWdmSubscriptionInitiatorImpl::MonitorClientCurrentState (nl::Weave::System::Layer* aSystemLayer, void *aAppState, INET_ERROR aErr)
{
    MockWdmSubscriptionInitiatorImpl * const initiator = reinterpret_cast<MockWdmSubscriptionInitiatorImpl *>(aAppState);
    if (NULL != initiator->mSubscriptionClient)
    {
        if (initiator->mSubscriptionClient->IsEstablishedIdle() && (gIsMutualSubscription == false || gSubscriptionHandler->IsEstablishedIdle()))
        {
            WeaveLogDetail(DataManagement, "state transitions to idle within %d msec", kMonitorCurrentStateInterval * kMonitorCurrentStateCnt);
            gInitiatorState.mClientStateCount = 1;
            HandleClientComplete(initiator);

            if (gIsMutualSubscription == false)
            {
                HandleClientRelease(initiator);
                WeaveLogDetail(DataManagement, "One_way: Good Iteration");
                initiator->onCompleteTest();
            }
        }
        else
        {
            if (gInitiatorState.mClientStateCount < kMonitorCurrentStateCnt)
            {
                gInitiatorState.mClientStateCount++;
                aSystemLayer->StartTimer(kMonitorCurrentStateInterval, MonitorClientCurrentState, initiator);
            }
            else
            {
                gInitiatorState.mClientStateCount = 1;
                WeaveLogDetail(DataManagement, "state is not idle or aborted within %d msec", kMonitorCurrentStateInterval * kMonitorCurrentStateCnt);
                (void)initiator->mSubscriptionClient->AbortSubscription();
                HandlePublisherRelease();
                HandleClientRelease(initiator);
                initiator->onCompleteTest();

            }
        }
    }
    else
    {
        WeaveLogDetail(DataManagement, "mSubscriptionClient is NULL, and current session is torn down");
        HandlePublisherRelease();
        HandleClientRelease(initiator);
        initiator->onCompleteTest();
    }
}

#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
