| #!/usr/bin/python |
| |
| import sys |
| import gobject |
| import string |
| |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| |
| import glib |
| |
| import traceback |
| |
| def extract_list(list): |
| val = "[" |
| for i in list: |
| val += " " + str(i) |
| val += " ]" |
| return val |
| |
| def extract_values(values): |
| val = "{" |
| for key in values.keys(): |
| val += " " + key + "=" |
| if key in ["PrefixLength"]: |
| val += "%s" % (int(values[key])) |
| else: |
| if key in ["Servers", "Excludes"]: |
| val += extract_list(values[key]) |
| else: |
| val += str(values[key]) |
| val += " }" |
| return val |
| |
| class Notification(dbus.service.Object): |
| def __init__(self, bus, app, notify_path): |
| dbus.service.Object.__init__(self) |
| self.app = app |
| |
| @dbus.service.method("net.connman.Notification", |
| in_signature='', out_signature='') |
| def Release(self): |
| print "Release %s" % (self._object_path) |
| session_name = self._object_path.split('/')[-1] |
| self.app.release(session_name) |
| |
| @dbus.service.method("net.connman.Notification", |
| in_signature='a{sv}', out_signature='') |
| def Update(self, settings): |
| print "Update called at %s" % (self._object_path) |
| |
| try: |
| for key in settings.keys(): |
| if key in ["IPv4", "IPv6"]: |
| val = extract_values(settings[key]) |
| elif key in ["AllowedBearers"]: |
| val = extract_list(settings[key]) |
| else: |
| val = settings[key] |
| print " %s = %s" % (key, val) |
| except: |
| print "Exception:" |
| traceback.print_exc() |
| |
| class SessionApplication(dbus.service.Object): |
| def __init__(self, bus, object_path, mainloop): |
| dbus.service.Object.__init__(self, bus, object_path) |
| |
| self.manager = None |
| self.mainloop = mainloop |
| self.sessions = {} |
| |
| try: |
| bus = dbus.SystemBus() |
| bus.watch_name_owner('net.connman', self.connman_name_owner_changed) |
| except dbus.DBusException: |
| traceback.print_exc() |
| |
| def connman_name_owner_changed(self, proxy): |
| try: |
| if proxy: |
| print "connman appeared on D-Bus ", str(proxy) |
| |
| bus = dbus.SystemBus() |
| self.manager = dbus.Interface(bus.get_object("net.connman", "/"), |
| "net.connman.Manager") |
| else: |
| print "connman disappeared on D-Bus" |
| self.manager = None |
| for s in self.sessions.keys(): |
| self.sessions[s]['notify'].remove_from_connection() |
| self.sessions[s]['notify'] = None |
| |
| self.sessions = {} |
| |
| except dbus.DBusException: |
| traceback.print_exc() |
| |
| def release(self, session_name): |
| s = self.find_session(session_name) |
| if not s: |
| return |
| if s['session']: |
| s['session'].Destroy() |
| s['session'] = None |
| if s['notify']: |
| s['notify'].remove_from_connection() |
| s['notify'] = None |
| del self.sessions[session_name] |
| |
| def type_convert(self, key, value): |
| if key in [ "AllowedBearers" ]: |
| return value |
| elif key in [ "RoamingPolicy", "ConnectionType" ]: |
| if len(value) > 0: |
| return value[0] |
| elif key in [ "Priority", "AvoidHandover", |
| "StayConnected", "EmergencyCall" ]: |
| flag = str(value[0]).strip().lower() |
| val = flag not in ['false', 'f', 'n', '0'] |
| return dbus.Boolean(val) |
| elif key in [ "PeriodicConnect", "IdleTimeout" ]: |
| val = value[0] |
| return dbus.UInt32(val) |
| |
| return value |
| |
| def find_session(self, session_name): |
| if not session_name in self.sessions.keys(): |
| return None |
| return self.sessions[session_name] |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def CreateSession(self, session_name): |
| print "Create session" |
| |
| s = self.find_session(session_name) |
| if s and s['session'] : |
| print "Session %s already created-> drop reqest" % (session_name) |
| return |
| |
| try: |
| bus = dbus.SystemBus() |
| |
| if s == None: |
| s = {} |
| s['notify_path'] = self._object_path + "/" + session_name |
| s['notify'] = Notification(bus, self, s['notify_path']) |
| s['notify'].add_to_connection(bus, s['notify_path']) |
| if not 'settings' in s.keys(): |
| s['settings'] = {}; |
| s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path']) |
| print "notify path %s" % (s['notify_path']) |
| print "session path %s" % (s['session_path']) |
| s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']), |
| "net.connman.Session") |
| self.sessions[session_name] = s |
| |
| except dbus.DBusException, e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print e.get_dbus_message() |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def DestroySession(self, session_name): |
| print "Destroy session" |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print "The session is not running -> drop request" |
| return |
| |
| try: |
| self.release(session_name) |
| except dbus.DBusException: |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Connect(self, session_name): |
| print "Connect session" |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print "The session is not running -> drop request" |
| return |
| |
| try: |
| s['session'].Connect() |
| except dbus.DBusException, e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print e.get_dbus_message() |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Disconnect(self, session_name): |
| print "Disconnect session" |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print "The session is not running -> drop request" |
| return |
| |
| try: |
| s['session'].Disconnect() |
| except dbus.DBusException, e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print e.get_dbus_message() |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Change(self, session_name, key, value): |
| print "Update session settings" |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print "The session is not running -> drop request" |
| return |
| |
| try: |
| val = self.type_convert(key, value) |
| s['session'].Change(key, val) |
| except dbus.DBusException, e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print e.get_dbus_message() |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Configure(self, session_name, key, value): |
| print "Configure session settings" |
| s = self.find_session(session_name) |
| if s == None: |
| s = {} |
| s['notify_path'] = None |
| s['notify'] = None |
| if not 'settings' in s.keys(): |
| s['settings'] = {}; |
| s['session_path'] = None |
| s['session'] = None |
| self.sessions[session_name] = s |
| if s and s['session']: |
| print "The session is running, use change -> drop request" |
| return |
| val = self.type_convert(key, value) |
| s['settings'][key] = val |
| |
| def main(): |
| if len(sys.argv) < 2: |
| print "Usage: %s <command>" % (sys.argv[0]) |
| print "" |
| print " enable" |
| print " disable" |
| print " create <app_path> <session_name>" |
| print " destroy <app_path> <session_name>" |
| print " connect <app_path> <session_name>" |
| print " disconnect <app_path> <session_name>" |
| print " change <app_path> <session_name> <key> <value>" |
| print " configure <app_path> <session_name> <key> <value>" |
| print "" |
| print " run <app_path>" |
| sys.exit(1) |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| if sys.argv[1] == "enable": |
| bus = dbus.SystemBus() |
| manager = dbus.Interface(bus.get_object("net.connman", "/"), |
| "net.connman.Manager") |
| manager.SetProperty("SessionMode", True) |
| return |
| |
| elif sys.argv[1] == "disable": |
| bus = dbus.SystemBus() |
| manager = dbus.Interface(bus.get_object("net.connman", "/"), |
| "net.connman.Manager") |
| manager.SetProperty("SessionMode", False) |
| return |
| |
| if (len(sys.argv) < 3): |
| print "Need test application path" |
| sys.exit(1) |
| |
| app_path = sys.argv[2] |
| bus = dbus.SessionBus() |
| |
| app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/")) |
| |
| if sys.argv[1] == "run": |
| name = dbus.service.BusName(app_name, bus) |
| mainloop = gobject.MainLoop() |
| |
| app = SessionApplication(bus, app_path, mainloop) |
| |
| mainloop.run() |
| return |
| |
| app = dbus.Interface(bus.get_object(app_name, app_path), |
| "com.example.TestSession") |
| |
| if sys.argv[1] == "create": |
| app.CreateSession(sys.argv[3]) |
| |
| elif sys.argv[1] == "destroy": |
| app.DestroySession(sys.argv[3]) |
| |
| elif sys.argv[1] == "connect": |
| app.Connect(sys.argv[3]) |
| |
| elif sys.argv[1] == "disconnect": |
| app.Disconnect(sys.argv[3]) |
| |
| elif sys.argv[1] == "change": |
| if len(sys.argv) < 5: |
| print "Arguments missing" |
| sys.exit(1) |
| |
| app.Change(sys.argv[3], sys.argv[4], sys.argv[5:]) |
| |
| elif sys.argv[1] == "configure": |
| if len(sys.argv) < 5: |
| print "Arguments missing" |
| sys.exit(1) |
| |
| app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:]) |
| |
| else: |
| print "Unknown command '%s'" % sys.argv[1] |
| sys.exit(1) |
| |
| if __name__ == '__main__': |
| main() |