blob: a666e671af2cc84210b5929e32990039f4ab94a6 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
"""
This module provides simple 802.15.4 MAC parser.
"""
import io
import struct
import config
from common import MacAddress, MacAddressType, MessageInfo
from net_crypto import AuxiliarySecurityHeader, CryptoEngine, MacCryptoMaterialCreator
class DeviceDescriptors:
"""Class representing 802.15.4 Device Descriptors."""
device_descriptors = {}
@classmethod
def add(cls, short_address, extended_address):
short_address = cls._get_short_address_value(short_address)
cls.device_descriptors[short_address] = extended_address
@classmethod
def get_extended(cls, short_address):
short_address = cls._get_short_address_value(short_address)
return cls.device_descriptors[short_address]
@staticmethod
def _get_short_address_value(short_address):
if isinstance(short_address, MacAddress):
short_address = short_address.rloc
return short_address
class MacHeader:
"""Class representing 802.15.4 MAC header."""
class FrameType:
BEACON = 0
DATA = 1
ACK = 2
COMMAND = 3
class AddressMode:
NOT_PRESENT = 0
SHORT = 2
EXTENDED = 3
def __init__(self, frame_type, frame_pending, ack_request, frame_version, seq,
dest_pan_id=None, dest_address=None, src_pan_id=None, src_address=None, command_type=None,
aux_sec_header=None, mic=None,
fcs=None):
self.frame_type = frame_type
self.frame_pending = frame_pending
self.ack_request = ack_request
self.frame_version = frame_version
self.seq = seq
self.dest_pan_id = dest_pan_id
self.dest_address = dest_address
self.src_pan_id = src_pan_id
self.src_address = src_address
self.aux_sec_header = aux_sec_header
self.mic = mic
self.fcs = fcs
class MacPayload:
"""Class representing 802.15.4 MAC payload."""
def __init__(self, data):
self.data = bytearray(data)
class MacFrame:
"""Class representing 802.15.4 MAC frame."""
def parse(self, data):
mhr_start = data.tell()
fc, seq = struct.unpack("<HB", data.read(3))
frame_type = fc & 0x0007
security_enabled = bool(fc & 0x0008)
frame_pending = bool(fc & 0x0010)
ack_request = bool(fc & 0x0020)
panid_compression = bool(fc & 0x0040)
dest_addr_mode = (fc & 0x0c00) >> 10
frame_version = (fc & 0x3000) >> 12
source_addr_mode = (fc & 0xc000) >> 14
if frame_type == MacHeader.FrameType.ACK:
fcs = self._parse_fcs(data, data.tell())
self.header = MacHeader(frame_type, frame_pending, ack_request, frame_version, seq, fcs=fcs)
self.payload = None
return
# Presence of PAN Ids is not fully implemented yet but should be enough for Thread.
dest_pan_id = struct.unpack("<H", data.read(2))[0]
dest_address = self._parse_address(data, dest_addr_mode)
if not panid_compression:
src_pan_id = struct.unpack("<H", data.read(2))[0]
else:
src_pan_id = dest_pan_id
src_address = self._parse_address(data, source_addr_mode)
mhr_end = data.tell()
if security_enabled:
aux_sec_header = self._parse_aux_sec_header(data)
aux_sec_header_end = data.tell()
else:
aux_sec_header = None
# Check end of MAC frame
if frame_type == MacHeader.FrameType.COMMAND:
command_type = ord(data.read(1))
else:
command_type = None
payload_pos = data.tell()
data.seek(-2, io.SEEK_END)
fcs_start = data.tell()
if aux_sec_header and aux_sec_header.security_level:
mic, payload_end = self._parse_mic(data, aux_sec_header.security_level)
else:
payload_end = data.tell()
mic = None
fcs = self._parse_fcs(data, fcs_start)
# Create Header object
self.header = MacHeader(frame_type, frame_pending, ack_request, frame_version, seq,
dest_pan_id, dest_address, src_pan_id, src_address, command_type,
aux_sec_header, mic,
fcs)
# Create Payload object
payload_len = payload_end - payload_pos
data.seek(payload_pos)
payload = data.read(payload_len)
if security_enabled:
mhr_len = mhr_end - mhr_start
data.seek(mhr_start)
mhr_bytes = data.read(mhr_len)
aux_sec_header_len = aux_sec_header_end - mhr_end
aux_sec_hdr_bytes = data.read(aux_sec_header_len)
non_payload_fields = bytearray([])
if command_type is not None:
non_payload_fields.append(command_type)
message_info = MessageInfo()
message_info.aux_sec_hdr = aux_sec_header
message_info.aux_sec_hdr_bytes = aux_sec_hdr_bytes
message_info.nonpayload_fields = non_payload_fields
message_info.mhr_bytes = mhr_bytes
if src_address.type == MacAddressType.SHORT:
message_info.source_mac_address = DeviceDescriptors.get_extended(src_address).mac_address
else:
message_info.source_mac_address = src_address.mac_address
sec_obj = CryptoEngine(MacCryptoMaterialCreator(config.DEFAULT_MASTER_KEY))
self.payload = MacPayload(sec_obj.decrypt(payload, mic, message_info))
else:
self.payload = MacPayload(payload)
def _parse_address(self, data, mode):
if mode == MacHeader.AddressMode.SHORT:
return MacAddress(data.read(2), MacAddressType.SHORT, big_endian=False)
if mode == MacHeader.AddressMode.EXTENDED:
return MacAddress(data.read(8), MacAddressType.LONG, big_endian=False)
else:
return None
def _parse_aux_sec_header(self, data):
security_control, frame_counter = struct.unpack("<BL", data.read(5))
security_level = security_control & 0x07
key_id_mode = (security_control & 0x18) >> 3
if key_id_mode == 1:
key_id = data.read(1)
elif key_id_mode == 2:
key_id = data.read(5)
elif key_id_mode == 3:
key_id = data.read(9)
else:
key_source = None
key_index = None
return AuxiliarySecurityHeader(key_id_mode, security_level, frame_counter, key_id, big_endian=False)
def _parse_mic(self, data, security_level):
if security_level in (1, 5):
data.seek(-4, io.SEEK_CUR)
payload_end = data.tell()
mic = data.read(4)
elif security_level in (2, 6):
data.seek(-8, io.SEEK_CUR)
payload_end = data.tell()
mic = data.read(8)
elif security_level in (3, 7):
data.seek(-16, io.SEEK_CUR)
payload_end = data.tell()
mic = data.read(16)
else:
payload_end = data.tell()
return mic, payload_end
def _parse_fcs(self, data, fcs_start):
data.seek(fcs_start)
fcs = bytearray(data.read(2))
return fcs