| #!/usr/bin/env python |
| # |
| # Copyright (c) 2016, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| import io |
| import math |
| import struct |
| |
| from binascii import hexlify |
| from enum import IntEnum |
| |
| import common |
| |
| |
| class TlvType(IntEnum): |
| HAS_ROUTE = 0 |
| PREFIX = 1 |
| BORDER_ROUTER = 2 |
| LOWPAN_ID = 3 |
| COMMISSIONING = 4 |
| SERVICE = 5 |
| SERVER = 6 |
| |
| |
| class MeshcopTlvType(IntEnum): |
| STEERING_DATA = 8 |
| BORDER_AGENT_LOCATOR = 9 |
| COMMISSIONER_SESSION_ID = 11 |
| COMMISSIONER_UDP_PORT = 15 |
| |
| |
| class NetworkData(object): |
| |
| def __init__(self, stable): |
| self._stable = stable |
| |
| @property |
| def stable(self): |
| return self._stable |
| |
| |
| class SubTlvsFactory(object): |
| |
| def __init__(self, sub_tlvs_factories): |
| self._sub_tlvs_factories = sub_tlvs_factories |
| |
| def _get_factory(self, _type): |
| try: |
| return self._sub_tlvs_factories[_type] |
| except KeyError: |
| raise RuntimeError("Could not find factory. Factory type = {}.".format(_type)) |
| |
| def parse(self, data, message_info): |
| sub_tlvs = [] |
| |
| while data.tell() < len(data.getvalue()): |
| _type = ord(data.read(1)) |
| |
| length = ord(data.read(1)) |
| value = data.read(length) |
| |
| factory = self._get_factory(_type) |
| |
| message_info.length = length |
| tlv = factory.parse(io.BytesIO(value), message_info) |
| |
| sub_tlvs.append(tlv) |
| |
| return sub_tlvs |
| |
| |
| class NetworkDataSubTlvsFactory(SubTlvsFactory): |
| |
| def parse(self, data, message_info): |
| sub_tlvs = [] |
| |
| while data.tell() < len(data.getvalue()): |
| data_byte = ord(data.read(1)) |
| |
| stable = data_byte & 0x01 |
| _type = (data_byte >> 1) & 0x7f |
| |
| length = ord(data.read(1)) |
| value = data.read(length) |
| |
| factory = self._get_factory(_type) |
| |
| message_info.stable = stable |
| tlv = factory.parse(io.BytesIO(value), message_info) |
| |
| sub_tlvs.append(tlv) |
| |
| return sub_tlvs |
| |
| |
| class Route(object): |
| |
| def __init__(self, border_router_16, prf): |
| self._border_router_16 = border_router_16 |
| self._prf = prf |
| |
| @property |
| def border_router_16(self): |
| return self._border_router_16 |
| |
| @property |
| def prf(self): |
| return self._prf |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.border_router_16 == other.border_router_16 and self.prf == other.prf |
| |
| def __repr__(self): |
| return "Route(border_router_16={}, prf={})".format(self.border_router_16, self.prf) |
| |
| |
| class RouteFactory(object): |
| |
| def parse(self, data, message_info): |
| border_router_16 = struct.unpack(">H", data.read(2))[0] |
| |
| data_byte = ord(data.read(1)) |
| prf = (data_byte >> 6) & 0x03 |
| |
| return Route(border_router_16, prf) |
| |
| |
| class RoutesFactory(object): |
| |
| def __init__(self, route_factory): |
| self._route_factory = route_factory |
| |
| def parse(self, data, message_info): |
| routes = [] |
| |
| while data.tell() < len(data.getvalue()): |
| route = self._route_factory.parse(data, message_info) |
| |
| routes.append(route) |
| |
| return routes |
| |
| |
| class HasRoute(NetworkData): |
| |
| def __init__(self, routes, stable): |
| super(HasRoute, self).__init__(stable) |
| self._routes = routes |
| |
| @property |
| def routes(self): |
| return self._routes |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.routes == other.routes |
| |
| def __repr__(self): |
| routes_str = ", ".join(["{}".format(route) for route in self.routes]) |
| return "HasRoute(stable={}, routes=[{}])".format(self.stable, routes_str) |
| |
| |
| class HasRouteFactory(object): |
| |
| def __init__(self, routes_factory): |
| self._routes_factory = routes_factory |
| |
| def parse(self, data, message_info): |
| routes = self._routes_factory.parse(data, message_info) |
| |
| return HasRoute(routes, message_info.stable) |
| |
| |
| class Prefix(NetworkData): |
| |
| def __init__(self, domain_id, prefix_length, prefix, sub_tlvs, stable): |
| super(Prefix, self).__init__(stable) |
| self._domain_id = domain_id |
| self._prefix_length = prefix_length |
| self._prefix = prefix |
| self._sub_tlvs = sub_tlvs |
| |
| @property |
| def domain_id(self): |
| return self._domain_id |
| |
| @property |
| def prefix_length(self): |
| return self._prefix_length |
| |
| @property |
| def prefix(self): |
| return self._prefix |
| |
| @property |
| def sub_tlvs(self): |
| return self._sub_tlvs |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.domain_id == other.domain_id and \ |
| self.prefix_length == other.prefix_length and \ |
| self.prefix == other.prefix and \ |
| self.sub_tlvs == other.sub_tlvs |
| |
| def __repr__(self): |
| sub_tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.sub_tlvs]) |
| return "Prefix(stable={}, domain_id={}, prefix_length={}, prefix={}, sub_tlvs=[{}])".format( |
| self.stable, self.domain_id, self.prefix_length, hexlify(self.prefix), sub_tlvs_str) |
| |
| |
| class PrefixSubTlvsFactory(NetworkDataSubTlvsFactory): |
| |
| def __init__(self, sub_tlvs_factories): |
| super(PrefixSubTlvsFactory, self).__init__(sub_tlvs_factories) |
| |
| |
| class PrefixFactory(object): |
| |
| def __init__(self, sub_tlvs_factory): |
| self._sub_tlvs_factory = sub_tlvs_factory |
| |
| def _bits_to_bytes(self, bits): |
| return int(math.ceil(bits / 8)) |
| |
| def parse(self, data, message_info): |
| domain_id = ord(data.read(1)) |
| |
| prefix_length = ord(data.read(1)) |
| |
| prefix = bytearray(data.read(self._bits_to_bytes(prefix_length))) |
| |
| sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info) |
| |
| return Prefix(domain_id, prefix_length, prefix, sub_tlvs, message_info.stable) |
| |
| |
| class BorderRouter(NetworkData): |
| |
| def __init__(self, border_router_16, prf, p, s, d, c, r, o, n, stable): |
| super(BorderRouter, self).__init__(stable) |
| self._border_router_16 = border_router_16 |
| self._prf = prf |
| self._p = p |
| self._s = s |
| self._d = d |
| self._c = c |
| self._r = r |
| self._o = o |
| self._n = n |
| |
| @property |
| def border_router_16(self): |
| return self._border_router_16 |
| |
| @property |
| def prf(self): |
| return self._prf |
| |
| @property |
| def p(self): |
| return self._p |
| |
| @property |
| def s(self): |
| return self._s |
| |
| @property |
| def d(self): |
| return self._d |
| |
| @property |
| def c(self): |
| return self._c |
| |
| @property |
| def r(self): |
| return self._r |
| |
| @property |
| def o(self): |
| return self._o |
| |
| @property |
| def n(self): |
| return self._n |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.border_router_16 == other.border_router_16 and \ |
| self.prf == other.prf and \ |
| self.p == other.p and \ |
| self.s == other.s and \ |
| self.d == other.d and \ |
| self.c == other.c and \ |
| self.r == other.r and \ |
| self.o == other.o and \ |
| self.n == other.n |
| |
| def __repr__(self): |
| return "BorderRouter(stable={}, border_router_16={}, prf={}, p={}, s={}, d={}, c={}, r={}, o={}, n={})".format( |
| self.stable, self.border_router_16, self.prf, self.p, self.s, self.d, self.c, self.r, self.o, self.n) |
| |
| |
| class BorderRouterFactory(object): |
| |
| def parse(self, data, message_info): |
| border_router_16 = struct.unpack(">H", data.read(2))[0] |
| |
| data_byte = ord(data.read(1)) |
| o = (data_byte & 0x01) |
| r = (data_byte >> 1) & 0x01 |
| c = (data_byte >> 2) & 0x01 |
| d = (data_byte >> 3) & 0x01 |
| s = (data_byte >> 4) & 0x01 |
| p = (data_byte >> 5) & 0x01 |
| prf = (data_byte >> 6) & 0x03 |
| |
| data_byte = ord(data.read(1)) |
| n = (data_byte >> 7) & 0x01 |
| |
| return BorderRouter(border_router_16, prf, p, s, d, c, r, o, n, message_info.stable) |
| |
| |
| class LowpanId(NetworkData): |
| |
| def __init__(self, c, cid, context_length, stable): |
| super(LowpanId, self).__init__(stable) |
| self._c = c |
| self._cid = cid |
| self._context_length = context_length |
| |
| @property |
| def c(self): |
| return self._c |
| |
| @property |
| def cid(self): |
| return self._cid |
| |
| @property |
| def context_length(self): |
| return self._context_length |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.c == other.c and \ |
| self.cid == other.cid and \ |
| self.context_length == other.context_length |
| |
| def __repr__(self): |
| return "LowpanId(stable={}, c={}, cid={}, context_length={})".format( |
| self.stable, self.c, self.cid, self.context_length) |
| |
| |
| class LowpanIdFactory(object): |
| |
| def parse(self, data, message_info): |
| data_byte = ord(data.read(1)) |
| |
| cid = (data_byte & 0x0f) |
| c = (data_byte >> 4) & 0x01 |
| |
| context_length = ord(data.read(1)) |
| |
| return LowpanId(c, cid, context_length, message_info.stable) |
| |
| |
| class CommissioningData(NetworkData): |
| |
| def __init__(self, sub_tlvs, stable): |
| super(CommissioningData, self).__init__(stable) |
| self._sub_tlvs = sub_tlvs |
| |
| @property |
| def sub_tlvs(self): |
| return self._sub_tlvs |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.sub_tlvs == other.sub_tlvs |
| |
| def __repr__(self): |
| sub_tlvs_str = ", ".format(["{}".format(tlv) for tlv in self._sub_tlvs]) |
| return "CommissioningData(stable={}, sub_tlvs=[{}])".format( |
| self._stable, sub_tlvs_str) |
| |
| |
| class CommissioningDataSubTlvsFactory(SubTlvsFactory): |
| |
| def __init__(self, sub_tlvs_factories): |
| super(CommissioningDataSubTlvsFactory, self).__init__(sub_tlvs_factories) |
| |
| |
| class CommissioningDataFactory(object): |
| |
| def __init__(self, sub_tlvs_factory): |
| self._sub_tlvs_factory = sub_tlvs_factory |
| |
| def parse(self, data, message_info): |
| sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info) |
| |
| return CommissioningData(sub_tlvs, message_info.stable) |
| |
| |
| class SteeringData(object): |
| |
| def __init__(self, bloom_filter): |
| self._bloom_filter = bloom_filter |
| |
| @property |
| def bloom_filter(self): |
| return self._bloom_filter |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._bloom_filter == other._bloom_filter |
| |
| def __repr__(self): |
| return "SteeringData(bloom_filter={})".format(hexlify(self._bloom_filter)) |
| |
| |
| class SteeringDataFactory: |
| |
| def parse(self, data, message_info): |
| bloom_filter = data.read(message_info.length) |
| return SteeringData(bloom_filter) |
| |
| |
| class BorderAgentLocator(object): |
| |
| def __init__(self, address): |
| self._udp_port = address |
| |
| @property |
| def udp_port(self): |
| return self._udp_port |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._udp_port == other._udp_port |
| |
| def __repr__(self): |
| return "BorderAgentLocator(rloc16={})".format(hex(self._udp_port)) |
| |
| |
| class BorderAgentLocatorFactory: |
| |
| def parse(self, data, message_info): |
| border_agent_locator = struct.unpack(">H", data.read(2))[0] |
| return BorderAgentLocator(border_agent_locator) |
| |
| |
| class CommissionerSessionId(object): |
| |
| def __init__(self, commissioner_session_id): |
| self._udp_port = commissioner_session_id |
| |
| @property |
| def udp_port(self): |
| return self._udp_port |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._udp_port == other._udp_port |
| |
| def __repr__(self): |
| return "CommissionerSessionId(id={})".format(hex(self._udp_port)) |
| |
| |
| class CommissionerSessionIdFactory: |
| |
| def parse(self, data, message_info): |
| session_id = struct.unpack(">H", data.read(2))[0] |
| return CommissionerSessionId(session_id) |
| |
| |
| class CommissionerUdpPort(object): |
| |
| def __init__(self, udp_port): |
| self._udp_port = udp_port |
| |
| @property |
| def udp_port(self): |
| return self._udp_port |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._udp_port == other._udp_port |
| |
| def __repr__(self): |
| return "CommissionerUdpPort(udp_port={})".format(self._udp_port) |
| |
| |
| class CommissionerUdpPortFactory: |
| |
| def parse(self, data, message_info): |
| udp_port = struct.unpack(">H", data.read(2))[0] |
| return CommissionerUdpPort(udp_port) |
| |
| |
| class Service(NetworkData): |
| |
| def __init__(self, t, _id, enterprise_number, service_data_length, service_data, sub_tlvs, stable): |
| super(Service, self).__init__(stable) |
| self._t = t |
| self._id = _id |
| self._enterprise_number = enterprise_number |
| self._service_data_length = service_data_length |
| self._service_data = service_data |
| self._sub_tlvs = sub_tlvs |
| |
| @property |
| def t(self): |
| return self._t |
| |
| @property |
| def id(self): |
| return self._id |
| |
| @property |
| def enterprise_number(self): |
| return self._enterprise_number |
| |
| @property |
| def service_data_length(self): |
| return self._service_data_length |
| |
| @property |
| def service_data(self): |
| return self._service_data |
| |
| @property |
| def sub_tlvs(self): |
| return self._sub_tlvs |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.t == other.t and \ |
| self.id == other.id and \ |
| self.enterprise_number == other.enterprise_number and \ |
| self.service_data_length == other.service_data_length and \ |
| self.service_data == other.service_data and \ |
| self.sub_tlvs == other.sub_tlvs |
| |
| def __repr__(self): |
| sub_tlvs_str = ", ".format(["{}".format(tlv) for tlv in self.sub_tlvs]) |
| return "LowpanId(stable={}, t={}, id={}, enterprise_number={}, service_data_length={}, service_data={}, sub_tlvs=[{}])".format( |
| self.stable, self.t, self.id, self.enterprise_number, self.service_data_length, self.service_data, sub_tlvs_str) |
| |
| |
| class ServiceSubTlvsFactory(NetworkDataSubTlvsFactory): |
| |
| def __init__(self, sub_tlvs_factories): |
| super(ServiceSubTlvsFactory, self).__init__(sub_tlvs_factories) |
| |
| |
| class ServiceFactory(object): |
| |
| def __init__(self, sub_tlvs_factory): |
| self._sub_tlvs_factory = sub_tlvs_factory |
| |
| def parse(self, data, message_info): |
| data_byte = ord(data.read(1)) |
| t = (data_byte >> 7) & 0x01 |
| _id = (data_byte & 0x0f) |
| |
| enterprise_number = struct.unpack(">L", data.read(4))[0] |
| service_data_length = ord(data.read(1)) |
| service_data = data.read(service_data_length) |
| |
| sub_tlvs = self._sub_tlvs_factory.parse(io.BytesIO(data.read()), message_info) |
| |
| return Service(t, _id, enterprise_number, service_data_length, service_data, sub_tlvs, message_info.stable) |
| |
| |
| class Server(NetworkData): |
| |
| def __init__(self, server_16, server_data, stable): |
| super(Server, self).__init__(stable) |
| self._server_16 = server_16 |
| self._server_data = server_data |
| |
| @property |
| def server_16(self): |
| return self._server_16 |
| |
| @property |
| def server_data(self): |
| return self._server_data |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.server_16 == other.server_16 and \ |
| self.server_data == other.server_data |
| |
| def __repr__(self): |
| return "LowpanId(stable={}, server_16={}, server_data=b'{}')".format( |
| self.stable, self.server_16, hexlify(self.server_data)) |
| |
| |
| class ServerFactory(object): |
| |
| def parse(self, data, message_info): |
| server_16 = struct.unpack(">H", data.read(2))[0] |
| server_data = bytearray(data.read()) |
| |
| return Server(server_16, server_data, message_info.stable) |
| |
| |
| class NetworkDataTlvsFactory(NetworkDataSubTlvsFactory): |
| |
| def __init__(self, sub_tlvs_factories): |
| super(NetworkDataTlvsFactory, self).__init__(sub_tlvs_factories) |