blob: 7fa3cc4f35bb84c55d59f2017a51005c99338d87 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import collections
import io
import struct
from binascii import hexlify
from enum import IntEnum
class CoapMessageType(IntEnum):
CON = 0 # Confirmable
NON = 1 # Non-confirmable
ACK = 2 # Acknowledgement
RST = 3 # Reset
class CoapOptionsTypes(IntEnum):
IF_MATCH = 1
URI_HOST = 3
ETAG = 4
IF_NOT_MATCH = 5
URI_PORT = 7
LOCATION_PATH = 8
URI_PATH = 11
CONTENT_FORMAT = 12
MAX_AGE = 14
URI_QUERY = 15
ACCEPT = 17
LOCATION_QUERY = 20
PROXY_URI = 35
PROXY_SCHEME = 39
SIZE1 = 60
class CoapOptionHeader(object):
""" Class representing CoAP optiona header. """
def __init__(self, delta, length):
self._delta = delta
self._length = length
@property
def delta(self):
return self._delta
@property
def length(self):
return self._length
@property
def is_payload_marker(self):
return self.delta == 0xf and self.length == 0xf
@classmethod
def _read_extended_value(cls, data, value):
if value == 13:
return ord(data.read(1)) + 13
elif value == 14:
data.read(1)
return ord(data.read(1)) + 269
else:
return value
@classmethod
def from_bytes(cls, data):
initial_byte = ord(data.read(1))
delta = (initial_byte >> 4) & 0xf
length = initial_byte & 0xf
delta = cls._read_extended_value(data, delta)
length = cls._read_extended_value(data, length)
return cls(delta, length)
class CoapOption(object):
""" Class representing CoAP option. """
def __init__(self, _type, value):
self._type = _type
self._value = value
@property
def type(self):
return self._type
@property
def value(self):
return self._value
def __repr__(self):
return "CoapOption(type={}, value={})".format(self.type, hexlify(self.value))
class CoapOptionsFactory(object):
""" Factory that produces CoAP options. """
def parse(self, data, message_info):
options = []
_type = 0
while data.tell() < len(data.getvalue()):
option_header = CoapOptionHeader.from_bytes(data)
if option_header.is_payload_marker:
break
_type += option_header.delta
value = data.read(option_header.length)
option = CoapOption(_type, value)
options.append(option)
return options
class CoapCode(object):
""" Class representing CoAP code. """
def __init__(self, code):
self._code = code
@property
def code(self):
return self._code
@property
def _class(self):
return (self.code >> 5) & 0x7
@property
def detail(self):
return self.code & 0x1f
@classmethod
def from_class_and_detail(cls, _class, detail):
return cls(((_class & 0x7) << 5) | (detail & 0x1f))
@classmethod
def from_dotted(cls, dotted_str):
_class, detail = dotted_str.split(".")
return cls.from_class_and_detail(int(_class), int(detail))
def is_equal_dotted(self, dotted_code):
other = self.from_dotted(dotted_code)
return self.code == other.code
@property
def dotted(self):
return ".".join(["{:01d}".format(self._class), "{:02d}".format(self.detail)])
def __eq__(self, other):
if isinstance(other, int):
return self.code == other
elif isinstance(other, str):
return self.is_equal_dotted(other)
elif isinstance(other, self.__class__):
return self.code == other.code
else:
raise TypeError("Could not compare {} and {}".format(type(self), type(other)))
def __repr__(self):
return self.dotted
class CoapMessage(object):
""" Class representing CoAP message. """
def __init__(self, version, _type, code, message_id, token, options, payload, uri_path=None):
self._version = version
self._type = _type
self._code = code
self._message_id = message_id
self._token = token
self._options = options
self._payload = payload
self._uri_path = uri_path
@property
def version(self):
return self._version
@property
def type(self):
return self._type
@property
def code(self):
return self._code
@property
def message_id(self):
return self._message_id
@property
def token(self):
return self._token
@property
def tkl(self):
return len(self._token)
@property
def options(self):
return self._options
@property
def payload(self):
return self._payload
@property
def uri_path(self):
return self._uri_path
def __repr__(self):
options_str = ", ".join([repr(opt) for opt in self.options])
return "CoapMessage(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={}, uri-path='{}')".format(
self.version, CoapMessageType.name[self.type], self.code, self.message_id, hexlify(self.token),
options_str, self.payload, self.uri_path)
class CoapMessageProxy(object):
""" Proxy class of CoAP message.
The main idea behind this class is to delay parsing payload. Due to architecture of the existing solution
it is possible to process confirmation message before a request message. In such case it is not possible
to get URI path to get proper payload parser.
"""
def __init__(self, coap_message, message_info, mid_to_uri_path_binder, uri_path_based_payload_factories):
self._coap_message = coap_message
self._message_info = message_info
self._mid_to_uri_path_binder = mid_to_uri_path_binder
self._uri_path_based_payload_factories = uri_path_based_payload_factories
@property
def version(self):
return self._coap_message.version
@property
def type(self):
return self._coap_message.type
@property
def code(self):
return self._coap_message.code
@property
def message_id(self):
return self._coap_message.message_id
@property
def token(self):
return self._coap_message.token
@property
def tkl(self):
return self._coap_message.tkl
@property
def options(self):
return self._coap_message.options
@property
def payload(self):
try:
binded_uri_path = self._mid_to_uri_path_binder.get_uri_path_for(self.message_id, self.token)
factory = self._uri_path_based_payload_factories[binded_uri_path]
return factory.parse(io.BytesIO(self._coap_message.payload), self._message_info)
except RuntimeError:
return self._coap_message.payload
@property
def uri_path(self):
return self._coap_message.uri_path
def __repr__(self):
options_str = ", ".join([repr(opt) for opt in self.options])
return "CoapMessageProxy(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={}, uri-path='{}')".format(
self.version, self.type, self.code, self.message_id, hexlify(self.token),
options_str, self.payload, self.uri_path)
class CoapMessageIdToUriPathBinder:
""" Class binds message id and token with URI path. """
def __init__(self):
self._uri_path_binds = collections.defaultdict(collections.defaultdict)
def add_uri_path_for(self, message_id, token, uri_path):
self._uri_path_binds[message_id][hexlify(token)] = uri_path
def get_uri_path_for(self, message_id, token):
try:
return self._uri_path_binds[message_id][hexlify(token)]
except KeyError:
raise RuntimeError("Could not find URI PATH for message_id: {} and token: {}".format(
message_id, hexlify(token)))
class CoapMessageFactory(object):
""" Factory that produces CoAP messages. """
def __init__(self, options_factory, uri_path_based_payload_factories, message_id_to_uri_path_binder):
self._options_factory = options_factory
self._uri_path_based_payload_factories = uri_path_based_payload_factories
self._mid_to_uri_path_binder = message_id_to_uri_path_binder
def _uri_path_from(self, options):
uri_path_options = []
for option in options:
if option.type == CoapOptionsTypes.URI_PATH:
uri_path_options.append(option.value.decode("utf-8"))
if not uri_path_options:
return None
return "/" + "/".join(uri_path_options)
def _parse_initial_byte(self, data, message_info):
initial_byte = ord(data.read(1))
version = (initial_byte >> 6) & 0x3
_type = CoapMessageType((initial_byte >> 4) & 0x3)
token_length = initial_byte & 0xf
return version, _type, token_length
def parse(self, data, message_info):
version, _type, token_length = self._parse_initial_byte(data, message_info)
code = CoapCode(ord(data.read(1)))
message_id = struct.unpack(">H", data.read(2))[0]
token = data.read(token_length)
options = self._options_factory.parse(data, message_info)
uri_path = self._uri_path_from(options)
if uri_path is not None:
self._mid_to_uri_path_binder.add_uri_path_for(message_id, token, uri_path)
coap_message = CoapMessage(version, _type, code, message_id, token, options, data.read(), uri_path)
return CoapMessageProxy(coap_message, message_info, self._mid_to_uri_path_binder, self._uri_path_based_payload_factories)