blob: 959a2316a739d60f117afd3f19d677946ee545c6 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import hmac
import hashlib
import struct
from binascii import hexlify
from Crypto.Cipher import AES
class CryptoEngine:
""" Class responsible for encryption and decryption of data. """
def __init__(self, crypto_material_creator):
"""
Args:
master_key (bytearray)
"""
self._crypto_material_creator = crypto_material_creator
@property
def mic_length(self):
return self._crypto_material_creator.mic_length
def encrypt(self, data, message_info):
""" Encrypt message.
Args:
data (bytearray)
message_info (MessageInfo)
Returns:
tuple: Encrypted message (bytearray), MIC (bytearray)
"""
key, nonce, auth_data = self._crypto_material_creator.create_key_and_nonce_and_authenticated_data(message_info)
cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=self.mic_length)
cipher.update(auth_data)
return cipher.encrypt_and_digest(bytes(data))
def decrypt(self, enc_data, mic, message_info):
""" Decrypt MLE message.
Args:
enc_data (bytearray)
mic (bytearray)
message_info (MessageInfo)
Returns:
bytearray: Decrypted message.
"""
key, nonce, auth_data = self._crypto_material_creator.create_key_and_nonce_and_authenticated_data(message_info)
cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=self.mic_length)
cipher.update(auth_data)
dec_data = cipher.decrypt_and_verify(bytes(enc_data), bytes(mic))
return bytearray(dec_data)
class CryptoMaterialCreator(object):
_salt = b'Thread'
def __init__(self, master_key):
"""
Args:
master_key (bytearray)
"""
self.master_key = master_key
def _generate_keys(self, sequence_counter):
""" Generate MLE and MAC keys.
Read more: 7.1.4. Key Generation - Thread v1.1 Specification Final
Args:
sequence_counter (int)
Returns:
tuple: MLE and MAC as bytes
"""
k = self.master_key
s = struct.pack(">L", sequence_counter) + self._salt
d = hmac.new(k, s, digestmod=hashlib.sha256).digest()
mle = d[:16]
mac = d[16:]
return mle, mac
def create_key_and_nonce_and_authenticated_data(self, message_info):
raise NotImplementedError
@property
def mic_length(self):
raise NotImplementedError
class MacCryptoMaterialCreator(CryptoMaterialCreator):
def __init__(self, master_key):
"""
Args:
master_key (bytearray)
"""
super(MacCryptoMaterialCreator, self).__init__(master_key)
def _create_nonce(self, eui64, frame_counter, security_level):
""" Create CCM Nonce required by AES-128 CCM for encryption and decryption.
Read more: 7.6.3.2 CCM Nonce - Std 802.15.4-2006
Args:
eui64 (bytes)
frame_counter (int)
security_level (int)
Returns:
bytes: created Nonce
"""
return bytes(eui64 + struct.pack(">LB", frame_counter, security_level))
def _create_authenticated_data(self, mhr, auxiliary_security_header, nonpayload_fields):
""" Create Authenticated Data
Read more: 7.6.3.3 CCM prerequisites - Std 802.15.4-2006
Args:
mhr (bytes)
auxiliary_security_header (bytes)
nonpayload_fields (bytes)
Returns:
bytes: Authenticated Data
"""
return bytes(mhr + auxiliary_security_header + nonpayload_fields)
def create_key_and_nonce_and_authenticated_data(self, message_info):
_, mac_key = self._generate_keys(message_info.aux_sec_hdr.sequence_counter)
nonce = self._create_nonce(message_info.source_mac_address,
message_info.aux_sec_hdr.frame_counter,
message_info.aux_sec_hdr.security_level)
auth_data = self._create_authenticated_data(message_info.mhr_bytes,
message_info.aux_sec_hdr_bytes,
message_info.nonpayload_fields)
return mac_key, nonce, auth_data
@property
def mic_length(self):
return 4
class MleCryptoMaterialCreator(CryptoMaterialCreator):
def __init__(self, master_key):
"""
Args:
master_key (bytearray)
"""
super(MleCryptoMaterialCreator, self).__init__(master_key)
def _create_nonce(self, source_eui64, frame_counter, security_level):
""" Create CCM Nonce required by AES-128 CCM for encryption and decryption.
Read more: 7.6.3.2 CCM Nonce - Std 802.15.4-2006
Args:
eui64 (bytearray)
frame_counter (int)
security_level (int)
Returns:
bytes: created Nonce
"""
return bytes(source_eui64[:8] + struct.pack(">LB", frame_counter, security_level))
def _create_authenticated_data(self, source_address, destination_address, auxiliary_security_header):
""" Create Authenticated Data
Read more: 4.8 - Thread v1.0 Specification
Args:
source_address (ip_address)
destination_address (ip_address)
auxiliary_security_header (bytearray)
Returns:
bytes: Authenticated Data
"""
return bytes(source_address.packed + destination_address.packed + auxiliary_security_header)
def create_key_and_nonce_and_authenticated_data(self, message_info):
mle_key, _ = self._generate_keys(message_info.aux_sec_hdr.sequence_counter)
nonce = self._create_nonce(message_info.source_mac_address.mac_address,
message_info.aux_sec_hdr.frame_counter,
message_info.aux_sec_hdr.security_level)
auth_data = self._create_authenticated_data(message_info.source_ipv6,
message_info.destination_ipv6,
message_info.aux_sec_hdr_bytes)
return mle_key, nonce, auth_data
@property
def mic_length(self):
return 4
class AuxiliarySecurityHeader:
def __init__(self, key_id_mode, security_level, frame_counter, key_id, big_endian=True):
"""
Args:
key_id_mode (int)
security_level (int)
frame_counter (int)
key_id (bytearray)
"""
self._key_id_mode = key_id_mode
self._security_level = security_level
self._frame_counter = frame_counter
self._key_id = key_id
self._big_endian = big_endian
@property
def sequence_counter(self):
""" Compute or extract sequence counter based on currently set Key Index Mode. """
if self.key_id_mode == 1:
# Try to guess valid Key Sequence Counter based on Key Index. This one should work for now.
return self.key_index - 1
elif self.key_id_mode == 2:
# In this mode sequence counter is stored on the first four bytes of Key ID.
key_source = self.key_id[:4]
format = ">I" if self._big_endian else "<I"
elif self.key_id_mode == 3:
key_source = self.key_id[:8]
format = ">Q" if self._big_endian else "<Q"
else:
raise ValueError("Unsupported Key Index Mode: {}".format(self.key_id_mode))
return struct.unpack(format, key_source)[0]
@property
def key_index(self):
return struct.unpack(">B", self.key_id[-1:])[0]
@property
def key_id_mode(self):
return self._key_id_mode
@property
def security_level(self):
return self._security_level
@property
def frame_counter(self):
return self._frame_counter
@property
def key_id(self):
return self._key_id
def __repr__(self):
return "AuxiliarySecurityHeader(key_id_mode={}, security_level={}, frame_counter={}, key_id={})".format(
self.key_id_mode, self.security_level, self.frame_counter, hexlify(self.key_id))
class AuxiliarySecurityHeaderFactory:
_SECURITY_CONTROL_LENGTH = 1
_FRAME_COUNTER_LENGTH = 4
_KEY_ID_LENGTH_KEY_ID_0 = 0
_KEY_ID_LENGTH_KEY_ID_1 = 1
_KEY_ID_LENGTH_KEY_ID_2 = 5
_KEY_ID_LENGTH_KEY_ID_3 = 9
_key_id_lengths = {
0: _KEY_ID_LENGTH_KEY_ID_0,
1: _KEY_ID_LENGTH_KEY_ID_1,
2: _KEY_ID_LENGTH_KEY_ID_2,
3: _KEY_ID_LENGTH_KEY_ID_3
}
def _parse_security_control(self, security_control_byte):
security_level = (security_control_byte & 0x07)
key_id_mode = (security_control_byte >> 3) & 0x03
return security_level, key_id_mode
def _parse_frame_counter(self, frame_counter_bytes):
return struct.unpack("<I", frame_counter_bytes)[0]
def _key_id_length(self, key_id_mode):
return self._key_id_lengths[key_id_mode]
def parse(self, data, message_info):
security_control_bytes = bytearray(data.read(self._SECURITY_CONTROL_LENGTH))
frame_counter_bytes = bytearray(data.read(self._FRAME_COUNTER_LENGTH))
security_level, key_id_mode = self._parse_security_control(security_control_bytes[0])
frame_counter = self._parse_frame_counter(frame_counter_bytes)
key_id_length = self._key_id_length(key_id_mode)
key_id_bytes = bytearray(data.read(key_id_length))
aux_sec_hdr = AuxiliarySecurityHeader(key_id_mode, security_level, frame_counter, key_id_bytes)
message_info.aux_sec_hdr_bytes = security_control_bytes + frame_counter_bytes + key_id_bytes
message_info.aux_sec_hdr = aux_sec_hdr
return aux_sec_hdr