blob: 4fd25df541d8b2eb6910922a11e7b9a1cd22f0b6 [file] [log] [blame]
#!/usr/bin/env python2.7
import sys
import dbus
import dbus.service
import dbus.glib
import gobject
import socket
import string
import struct
import threading
from threading import Thread
import time
from dbus.mainloop.glib import DBusGMainLoop
# IDs specific to Linux nfc
AF_NFC = 39
SOL_NFC = 280
NFC_LLCP_MIUX = 1
# Sample test code - compliant with nfcpy phdc test agent
thermometer_assoc_req = \
"E200 0032 8000 0000" \
"0001 002A 5079 0026" \
"8000 0000 A000 8000" \
"0000 0000 0000 0080" \
"0000 0008 3132 3334" \
"3536 3738 0320 0001" \
"0100 0000 0000"
thermometer_assoc_res = \
"E300 002C 0003 5079" \
"0026 8000 0000 8000" \
"8000 0000 0000 0000" \
"8000 0000 0008 3837" \
"3635 3433 3231 0000" \
"0000 0000 0000 0000" \
assoc_release_req = "E40000020000"
assoc_release_res = "E50000020000"
#========================================
# Display helper
def hexdump( chars, sep, width ):
while chars:
line = chars[:width]
chars = chars[width:]
line = line.ljust( width, '\000' )
print "%s%s%s" % ( sep.join( "%02x" % ord(c) for c in line ),
sep, quotechars( line ))
def quotechars( chars ):
return ''.join( ['.', c][c.isalnum()] for c in chars )
#========================================
class PhdcPeerManager:
def __init__(self, agent_fd):
#Grab the agent ....
print 'Init PhdcPeerManager thread'
self.r_fd = agent_fd.take()
print 'Agent fd:', str(self.r_fd)
def run( self):
print 'Run PhdcPeerManager thread: ', str(self.r_fd)
self.sock = socket.fromfd(self.r_fd, AF_NFC, socket.SOCK_STREAM)
try:
while True:
miu = self.sock.getsockopt(SOL_NFC, NFC_LLCP_MIUX)
print 'MIU=', miu
while True:
data = self.sock.recv(16)
if data == None:
print 'no data'
break
#analyze frame
print 'analyze'
size = struct.unpack(">H", data[0:2])[0]
apdu = data[2:]
#should i read more data ?
while len(apdu) < size:
data = self.sock.recv(10)
if data == None: break
hexdump(data, ':', 16)
apdu += data
print "[ieee] <<< {0}".format(str(apdu).encode("hex"))
if apdu.startswith("\xE2\x00"):
apdu = bytearray.fromhex(thermometer_assoc_res)
elif apdu.startswith("\xE4\x00"):
apdu = bytearray.fromhex(assoc_release_res)
else:
apdu = apdu[::-1]
time.sleep(0.2)
print "[ieee] >>> {0}".format(str(apdu).encode("hex"))
data = struct.pack(">H", len(apdu)) + apdu
for i in range(0, len(data), miu):
self.sock.send(str(data[i:i+miu]))
print "remote peer {0} closed connection".format(agent_fd)
print "leaving ieee manager"
self.sock.close()
except IOError as e:
if e.errno == errno.EPIPE:
print 'Remote disconnect'
else:
print "I/O error({0}): {1}".format(e.errno, e.strerror)
finally:
print 'Finally exit'
stop()
def stop(self):
print 'Stop PhdcPeerManager:', str(self.r_fd)
self._Thread__stop()
#===================================================
''' Phdc Manager Class
'''
class SimplePhdcManager(dbus.service.Object):
@dbus.service.method('org.neard.PHDC.Manager',
in_signature='',
out_signature='')
def Release(self):
print 'Release'
mainloop.quit()
''' Called on incoming agents
'''
@dbus.service.method('org.neard.PHDC.Manager',
in_signature='h',
out_signature='')
def NewConnection(self, agent_fd):
print'Launch Phdc Manager thread for fd:', str(agent_fd)
self.server = PhdcPeerManager(agent_fd)
print'Run Server'
self.server.run()
print'Leave Server'
return
''' Called when the agent ends (from phdc_close)
'''
@dbus.service.method('org.neard.PHDC.Manager',
in_signature='hi', out_signature='')
def Disconnection(self,agent_fd, i_err):
print'Stop Phdc Manager thread'
self.server.stop()
return
''' Main loop
This sample installs two PHDC Managers:
* Simple: simulates a thermometer data exchange
* Validation: Validation Manager for NFC Forum PHDC)
'''
if "__main__" == __name__:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
print 'PHDC Simple Manager Test'
bus = dbus.SystemBus()
obj = bus.get_object("org.neard", "/org/neard");
neard_manager = dbus.Interface(obj, "org.neard.PHDC")
simple_path = '/Simple'
valid_path = '/Validation'
print 'Creating & registering PHDC Simple Manager'
simpleobject = SimplePhdcManager(bus, simple_path)
d = dbus.Dictionary({'Role': 'Manager', 'Path': simple_path,
'ServiceName': 'urn:nfc:sn:phdc' }, signature='sv')
neard_manager.RegisterAgent(d)
print 'Creating & Registering Validation Manager'
validationobj= SimplePhdcManager(bus, valid_path)
d = dbus.Dictionary({'Role': 'Manager', 'Path': valid_path,
'ServiceName': 'urn:nfc:xsn:nfc-forum.org:phdc-validation' },
signature='sv')
neard_manager.RegisterAgent(d)
mainloop = gobject.MainLoop()
try:
mainloop.run()
except(KeyboardInterrupt):
#Call for unregister...
neard_manager.UnregisterAgent(simple_path, 'Manager')
neard_manager.UnregisterAgent(valid_path, 'Manager')