blob: b55881b278b27fbec1706e556ff7e6b2c5839b37 [file] [log] [blame]
#
# Copyright (c) 2015-2017 Nest Labs, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# @file
# This file is uitilies for Weave BLE
#
import binascii
import logging
from ctypes import *
# Duplicates of BLE definitions in WeaveDeviceManager-ScriptBinding.cpp
BleEventType_Rx = 1
BleEventType_Tx = 2
BleEventType_Subscribe = 3
BleEventType_Disconnect = 4
BleSubscribeOperation_Subscribe = 1
BleSubscribeOperation_Unsubscribe = 2
# From BleError.h:
BLE_ERROR_REMOTE_DEVICE_DISCONNECTED = 12
# Internal representation of the CoreBluetooth peripheral State enum.
BlePeripheralState_Disconnected = 0
FAKE_CONN_OBJ_VALUE = 12121212
# these are defined in CoreBluetooth but are not accessible from python
CBCharacteristicWriteWithResponse = 0
CBCharacteristicWriteWithoutResponse = 1
def _VoidPtrToUUIDString(ptr, len):
try:
ptr = _VoidPtrToByteArray(ptr, len)
ptr = binascii.hexlify(ptr)
ptr = ptr[:8] + '-' + ptr[8:12] + '-' + ptr[12:16] + '-' + ptr[16:20] + '-' + ptr[20:]
ptr = str(ptr)
except:
print "ERROR: failed to convert void * to UUID"
ptr = None
return ptr
def _VoidPtrToByteArray(ptr, len):
if ptr:
v = bytearray(len)
memmove((c_byte * len).from_buffer(v), ptr, len)
return v
else:
return None
def _ByteArrayToVoidPtr(array):
if array != None:
return cast( (c_byte * len(array)) .from_buffer_copy(array), c_void_p)
else:
return c_void_p(0)
def ParseBleEventType(val):
if isinstance(val, (int, long)):
return val
if (val.lower() == "rx"):
return BleEventType_Rx
if (val.lower() == "tx"):
return BleEventType_Tx
raise Exception("Invalid Ble Event Type: " + str(val))
class BleTxEvent:
def __init__(self, svcId=None, charId=None, status=False):
self.EventType = BleEventType_Tx
self.ConnObj = None
self.SvcId = svcId
self.CharId = charId
self.Status = status
def Print(self, prefix=""):
print "%sBleEvent Type: %s" % (prefix, ("TX" if self.EventType == BleEventType_Tx else "ERROR"))
print "%sStatus: %s" % (prefix, str(self.Status))
if self.SvcId:
print "%sSvcId:" % (prefix)
print binascii.hexlify(self.SvcId)
if self.CharId:
print "%sCharId:" % (prefix)
print binascii.hexlify(self.CharId)
def SetField(self, name, val):
name = name.lower()
if (name == 'eventtype' or name == 'event-type' or name == 'type'):
self.EventType = ParseBleEventType(val)
elif (name == 'status'):
self.Status = val
elif (name == 'svcid'):
self.SvcId = val
elif (name == 'charid'):
self.CharId = val
else:
raise Exception("Invalid BleTxEvent field: " + str(name))
class BleDisconnectEvent:
def __init__(self, error=0):
self.EventType = BleEventType_Disconnect
self.ConnObj = None
self.Error = error
def Print(self, prefix=""):
print "%sBleEvent Type: %s" % (prefix, ("DC" if self.EventType == BleEventType_Disconnect else "ERROR"))
print "%sError: %s" % (prefix, str(self.Error))
def SetField(self, name, val):
name = name.lower()
if (name == 'eventtype' or name == 'event-type' or name == 'type'):
self.EventType = ParseBleEventType(val)
elif (name == 'error'):
self.Error = val
else:
raise Exception("Invalid BleDisconnectEvent field: " + str(name))
class BleRxEvent:
def __init__(self, svcId=None, charId=None, buffer=None):
self.EventType = BleEventType_Rx
self.ConnObj = None
self.SvcId = svcId
self.CharId = charId
self.Buffer = buffer
def Print(self, prefix=""):
print "%sBleEvent Type: %s" % (prefix, ("RX" if self.EventType == BleEventType_Rx else "ERROR"))
if self.Buffer:
print "%sBuffer:" % (prefix)
print binascii.hexlify(self.Buffer)
if self.SvcId:
print "%sSvcId:" % (prefix)
print binascii.hexlify(self.SvcId)
if self.CharId:
print "%sCharId:" % (prefix)
print binascii.hexlify(self.CharId)
def SetField(self, name, val):
name = name.lower()
if (name == 'eventtype' or name == 'event-type' or name == 'type'):
self.EventType = ParseBleEventType(val)
elif (name == 'buffer'):
self.Buffer = val
elif (name == 'svcid'):
self.SvcId = val
elif (name == 'charid'):
self.CharId = val
else:
raise Exception("Invalid BleRxEvent field: " + str(name))
class BleSubscribeEvent:
def __init__(self, svcId=None, charId=None, status=True, operation=BleSubscribeOperation_Subscribe):
self.EventType = BleEventType_Subscribe
self.ConnObj = None
self.SvcId = svcId
self.CharId = charId
self.Status = status
self.Operation = operation
def Print(self, prefix=""):
print "%sBleEvent Type: %s" % (prefix, ("SUBSCRIBE" if self.EventType == BleEventType_Subscribe else "ERROR"))
print "%sStatus: %s" % (prefix, str(self.Status))
print "%sOperation: %s" % (prefix, ("UNSUBSCRIBE" if self.Operation == BleSubscribeOperation_Unsubscribe else "SUBSCRIBE"))
if self.SvcId:
print "%sSvcId:" % (prefix)
print binascii.hexlify(self.SvcId)
if self.CharId:
print "%sCharId:" % (prefix)
print binascii.hexlify(self.CharId)
def SetField(self, name, val):
name = name.lower()
if (name == 'eventtype' or name == 'event-type' or name == 'type'):
self.EventType = ParseBleEventType(val)
elif (name == 'status'):
self.Status = val
elif (name == 'svcid'):
self.SvcId = val
elif (name == 'charid'):
self.CharId = val
elif (name == 'operation'):
self.Operation = val
else:
raise Exception("Invalid BleSubscribeEvent field: " + str(name))
class BleTxEventStruct(Structure):
_fields_ = [
('EventType', c_int32), # The type of event.
('ConnObj', c_void_p), # a Handle back to the connection object or None.
('SvcId', c_void_p), # the byte array of the service UUID.
('CharId', c_void_p), # the byte array of the characteristic UUID.
('Status', c_bool) # The status of the previous Tx request
]
def toBleTxEvent(self):
return BleTxEvent(
svcId = _VoidPtrToByteArray(self.SvcId, 16),
charId = _VoidPtrToByteArray(self.CharId, 16),
status = self.Status
)
@classmethod
def fromBleTxEvent(cls, bleTxEvent):
bleTxEventStruct = cls()
bleTxEventStruct.EventType = bleTxEvent.EventType
bleTxEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleTxEventStruct.SvcId = _ByteArrayToVoidPtr(bleTxEvent.SvcId)
bleTxEventStruct.CharId = _ByteArrayToVoidPtr(bleTxEvent.CharId)
bleTxEventStruct.Status = bleTxEvent.Status
return bleTxEventStruct
class BleDisconnectEventStruct(Structure):
_fields_ = [
('EventType', c_int32), # The type of event.
('ConnObj', c_void_p), # a Handle back to the connection object or None.
('Error', c_int32) # The disconnect error code.
]
def toBleDisconnectEvent(self):
return BleDisconnectEvent(
error = self.Error
)
@classmethod
def fromBleDisconnectEvent(cls, bleDisconnectEvent):
bleDisconnectEventStruct = cls()
bleDisconnectEventStruct.EventType = bleDisconnectEvent.EventType
bleDisconnectEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleDisconnectEventStruct.Error = bleDisconnectEvent.Error
return bleDisconnectEventStruct
class BleRxEventStruct(Structure):
_fields_ = [
('EventType', c_int32), # The type of event.
('ConnObj', c_void_p), # a Handle back to the connection object or None.
('SvcId', c_void_p), # the byte array of the service UUID.
('CharId', c_void_p), # the byte array of the characteristic UUID.
('Buffer', c_void_p), # the byte array of the Rx packet.
('Length', c_uint16), # the length of the byte array (buffer).
]
def toBleRxEvent(self):
return BleRxEvent(
svcId = _VoidPtrToByteArray(self.SvcId, 16),
charId = _VoidPtrToByteArray(self.CharId, 16),
buffer = _VoidPtrToByteArray(self.Buffer, self.Length)
)
@classmethod
def fromBleRxEvent(cls, bleRxEvent):
bleRxEventStruct = cls()
bleRxEventStruct.EventType = bleRxEvent.EventType
bleRxEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleRxEventStruct.SvcId = _ByteArrayToVoidPtr(bleRxEvent.SvcId)
bleRxEventStruct.CharId = _ByteArrayToVoidPtr(bleRxEvent.CharId)
bleRxEventStruct.Buffer = _ByteArrayToVoidPtr(bleRxEvent.Buffer)
bleRxEventStruct.Length = len(bleRxEvent.Buffer) if (bleRxEvent.Buffer != None) else 0
return bleRxEventStruct
class BleSubscribeEventStruct(Structure):
_fields_ = [
('EventType', c_int32), # The type of event.
('ConnObj', c_void_p), # a Handle back to the connection object or None.
('SvcId', c_void_p), # the byte array of the service UUID.
('CharId', c_void_p), # the byte array of the characteristic UUID.
('Operation', c_int32), # The subscribe operation.
('Status', c_bool) # The status of the previous Tx request
]
def toBleSubscribeEvent(self):
return BleSubscribeEvent(
svcId = _VoidPtrToByteArray(self.SvcId, 16),
charId = _VoidPtrToByteArray(self.CharId, 16),
status = self.Status,
operation = self.Operation
)
@classmethod
def fromBleSubscribeEvent(cls, bleSubscribeEvent):
bleSubscribeEventStruct = cls()
bleSubscribeEventStruct.EventType = bleSubscribeEvent.EventType
bleSubscribeEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleSubscribeEventStruct.SvcId = _ByteArrayToVoidPtr(bleSubscribeEvent.SvcId)
bleSubscribeEventStruct.CharId = _ByteArrayToVoidPtr(bleSubscribeEvent.CharId)
bleSubscribeEventStruct.Operation = bleSubscribeEvent.Operation
bleSubscribeEventStruct.Status = bleSubscribeEvent.Status
return bleSubscribeEventStruct