blob: 2d82fb65ea1245ec585bf39afb39b5464f0cdd6b [file] [log] [blame]
#!/usr/bin/python
import sys
import gobject
import string
import dbus
import dbus.service
import dbus.mainloop.glib
import glib
import traceback
def extract_list(list):
val = "["
for i in list:
val += " " + str(i)
val += " ]"
return val
def extract_values(values):
val = "{"
for key in values.keys():
val += " " + key + "="
if key in ["PrefixLength"]:
val += "%s" % (int(values[key]))
else:
if key in ["Servers", "Excludes"]:
val += extract_list(values[key])
else:
val += str(values[key])
val += " }"
return val
class Notification(dbus.service.Object):
def __init__(self, bus, app, notify_path):
dbus.service.Object.__init__(self)
self.app = app
@dbus.service.method("net.connman.Notification",
in_signature='', out_signature='')
def Release(self):
print "Release %s" % (self._object_path)
session_name = self._object_path.split('/')[-1]
self.app.release(session_name)
@dbus.service.method("net.connman.Notification",
in_signature='a{sv}', out_signature='')
def Update(self, settings):
print "Update called at %s" % (self._object_path)
try:
for key in settings.keys():
if key in ["IPv4", "IPv6"]:
val = extract_values(settings[key])
elif key in ["AllowedBearers"]:
val = extract_list(settings[key])
else:
val = settings[key]
print " %s = %s" % (key, val)
except:
print "Exception:"
traceback.print_exc()
class SessionApplication(dbus.service.Object):
def __init__(self, bus, object_path, mainloop):
dbus.service.Object.__init__(self, bus, object_path)
self.manager = None
self.mainloop = mainloop
self.sessions = {}
try:
bus = dbus.SystemBus()
bus.watch_name_owner('net.connman', self.connman_name_owner_changed)
except dbus.DBusException:
traceback.print_exc()
def connman_name_owner_changed(self, proxy):
try:
if proxy:
print "connman appeared on D-Bus ", str(proxy)
bus = dbus.SystemBus()
self.manager = dbus.Interface(bus.get_object("net.connman", "/"),
"net.connman.Manager")
else:
print "connman disappeared on D-Bus"
self.manager = None
for s in self.sessions.keys():
self.sessions[s]['notify'].remove_from_connection()
self.sessions[s]['notify'] = None
self.sessions = {}
except dbus.DBusException:
traceback.print_exc()
def release(self, session_name):
s = self.find_session(session_name)
if not s:
return
if s['session']:
s['session'].Destroy()
s['session'] = None
if s['notify']:
s['notify'].remove_from_connection()
s['notify'] = None
del self.sessions[session_name]
def type_convert(self, key, value):
if key in [ "AllowedBearers" ]:
return value
elif key in [ "RoamingPolicy", "ConnectionType" ]:
if len(value) > 0:
return value[0]
elif key in [ "Priority", "AvoidHandover",
"StayConnected", "EmergencyCall" ]:
flag = str(value[0]).strip().lower()
val = flag not in ['false', 'f', 'n', '0']
return dbus.Boolean(val)
elif key in [ "PeriodicConnect", "IdleTimeout" ]:
val = value[0]
return dbus.UInt32(val)
return value
def find_session(self, session_name):
if not session_name in self.sessions.keys():
return None
return self.sessions[session_name]
@dbus.service.method("com.example.TestSession",
in_signature='', out_signature='')
def CreateSession(self, session_name):
print "Create session"
s = self.find_session(session_name)
if s and s['session'] :
print "Session %s already created-> drop reqest" % (session_name)
return
try:
bus = dbus.SystemBus()
if s == None:
s = {}
s['notify_path'] = self._object_path + "/" + session_name
s['notify'] = Notification(bus, self, s['notify_path'])
s['notify'].add_to_connection(bus, s['notify_path'])
if not 'settings' in s.keys():
s['settings'] = {};
s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path'])
print "notify path %s" % (s['notify_path'])
print "session path %s" % (s['session_path'])
s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']),
"net.connman.Session")
self.sessions[session_name] = s
except dbus.DBusException, e:
if e.get_dbus_name() in ['net.connman.Error.Failed']:
print e.get_dbus_message()
return
traceback.print_exc()
@dbus.service.method("com.example.TestSession",
in_signature='', out_signature='')
def DestroySession(self, session_name):
print "Destroy session"
s = self.find_session(session_name)
if s == None or s['session'] == None:
print "The session is not running -> drop request"
return
try:
self.release(session_name)
except dbus.DBusException:
traceback.print_exc()
@dbus.service.method("com.example.TestSession",
in_signature='', out_signature='')
def Connect(self, session_name):
print "Connect session"
s = self.find_session(session_name)
if s == None or s['session'] == None:
print "The session is not running -> drop request"
return
try:
s['session'].Connect()
except dbus.DBusException, e:
if e.get_dbus_name() in ['net.connman.Error.Failed']:
print e.get_dbus_message()
return
traceback.print_exc()
@dbus.service.method("com.example.TestSession",
in_signature='', out_signature='')
def Disconnect(self, session_name):
print "Disconnect session"
s = self.find_session(session_name)
if s == None or s['session'] == None:
print "The session is not running -> drop request"
return
try:
s['session'].Disconnect()
except dbus.DBusException, e:
if e.get_dbus_name() in ['net.connman.Error.Failed']:
print e.get_dbus_message()
return
traceback.print_exc()
@dbus.service.method("com.example.TestSession",
in_signature='', out_signature='')
def Change(self, session_name, key, value):
print "Update session settings"
s = self.find_session(session_name)
if s == None or s['session'] == None:
print "The session is not running -> drop request"
return
try:
val = self.type_convert(key, value)
s['session'].Change(key, val)
except dbus.DBusException, e:
if e.get_dbus_name() in ['net.connman.Error.Failed']:
print e.get_dbus_message()
return
traceback.print_exc()
@dbus.service.method("com.example.TestSession",
in_signature='', out_signature='')
def Configure(self, session_name, key, value):
print "Configure session settings"
s = self.find_session(session_name)
if s == None:
s = {}
s['notify_path'] = None
s['notify'] = None
if not 'settings' in s.keys():
s['settings'] = {};
s['session_path'] = None
s['session'] = None
self.sessions[session_name] = s
if s and s['session']:
print "The session is running, use change -> drop request"
return
val = self.type_convert(key, value)
s['settings'][key] = val
def main():
if len(sys.argv) < 2:
print "Usage: %s <command>" % (sys.argv[0])
print ""
print " enable"
print " disable"
print " create <app_path> <session_name>"
print " destroy <app_path> <session_name>"
print " connect <app_path> <session_name>"
print " disconnect <app_path> <session_name>"
print " change <app_path> <session_name> <key> <value>"
print " configure <app_path> <session_name> <key> <value>"
print ""
print " run <app_path>"
sys.exit(1)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if sys.argv[1] == "enable":
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("net.connman", "/"),
"net.connman.Manager")
manager.SetProperty("SessionMode", True)
return
elif sys.argv[1] == "disable":
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("net.connman", "/"),
"net.connman.Manager")
manager.SetProperty("SessionMode", False)
return
if (len(sys.argv) < 3):
print "Need test application path"
sys.exit(1)
app_path = sys.argv[2]
bus = dbus.SessionBus()
app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/"))
if sys.argv[1] == "run":
name = dbus.service.BusName(app_name, bus)
mainloop = gobject.MainLoop()
app = SessionApplication(bus, app_path, mainloop)
mainloop.run()
return
app = dbus.Interface(bus.get_object(app_name, app_path),
"com.example.TestSession")
if sys.argv[1] == "create":
app.CreateSession(sys.argv[3])
elif sys.argv[1] == "destroy":
app.DestroySession(sys.argv[3])
elif sys.argv[1] == "connect":
app.Connect(sys.argv[3])
elif sys.argv[1] == "disconnect":
app.Disconnect(sys.argv[3])
elif sys.argv[1] == "change":
if len(sys.argv) < 5:
print "Arguments missing"
sys.exit(1)
app.Change(sys.argv[3], sys.argv[4], sys.argv[5:])
elif sys.argv[1] == "configure":
if len(sys.argv) < 5:
print "Arguments missing"
sys.exit(1)
app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:])
else:
print "Unknown command '%s'" % sys.argv[1]
sys.exit(1)
if __name__ == '__main__':
main()