| #!/usr/bin/env python2.7 |
| |
| import sys |
| import dbus |
| import dbus.service |
| import dbus.glib |
| import gobject |
| import socket |
| import string |
| import struct |
| import threading |
| from threading import Thread |
| import time |
| from dbus.mainloop.glib import DBusGMainLoop |
| |
| # IDs specific to Linux nfc |
| AF_NFC = 39 |
| SOL_NFC = 280 |
| NFC_LLCP_MIUX = 1 |
| |
| # Sample test code - compliant with nfcpy phdc test agent |
| thermometer_assoc_req = \ |
| "E200 0032 8000 0000" \ |
| "0001 002A 5079 0026" \ |
| "8000 0000 A000 8000" \ |
| "0000 0000 0000 0080" \ |
| "0000 0008 3132 3334" \ |
| "3536 3738 0320 0001" \ |
| "0100 0000 0000" |
| |
| thermometer_assoc_res = \ |
| "E300 002C 0003 5079" \ |
| "0026 8000 0000 8000" \ |
| "8000 0000 0000 0000" \ |
| "8000 0000 0008 3837" \ |
| "3635 3433 3231 0000" \ |
| "0000 0000 0000 0000" \ |
| |
| assoc_release_req = "E40000020000" |
| assoc_release_res = "E50000020000" |
| |
| #======================================== |
| # Display helper |
| def hexdump( chars, sep, width ): |
| while chars: |
| line = chars[:width] |
| chars = chars[width:] |
| line = line.ljust( width, '\000' ) |
| print "%s%s%s" % ( sep.join( "%02x" % ord(c) for c in line ), |
| sep, quotechars( line )) |
| |
| |
| def quotechars( chars ): |
| return ''.join( ['.', c][c.isalnum()] for c in chars ) |
| |
| #======================================== |
| class PhdcPeerManager: |
| def __init__(self, agent_fd): |
| #Grab the agent .... |
| print 'Init PhdcPeerManager thread' |
| self.r_fd = agent_fd.take() |
| print 'Agent fd:', str(self.r_fd) |
| |
| def run( self): |
| print 'Run PhdcPeerManager thread: ', str(self.r_fd) |
| self.sock = socket.fromfd(self.r_fd, AF_NFC, socket.SOCK_STREAM) |
| try: |
| while True: |
| miu = self.sock.getsockopt(SOL_NFC, NFC_LLCP_MIUX) |
| print 'MIU=', miu |
| |
| while True: |
| data = self.sock.recv(16) |
| if data == None: |
| print 'no data' |
| break |
| |
| #analyze frame |
| print 'analyze' |
| size = struct.unpack(">H", data[0:2])[0] |
| apdu = data[2:] |
| |
| #should i read more data ? |
| while len(apdu) < size: |
| data = self.sock.recv(10) |
| if data == None: break |
| hexdump(data, ':', 16) |
| apdu += data |
| print "[ieee] <<< {0}".format(str(apdu).encode("hex")) |
| if apdu.startswith("\xE2\x00"): |
| apdu = bytearray.fromhex(thermometer_assoc_res) |
| elif apdu.startswith("\xE4\x00"): |
| apdu = bytearray.fromhex(assoc_release_res) |
| else: |
| apdu = apdu[::-1] |
| time.sleep(0.2) |
| print "[ieee] >>> {0}".format(str(apdu).encode("hex")) |
| data = struct.pack(">H", len(apdu)) + apdu |
| for i in range(0, len(data), miu): |
| self.sock.send(str(data[i:i+miu])) |
| |
| print "remote peer {0} closed connection".format(agent_fd) |
| print "leaving ieee manager" |
| self.sock.close() |
| |
| except IOError as e: |
| if e.errno == errno.EPIPE: |
| print 'Remote disconnect' |
| else: |
| print "I/O error({0}): {1}".format(e.errno, e.strerror) |
| finally: |
| print 'Finally exit' |
| stop() |
| |
| def stop(self): |
| print 'Stop PhdcPeerManager:', str(self.r_fd) |
| self._Thread__stop() |
| |
| #=================================================== |
| ''' Phdc Manager Class |
| ''' |
| class SimplePhdcManager(dbus.service.Object): |
| |
| @dbus.service.method('org.neard.PHDC.Manager', |
| in_signature='', |
| out_signature='') |
| def Release(self): |
| print 'Release' |
| mainloop.quit() |
| |
| |
| ''' Called on incoming agents |
| ''' |
| @dbus.service.method('org.neard.PHDC.Manager', |
| in_signature='h', |
| out_signature='') |
| def NewConnection(self, agent_fd): |
| print'Launch Phdc Manager thread for fd:', str(agent_fd) |
| self.server = PhdcPeerManager(agent_fd) |
| print'Run Server' |
| self.server.run() |
| print'Leave Server' |
| return |
| |
| ''' Called when the agent ends (from phdc_close) |
| ''' |
| @dbus.service.method('org.neard.PHDC.Manager', |
| in_signature='hi', out_signature='') |
| def Disconnection(self,agent_fd, i_err): |
| print'Stop Phdc Manager thread' |
| self.server.stop() |
| return |
| |
| ''' Main loop |
| This sample installs two PHDC Managers: |
| * Simple: simulates a thermometer data exchange |
| * Validation: Validation Manager for NFC Forum PHDC) |
| ''' |
| if "__main__" == __name__: |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| print 'PHDC Simple Manager Test' |
| bus = dbus.SystemBus() |
| obj = bus.get_object("org.neard", "/org/neard"); |
| neard_manager = dbus.Interface(obj, "org.neard.PHDC") |
| |
| simple_path = '/Simple' |
| valid_path = '/Validation' |
| |
| print 'Creating & registering PHDC Simple Manager' |
| simpleobject = SimplePhdcManager(bus, simple_path) |
| |
| d = dbus.Dictionary({'Role': 'Manager', 'Path': simple_path, |
| 'ServiceName': 'urn:nfc:sn:phdc' }, signature='sv') |
| neard_manager.RegisterAgent(d) |
| |
| print 'Creating & Registering Validation Manager' |
| |
| validationobj= SimplePhdcManager(bus, valid_path) |
| d = dbus.Dictionary({'Role': 'Manager', 'Path': valid_path, |
| 'ServiceName': 'urn:nfc:xsn:nfc-forum.org:phdc-validation' }, |
| signature='sv') |
| neard_manager.RegisterAgent(d) |
| |
| mainloop = gobject.MainLoop() |
| |
| try: |
| mainloop.run() |
| |
| except(KeyboardInterrupt): |
| #Call for unregister... |
| neard_manager.UnregisterAgent(simple_path, 'Manager') |
| neard_manager.UnregisterAgent(valid_path, 'Manager') |