blob: fb4047baccea413980b3c102fd80f43fe13f31ea [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import abc
import io
import struct
import sys
from binascii import hexlify
from ipaddress import ip_address
try:
from itertools import izip_longest as zip_longest
except ImportError:
from itertools import zip_longest
# Next headers for IPv6 protocols
IPV6_NEXT_HEADER_HOP_BY_HOP = 0
IPV6_NEXT_HEADER_TCP = 6
IPV6_NEXT_HEADER_UDP = 17
IPV6_NEXT_HEADER_ICMP = 58
UPPER_LAYER_PROTOCOLS = [
IPV6_NEXT_HEADER_TCP,
IPV6_NEXT_HEADER_UDP,
IPV6_NEXT_HEADER_ICMP,
]
# ICMP Protocol codes
ICMP_DESTINATION_UNREACHABLE = 1
ICMP_ECHO_REQUEST = 128
ICMP_ECHO_RESPONSE = 129
# Default hop limit for IPv6
HOP_LIMIT_DEFAULT = 64
def calculate_checksum(data):
""" Calculate checksum from data bytes.
How to calculate checksum (RFC 2460):
https://tools.ietf.org/html/rfc2460#page-27
Args:
data (bytes): input data from which checksum will be calculated
Returns:
int: calculated checksum
"""
# Create halfwords from data bytes. Example: data[0] = 0x01, data[1] = 0xb2 => 0x01b2
halfwords = [((byte0 << 8) | byte1) for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00)]
checksum = 0
for halfword in halfwords:
checksum += halfword
checksum = (checksum & 0xFFFF) + (checksum >> 16)
checksum ^= 0xFFFF
if checksum == 0:
return 0xFFFF
else:
return checksum
class PacketFactory(object):
""" Interface for classes that produce objects from data. """
def parse(self, data, message_info):
""" Convert data to object.
Args:
data (BytesIO)
message_info (MessageInfo)
"""
raise NotImplementedError
class BuildableFromBytes(object):
""" Interface for classes which can be built from bytes. """
@classmethod
def from_bytes(cls, data):
""" Convert data to object.
Args:
data (bytes)
"""
raise NotImplementedError
class ConvertibleToBytes(object):
""" Interface for classes which can be converted to bytes. """
def to_bytes(self):
""" Convert object to data.
Returns:
bytes
"""
raise NotImplementedError
def __len__(self):
""" Length of data (in bytes).
Returns:
int
"""
raise NotImplementedError
class Header(object):
""" Interface for header classes. """
__metaclass__ = abc.ABCMeta
@abc.abstractproperty
def type(self):
""" Number which can be used in the next header field in IPv6 header or next headers.
Returns:
int
"""
class ExtensionHeader(object):
""" Base for classes representing Extension Headers in IPv6 packets. """
def __init__(self, next_header, hdr_ext_len=0):
self.next_header = next_header
self.hdr_ext_len = hdr_ext_len
class UpperLayerProtocol(Header, ConvertibleToBytes):
""" Base for classes representing upper layer protocol payload in IPv6 packets. """
def __init__(self, header):
self.header = header
@property
def checksum(self):
""" Return checksum from upper layer protocol header. """
return self.header.checksum
@checksum.setter
def checksum(self, value):
""" Set checksum value in upper layer protocol header. """
self.header.checksum = value
def is_valid_checksum(self):
""" Return information if set checksum is valid.
It is not possible to get zero from checksum calculation.
Zero indicates invalid checksum value.
Returns:
bool
"""
return self.checksum != 0
class IPv6PseudoHeader(ConvertibleToBytes):
""" Class representing IPv6 pseudo header which is required to calculate
upper layer protocol (like e.g. UDP or ICMPv6) checksum.
This class is used only during upper layer protocol checksum calculation. Do not use it outside of this module.
"""
def __init__(self, source_address, destination_address, payload_length, next_header):
self._source_address = self._convert_to_ipaddress(source_address)
self._destination_address = self._convert_to_ipaddress(destination_address)
self.payload_length = payload_length
self.next_header = next_header
def _convert_to_ipaddress(self, value):
if isinstance(value, bytearray):
value = bytes(value)
elif isinstance(value, str) and sys.version_info[0] == 2:
value = value.decode("utf-8")
return ip_address(value)
@property
def source_address(self):
return self._source_address
@source_address.setter
def source_address(self, value):
self._source_address = self._convert_to_ipaddress(value)
@property
def destination_address(self):
return self._destination_address
@destination_address.setter
def destination_address(self, value):
self._source_address = self._convert_to_ipaddress(value)
def to_bytes(self):
data = bytearray()
data += self.source_address.packed
data += self.destination_address.packed
data += struct.pack(">I", self.payload_length)
data += struct.pack(">I", self.next_header)
return data
class IPv6Header(ConvertibleToBytes, BuildableFromBytes):
""" Class representing IPv6 packet header. """
_version = 6
_header_length = 40
def __init__(self, source_address, destination_address, traffic_class=0, flow_label=0, hop_limit=64,
payload_length=0, next_header=0):
self.version = self._version
self._source_address = self._convert_to_ipaddress(source_address)
self._destination_address = self._convert_to_ipaddress(destination_address)
self.traffic_class = traffic_class
self.flow_label = flow_label
self.hop_limit = hop_limit
self.payload_length = payload_length
self.next_header = next_header
def _convert_to_ipaddress(self, value):
if isinstance(value, bytearray):
value = bytes(value)
elif isinstance(value, str) and sys.version_info[0] == 2:
value = value.decode("utf-8")
return ip_address(value)
@property
def source_address(self):
return self._source_address
@source_address.setter
def source_address(self, value):
self._source_address = self._convert_to_ipaddress(value)
@property
def destination_address(self):
return self._destination_address
def to_bytes(self):
data = bytearray([
((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F),
((self.traffic_class & 0x0F) << 4) | ((self.flow_label >> 16) & 0x0F),
((self.flow_label >> 8) & 0xFF),
((self.flow_label & 0xFF))
])
data += struct.pack(">H", self.payload_length)
data += bytearray([self.next_header, self.hop_limit])
data += self.source_address.packed
data += self.destination_address.packed
return data
@classmethod
def from_bytes(cls, data):
b = bytearray(data.read(4))
version = (b[0] >> 4) & 0x0F
traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F)
flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3]
payload_length = struct.unpack(">H", data.read(2))[0]
next_header = ord(data.read(1))
hop_limit = ord(data.read(1))
src_addr = bytearray(data.read(16))
dst_addr = bytearray(data.read(16))
return cls(src_addr,
dst_addr,
traffic_class,
flow_label,
hop_limit,
payload_length,
next_header)
def __repr__(self):
return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, \
hop_limit={}, traffic_class={}, flow_label={})".format(self.source_address.compressed,
self.destination_address.compressed,
self.next_header,
self.payload_length,
self.hop_limit,
self.traffic_class,
self.flow_label)
def __len__(self):
return self._header_length
class IPv6Packet(ConvertibleToBytes):
""" Class representing IPv6 packet.
IPv6 packet consists of IPv6 header, optional extension header, and upper layer protocol.
IPv6 packet
+-------------+----------------------------------+----------------------------------------------+
| | | |
| IPv6 header | extension headers (zero or more) | upper layer protocol (e.g. UDP, TCP, ICMPv6) |
| | | |
+-------------+----------------------------------+----------------------------------------------+
Extension headers:
- HopByHop
- Routing header (not implemented in this module)
Upper layer protocols:
- ICMPv6
- UDP
- TCP (not implemented in this module)
Example:
IPv6 packet construction without extension headers:
ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
ICMPv6(ICMPv6Header(128, 0),
ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
0x41, 0x41]))))
IPv6 packet construction with extension headers:
ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
ICMPv6(ICMPv6Header(128, 0),
ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
0x41, 0x41])),
[HopByHop(options=[
HopByHopOption(HopByHopOptionHeader(_type=0x6d),
MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
])])
"""
def __init__(self, ipv6_header, upper_layer_protocol, extension_headers=None):
self.ipv6_header = ipv6_header
self.upper_layer_protocol = upper_layer_protocol
self.extension_headers = extension_headers if extension_headers is not None else []
self._update_next_header_values_in_headers()
if not upper_layer_protocol.is_valid_checksum():
self.upper_layer_protocol.checksum = self.calculate_checksum()
def _validate_checksum(self):
checksum = self.calculate_checksum()
if self.upper_layer_protocol.checksum != checksum:
raise RuntimeError("Could not create IPv6 packet. "
"Invalid checksum: {}!={}".format(self.upper_layer_protocol.checksum, checksum))
self.upper_layer_protocol.checksum = checksum
def _update_payload_length_value_in_ipv6_header(self):
self.ipv6_header.payload_length = len(self.upper_layer_protocol) + \
sum([len(extension_header) for extension_header in self.extension_headers])
def _update_next_header_values_in_headers(self):
last_header = self.ipv6_header
for extension_header in self.extension_headers:
last_header.next_header = extension_header.type
last_header = extension_header
last_header.next_header = self.upper_layer_protocol.type
def calculate_checksum(self):
saved_checksum = self.upper_layer_protocol.checksum
self.upper_layer_protocol.checksum = 0
upper_layer_protocol_bytes = self.upper_layer_protocol.to_bytes()
self.upper_layer_protocol.checksum = saved_checksum
pseudo_header = IPv6PseudoHeader(self.ipv6_header.source_address,
self.ipv6_header.destination_address,
len(upper_layer_protocol_bytes),
self.upper_layer_protocol.type)
return calculate_checksum(pseudo_header.to_bytes() + upper_layer_protocol_bytes)
def to_bytes(self):
self._update_payload_length_value_in_ipv6_header()
self._update_next_header_values_in_headers()
self.upper_layer_protocol.checksum = self.calculate_checksum()
ipv6_packet = self.ipv6_header.to_bytes()
for extension_header in self.extension_headers:
ipv6_packet += extension_header.to_bytes()
ipv6_packet += self.upper_layer_protocol.to_bytes()
return ipv6_packet
def __repr__(self):
return "IPv6Packet(header={}, upper_layer_protocol={})".format(self.ipv6_header, self.upper_layer_protocol)
class UDPHeader(ConvertibleToBytes, BuildableFromBytes):
""" Class representing UDP datagram header.
This header is required to construct UDP datagram.
"""
_header_length = 8
def __init__(self, src_port, dst_port, payload_length=0, checksum=0):
self.src_port = src_port
self.dst_port = dst_port
self._payload_length = payload_length
self.checksum = checksum
@property
def type(self):
return 17
@property
def payload_length(self):
return self._payload_length
@payload_length.setter
def payload_length(self, value):
self._payload_length = self._header_length + value
def to_bytes(self):
data = struct.pack(">H", self.src_port)
data += struct.pack(">H", self.dst_port)
data += struct.pack(">H", self.payload_length)
data += struct.pack(">H", self.checksum)
return data
@classmethod
def from_bytes(cls, data):
src_port = struct.unpack(">H", data.read(2))[0]
dst_port = struct.unpack(">H", data.read(2))[0]
payload_length = struct.unpack(">H", data.read(2))[0]
checksum = struct.unpack(">H", data.read(2))[0]
return cls(src_port, dst_port, payload_length, checksum)
def __len__(self):
return self._header_length
class UDPDatagram(UpperLayerProtocol):
""" Class representing UDP datagram.
UDP is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
This class consists of a UDP header and payload. The example below shows how a UDP datagram can be constructed.
Example:
udp_dgram = UDPDatagram(UDPHeader(src_port=19788, dst_port=19788),
bytes([0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03,
0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80,
0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef]))
"""
@property
def type(self):
return 17
def __init__(self, header, payload):
super(UDPDatagram, self).__init__(header)
self.payload = payload
def to_bytes(self):
self.header.payload_length = len(self.payload)
data = bytearray()
data += self.header.to_bytes()
data += self.payload.to_bytes()
return data
def __len__(self):
return len(self.header) + len(self.payload)
class ICMPv6Header(ConvertibleToBytes, BuildableFromBytes):
""" Class representing ICMPv6 message header.
This header is required to construct ICMPv6 message.
"""
_header_length = 4
def __init__(self, _type, code, checksum=0):
self.type = _type
self.code = code
self.checksum = checksum
def to_bytes(self):
return bytearray([self.type, self.code]) + struct.pack(">H", self.checksum)
@classmethod
def from_bytes(cls, data):
_type = ord(data.read(1))
code = ord(data.read(1))
checksum = struct.unpack(">H", data.read(2))[0]
return cls(_type, code, checksum)
def __len__(self):
return self._header_length
class ICMPv6(UpperLayerProtocol):
""" Class representing ICMPv6 message.
ICMPv6 is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
This class consists of an ICMPv6 header and body. The example below shows how an ICMPv6 message can be constructed.
Example:
icmpv6_msg = ICMPv6(ICMPv6Header(128, 0),
ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
0x41, 0x41])))
"""
@property
def type(self):
return 58
def __init__(self, header, body):
super(ICMPv6, self).__init__(header)
self.body = body
def to_bytes(self):
return bytearray(self.header.to_bytes() + self.body.to_bytes())
def __len__(self):
return len(self.header) + len(self.body)
class HopByHop(ExtensionHeader):
""" Class representing HopByHop extension header.
HopByHop extension header consists of:
- next_header type
- extension header length which is multiple of 8
- options
"""
_one_byte_padding = 0x00
_many_bytes_padding = 0x01
@property
def type(self):
return 0
def __init__(self, next_header=None, options=None, hdr_ext_len=None):
super(HopByHop, self).__init__(next_header, hdr_ext_len)
self.options = options if options is not None else []
if hdr_ext_len is not None:
self.hdr_ext_len = hdr_ext_len
else:
payload_length = self._calculate_payload_length()
self.hdr_ext_len = self._calculate_hdr_ext_len(payload_length)
def _calculate_payload_length(self):
payload_length = 2
for option in self.options:
payload_length += len(option)
return payload_length
def _calculate_hdr_ext_len(self, payload_length):
count = payload_length >> 3
if (payload_length & 0x7) == 0 and count > 0:
return count - 1
return count
def to_bytes(self):
data = bytearray([self.next_header, self.hdr_ext_len])
for option in self.options:
data += option.to_bytes()
# Padding
#
# More details:
# https://tools.ietf.org/html/rfc2460#section-4.2
#
excess_bytes = len(data) & 0x7
if excess_bytes > 0:
padding_length = 8 - excess_bytes
if padding_length == 1:
data += bytearray([self._one_byte_padding])
else:
padding_length -= 2
data += bytearray([self._many_bytes_padding, padding_length])
data += bytearray([0x00 for _ in range(padding_length)])
return data
def __len__(self):
""" HopByHop extension header length
More details:
https://tools.ietf.org/html/rfc2460#section-4.3
"""
return (self.hdr_ext_len + 1) * 8
class HopByHopOptionHeader(ConvertibleToBytes, BuildableFromBytes):
""" Class representing HopByHop option header. """
_header_length = 2
def __init__(self, _type, length=None):
self.type = _type
self.length = length if length is not None else 0
def to_bytes(self):
return bytearray([self.type, self.length])
@classmethod
def from_bytes(cls, data):
_type = ord(data.read(1))
length = ord(data.read(1))
return cls(_type, length)
def __len__(self):
return self._header_length
def __repr__(self):
return "HopByHopOptionHeader(type={}, length={})".format(self.type, self.length)
class HopByHopOption(ConvertibleToBytes):
""" Class representing HopByHop option.
Class consists of two elements: HopByHopOptionHeader and value (e.g. for MPLOption).
The following example shows how any HopByHop option can be constructed.
Example:
HopByHop(next_header=0x3a,
options=[HopByHopOption(HopByHopOptionHeader(_type=0x6d),
MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
"""
def __init__(self, header, value):
self.value = value
self.header = header
self.header.length = len(self.value)
def to_bytes(self):
return self.header.to_bytes() + self.value.to_bytes()
def __len__(self):
return len(self.header) + len(self.value)
def __repr__(self):
return "HopByHopOption(header={}, value={})".format(self.header, self.value)
class MPLOption(ConvertibleToBytes):
""" Class representing MPL option. """
_header_length = 2
_seed_id_length = {
0: 0,
1: 2,
2: 8,
3: 16
}
def __init__(self, S, M, V, sequence, seed_id):
self.S = S
self.M = M
self.V = V
self.sequence = sequence
self.seed_id = seed_id
def to_bytes(self):
smv = ((self.S & 0x03) << 6) | ((self.M & 0x01) << 5) | ((self.V & 0x01) << 4)
return bytearray([smv, self.sequence]) + self.seed_id
@classmethod
def from_bytes(cls, data):
b = ord(data.read(1))
s = ((b >> 6) & 0x03)
m = ((b >> 5) & 0x01)
v = ((b >> 4) & 0x01)
sequence = ord(data.read(1))
seed_id = data.read(cls._seed_id_length[s])
return cls(s, m, v, sequence, seed_id)
def __len__(self):
return self._header_length + self._seed_id_length[self.S]
def __repr__(self):
return "MPLOption(S={}, M={}, V={}, sequence={}, seed_id={})".format(self.S, self.M, self.V, self.sequence, hexlify(self.seed_id))
class IPv6PacketFactory(PacketFactory):
""" Factory that produces IPv6 packets from data.
This factory must be initialized with factories which allow to parse extension headers and upper layer protocols.
The following example shows preferable setup of IPv6PacketFactory.
Header types:
0: HopByHop
17: UDP
58: ICMPv6
Option types:
109: MPL
ICMPv6 body types:
128: Echo request
129: Echo response
Example usage:
ipv6_factory = IPv6PacketFactory(
ehf={
0: HopByHopFactory(options_factories={
109: MPLOptionFactory()
})
},
ulpf={
17: UDPDatagramFactory(dst_port_factories={
19788: MLEMessageFactory(),
19789: CoAPMessageFactory()
}),
58: ICMPv6Factory(body_factories={
128: ICMPv6EchoBodyFactory(),
129: ICMPv6EchoBodyFactory()
})
}
)
"""
def __init__(self, ehf=None, ulpf=None):
"""
ehf - Extension Header Factory
ulpf - Upper Layer Protocol Factory
Args:
ehf(dict[int: PacketFactory]): Dictionary mapping extension header types on specialized factories.
ulpf(dict[int: PacketFactory]): Dictionary mapping upper layer protocol types on specialized factories.
"""
self._ehf = ehf if ehf is not None else {}
self._ulpf = ulpf if ulpf is not None else {}
def _is_extension_header(self, header_type):
return not header_type in UPPER_LAYER_PROTOCOLS
def _get_extension_header_factory_for(self, next_header):
try:
return self._ehf[next_header]
except KeyError:
raise RuntimeError("Could not get Extension Header factory for next_header={}.".format(next_header))
def _get_upper_layer_protocol_factory_for(self, next_header):
try:
return self._ulpf[next_header]
except KeyError:
raise RuntimeError("Could not get Upper Layer Protocol factory for next_header={}.".format(next_header))
def _parse_extension_headers(self, data, next_header, message_info):
extension_headers = []
while self._is_extension_header(next_header):
factory = self._get_extension_header_factory_for(next_header)
extension_header = factory.parse(data, message_info)
next_header = extension_header.next_header
extension_headers.append(extension_header)
return next_header, extension_headers
def _parse_upper_layer_protocol(self, data, next_header, message_info):
factory = self._get_upper_layer_protocol_factory_for(next_header)
return factory.parse(data, message_info)
def parse(self, data, message_info):
ipv6_header = IPv6Header.from_bytes(data)
message_info.source_ipv6 = ipv6_header.source_address
message_info.destination_ipv6 = ipv6_header.destination_address
next_header, extension_headers = self._parse_extension_headers(data, ipv6_header.next_header, message_info)
upper_layer_protocol = self._parse_upper_layer_protocol(data, next_header, message_info)
return IPv6Packet(ipv6_header, upper_layer_protocol, extension_headers)
class HopByHopOptionsFactory(object):
""" Factory that produces HopByHop options. """
_one_byte_padding = 0x00
_many_bytes_padding = 0x01
def __init__(self, options_factories=None):
self._options_factories = options_factories if options_factories is not None else {}
def _get_HopByHopOption_value_factory(self, _type):
try:
return self._options_factories[_type]
except KeyError:
raise RuntimeError("Could not find HopByHopOption value factory for type={}.".format(_type))
def parse(self, data, message_info):
options = []
while data.tell() < len(data.getvalue()):
option_header = HopByHopOptionHeader.from_bytes(data)
if option_header.type == self._one_byte_padding:
# skip one byte padding
data.read(1)
elif option_header.type == self._many_bytes_padding:
# skip n bytes padding
data.read(option_header.length)
else:
factory = self._get_HopByHopOption_value_factory(option_header.type)
option_data = data.read(option_header.length)
option = HopByHopOption(option_header, factory.parse(io.BytesIO(option_data), message_info))
options.append(option)
return options
class HopByHopFactory(PacketFactory):
""" Factory that produces HopByHop extension headers from data. """
def __init__(self, hop_by_hop_options_factory):
self._hop_by_hop_options_factory = hop_by_hop_options_factory
def _calculate_extension_header_length(self, hdr_ext_len):
return (hdr_ext_len + 1) * 8
def parse(self, data, message_info):
next_header = ord(data.read(1))
hdr_ext_len = ord(data.read(1))
# Note! Two bytes were read (next_header and hdr_ext_len) so they must be substracted from header length
hop_by_hop_length = self._calculate_extension_header_length(hdr_ext_len) - 2
hop_by_hop_data = data.read(hop_by_hop_length)
options = self._hop_by_hop_options_factory.parse(io.BytesIO(hop_by_hop_data), message_info)
hop_by_hop = HopByHop(next_header, options, hdr_ext_len)
message_info.payload_length += len(hop_by_hop)
return hop_by_hop
class MPLOptionFactory(PacketFactory):
""" Factory that produces MPL options for HopByHop extension header. """
def parse(self, data, message_info):
return MPLOption.from_bytes(data)
class UDPHeaderFactory:
""" Factory that produces UDP header. """
def parse(self, data, message_info):
return UDPHeader.from_bytes(data)
class UdpBasedOnSrcDstPortsPayloadFactory:
# TODO: Unittests
""" Factory that produces UDP payload. """
def __init__(self, src_dst_port_based_payload_factories):
"""
Args:
src_dst_port_based_payload_factories (PacketFactory): Factories parse UDP payload based on source or destination port.
"""
self._factories = src_dst_port_based_payload_factories
def parse(self, data, message_info):
factory = None
if message_info.dst_port in self._factories:
factory = self._factories[message_info.dst_port]
if message_info.src_port in self._factories:
factory = self._factories[message_info.src_port]
if factory is None:
raise RuntimeError("Could not find factory to build UDP payload.")
return factory.parse(data, message_info)
class UDPDatagramFactory(PacketFactory):
# TODO: Unittests
""" Factory that produces UDP datagrams. """
def __init__(self, udp_header_factory, udp_payload_factory):
self._udp_header_factory = udp_header_factory
self._udp_payload_factory = udp_payload_factory
def parse(self, data, message_info):
header = self._udp_header_factory.parse(data, message_info)
# Update message payload length: UDP header (8B) + payload length
message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
message_info.src_port = header.src_port
message_info.dst_port = header.dst_port
payload = self._udp_payload_factory.parse(data, message_info)
return UDPDatagram(header, payload)
class ICMPv6Factory(PacketFactory):
""" Factory that produces ICMPv6 messages from data. """
def __init__(self, body_factories=None):
self._body_factories = body_factories if body_factories is not None else {}
def _get_icmpv6_body_factory(self, _type):
try:
return self._body_factories[_type]
except KeyError:
if "default" not in self._body_factories:
raise RuntimeError("Could not find specialized factory to parse ICMP body. "
"Unsupported ICMP type: {}".format(_type))
default_factory = self._body_factories["default"]
print("Could not find specialized factory to parse ICMP body. "
"Take the default one: {}".format(type(default_factory)))
return default_factory
def parse(self, data, message_info):
header = ICMPv6Header.from_bytes(data)
factory = self._get_icmpv6_body_factory(header.type)
message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
return ICMPv6(header, factory.parse(data, message_info))
class ICMPv6EchoBodyFactory(PacketFactory):
""" Factory that produces ICMPv6 echo message body. """
def parse(self, data, message_info):
return ICMPv6EchoBody.from_bytes(data)
class BytesPayload(ConvertibleToBytes, BuildableFromBytes):
""" Class representing bytes payload. """
def __init__(self, data):
self.data = data
def to_bytes(self):
return bytearray(self.data)
@classmethod
def from_bytes(cls, data):
return cls(data)
def __len__(self):
return len(self.data)
class BytesPayloadFactory(PacketFactory):
""" Factory that produces bytes payload. """
def parse(self, data, message_info):
return BytesPayload(data.read())
class ICMPv6EchoBody(ConvertibleToBytes, BuildableFromBytes):
""" Class representing body of ICMPv6 echo messages. """
_header_length = 4
def __init__(self, identifier, sequence_number, data):
self.identifier = identifier
self.sequence_number = sequence_number
self.data = data
def to_bytes(self):
data = struct.pack(">H", self.identifier)
data += struct.pack(">H", self.sequence_number)
data += self.data
return data
@classmethod
def from_bytes(cls, data):
identifier = struct.unpack(">H", data.read(2))[0]
sequence_number = struct.unpack(">H", data.read(2))[0]
return cls(identifier, sequence_number, data.read())
def __len__(self):
return self._header_length + len(self.data)
class ICMPv6DestinationUnreachableFactory(PacketFactory):
""" Factory that produces ICMPv6 echo message body. """
def parse(self, data, message_info):
return ICMPv6DestinationUnreachable.from_bytes(data)
class ICMPv6DestinationUnreachable(ConvertibleToBytes, BuildableFromBytes):
""" Class representing body of ICMPv6 Destination Unreachable messages. """
_header_length = 4
_unused = 0
def __init__(self, data):
self.data = data
def to_bytes(self):
data = bytearray(struct.pack(">I", self._unused))
data += self.data
return data
@classmethod
def from_bytes(cls, data):
unused = struct.unpack(">I", data.read(4))[0]
if unused != 0:
raise RuntimeError(
"Invalid value of unused field in the ICMPv6 Destination Unreachable data. Expected value: 0.")
return cls(bytearray(data.read()))
def __len__(self):
return self._header_length + len(self.data)