blob: a8060439e4b79e2d306cff1cef4b3c5f1a03533c [file] [log] [blame] [edit]
#!/usr/bin/python
from __future__ import absolute_import, print_function, unicode_literals
from optparse import OptionParser, make_option
import os
from socket import SOCK_SEQPACKET, socket
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
import glib
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
mainloop = None
audio_supported = True
try:
from socket import AF_BLUETOOTH, BTPROTO_SCO
except:
print("WARNING: python compiled without Bluetooth support"
" - audio will not be available")
audio_supported = False
BUF_SIZE = 1024
BDADDR_ANY = '00:00:00:00:00:00'
HF_NREC = 0x0001
HF_3WAY = 0x0002
HF_CLI = 0x0004
HF_VOICE_RECOGNITION = 0x0008
HF_REMOTE_VOL = 0x0010
HF_ENHANCED_STATUS = 0x0020
HF_ENHANCED_CONTROL = 0x0040
HF_CODEC_NEGOTIATION = 0x0080
AG_3WAY = 0x0001
AG_NREC = 0x0002
AG_VOICE_RECOGNITION = 0x0004
AG_INBAND_RING = 0x0008
AG_VOICE_TAG = 0x0010
AG_REJECT_CALL = 0x0020
AG_ENHANCED_STATUS = 0x0040
AG_ENHANCED_CONTROL = 0x0080
AG_EXTENDED_RESULT = 0x0100
AG_CODEC_NEGOTIATION = 0x0200
HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
HF_REMOTE_VOL | HF_ENHANCED_STATUS |
HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
AVAIL_CODECS = "1,2"
class HfpConnection:
slc_complete = False
fd = None
io_id = 0
version = 0
features = 0
pending = None
def disconnect(self):
if (self.fd >= 0):
os.close(self.fd)
self.fd = -1
glib.source_remove(self.io_id)
self.io_id = 0
def slc_completed(self):
print("SLC establisment complete")
self.slc_complete = True
def slc_next_cmd(self, cmd):
if not cmd:
self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
elif (cmd.startswith("AT+BRSF")):
if (self.features & AG_CODEC_NEGOTIATION and
HF_FEATURES & HF_CODEC_NEGOTIATION):
self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
else:
self.send_cmd("AT+CIND=?")
elif (cmd.startswith("AT+BAC")):
self.send_cmd("AT+CIND=?")
elif (cmd.startswith("AT+CIND=?")):
self.send_cmd("AT+CIND?")
elif (cmd.startswith("AT+CIND?")):
self.send_cmd("AT+CMER=3,0,0,1")
elif (cmd.startswith("AT+CMER=")):
if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
self.send_cmd("AT+CHLD=?")
else:
self.slc_completed()
elif (cmd.startswith("AT+CHLD=?")):
self.slc_completed()
else:
print("Unknown SLC command completed: %s" % (cmd))
def io_cb(self, fd, cond):
buf = os.read(fd, BUF_SIZE)
buf = buf.strip()
print("Received: %s" % (buf))
if (buf == "OK" or buf == "ERROR"):
cmd = self.pending
self.pending = None
if (not self.slc_complete):
self.slc_next_cmd(cmd)
return True
parts = buf.split(':')
if (parts[0] == "+BRSF"):
self.features = int(parts[1])
return True
def send_cmd(self, cmd):
if (self.pending):
print("ERROR: Another command is pending")
return
print("Sending: %s" % (cmd))
os.write(self.fd, cmd + "\r\n")
self.pending = cmd
def __init__(self, fd, version, features):
self.fd = fd
self.version = version
self.features = features
print("Version 0x%04x Features 0x%04x" % (version, features))
self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
self.slc_next_cmd(None)
class HfpProfile(dbus.service.Object):
sco_socket = None
io_id = 0
conns = {}
def sco_cb(self, sock, cond):
(sco, peer) = sock.accept()
print("New SCO connection from %s" % (peer))
def init_sco(self, sock):
self.sco_socket = sock
self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
def __init__(self, bus, path, sco):
dbus.service.Object.__init__(self, bus, path)
if sco:
self.init_sco(sco)
@dbus.service.method("org.bluez.Profile1",
in_signature="", out_signature="")
def Release(self):
print("Release")
mainloop.quit()
@dbus.service.method("org.bluez.Profile1",
in_signature="", out_signature="")
def Cancel(self):
print("Cancel")
@dbus.service.method("org.bluez.Profile1",
in_signature="o", out_signature="")
def RequestDisconnection(self, path):
conn = self.conns.pop(path)
conn.disconnect()
@dbus.service.method("org.bluez.Profile1",
in_signature="oha{sv}", out_signature="")
def NewConnection(self, path, fd, properties):
fd = fd.take()
version = 0x0105
features = 0
print("NewConnection(%s, %d)" % (path, fd))
for key in properties.keys():
if key == "Version":
version = properties[key]
elif key == "Features":
features = properties[key]
conn = HfpConnection(fd, version, features)
self.conns[path] = conn
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez",
"/org/bluez"), "org.bluez.ProfileManager1")
option_list = [
make_option("-p", "--path", action="store",
type="string", dest="path",
default="/bluez/test/hfp"),
make_option("-n", "--name", action="store",
type="string", dest="name",
default=None),
make_option("-C", "--channel", action="store",
type="int", dest="channel",
default=None),
]
parser = OptionParser(option_list=option_list)
(options, args) = parser.parse_args()
mainloop = GObject.MainLoop()
opts = {
"Version" : dbus.UInt16(0x0106),
"Features" : dbus.UInt16(HF_FEATURES),
}
if (options.name):
opts["Name"] = options.name
if (options.channel is not None):
opts["Channel"] = dbus.UInt16(options.channel)
if audio_supported:
sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
sco.bind(BDADDR_ANY)
sco.listen(1)
else:
sco = None
profile = HfpProfile(bus, options.path, sco)
manager.RegisterProfile(options.path, "hfp-hf", opts)
print("Profile registered - waiting for connections")
mainloop.run()