blob: c6eb6301a0619c8b4b85025c206883dce18a10aa [file] [log] [blame] [edit]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import io
import struct
from binascii import hexlify
from enum import IntEnum
import common
class TlvType(IntEnum):
TARGET_EID = 0
MAC_EXTENDED_ADDRESS = 1
RLOC16 = 2
ML_EID = 3
STATUS = 4
TIME_SINCE_LAST_TRANSACTION = 6
ROUTER_MASK = 7
ND_OPTION = 8
ND_DATA = 9
THREAD_NETWORK_DATA = 10
MLE_ROUTING = 11
class StatusValues(IntEnum):
SUCCESS = 0
NO_ADDRESS_AVAILABLE = 1
TOO_FEW_ROUTERS = 2
HAVE_CHILD_ID_REQUEST = 3
PARENT_PARTITION_CHANGE = 4
class TargetEid(object):
def __init__(self, eid):
self._eid = eid
@property
def eid(self):
return self._eid
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.eid == other.eid
def __repr__(self):
return "TargetEid(eid={})".format(hexlify(self.eid))
class TargetEidFactory(object):
def parse(self, data, message_info):
eid = bytearray(data.read(16))
return TargetEid(eid)
class MacExtendedAddress(object):
def __init__(self, mac_address):
self._mac_address = mac_address
@property
def mac_address(self):
return self._mac_address
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.mac_address == other.mac_address
def __repr__(self):
return "MacExtendedAddress(mac_address={})".format(hexlify(self.mac_address))
class MacExtendedAddressFactory(object):
def parse(self, data, message_info):
mac_address = bytearray(data.read(8))
return MacExtendedAddress(mac_address)
class Rloc16(object):
def __init__(self, rloc16):
self._rloc16 = rloc16
@property
def rloc16(self):
return self._rloc16
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.rloc16 == other.rloc16
def __repr__(self):
return "Rloc16(rloc16={})".format(hex(self.rloc16))
class Rloc16Factory(object):
def parse(self, data, message_info):
rloc16 = struct.unpack(">H", data.read(2))[0]
return Rloc16(rloc16)
class MlEid(object):
def __init__(self, ml_eid):
self._ml_eid = ml_eid
@property
def ml_eid(self):
return self._ml_eid
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.ml_eid == other.ml_eid
def __repr__(self):
return "MlEid(ml_eid={})".format(hexlify(self.ml_eid))
class MlEidFactory(object):
def parse(self, data, message_info):
ml_eid = bytearray(data.read(8))
return MlEid(ml_eid)
class Status(object):
def __init__(self, status):
self._status = status
@property
def status(self):
return self._status
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.status == other.status
def __repr__(self):
return "Status(status={})".format(self.status)
class StatusFactory(object):
def parse(self, data, message_info):
status = StatusValues(ord(data.read(1)))
return Status(status)
class TimeSinceLastTransaction(object):
def __init__(self, seconds):
self._seconds = seconds
@property
def seconds(self):
return self._seconds
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.seconds == other.seconds
def __repr__(self):
return "TimeSinceLastTransaction(seconds={})".format(self.seconds)
class TimeSinceLastTransactionFactory(object):
def parse(self, data, message_info):
seconds = struct.unpack(">L", data.read(4))[0]
return TimeSinceLastTransaction(seconds)
class RouterMask(object):
def __init__(self, id_sequence, router_id_mask):
self._id_sequence = id_sequence
self._router_id_mask = router_id_mask
@property
def id_sequence(self):
return self._id_sequence
@property
def router_id_mask(self):
return self._router_id_mask
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.id_sequence == other.id_sequence and self.router_id_mask == other.router_id_mask
def __repr__(self):
return "RouterMask(id_sequence={}, router_id_mask={})".format(self.id_sequence, hex(self.router_id_mask))
class RouterMaskFactory(object):
def parse(self, data, message_info):
id_sequence = ord(data.read(1))
router_id_mask = struct.unpack(">Q", data.read(8))[0]
return RouterMask(id_sequence, router_id_mask)
class NdOption(object):
def __init__(self, options):
self._options = options
@property
def options(self):
return self._options
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.options == other.options
def __repr__(self):
return "NdOption(options=[{}])".format(", ".join([str(opt) for opt in self.options]))
class NdOptionFactory(object):
def parse(self, data, message_info):
options = [opt for opt in bytearray(data.read())]
return NdOption(options)
class NdData(object):
# TODO: Not implemented yet
pass
class NdDataFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
class ThreadNetworkData(object):
def __init__(self, tlvs):
self._tlvs = tlvs
@property
def tlvs(self):
return self._tlvs
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.tlvs == other.tlvs
def __repr__(self):
return "ThreadNetworkData(tlvs=[{}])".format(", ".join([str(tlv) for tlv in self.tlvs]))
class ThreadNetworkDataFactory(object):
def __init__(self, network_data_tlvs_factory):
self._network_data_tlvs_factory = network_data_tlvs_factory
def parse(self, data, message_info):
tlvs = self._network_data_tlvs_factory.parse(data, message_info)
return ThreadNetworkData(tlvs)
class NetworkLayerTlvsFactory(object):
def __init__(self, tlvs_factories):
self._tlvs_factories = tlvs_factories
def parse(self, data, message_info):
tlvs = []
while data.tell() < len(data.getvalue()):
_type = ord(data.read(1))
length = ord(data.read(1))
factory = self._tlvs_factories[_type]
tlv = factory.parse(io.BytesIO(data.read(length)), message_info)
tlvs.append(tlv)
return tlvs