blob: e9b80cde6d4db616671b73d3167570adac5512e2 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import io
import ipaddress
import struct
import sys
import coap
import common
import ipv6
import lowpan
import mac802154
import mle
from enum import IntEnum
class MessageType(IntEnum):
MLE = 0
COAP = 1
ICMP = 2
ACK = 3
BEACON = 4
DATA = 5
class Message(object):
def __init__(self):
self._type = None
self._channel = None
self._mac_header = None
self._ipv6_packet = None
self._coap = None
self._mle = None
self._icmp = None
def _extract_udp_datagram(self, udp_datagram):
if isinstance(udp_datagram.payload, mle.MleMessage):
self._type = MessageType.MLE
self._mle = udp_datagram.payload
elif isinstance(udp_datagram.payload, (coap.CoapMessage, coap.CoapMessageProxy)):
self._type = MessageType.COAP
self._coap = udp_datagram.payload
def _extract_upper_layer_protocol(self, upper_layer_protocol):
if isinstance(upper_layer_protocol, ipv6.ICMPv6):
self._type = MessageType.ICMP
self._icmp = upper_layer_protocol
elif isinstance(upper_layer_protocol, ipv6.UDPDatagram):
self._extract_udp_datagram(upper_layer_protocol)
@property
def type(self):
return self._type
@type.setter
def type(self, value):
self._type = value
@property
def channel(self):
return self._channel
@channel.setter
def channel(self, value):
self._channel = value
@property
def mac_header(self):
return self._mac_header
@mac_header.setter
def mac_header(self, value):
self._mac_header = value
if self._mac_header.frame_type == mac802154.MacHeader.FrameType.BEACON:
self._type = MessageType.BEACON
elif self._mac_header.frame_type == mac802154.MacHeader.FrameType.ACK:
self._type = MessageType.ACK
elif self._mac_header.frame_type == mac802154.MacHeader.FrameType.DATA:
self._type = MessageType.DATA
@property
def ipv6_packet(self):
return self._ipv6_packet
@ipv6_packet.setter
def ipv6_packet(self, value):
self._ipv6_packet = value
self._extract_upper_layer_protocol(value.upper_layer_protocol)
@property
def coap(self):
return self._coap
@property
def mle(self):
return self._mle
@property
def icmp(self):
return self._icmp
@icmp.setter
def icmp(self, value):
self._icmp = value
def get_mle_message_tlv(self, tlv_class_type):
if self.type != MessageType.MLE:
raise ValueError("Invalid message type. Expected MLE message.")
for tlv in self.mle.command.tlvs:
if isinstance(tlv, tlv_class_type):
return tlv
def assertMleMessageIsType(self, command_type):
if self.type != MessageType.MLE:
raise ValueError("Invalid message type. Expected MLE message.")
assert(self.mle.command.type == command_type)
def assertMleMessageContainsTlv(self, tlv_class_type):
if self.type != MessageType.MLE:
raise ValueError("Invalid message type. Expected MLE message.")
contains_tlv = False
for tlv in self.mle.command.tlvs:
if isinstance(tlv, tlv_class_type):
contains_tlv = True
break
assert(contains_tlv == True)
def assertMleMessageDoesNotContainTlv(self, tlv_class_type):
if self.type != MessageType.MLE:
raise ValueError("Invalid message type. Expected MLE message.")
contains_tlv = False
for tlv in self.mle.command.tlvs:
if isinstance(tlv, tlv_class_type):
contains_tlv = True
break
assert(contains_tlv == False)
def assertMleMessageContainsOptionalTlv(self, tlv_class_type):
if self.type != MessageType.MLE:
raise ValueError("Invalid message type. Expected MLE message.")
contains_tlv = False
for tlv in self.mle.command.tlvs:
if isinstance(tlv, tlv_class_type):
contains_tlv = True
break
print("MleMessage doesn't contain optional TLV: {}".format(tlv_class_type))
def get_coap_message_tlv(self, tlv_class_type):
if self.type != MessageType.COAP:
raise ValueError("Invalid message type. Expected CoAP message.")
for tlv in self.coap.payload:
if isinstance(tlv, tlv_class_type):
return tlv
def assertCoapMessageContainsTlv(self, tlv_class_type):
if self.type != MessageType.COAP:
raise ValueError("Invalid message type. Expected CoAP message.")
contains_tlv = False
for tlv in self.coap.payload:
if isinstance(tlv, tlv_class_type):
contains_tlv = True
break
assert(contains_tlv == True)
def assertCoapMessageContainsOptionalTlv(self, tlv_class_type):
if self.type != MessageType.COAP:
raise ValueError("Invalid message type. Expected CoAP message.")
contains_tlv = False
for tlv in self.coap.payload:
if isinstance(tlv, tlv_class_type):
contains_tlv = True
break
print("CoapMessage doesn't contain optional TLV: {}".format(tlv_class_type))
def assertCoapMessageRequestUriPath(self, uri_path):
if self.type != MessageType.COAP:
raise ValueError("Invalid message type. Expected CoAP message.")
assert(uri_path == self.coap.uri_path)
def assertCoapMessageCode(self, code):
if self.type != MessageType.COAP:
raise ValueError("Invalid message type. Expected CoAP message.")
assert(code == self.coap.code)
def assertSentToNode(self, node):
sent_to_node = False
dst_addr = self.ipv6_packet.ipv6_header.destination_address
for addr in node.get_addrs():
if dst_addr == ipaddress.ip_address(addr):
sent_to_node = True
assert sent_to_node == True
def assertSentToDestinationAddress(self, ipv6_address):
if sys.version_info[0] == 2:
ipv6_address = ipv6_address.decode("utf-8")
assert self.ipv6_packet.ipv6_header.destination_address == ipaddress.ip_address(ipv6_address)
def assertSentWithHopLimit(self, hop_limit):
assert self.ipv6_packet.ipv6_header.hop_limit == hop_limit
def __repr__(self):
return "Message(type={})".format(MessageType(self.type).name)
class MessagesSet(object):
def __init__(self, messages):
self._messages = messages
@property
def messages(self):
return self._messages
def next_coap_message(self, code, uri_path=None, assert_enabled=True):
message = None
while self.messages:
m = self.messages.pop(0)
if m.type != MessageType.COAP:
continue
if uri_path is not None and m.coap.uri_path != uri_path:
continue
else:
if not m.coap.code.is_equal_dotted(code):
continue
message = m
break
if assert_enabled:
assert message is not None, "Could not find CoapMessage with code: {}".format(code)
return message
def next_mle_message(self, command_type, assert_enabled=True):
message = self.next_mle_message_of_one_of_command_types(command_type,)
if assert_enabled:
assert message is not None, "Could not find MleMessage of the type: {}".format(command_type)
return message
def next_mle_message_of_one_of_command_types(self, *command_types):
message = None
while self.messages:
m = self.messages.pop(0)
if m.type != MessageType.MLE:
continue
command_found = False
for command_type in command_types:
if m.mle.command.type == command_type:
command_found = True
break
if command_found:
message = m
break
return message
def contains_mle_message(self, command_type):
for m in self.messages:
if m.type != MessageType.MLE:
continue
if m.mle.command.type == command_type:
return True
return False
def does_not_contain_coap_message(self):
for m in self.messages:
if m.type != MessageType.COAP:
continue
return False
return True
class MessageFactory:
def __init__(self, lowpan_parser):
self._lowpan_parser = lowpan_parser
def _add_device_descriptors(self, message):
for tlv in message.mle.command.tlvs:
if isinstance(tlv, mle.SourceAddress):
mac802154.DeviceDescriptors.add(tlv.address, message.mac_header.src_address)
if isinstance(tlv, mle.Address16):
mac802154.DeviceDescriptors.add(tlv.address, message.mac_header.dest_address)
def _parse_mac_frame(self, data):
mac_frame = mac802154.MacFrame()
mac_frame.parse(data)
return mac_frame
def create(self, data):
message = Message()
message.channel = struct.unpack(">B", data.read(1))
# Parse MAC header
mac_frame = self._parse_mac_frame(data)
message.mac_header = mac_frame.header
if message.mac_header.frame_type != mac802154.MacHeader.FrameType.DATA:
return message
message_info = common.MessageInfo()
message_info.source_mac_address = message.mac_header.src_address
message_info.destination_mac_address = message.mac_header.dest_address
# Create stream with 6LoWPAN datagram
lowpan_payload = io.BytesIO(mac_frame.payload.data)
ipv6_packet = self._lowpan_parser.parse(lowpan_payload, message_info)
if ipv6_packet is None:
return message
message.ipv6_packet = ipv6_packet
if message.type == MessageType.MLE:
self._add_device_descriptors(message)
return message