blob: 0f077c1b97c25c71f7bbae17dd8fed98bbf066fa [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import io
import math
import struct
from binascii import hexlify
from enum import IntEnum
import common
class TlvType(IntEnum):
HAS_ROUTE = 0
PREFIX = 1
BORDER_ROUTER = 2
LOWPAN_ID = 3
COMMISSIONING = 4
SERVICE = 5
SERVER = 6
class MeshcopTlvType(IntEnum):
STEERING_DATA = 8
BORDER_AGENT_LOCATOR = 9
COMMISSIONER_SESSION_ID = 11
COMMISSIONER_UDP_PORT = 15
class NetworkData(object):
def __init__(self, stable):
self._stable = stable
@property
def stable(self):
return self._stable
class SubTlvsFactory(object):
def __init__(self, sub_tlvs_factories):
self._sub_tlvs_factories = sub_tlvs_factories
def _get_factory(self, _type):
try:
return self._sub_tlvs_factories[_type]
except KeyError:
raise RuntimeError("Could not find factory. Factory type = {}.".format(_type))
def parse(self, data, message_info):
sub_tlvs = []
while data.tell() < len(data.getvalue()):
_type = ord(data.read(1))
length = ord(data.read(1))
value = data.read(length)
factory = self._get_factory(_type)
message_info.length = length
tlv = factory.parse(io.BytesIO(value), message_info)
sub_tlvs.append(tlv)
return sub_tlvs
class NetworkDataSubTlvsFactory(SubTlvsFactory):
def parse(self, data, message_info):
sub_tlvs = []
while data.tell() < len(data.getvalue()):
data_byte = ord(data.read(1))
stable = data_byte & 0x01
_type = (data_byte >> 1) & 0x7f
length = ord(data.read(1))
value = data.read(length)
factory = self._get_factory(_type)
message_info.stable = stable
tlv = factory.parse(io.BytesIO(value), message_info)
sub_tlvs.append(tlv)
return sub_tlvs
class Route(object):
def __init__(self, border_router_16, prf):
self._border_router_16 = border_router_16
self._prf = prf
@property
def border_router_16(self):
return self._border_router_16
@property
def prf(self):
return self._prf
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.border_router_16 == other.border_router_16 and self.prf == other.prf
def __repr__(self):
return "Route(border_router_16={}, prf={})".format(self.border_router_16, self.prf)
class RouteFactory(object):
def parse(self, data, message_info):
border_router_16 = struct.unpack(">H", data.read(2))[0]
data_byte = ord(data.read(1))
prf = (data_byte >> 6) & 0x03
return Route(border_router_16, prf)
class RoutesFactory(object):
def __init__(self, route_factory):
self._route_factory = route_factory
def parse(self, data, message_info):
routes = []
while data.tell() < len(data.getvalue()):
route = self._route_factory.parse(data, message_info)
routes.append(route)
return routes
class HasRoute(NetworkData):
def __init__(self, routes, stable):
super(HasRoute, self).__init__(stable)
self._routes = routes
@property
def routes(self):
return self._routes
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.routes == other.routes
def __repr__(self):
routes_str = ", ".join(["{}".format(route) for route in self.routes])
return "HasRoute(stable={}, routes=[{}])".format(self.stable, routes_str)
class HasRouteFactory(object):
def __init__(self, routes_factory):
self._routes_factory = routes_factory
def parse(self, data, message_info):
routes = self._routes_factory.parse(data, message_info)
return HasRoute(routes, message_info.stable)
class Prefix(NetworkData):
def __init__(self, domain_id, prefix_length, prefix, sub_tlvs, stable):
super(Prefix, self).__init__(stable)
self._domain_id = domain_id
self._prefix_length = prefix_length
self._prefix = prefix
self._sub_tlvs = sub_tlvs
@property
def domain_id(self):
return self._domain_id
@property
def prefix_length(self):
return self._prefix_length
@property
def prefix(self):
return self._prefix
@property
def sub_tlvs(self):
return self._sub_tlvs
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.domain_id == other.domain_id and \
self.prefix_length == other.prefix_length and \
self.prefix == other.prefix and \
self.sub_tlvs == other.sub_tlvs
def __repr__(self):
sub_tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.sub_tlvs])
return "Prefix(stable={}, domain_id={}, prefix_length={}, prefix={}, sub_tlvs=[{}])".format(
self.stable, self.domain_id, self.prefix_length, hexlify(self.prefix), sub_tlvs_str)
class PrefixSubTlvsFactory(NetworkDataSubTlvsFactory):
def __init__(self, sub_tlvs_factories):
super(PrefixSubTlvsFactory, self).__init__(sub_tlvs_factories)
class PrefixFactory(object):
def __init__(self, sub_tlvs_factory):
self._sub_tlvs_factory = sub_tlvs_factory
def _bits_to_bytes(self, bits):
return int(math.ceil(bits / 8))
def parse(self, data, message_info):
domain_id = ord(data.read(1))
prefix_length = ord(data.read(1))
prefix = bytearray(data.read(self._bits_to_bytes(prefix_length)))
sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info)
return Prefix(domain_id, prefix_length, prefix, sub_tlvs, message_info.stable)
class BorderRouter(NetworkData):
def __init__(self, border_router_16, prf, p, s, d, c, r, o, n, stable):
super(BorderRouter, self).__init__(stable)
self._border_router_16 = border_router_16
self._prf = prf
self._p = p
self._s = s
self._d = d
self._c = c
self._r = r
self._o = o
self._n = n
@property
def border_router_16(self):
return self._border_router_16
@property
def prf(self):
return self._prf
@property
def p(self):
return self._p
@property
def s(self):
return self._s
@property
def d(self):
return self._d
@property
def c(self):
return self._c
@property
def r(self):
return self._r
@property
def o(self):
return self._o
@property
def n(self):
return self._n
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.border_router_16 == other.border_router_16 and \
self.prf == other.prf and \
self.p == other.p and \
self.s == other.s and \
self.d == other.d and \
self.c == other.c and \
self.r == other.r and \
self.o == other.o and \
self.n == other.n
def __repr__(self):
return "BorderRouter(stable={}, border_router_16={}, prf={}, p={}, s={}, d={}, c={}, r={}, o={}, n={})".format(
self.stable, self.border_router_16, self.prf, self.p, self.s, self.d, self.c, self.r, self.o, self.n)
class BorderRouterFactory(object):
def parse(self, data, message_info):
border_router_16 = struct.unpack(">H", data.read(2))[0]
data_byte = ord(data.read(1))
o = (data_byte & 0x01)
r = (data_byte >> 1) & 0x01
c = (data_byte >> 2) & 0x01
d = (data_byte >> 3) & 0x01
s = (data_byte >> 4) & 0x01
p = (data_byte >> 5) & 0x01
prf = (data_byte >> 6) & 0x03
data_byte = ord(data.read(1))
n = (data_byte >> 7) & 0x01
return BorderRouter(border_router_16, prf, p, s, d, c, r, o, n, message_info.stable)
class LowpanId(NetworkData):
def __init__(self, c, cid, context_length, stable):
super(LowpanId, self).__init__(stable)
self._c = c
self._cid = cid
self._context_length = context_length
@property
def c(self):
return self._c
@property
def cid(self):
return self._cid
@property
def context_length(self):
return self._context_length
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.c == other.c and \
self.cid == other.cid and \
self.context_length == other.context_length
def __repr__(self):
return "LowpanId(stable={}, c={}, cid={}, context_length={})".format(
self.stable, self.c, self.cid, self.context_length)
class LowpanIdFactory(object):
def parse(self, data, message_info):
data_byte = ord(data.read(1))
cid = (data_byte & 0x0f)
c = (data_byte >> 4) & 0x01
context_length = ord(data.read(1))
return LowpanId(c, cid, context_length, message_info.stable)
class CommissioningData(NetworkData):
def __init__(self, sub_tlvs, stable):
super(CommissioningData, self).__init__(stable)
self._sub_tlvs = sub_tlvs
@property
def sub_tlvs(self):
return self._sub_tlvs
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.sub_tlvs == other.sub_tlvs
def __repr__(self):
sub_tlvs_str = ", ".format(["{}".format(tlv) for tlv in self._sub_tlvs])
return "CommissioningData(stable={}, sub_tlvs=[{}])".format(
self._stable, sub_tlvs_str)
class CommissioningDataSubTlvsFactory(SubTlvsFactory):
def __init__(self, sub_tlvs_factories):
super(CommissioningDataSubTlvsFactory, self).__init__(sub_tlvs_factories)
class CommissioningDataFactory(object):
def __init__(self, sub_tlvs_factory):
self._sub_tlvs_factory = sub_tlvs_factory
def parse(self, data, message_info):
sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info)
return CommissioningData(sub_tlvs, message_info.stable)
class SteeringData(object):
def __init__(self, bloom_filter):
self._bloom_filter = bloom_filter
@property
def bloom_filter(self):
return self._bloom_filter
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._bloom_filter == other._bloom_filter
def __repr__(self):
return "SteeringData(bloom_filter={})".format(hexlify(self._bloom_filter))
class SteeringDataFactory:
def parse(self, data, message_info):
bloom_filter = data.read(message_info.length)
return SteeringData(bloom_filter)
class BorderAgentLocator(object):
def __init__(self, address):
self._udp_port = address
@property
def udp_port(self):
return self._udp_port
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._udp_port == other._udp_port
def __repr__(self):
return "BorderAgentLocator(rloc16={})".format(hex(self._udp_port))
class BorderAgentLocatorFactory:
def parse(self, data, message_info):
border_agent_locator = struct.unpack(">H", data.read(2))[0]
return BorderAgentLocator(border_agent_locator)
class CommissionerSessionId(object):
def __init__(self, commissioner_session_id):
self._udp_port = commissioner_session_id
@property
def udp_port(self):
return self._udp_port
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._udp_port == other._udp_port
def __repr__(self):
return "CommissionerSessionId(id={})".format(hex(self._udp_port))
class CommissionerSessionIdFactory:
def parse(self, data, message_info):
session_id = struct.unpack(">H", data.read(2))[0]
return CommissionerSessionId(session_id)
class CommissionerUdpPort(object):
def __init__(self, udp_port):
self._udp_port = udp_port
@property
def udp_port(self):
return self._udp_port
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._udp_port == other._udp_port
def __repr__(self):
return "CommissionerUdpPort(udp_port={})".format(self._udp_port)
class CommissionerUdpPortFactory:
def parse(self, data, message_info):
udp_port = struct.unpack(">H", data.read(2))[0]
return CommissionerUdpPort(udp_port)
class Service(NetworkData):
def __init__(self, t, _id, enterprise_number, service_data_length, service_data, sub_tlvs, stable):
super(Service, self).__init__(stable)
self._t = t
self._id = _id
self._enterprise_number = enterprise_number
self._service_data_length = service_data_length
self._service_data = service_data
self._sub_tlvs = sub_tlvs
@property
def t(self):
return self._t
@property
def id(self):
return self._id
@property
def enterprise_number(self):
return self._enterprise_number
@property
def service_data_length(self):
return self._service_data_length
@property
def service_data(self):
return self._service_data
@property
def sub_tlvs(self):
return self._sub_tlvs
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.t == other.t and \
self.id == other.id and \
self.enterprise_number == other.enterprise_number and \
self.service_data_length == other.service_data_length and \
self.service_data == other.service_data and \
self.sub_tlvs == other.sub_tlvs
def __repr__(self):
sub_tlvs_str = ", ".format(["{}".format(tlv) for tlv in self.sub_tlvs])
return "LowpanId(stable={}, t={}, id={}, enterprise_number={}, service_data_length={}, service_data={}, sub_tlvs=[{}])".format(
self.stable, self.t, self.id, self.enterprise_number, self.service_data_length, self.service_data, sub_tlvs_str)
class ServiceSubTlvsFactory(NetworkDataSubTlvsFactory):
def __init__(self, sub_tlvs_factories):
super(ServiceSubTlvsFactory, self).__init__(sub_tlvs_factories)
class ServiceFactory(object):
def __init__(self, sub_tlvs_factory):
self._sub_tlvs_factory = sub_tlvs_factory
def parse(self, data, message_info):
data_byte = ord(data.read(1))
t = (data_byte >> 7) & 0x01
_id = (data_byte & 0x0f)
enterprise_number = struct.unpack(">L", data.read(4))[0]
service_data_length = ord(data.read(1))
service_data = data.read(service_data_length)
sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info)
return Service(t, _id, enterprise_number, service_data_length, service_data, sub_tlvs, message_info.stable)
class Server(NetworkData):
def __init__(self, server_16, server_data, stable):
super(Server, self).__init__(stable)
self._server_16 = server_16
self._server_data = server_data
@property
def server_16(self):
return self._server_16
@property
def server_data(self):
return self._server_data
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self.server_16 == other.server_16 and \
self.server_data == other.server_data
def __repr__(self):
return "LowpanId(stable={}, server_16={}, server_data=b'{}')".format(
self.stable, self.server_16, hexlify(self.server_data))
class ServerFactory(object):
def parse(self, data, message_info):
server_16 = struct.unpack(">H", data.read(2))[0]
server_data = bytearray(data.read())
return Server(server_16, server_data, message_info.stable)
class NetworkDataTlvsFactory(NetworkDataSubTlvsFactory):
def __init__(self, sub_tlvs_factories):
super(NetworkDataTlvsFactory, self).__init__(sub_tlvs_factories)