blob: 24135dde609b9d8b1f170d30981922f56d0ed62e [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import io
import ipaddress
import struct
from binascii import hexlify
import common
import network_data
from enum import IntEnum
class CommandType(IntEnum):
LINK_REQUEST = 0
LINK_ACCEPT = 1
LINK_ACCEPT_AND_REQUEST = 2
LINK_REJECT = 3
ADVERTISEMENT = 4
UPDATE = 5
UPDATE_REQUEST = 6
DATA_REQUEST = 7
DATA_RESPONSE = 8
PARENT_REQUEST = 9
PARENT_RESPONSE = 10
CHILD_ID_REQUEST = 11
CHILD_ID_RESPONSE = 12
CHILD_UPDATE_REQUEST = 13
CHILD_UPDATE_RESPONSE = 14
ANNOUNCE = 15
DISCOVERY_REQUEST = 16
DISCOVERY_RESPONSE = 17
class TlvType(IntEnum):
SOURCE_ADDRESS = 0
MODE = 1
TIMEOUT = 2
CHALLENGE = 3
RESPONSE = 4
LINK_LAYER_FRAME_COUNTER = 5
MLE_FRAME_COUNTER = 8
ROUTE64 = 9
ADDRESS16 = 10
LEADER_DATA = 11
NETWORK_DATA = 12
TLV_REQUEST = 13
SCAN_MASK = 14
CONNECTIVITY = 15
LINK_MARGIN = 16
STATUS = 17
VERSION = 18
ADDRESS_REGISTRATION = 19
CHANNEL = 20
PANID = 21
ACTIVE_TIMESTAMP = 22
PENDING_TIMESTAMP = 23
ACTIVE_OPERATIONAL_DATASET = 24
PENDING_OPERATIONAL_DATASET = 25
THREAD_DISCOVERY = 26
class SourceAddress(object):
def __init__(self, address):
self._address = address
@property
def address(self):
return self._address
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.address == other.address
def __repr__(self):
return "SourceAddress(address={})".format(hex(self._address))
class SourceAddressFactory:
def parse(self, data, message_info):
address = struct.unpack(">H", data.read(2))[0]
return SourceAddress(address)
class Mode(object):
def __init__(self, receiver, secure, device_type, network_data):
self._receiver = receiver
self._secure = secure
self._device_type = device_type
self._network_data = network_data
@property
def receiver(self):
return self._receiver
@property
def secure(self):
return self._secure
@property
def device_type(self):
return self._device_type
@property
def network_data(self):
return self._network_data
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.receiver == other.receiver and \
self.secure == other.secure and \
self.device_type == other.device_type and \
self.network_data == other.network_data
def __repr__(self):
return "Mode(receiver={}, secure={}, device_type={}, network_data={})".format(self.receiver,
self.secure,
self.device_type,
self.network_data)
class ModeFactory:
def parse(self, data, message_info):
mode = ord(data.read(1))
receiver = (mode >> 3) & 0x01
secure = (mode >> 2) & 0x01
device_type = (mode >> 1) & 0x01
network_data = (mode >> 0) & 0x01
return Mode(receiver, secure, device_type, network_data)
class Timeout(object):
def __init__(self, timeout):
self._timeout = timeout
@property
def timeout(self):
return self._timeout
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.timeout == other.timeout
def __repr__(self):
return "Timeout(timeout={})".format(self.timeout)
class TimeoutFactory:
def parse(self, data, message_info):
timeout = struct.unpack(">I", data.read(4))[0]
return Timeout(timeout)
class Challenge(object):
def __init__(self, challenge):
self._challenge = challenge
@property
def challenge(self):
return self._challenge
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.challenge == other.challenge
def __repr__(self):
return "Challenge(challenge={})".format(hexlify(self.challenge))
class ChallengeFactory:
def parse(self, data, message_info):
challenge = data.read()
return Challenge(challenge)
class Response(object):
def __init__(self, response):
self._response = response
@property
def response(self):
return self._response
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.response == other.response
def __repr__(self):
return "Response(response={})".format(hexlify(self.response))
class ResponseFactory:
def parse(self, data, message_info):
response = data.read()
return Response(response)
class LinkLayerFrameCounter(object):
def __init__(self, frame_counter):
self._frame_counter = frame_counter
@property
def frame_counter(self):
return self._frame_counter
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.frame_counter == other.frame_counter
def __repr__(self):
return "LinkLayerFrameCounter(frame_counter={})".format(self.frame_counter)
class LinkLayerFrameCounterFactory:
def parse(self, data, message_info):
frame_counter = struct.unpack(">I", data.read(4))[0]
return LinkLayerFrameCounter(frame_counter)
class MleFrameCounter(object):
def __init__(self, frame_counter):
self._frame_counter = frame_counter
@property
def frame_counter(self):
return self._frame_counter
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.frame_counter == other.frame_counter
def __repr__(self):
return "MleFrameCounter(frame_counter={})".format(self.frame_counter)
class MleFrameCounterFactory:
def parse(self, data, message_info):
frame_counter = struct.unpack(">I", data.read(4))[0]
return MleFrameCounter(frame_counter)
class LinkQualityAndRouteData(object):
def __init__(self, output, _input, route):
self._output = output
self._input = _input
self._route = route
@property
def output(self):
return self._output
@property
def input(self):
return self._input
@property
def route(self):
return self._route
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.output == other.output and self.input == other.input and self.route == other.route
def __repr__(self):
return "LinkQualityAndRouteData(ouput={}, input={}, route={})".format(self.output, self.input, self.route)
class LinkQualityAndRouteDataFactory:
def parse(self, data, message_info):
lqrd = ord(data.read(1))
output = (lqrd >> 6) & 0x3
_input = (lqrd >> 4) & 0x3
route = lqrd & 0x0f
return LinkQualityAndRouteData(output, _input, route)
class Route64(object):
def __init__(self, id_sequence, router_id_mask, link_quality_and_route_data):
self._id_sequence = id_sequence
self._router_id_mask = router_id_mask
self._link_quality_and_route_data = link_quality_and_route_data
@property
def id_sequence(self):
return self._id_sequence
@property
def router_id_mask(self):
return self._router_id_mask
@property
def link_quality_and_route_data(self):
return self._link_quality_and_route_data
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.id_sequence == other.id_sequence and \
self.router_id_mask == other.router_id_mask and \
self.link_quality_and_route_data == other.link_quality_and_route_data
def __repr__(self):
lqrd_str = ", ".join(["{}".format(lqrd) for lqrd in self.link_quality_and_route_data])
return "Route64(id_sequence={}, router_id_mask={}, link_quality_and_route_data=[{}])".format(
self.id_sequence, hex(self.router_id_mask), lqrd_str)
class Route64Factory:
def __init__(self, link_quality_and_route_data_factory):
self._lqrd_factory = link_quality_and_route_data_factory
def parse(self, data, message_info):
id_sequence = ord(data.read(1))
router_id_mask = struct.unpack(">Q", data.read(8))[0]
link_quality_and_route_data = []
while data.tell() < len(data.getvalue()):
link_quality_and_route_data.append(self._lqrd_factory.parse(data, message_info))
return Route64(id_sequence, router_id_mask, link_quality_and_route_data)
class Address16(object):
def __init__(self, address):
self._address = address
@property
def address(self):
return self._address
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.address == other.address
def __repr__(self):
return "Address16(address={})".format(hex(self.address))
class Address16Factory:
def parse(self, data, message_info):
address = struct.unpack(">H", data.read(2))[0]
return Address16(address)
class LeaderData(object):
def __init__(self, partition_id, weighting, data_version, stable_data_version, leader_router_id):
self._partition_id = partition_id
self._weighting = weighting
self._data_version = data_version
self._stable_data_version = stable_data_version
self._leader_router_id = leader_router_id
@property
def partition_id(self):
return self._partition_id
@property
def weighting(self):
return self._weighting
@property
def data_version(self):
return self._data_version
@property
def stable_data_version(self):
return self._stable_data_version
@property
def leader_router_id(self):
return self._leader_router_id
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.partition_id == other.partition_id and \
self.weighting == other.weighting and \
self.data_version == other.data_version and \
self.stable_data_version == other.stable_data_version and \
self.leader_router_id == other.leader_router_id
def __repr__(self):
return "LeaderData(partition_id={}, weighting={}, data_version={}, stable_data_version={}, leader_router_id={})".format(
self.partition_id, self.weighting, self.data_version, self.stable_data_version, self.leader_router_id)
class LeaderDataFactory:
def parse(self, data, message_info):
partition_id = struct.unpack(">I", data.read(4))[0]
weighting = ord(data.read(1))
data_version = ord(data.read(1))
stable_data_version = ord(data.read(1))
leader_router_id = ord(data.read(1))
return LeaderData(partition_id, weighting, data_version, stable_data_version, leader_router_id)
class NetworkData(object):
def __init__(self, tlvs):
self._tlvs = tlvs
@property
def tlvs(self):
return self._tlvs
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.tlvs == other.tlvs
def __repr__(self):
tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs])
return "NetworkData(tlvs=[{}])".format(tlvs_str)
class NetworkDataFactory:
def __init__(self, network_data_tlvs_factory):
self._tlvs_factory = network_data_tlvs_factory
def parse(self, data, message_info):
tlvs = self._tlvs_factory.parse(data, message_info)
return NetworkData(tlvs)
class TlvRequest(object):
def __init__(self, tlvs):
self._tlvs = tlvs
@property
def tlvs(self):
return self._tlvs
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.tlvs == other.tlvs
def __repr__(self):
tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs])
return "TlvRequest(tlvs=[{}])".format(tlvs_str)
class TlvRequestFactory:
def parse(self, data, message_info):
tlvs = [b for b in bytearray(data.read())]
return TlvRequest(tlvs)
class ScanMask(object):
def __init__(self, router, end_device):
self._router = router
self._end_device = end_device
@property
def router(self):
return self._router
@property
def end_device(self):
return self._end_device
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.router == other.router and self.end_device == other.end_device
def __repr__(self):
return "ScanMask(router={}, end_device={})".format(self.router, self.end_device)
class ScanMaskFactory:
def parse(self, data, message_info):
scan_mask = ord(data.read(1))
router = (scan_mask >> 7) & 0x01
end_device = (scan_mask >> 6) & 0x01
return ScanMask(router, end_device)
class Connectivity(object):
def __init__(self,
pp,
link_quality_3,
link_quality_2,
link_quality_1,
leader_cost,
id_sequence,
active_routers,
sed_buffer_size=None,
sed_datagram_count=None):
self._pp = pp
self._link_quality_3 = link_quality_3
self._link_quality_2 = link_quality_2
self._link_quality_1 = link_quality_1
self._leader_cost = leader_cost
self._id_sequence = id_sequence
self._active_routers = active_routers
self._sed_buffer_size = sed_buffer_size
self._sed_datagram_count = sed_datagram_count
@property
def pp(self):
return self._pp
@property
def link_quality_3(self):
return self._link_quality_3
@property
def link_quality_2(self):
return self._link_quality_2
@property
def link_quality_1(self):
return self._link_quality_1
@property
def leader_cost(self):
return self._leader_cost
@property
def id_sequence(self):
return self._id_sequence
@property
def active_routers(self):
return self._active_routers
@property
def sed_buffer_size(self):
return self._sed_buffer_size
@property
def sed_datagram_count(self):
return self._sed_datagram_count
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.pp == other.pp and \
self.link_quality_3 == other.link_quality_3 and \
self.link_quality_2 == other.link_quality_2 and \
self.link_quality_1 == other.link_quality_1 and \
self.leader_cost == other.leader_cost and \
self.id_sequence == other.id_sequence and \
self.active_routers == other.active_routers and \
self.sed_buffer_size == other.sed_buffer_size and \
self.sed_datagram_count == other.sed_datagram_count
def __repr__(self):
return r"Connectivity(pp={}, \
link_quality_3={}, \
link_quality_2={}, \
link_quality_1={}, \
leader_cost={}, \
id_sequence={}, \
active_routers={}, \
sed_buffer_size={}, \
sed_datagram_count={})".format(
self.pp,
self.link_quality_3,
self.link_quality_2,
self.link_quality_1,
self.leader_cost,
self.id_sequence,
self.active_routers,
self.sed_buffer_size,
self.sed_datagram_count
)
class ConnectivityFactory:
def parse(self, data, message_info):
pp = ord(data.read(1)) & 0x03
link_quality_3 = ord(data.read(1))
link_quality_2 = ord(data.read(1))
link_quality_1 = ord(data.read(1))
leader_cost = ord(data.read(1))
id_sequence = ord(data.read(1))
active_routers = ord(data.read(1))
sed_data = io.BytesIO(data.read(3))
if len(sed_data.getvalue()) > 0:
sed_buffer_size = struct.unpack(">H", sed_data.read(2))[0]
sed_datagram_count = ord(sed_data.read(1))
else:
sed_buffer_size = None
sed_datagram_count = None
return Connectivity(pp,
link_quality_3,
link_quality_2,
link_quality_1,
leader_cost,
id_sequence,
active_routers,
sed_buffer_size,
sed_datagram_count)
class LinkMargin(object):
def __init__(self, link_margin):
self._link_margin = link_margin
@property
def link_margin(self):
return self._link_margin
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.link_margin == other.link_margin
def __repr__(self):
return "LinkMargin(link_margin={})".format(self.link_margin)
class LinkMarginFactory:
def parse(self, data, message_info):
link_margin = ord(data.read(1))
return LinkMargin(link_margin)
class Status(object):
def __init__(self, status):
self._status = status
@property
def status(self):
return self._status
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.status == other.status
def __repr__(self):
return "Status(status={})".format(self.status)
class StatusFactory:
def parse(self, data, message_info):
status = ord(data.read(1))
return Status(status)
class Version(object):
def __init__(self, version):
self._version = version
@property
def version(self):
return self._version
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.version == other.version
def __repr__(self):
return "Version(version={})".format(self.version)
class VersionFactory:
def parse(self, data, message_info):
version = struct.unpack(">H", data.read(2))[0]
return Version(version)
class AddressFull(object):
def __init__(self, ipv6_address):
self._ipv6_address = ipv6_address
@property
def ipv6_address(self):
return self._ipv6_address
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.ipv6_address == other.ipv6_address
def __repr__(self):
return "AddressFull(ipv6_address={}')".format(hexlify(self.ipv6_address))
class AddressFullFactory:
def parse(self, data, message_info):
data.read(1) # first byte is ignored
ipv6_address = data.read(16)
return AddressFull(ipv6_address)
class AddressCompressed(object):
def __init__(self, cid, iid):
self._cid = cid
self._iid = iid
@property
def cid(self):
return self._cid
@property
def iid(self):
return self._iid
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.cid == other.cid and self.iid == other.iid
def __repr__(self):
return "AddressCompressed(cid={}, iid={}')".format(self.cid, hexlify(self.iid))
class AddressCompressedFactory:
def parse(self, data, message_info):
cid = (ord(data.read(1)) & 0x0F)
iid = bytearray(data.read(8))
return AddressCompressed(cid, iid)
class AddressRegistration(object):
def __init__(self, addresses):
self._addresses = addresses
@property
def addresses(self):
return self._addresses
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.addresses == other.addresses
def __repr__(self):
addresses_str = ", ".join(["{}".format(address) for address in self.addresses])
return "AddressRegistration(addresses=[{}])".format(addresses_str)
class AddressRegistrationFactory:
def __init__(self, addr_compressed_factory, addr_full_factory):
self._addr_compressed_factory = addr_compressed_factory
self._addr_full_factory = addr_full_factory
def parse(self, data, message_info):
addresses = []
while data.tell() < len(data.getvalue()):
compressed = (ord(data.read(1)) >> 7) & 0x01
data.seek(-1, io.SEEK_CUR)
if compressed:
addresses.append(self._addr_compressed_factory.parse(data, message_info))
else:
addresses.append(self._addr_full_factory.parse(data, message_info))
return AddressRegistration(addresses)
class Channel(object):
def __init__(self, channel_page, channel):
self._channel_page = channel_page
self._channel = channel
@property
def channel_page(self):
return self._channel_page
@property
def channel(self):
return self._channel
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.channel_page == other.channel_page and \
self.channel == other.channel
def __repr__(self):
return "Channel(channel_page={}, channel={})".format(self.channel_page, channel)
class ChannelFactory:
def parse(self, data, message_info):
channel_page = ord(data.read(1))
channel = struct.unpack(">H", data.read(2))[0]
return Channel(channel_page, channel)
class PanId:
def __init__(self, pan_id):
self._pan_id = pan_id
@property
def pan_id(self):
return self._pan_id
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.pan_id == other.pan_id
def __repr__(self):
return "PanId(pan_id={})".format(self.pan_id)
class PanIdFactory:
def parse(self, data, message_info):
pan_id = struct.unpack(">H", data.read(2))[0]
return PanId(pan_id)
class ActiveTimestamp(object):
def __init__(self, timestamp_seconds, timestamp_ticks, u):
self._timestamp_seconds = timestamp_seconds
self._timestamp_ticks = timestamp_ticks
self._u = u
@property
def timestamp_seconds(self):
return self._timestamp_seconds
@property
def timestamp_ticks(self):
return self._timestamp_ticks
@property
def u(self):
return self._u
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.timestamp_seconds == other.timestamp_seconds and \
self.timestamp_ticks == other.timestamp_ticks and \
self.u == other.u
def __repr__(self):
return "ActiveTimestamp(timestamp_seconds={}, timestamp_ticks={}, u={})".format(
self.timestamp_seconds, self.timestamp_ticks, self.u)
class ActiveTimestampFactory:
def parse(self, data, message_info):
seconds = bytearray([0x00, 0x00]) + bytearray(data.read(6))
ticks = struct.unpack(">H", data.read(2))[0]
timestamp_seconds = struct.unpack(">Q", bytes(seconds))[0]
timestamp_ticks = (ticks >> 1)
u = ticks & 0x01
return ActiveTimestamp(timestamp_seconds, timestamp_ticks, u)
class PendingTimestamp(object):
def __init__(self, timestamp_seconds, timestamp_ticks, u):
self._timestamp_seconds = timestamp_seconds
self._timestamp_ticks = timestamp_ticks
self._u = u
@property
def timestamp_seconds(self):
return self._timestamp_seconds
@property
def timestamp_ticks(self):
return self._timestamp_ticks
@property
def u(self):
return self._u
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.timestamp_seconds == other.timestamp_seconds and \
self.timestamp_ticks == other.timestamp_ticks and \
self.u == other.u
def __repr__(self):
return "PendingTimestamp(timestamp_seconds={}, timestamp_ticks={}, u={})".format(
self.timestamp_seconds, self.timestamp_ticks, self.u)
class PendingTimestampFactory:
def parse(self, data, message_info):
seconds = bytearray([0x00, 0x00]) + bytearray(data.read(6))
ticks = struct.unpack(">H", data.read(2))[0]
timestamp_seconds = struct.unpack(">Q", bytes(seconds))[0]
timestamp_ticks = (ticks >> 1)
u = ticks & 0x01
return PendingTimestamp(timestamp_seconds, timestamp_ticks, u)
class ActiveOperationalDataset:
# TODO: Not implemented yet
def __init__(self):
print("ActiveOperationalDataset is not implemented yet.")
class ActiveOperationalDatasetFactory:
def parse(self, data, message_info):
return ActiveOperationalDataset()
class PendingOperationalDataset:
# TODO: Not implemented yet
def __init__(self):
print("PendingOperationalDataset is not implemented yet.")
class PendingOperationalDatasetFactory:
def parse(self, data, message_info):
return PendingOperationalDataset()
class ThreadDiscovery:
# TODO: Not implemented yet
def __init__(self):
print("ThreadDiscovery is not implemented yet.")
class ThreadDiscoveryFactory:
def parse(self, data, message_info):
return ThreadDiscovery()
class MleCommand(object):
def __init__(self, _type, tlvs):
self._type = _type
self._tlvs = tlvs
@property
def type(self):
return self._type
@property
def tlvs(self):
return self._tlvs
def __repr__(self):
tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs])
return "MleCommand(type={}, tlvs=[{}])".format(self.type.name, tlvs_str)
class MleCommandFactory:
_MARKER_EXTENDED_LENGTH = 0xff
def __init__(self, tlvs_factories):
self._tlvs_factories = tlvs_factories
def _get_length(self, data):
length = ord(data.read(1))
if length == self._MARKER_EXTENDED_LENGTH:
length = struct.unpack(">H", data.read(2))[0]
return length
def _get_tlv_factory(self, _type):
try:
return self._tlvs_factories[_type]
except KeyError:
raise KeyError("Could not find TLV factory. Unsupported TLV type: {}".format(_type))
def _parse_tlv(self, data, message_info):
_type = TlvType(ord(data.read(1)))
length = self._get_length(data)
value = data.read(length)
factory = self._get_tlv_factory(_type)
return factory.parse(io.BytesIO(value), message_info)
def parse(self, data, message_info):
cmd_type = CommandType(ord(data.read(1)))
tlvs = []
while data.tell() < len(data.getvalue()):
tlv = self._parse_tlv(data, message_info)
tlvs.append(tlv)
return MleCommand(cmd_type, tlvs)
class MleMessage(object):
def __init__(self, command):
self._command = command
@property
def command(self):
return self._command
def __repr__(self):
return "MleMessage(command={})".format(self.command)
class MleMessageSecured(MleMessage):
def __init__(self, aux_sec_hdr, command, mic):
super(MleMessageSecured, self).__init__(command)
self._aux_sec_hdr = aux_sec_hdr
self._mic = mic
@property
def aux_sec_hdr(self):
return self._aux_sec_hdr
@property
def mic(self):
return self._mic
def __repr__(self):
return "MleMessageSecured(aux_sec_hdr={}, command={}, mic=\"{}\")".format(
self.aux_sec_hdr, self.command, hexlify(self.mic))
class MleMessageFactory:
def __init__(self, aux_sec_hdr_factory, mle_command_factory, crypto_engine):
self._aux_sec_hdr_factory = aux_sec_hdr_factory
self._mle_command_factory = mle_command_factory
self._crypto_engine = crypto_engine
def _create_mle_secured_message(self, data, message_info):
aux_sec_hdr = self._aux_sec_hdr_factory.parse(data, message_info)
enc_data_length = len(data.getvalue())
enc_data = bytearray(data.read(enc_data_length - data.tell() - self._crypto_engine.mic_length))
mic = bytearray(data.read())
dec_data = self._crypto_engine.decrypt(enc_data, mic, message_info)
command = self._mle_command_factory.parse(io.BytesIO(dec_data), message_info)
return MleMessageSecured(aux_sec_hdr, command, mic)
def _create_mle_message(self, data, message_info):
command = self._mle_command_factory.parse(data, message_info)
return MleMessage(command)
def parse(self, data, message_info):
security_indicator = ord(data.read(1))
if security_indicator == 0:
return self._create_mle_secured_message(data, message_info)
elif security_indicator == 255:
return self._create_mle_message(data, message_info)
else:
raise RuntimeError(
"Could not create MLE message. Unknown security indicator value: {}".format(security_indicator))