| #!/usr/bin/python |
| |
| from os import O_NONBLOCK |
| from sys import stdin, stdout, exit, version_info, argv |
| from fcntl import fcntl, F_GETFL, F_SETFL |
| import glib |
| import dbus |
| import dbus.mainloop.glib |
| import gobject |
| import argparse |
| |
| WPA_NAME='fi.w1.wpa_supplicant1' |
| WPA_INTF='fi.w1.wpa_supplicant1' |
| WPA_PATH='/fi/w1/wpa_supplicant1' |
| WPA_IF_INTF = WPA_INTF + '.Interface' |
| WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice' |
| WPA_GROUP_INTF = WPA_INTF + '.Group' |
| WPA_PEER_INTF = WPA_INTF + '.Peer' |
| DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties' |
| |
| P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0 |
| |
| class ArgFields: |
| for field in ('help', 'metavar'): |
| exec('{}="{}"'.format(field, field)) |
| |
| class InputLine: |
| def __init__(self, handler): |
| self.line = '' |
| self.handler = handler |
| |
| flags = fcntl(stdin.fileno(), F_GETFL) |
| flags |= O_NONBLOCK |
| fcntl(stdin.fileno(), F_SETFL, flags) |
| glib.io_add_watch(stdin, glib.IO_IN, self.input_cb) |
| |
| self.prompt() |
| |
| def prompt(self): |
| self.line = '' |
| print '> ', |
| stdout.flush() |
| |
| def input_cb(self, fd, event): |
| if event != glib.IO_IN: |
| return |
| |
| self.line += fd.read(); |
| for line in self.line.split('\n'): |
| line = line.strip() |
| if len(line) == 0: |
| break |
| |
| self.handler(line.strip()) |
| |
| self.prompt() |
| |
| return True |
| |
| def error_print(ex): |
| print 'Command Error: %s' % ex |
| |
| def checkarg(nb_args = 0, min_args = False): |
| def under(function): |
| def wrapper(*args, **kwargs): |
| resuls = True |
| |
| if min_args: |
| result = len(args[1]) < nb_args |
| else: |
| result = len(args[1]) != nb_args |
| |
| if result: |
| raise Exception('Command %s takes %s arguments' % |
| (function.__name__, nb_args)) |
| return function(*args, **kwargs) |
| return wrapper |
| return under |
| |
| def print_dict(d): |
| for k in d: |
| try: |
| if type(d[k]) is dbus.Byte: |
| print 'Key %s --> 0x%x' % (k, d[k]) |
| else: |
| print 'Key %s --> %s' % (k, d[k]) |
| except: |
| print "Error: Key %s content cannot be printed" % k |
| pass |
| |
| def print_tuple(t): |
| for e in t: |
| if type(e) is dbus.Dictionary: |
| print_dict(e) |
| else: |
| print 'Element: %s' % e |
| |
| class Wpa_s: |
| def __init__(self, bus, iface_name, command): |
| self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF) |
| bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH, |
| member_keyword='signal') |
| bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH, |
| signal_name='InterfaceAdded') |
| bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH, |
| signal_name='InterfaceRemoved') |
| self.__reset() |
| |
| self.bus = bus |
| |
| self.debug = False |
| |
| self.line_in = InputLine(self.__command) |
| |
| if iface_name: |
| try: |
| self.create_if([iface_name]) |
| except: |
| print "Error creating interface: %s" % iface_name |
| |
| if len(command.strip(' ')): |
| self.__command(command) |
| |
| def help(self, args): |
| list = self.command_list.keys() |
| list.sort() |
| for key in list: |
| help = '' |
| if (self.command_list[key].has_key(ArgFields.help)): |
| help = self.command_list[key][ArgFields.help] |
| |
| print "%s\t%s" % (key.rjust(25), help.ljust(50)) |
| |
| def __command(self, cmd_line): |
| cmd = cmd_line.split(' ') |
| |
| try: |
| func = getattr(self, cmd[0]) |
| except Exception, e: |
| print 'Error: command unknown - %s' % e |
| return |
| |
| try: |
| func(cmd[1:]) |
| except Exception, e: |
| error_print(e) |
| |
| def __wpa_property_changed(*args, **kwargs): |
| print 'WPA - Signal: %s' % kwargs.get('signal') |
| |
| def __if_property_changed(*args, **kwargs): |
| signal = kwargs.get('signal') |
| print 'IF - Signal: %s' % signal |
| |
| if signal == 'BSSAdded': |
| return |
| |
| if args[0].debug: |
| print_tuple(args[1:]) |
| |
| def __p2p_property_changed(*args, **kwargs): |
| print 'IF P2P - Signal: %s' % kwargs.get('signal') |
| if args[0].debug: |
| print_tuple(args[1:]) |
| |
| def __peer_if_p2p_property_changed(*args, **kwargs): |
| print 'Peer - ', |
| args[0].__p2p_property_changed(*args, **kwargs) |
| |
| def __DeviceFound(self, object_path): |
| self.peers[object_path] = None |
| |
| peer = self.bus.get_object(WPA_INTF, object_path) |
| peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF) |
| |
| self.bus.add_signal_receiver(self.__peer_if_p2p_property_changed, |
| dbus_interface=WPA_PEER_INTF, |
| path=object_path, member_keyword='signal') |
| |
| self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF) |
| |
| if self.debug: |
| print_dict(self.peers[object_path]) |
| |
| def __DeviceLost(self, object_path): |
| if object_path in self.peers: |
| del self.peers[object_path] |
| |
| def __PeerJoined(self, object_path): |
| print 'Peer %s joined' % object_path |
| |
| def __PeerDisconnected(self, object_path): |
| print 'Peer %s disconnected' % object_path |
| |
| def __group_if_property_changed(*args, **kwargs): |
| print 'Group - ', |
| args[0].__if_property_changed(*args, **kwargs) |
| |
| def __group_if_p2p_property_changed(*args, **kwargs): |
| print 'Group - ', |
| args[0].__p2p_property_changed(*args, **kwargs) |
| |
| def __GroupFinished(self, properties): |
| print 'Group running on %s is being removed' % ifname |
| self.group_obj = self.group_if = self.group_iface_path = None |
| |
| if self.debug: |
| print_dict(properties) |
| |
| def __InvitationResult(self, response): |
| print 'Invitation result status: %d ' % response['status'] |
| |
| if response.has_key('bssid'): |
| print 'bssid: %s' % response['bssid'] |
| |
| if self.debug: |
| print_dict(response) |
| |
| def __GroupStarted(self, properties): |
| self.group_obj = properties['group_object'] |
| self.bus.add_signal_receiver(self.__PeerJoined, |
| dbus_interface=WPA_GROUP_INTF, |
| path=self.group_obj, |
| signal_name='PeerJoined') |
| self.bus.add_signal_receiver(self.__PeerDisconnected, |
| dbus_interface=WPA_GROUP_INTF, |
| path=self.group_obj, |
| signal_name='PeerDisconnected') |
| |
| self.group_iface_path = properties['interface_object'] |
| self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF, |
| self.group_iface_path), |
| WPA_P2P_INTF) |
| self.bus.add_signal_receiver(self.__group_if_property_changed, |
| dbus_interface=WPA_IF_INTF, |
| path=self.group_iface_path, |
| member_keyword='signal') |
| self.bus.add_signal_receiver(self.__group_if_p2p_property_changed, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.group_iface_path, |
| member_keyword='signal') |
| self.bus.add_signal_receiver(self.__GroupFinished, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.group_iface_path, |
| signal_name='GroupFinished') |
| self.bus.add_signal_receiver(self.__InvitationResult, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.iface_path, |
| signal_name='InvitationResult') |
| |
| if self.debug: |
| group = dbus.Interface(self.bus.get_object(WPA_INTF, |
| self.group_obj), |
| DBUS_PROPERTIES_INTF) |
| print_dict(group.GetAll(WPA_GROUP_INTF)) |
| |
| def __ServiceDiscoveryResponse(self, response): |
| peer = response['peer_object'] |
| if peer in self.peers: |
| print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName']) |
| print response['tlvs'] |
| |
| def __InterfaceAdded(self, path, properties): |
| print 'Interface %s Added (%s)' % (properties['Ifname'], path) |
| if self.debug: |
| print_dict(properties) |
| p2p = dbus.Interface(self.bus.get_object(WPA_INTF, |
| path), DBUS_PROPERTIES_INTF) |
| print_dict(p2p.GetAll(WPA_P2P_INTF)) |
| |
| def __InterfaceRemoved(self, path): |
| print 'Interface Removed (%s)' % (path) |
| |
| def __listen_if_signals(self): |
| self.bus.add_signal_receiver(self.__if_property_changed, |
| dbus_interface=WPA_IF_INTF, |
| path=self.iface_path, |
| member_keyword='signal') |
| self.bus.add_signal_receiver(self.__p2p_property_changed, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.iface_path, |
| member_keyword='signal') |
| self.bus.add_signal_receiver(self.__GroupStarted, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.iface_path, |
| signal_name='GroupStarted') |
| self.bus.add_signal_receiver(self.__DeviceFound, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.iface_path, |
| signal_name='DeviceFound') |
| self.bus.add_signal_receiver(self.__DeviceLost, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.iface_path, |
| signal_name='DeviceLost') |
| self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse, |
| dbus_interface=WPA_P2P_INTF, |
| path=self.iface_path, |
| signal_name='ServiceDiscoveryResponse') |
| |
| def __reset(self): |
| self.iface_path = self.iface_name = self.iface = None |
| self.p2p = self.group_if = self.group_obj = None |
| self.peers = {} |
| |
| def __set_if(self, iface_name): |
| self.iface = dbus.Interface(self.bus.get_object(WPA_INTF, |
| self.iface_path), WPA_IF_INTF) |
| self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF, |
| self.iface_path), WPA_P2P_INTF) |
| |
| p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF) |
| p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig', |
| dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' }, |
| signature='sv')) |
| print 'Interface %s: %s' % (iface_name, self.iface_path) |
| self.iface_name = iface_name |
| self.__listen_if_signals() |
| |
| @checkarg() |
| def enable_debug(self, args): |
| self.debug = True |
| |
| @checkarg() |
| def disable_debug(self, args): |
| self.debug = False |
| |
| @checkarg(nb_args=1) |
| def create_if(self, args): |
| self.__reset() |
| self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} )) |
| self.__set_if(args[0]) |
| |
| @checkarg(nb_args=1) |
| def get_if(self, args): |
| self.__reset() |
| self.iface_path = self.wpa.GetInterface(args[0]) |
| self.__set_if(args[0]) |
| |
| @checkarg() |
| def del_if(self, args = None): |
| if not self.iface_path: |
| return |
| |
| self.wpa.RemoveInterface(self.iface_path) |
| print 'Interface %s removed' % self.iface_name |
| self.__reset() |
| |
| @checkarg() |
| def scan(self, args = None): |
| if not self.iface: |
| return |
| |
| self.iface.Scan(({ 'Type': 'passive' })) |
| print 'Scan started' |
| |
| @checkarg() |
| def quit(self, args = None): |
| self.del_if(args) |
| exit(0) |
| |
| @checkarg(nb_args=1) |
| def set_command_list(self, command_list): |
| self.command_list = command_list[0] |
| |
| @checkarg() |
| def p2p_find(self, args = None): |
| if not self.p2p: |
| return |
| |
| self.p2p.Find(({})) |
| |
| @checkarg() |
| def p2p_stop_find(self, args = None): |
| if not self.p2p: |
| return |
| |
| self.p2p.StopFind() |
| |
| @checkarg() |
| def p2p_peers(self, args = None): |
| if not self.iface: |
| return |
| |
| for p in self.peers: |
| print 'Peer Name=%s' % (self.peers[p]['DeviceName']) |
| |
| def __find_peer(self, peer_name, ret_object_path = False): |
| if len(self.peers) == 0: |
| return None |
| |
| peer = None |
| for p in self.peers: |
| if self.peers[p]['DeviceName'] == peer_name: |
| peer = self.peers[p] |
| break |
| |
| if not peer: |
| print 'No peer found under the name: %s' % peer_name |
| p = None |
| |
| if ret_object_path: |
| return p |
| else: |
| return peer |
| |
| @checkarg(nb_args = 1) |
| def p2p_peer(self, args): |
| peer_path = self.__find_peer(args[0], True) |
| if peer_path: |
| peer = self.bus.get_object(WPA_INTF, peer_path) |
| peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF) |
| self.peers[peer_path] = peer_if.GetAll(WPA_PEER_INTF) |
| |
| print_dict(self.peers[peer_path]) |
| |
| @checkarg(nb_args = 1) |
| def p2p_connect(self, args): |
| if not self.p2p: |
| return |
| |
| peer = self.__find_peer(args[0]) |
| if not peer: |
| return |
| |
| peer_path = self.__find_peer(args[0], True) |
| |
| if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER == |
| P2P_GROUP_CAPAB_GROUP_OWNER): |
| print 'Joining an existing P2P group' |
| pin = self.p2p.Connect(({ 'peer' : peer_path, |
| 'wps_method' : 'pbc', |
| 'join' : True, |
| 'go_intent' : 0 })) |
| else: |
| print 'Associating with another P2P device' |
| pin = self.p2p.Connect(({ 'peer' : peer_path, |
| 'wps_method' : 'pbc', |
| 'join' : False, |
| 'go_intent' : 7 })) |
| if not pin: |
| print 'WPS PIN in use: %s' % pin |
| |
| @checkarg(nb_args = 1) |
| def p2p_disconnect(self, args): |
| if not self.p2p: |
| return |
| |
| peer = self.__find_peer(args[0]) |
| if not peer: |
| return |
| |
| if not self.group_if: |
| print 'Peer %s is not connected' % (peer['DeviceName']) |
| return |
| |
| self.group_if.Disconnect() |
| |
| @checkarg() |
| def p2p_group_add(self, args): |
| if not self.p2p: |
| return |
| |
| self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) })) |
| |
| @checkarg() |
| def p2p_group_remove(self, args): |
| if not self.group_if: |
| return |
| |
| self.group_if.Disconnect() |
| |
| @checkarg() |
| def p2p_group(self, args): |
| if not self.group_obj: |
| return |
| |
| group = dbus.Interface(self.bus.get_object(WPA_INTF, |
| self.group_obj), DBUS_PROPERTIES_INTF) |
| print_dict(group.GetAll(WPA_GROUP_INTF)) |
| |
| @checkarg() |
| def p2p_flush(self, args): |
| if not self.p2p: |
| return |
| |
| self.p2p.Flush() |
| |
| @checkarg() |
| def p2p_serv_disc_req(self, args = None): |
| if not self.p2p: |
| return |
| |
| """ We request all kind of services """ |
| sd_req = dbus.Array(signature='y', variant_level=1) |
| for a in [2,0,0,1]: |
| sd_req.append(dbus.Byte(a)) |
| |
| ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req })) |
| print 'Service discovery reference: %s' % ref |
| |
| @checkarg(nb_args = 1) |
| def p2p_serv_disc_cancel_req(self, args): |
| if not self.p2p: |
| return |
| |
| self.p2p.ServiceDiscoveryCancelRequest(int(args[0])) |
| |
| @checkarg(nb_args = 3) |
| def p2p_service_add(self, args): |
| if not self.p2p: |
| return |
| |
| service = { 'service_type' : args[0] } |
| if args[0] == 'upnp': |
| service['version'] = args[1] |
| service['service'] = args[2] |
| elif args[0] == 'bonjour': |
| service['query'] = args[1] |
| service['response'] = args[2] |
| else: |
| print 'Unknown service: %s' % args[0] |
| return |
| |
| self.p2p.AddService((service)) |
| |
| @checkarg(nb_args = 2, min_args = True) |
| def p2p_service_del(self, args): |
| if not self.p2p: |
| return |
| |
| service = { 'service_type' : args[0] } |
| if args[0] == 'upnp': |
| service['version'] = args[1] |
| service['service'] = args[2] |
| elif args[0] == 'bonjour': |
| service['query'] = args[1] |
| else: |
| print 'Unknown service: %s' % args[0] |
| return |
| |
| self.p2p.DeleteService((service)) |
| |
| @checkarg() |
| def p2p_service_flush(self, args = None): |
| if not self.p2p: |
| return |
| |
| self.p2p.FlushService() |
| |
| @checkarg(nb_args = 1) |
| def p2p_invite(self, args): |
| if not self.p2p or not self.group_if: |
| return |
| |
| peer_path = self.__find_peer(args[0], True) |
| |
| if not peer_path: |
| return |
| |
| self.group_if.Invite({ 'peer' : peer_path}) |
| |
| def build_args(parser): |
| parser.add_argument('-d', default=False, action='store_true', |
| dest='debug', help='enable debug') |
| parser.add_argument('-i', metavar='<interface>', dest='ifname', |
| help='interface name') |
| |
| command = {} |
| command['quit'] = {} |
| command['enable_debug'] = {} |
| command['disable_debug'] = {} |
| command['create_if'] = {ArgFields.help:'<iface_name> - create interface'} |
| command['get_if'] = {ArgFields.help:'<iface_name> - get interface'} |
| command['del_if'] = {ArgFields.help:'removes current interface'} |
| command['scan'] = {} |
| command['p2p_find'] = {} |
| command['p2p_stop_find'] = {} |
| command['p2p_flush'] = {} |
| command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'} |
| command['p2p_group_remove'] = {} |
| command['p2p_group'] = {} |
| command['p2p_peers'] = {} |
| command['p2p_peer'] = {ArgFields.help:'<p2p device name> - get info for a ' |
| 'peer'} |
| command['p2p_connect'] = {ArgFields.help:'<p2p device name>'} |
| command['p2p_disconnect'] = {ArgFields.help:'<p2p device name>'} |
| command['p2p_serv_disc_req'] = {} |
| command['p2p_serv_disc_cancel_req'] = {ArgFields.help:'<identifier>'} |
| command['p2p_service_add'] = {ArgFields.help:'<service type> ' |
| '<version/query> <service/response>'} |
| command['p2p_service_del'] = {ArgFields.help:'<service type> ' |
| '<version/query> [<service>]'} |
| command['p2p_service_flush'] = {} |
| command['p2p_invite'] = {ArgFields.help:'<p2p device name>'} |
| |
| command_list = command.keys() |
| command_list.sort() |
| subparsers = parser.add_subparsers(help='commands', dest='command') |
| subparsers.add_parser('') |
| for key in command_list: |
| help=None |
| metavar=None |
| if command[key].has_key(ArgFields.help): |
| help = command[key][ArgFields.help] |
| if command[key].has_key(ArgFields.metavar): |
| metavar = command[key][ArgFields.metavar] |
| command_parser = subparsers.add_parser(key, help=help) |
| command_parser.add_argument(key, nargs='*', metavar=metavar, help=help) |
| |
| return command |
| |
| def main(): |
| if version_info.major != 2: |
| print 'You need to run this under Python 2.x' |
| exit(1) |
| |
| parser = argparse.ArgumentParser(description='Connman P2P Test') |
| |
| command_list = build_args(parser) |
| |
| argv[1:] += [''] |
| |
| args = parser.parse_args(argv[1:]) |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| opts = [] |
| if args.command: |
| opts = getattr(args, args.command) |
| |
| params = '' |
| if opts and len(opts[0]): |
| params = ' ' + ''.join(opts) |
| |
| bus = dbus.SystemBus() |
| |
| mainloop = gobject.MainLoop() |
| |
| wpa_s = Wpa_s(bus, args.ifname, args.command + params) |
| |
| if (args.debug): |
| wpa_s.enable_debug([]) |
| |
| wpa_s.set_command_list([command_list]) |
| |
| mainloop.run() |
| |
| if __name__ == '__main__': |
| main() |