From 5bad910097bf39ccc1ffea4b991b3c27d743d2e2 Mon Sep 17 00:00:00 2001 From: Philip Withnall <philip.withnall@collabora.co.uk> Date: Wed, 4 Nov 2015 11:26:32 +0000 Subject: [PATCH] bluez: Reformat code to be PEP8-compliant Signed-off-by: Philip Withnall <philip.withnall@collabora.co.uk> Differential Revision: https://phabricator.apertis.org/D872 Reviewed-by: Simon McVittie <simon.mcvittie@collabora.co.uk> --- bluez/bluez-hfp | 554 ++++++++++---------- bluez/pair-two | 245 ++++----- bluez/simple-agent | 224 ++++---- bluez/test-avrcp.py | 22 +- bluez/ubt | 1214 ++++++++++++++++++++++--------------------- 5 files changed, 1147 insertions(+), 1112 deletions(-) diff --git a/bluez/bluez-hfp b/bluez/bluez-hfp index dd95bd5..f61878a 100755 --- a/bluez/bluez-hfp +++ b/bluez/bluez-hfp @@ -1,22 +1,22 @@ #!/usr/bin/python # -#BlueZ - Bluetooth protocol stack for Linux +# BlueZ - Bluetooth protocol stack for Linux # -#Copyright (C) 2012 Collabora Ltd. +# Copyright (C) 2012 Collabora Ltd. # -#This program is free software; you can redistribute it and/or modify -#it under the terms of the GNU General Public License as published by -#the Free Software Foundation; either version 2 of the License, or -#(at your option) any later version. +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. # -#This program is distributed in the hope that it will be useful, -#but WITHOUT ANY WARRANTY; without even the implied warranty of -#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -#GNU General Public License for more details. +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. # -#You should have received a copy of the GNU General Public License -#along with this program; if not, write to the Free Software -#Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import gobject import os @@ -29,353 +29,365 @@ import urllib2 from optparse import OptionParser profiles = { - "0000111f-0000-1000-8000-00805f9b34fb" : "HFP AG", + "0000111f-0000-1000-8000-00805f9b34fb": "HFP AG", } BASE_PATH = "" - process_q = [] + + def process_next(): - if len(process_q) == 0: - mainloop.quit() - return + if len(process_q) == 0: + mainloop.quit() + return - f = process_q[0] - process_q.remove(f) + f = process_q[0] + process_q.remove(f) + + gobject.idle_add(f) - gobject.idle_add(f) def TEST_START(str): - print ("TEST STARTED: %s" % str) + print ("TEST STARTED: %s" % str) + + def TEST_FAILED(str): - print ("TEST FINISHED: %s : FAILED" % str) + print ("TEST FINISHED: %s : FAILED" % str) + + def TEST_PASSED(str): - print ("TEST FINISHED: %s : PASSED" % str) + print ("TEST FINISHED: %s : PASSED" % str) + def ask(prompt): - try: - return raw_input(prompt) - except: - return input(prompt) + try: + return raw_input(prompt) + except: + return input(prompt) + class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" + _dbus_error_name = "org.bluez.Error.Rejected" + class BlueZAgent(dbus.service.Object): - exit_on_release = False - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print(" | Authorizing %s, %s" % (device, uuid)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - print("| RequestPinCode (%s)" % (device)) - return ask("| Enter PIN Code: ") - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("| DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("| DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - print("| Confiming passkey %06d (%s)" % (passkey, device)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("| Confirming ModeChange (%s)" % (mode)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("| Cancel") + exit_on_release = False + + def set_exit_on_release(self, exit_on_release): + self.exit_on_release = exit_on_release + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Release(self): + if self.exit_on_release: + mainloop.quit() + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def Authorize(self, device, uuid): + print(" | Authorizing %s, %s" % (device, uuid)) + return + + @dbus.service.method("org.bluez.Agent", + in_signature="o", out_signature="s") + def RequestPinCode(self, device): + print("| RequestPinCode (%s)" % (device)) + return ask("| Enter PIN Code: ") + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def DisplayPasskey(self, device, passkey): + print("| DisplayPasskey (%s, %06d)" % (device, passkey)) + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def DisplayPinCode(self, device, pincode): + print("| DisplayPinCode (%s, %s)" % (device, pincode)) + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def RequestConfirmation(self, device, passkey): + print("| Confiming passkey %06d (%s)" % (passkey, device)) + return + + @dbus.service.method("org.bluez.Agent", + in_signature="s", out_signature="") + def ConfirmModeChange(self, mode): + print("| Confirming ModeChange (%s)" % (mode)) + return + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Cancel(self): + print("| Cancel") + class Device(): - def __init__(self, path): - self.path = path - self.profiles = [] + def __init__(self, path): + self.path = path + self.profiles = [] + + self.device = dbus.Interface(bus.get_object("org.bluez", self.path), + "org.bluez.Device") + self.props = self.device.GetProperties() + self.device.SetProperty("Trusted", True) + + bus.add_signal_receiver(self.device_property_changed, + dbus_interface="org.bluez.Device", + signal_name="PropertyChanged") - self.device = dbus.Interface(bus.get_object("org.bluez", - self.path), "org.bluez.Device") - self.props = self.device.GetProperties() - self.device.SetProperty("Trusted", True) + if self.props["Paired"]: + self.get_properties() - bus.add_signal_receiver(self.device_property_changed, - dbus_interface="org.bluez.Device", - signal_name="PropertyChanged") + def test_hfp_ag(self): + TEST_START("HFP AG") - if self.props["Paired"] == True: - self.get_properties() + try: + bus = dbus.SystemBus() - def test_hfp_ag(self): - TEST_START("HFP AG") + manager = dbus.Interface(bus.get_object('org.ofono', '/'), + 'org.ofono.Manager') + modems = manager.GetModems() + path = modems[0][0] - try: - bus = dbus.SystemBus() + modem = dbus.Interface(bus.get_object('org.ofono', path), + 'org.ofono.Modem') + props = modem.GetProperties() + if not props["Powered"]: + modem.SetProperty("Powered", dbus.Boolean(1), + timeout=120) - manager = dbus.Interface(bus.get_object('org.ofono', '/'), - 'org.ofono.Manager') - modems = manager.GetModems() - path = modems[0][0] + vcm = dbus.Interface(bus.get_object('org.ofono', path), + 'org.ofono.VoiceCallManager') - modem = dbus.Interface(bus.get_object('org.ofono', path), - 'org.ofono.Modem') - props = modem.GetProperties() - if props["Powered"] == False: - modem.SetProperty("Powered", dbus.Boolean(1), - timeout = 120) + number = ask("Type the phone number to call:") - vcm = dbus.Interface(bus.get_object('org.ofono', path), - 'org.ofono.VoiceCallManager') + path = vcm.Dial(number, "default") - number = ask("Type the phone number to call:") + ask("Press ENTER to hangup the call:") - path = vcm.Dial(number, "default") + vcm.HangupAll() - ask("Press ENTER to hangup the call:") + ret = ask("| Did you hear the call (y/n)\n") - vcm.HangupAll() + if ret != 'y' and ret != 'Y': + raise - ret = ask("| Did you hear the call (y/n)\n") + ask("From a second phone call the phone connected to " + "oFono.\n Once the phone starts ring press ENTER" + " to anwser:") - if ret != 'y' and ret != 'Y': - raise + calls = vcm.GetCalls() - ask("From a second phone call the phone connected to " \ - "oFono.\n Once the phone starts ring press ENTER" \ - " to anwser:") + for path, properties in calls: + state = properties["State"] - calls = vcm.GetCalls() + if state != "incoming": + continue - for path, properties in calls: - state = properties["State"] + call = dbus.Interface(bus.get_object('org.ofono', path), + 'org.ofono.VoiceCall') - if state != "incoming": - continue + call.Answer() - call = dbus.Interface(bus.get_object('org.ofono', - path), 'org.ofono.VoiceCall') + ask("Press ENTER to hangup the call:") - call.Answer() + vcm.HangupAll() - ask("Press ENTER to hangup the call:") + ret = ask("| Did you hear the call (y/n)\n") - vcm.HangupAll() + if ret != 'y' and ret != 'Y': + raise - ret = ask("| Did you hear the call (y/n)\n") + except: + TEST_FAILED("HFP AG") + process_next() + return - if ret != 'y' and ret != 'Y': - raise + TEST_PASSED("HFP AG") + process_next() - except: - TEST_FAILED("HFP AG") - process_next() - return + def device_test_profiles(self): + if "HFP AG" in self.profiles: + process_q.append(self.test_hfp_ag) - TEST_PASSED("HFP AG") - process_next() + process_next() - def device_test_profiles(self): - if "HFP AG" in self.profiles: - process_q.append(self.test_hfp_ag) + def device_parse_uuids(self, uuids): + print ("| Profiles supported:") + for uuid in uuids: + if uuid in profiles.keys(): + print ("\t * %s" % profiles[uuid]) + self.profiles.append(profiles[uuid]) + self.device_test_profiles() - process_next() + def get_properties(self): + props = self.device.GetProperties() + self.device_parse_uuids(props["UUIDs"]) - def device_parse_uuids(self, uuids): - print ("| Profiles supported:") - for uuid in uuids: - if uuid in profiles.keys(): - print ("\t * %s" % profiles[uuid]) - self.profiles.append(profiles[uuid]) - self.device_test_profiles() - - def get_properties(self): - props = self.device.GetProperties() - self.device_parse_uuids(props["UUIDs"]) + def device_property_changed(self, name, value): + if name == "Paired" and value: + self.device.SetProperty("Trusted", True) + TEST_PASSED("Pairable Responder") + gobject.timeout_add(7000, self.get_properties) - def device_property_changed(self, name, value): - if name == "Paired" and value == True: - self.device.SetProperty("Trusted", True) - TEST_PASSED("Pairable Responder") - gobject.timeout_add(7000, self.get_properties) class Adapter(): - def __init__(self, adapter, device): - self.adapter = adapter - self.device = device - self.found = {} + def __init__(self, adapter, device): + self.adapter = adapter + self.device = device + self.found = {} - def create_device_reply(self, device): - TEST_PASSED("Pairing Initiatior") - self.adapter.RemoveDevice(device) - process_next() + def create_device_reply(self, device): + TEST_PASSED("Pairing Initiatior") + self.adapter.RemoveDevice(device) + process_next() - def create_device_error(self, error): - TEST_FAILED("Pairing Initiatior: Create device failed %s" - % (error)) - process_next() + def create_device_error(self, error): + TEST_FAILED("Pairing Initiatior: Create device failed %s" % (error)) + process_next() - def device_created(self, path): + def device_created(self, path): - print ("| Device created %s" % path) + print ("| Device created %s" % path) - Device(path) + Device(path) - def create_paired_device(self, device): - try: - path = self.adapter.FindDevice(device) - self.adapter.RemoveDevice(path) - except: - pass + def create_paired_device(self, device): + try: + path = self.adapter.FindDevice(device) + self.adapter.RemoveDevice(path) + except: + pass - path = self.adapter.CreatePairedDevice(device, - "/test/agent2", "DisplayYesNo", - timeout=60000, - reply_handler=self.create_device_reply, - error_handler=self.create_device_error) + path = self.adapter.CreatePairedDevice( + device, + "/test/agent2", "DisplayYesNo", + timeout=60000, + reply_handler=self.create_device_reply, + error_handler=self.create_device_error) - def show_menu(self): - self.adapter.StopDiscovery() + def show_menu(self): + self.adapter.StopDiscovery() - d = dialog.Dialog() - device = d.menu("Select one device to pair with:", - choices=self.found.items()) - d.clear() + d = dialog.Dialog() + device = d.menu("Select one device to pair with:", + choices=self.found.items()) + d.clear() - if device[1] == '': - ret = ask("| Retry discovery (y/n)" ) - if ret == 'y' or ret == 'Y': - self.start_paring_initiator() - return - TEST_FAILED("Pairing Initiatior: No device selected") - exit(0) + if device[1] == '': + ret = ask("| Retry discovery (y/n)") + if ret == 'y' or ret == 'Y': + self.start_paring_initiator() + return + TEST_FAILED("Pairing Initiatior: No device selected") + exit(0) - self.create_paired_device(self.found[device[1]]) + self.create_paired_device(self.found[device[1]]) - def device_found(self, address, values): - if values["Name"] in self.found: - return + def device_found(self, address, values): + if values["Name"] in self.found: + return - self.found[values["Name"]] = values["Address"] + self.found[values["Name"]] = values["Address"] - def get_device(self): - path = self.adapter.FindDevice(self.device) + def get_device(self): + path = self.adapter.FindDevice(self.device) - Device(path) + Device(path) - def start_paring_initiator(self): - TEST_START("Pairing Initiatior") + def start_paring_initiator(self): + TEST_START("Pairing Initiatior") - if self.device: - self.create_paired_device(self.device) - return + if self.device: + self.create_paired_device(self.device) + return - self.adapter.StartDiscovery() - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Pairable", False) + self.adapter.StartDiscovery() + self.adapter.SetProperty("Powered", True) + self.adapter.SetProperty("Name", "BlueZ") + self.adapter.SetProperty("Pairable", False) - bus.add_signal_receiver(self.device_found, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceFound") + bus.add_signal_receiver(self.device_found, + dbus_interface="org.bluez.Adapter", + signal_name="DeviceFound") - gobject.timeout_add(12000, self.show_menu) + gobject.timeout_add(12000, self.show_menu) - def start_paring_responder(self): - TEST_START("Pairing Responder") + def start_paring_responder(self): + TEST_START("Pairing Responder") - props = self.adapter.GetProperties() + props = self.adapter.GetProperties() - print("| In the phone start pairing with %s (%s)." % - (props["Name"], props["Address"])) + print("| In the phone start pairing with %s (%s)." % + (props["Name"], props["Address"])) - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Discoverable", True) - self.adapter.SetProperty("Pairable", True) + self.adapter.SetProperty("Powered", True) + self.adapter.SetProperty("Name", "BlueZ") + self.adapter.SetProperty("Discoverable", True) + self.adapter.SetProperty("Pairable", True) - bus.add_signal_receiver(self.device_created, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceCreated") + bus.add_signal_receiver(self.device_created, + dbus_interface="org.bluez.Adapter", + signal_name="DeviceCreated") -def parse_options(): - parser.add_option("-i", "--interface", dest="interface", - help="HCI device", metavar="HCI") - parser.add_option("-s", "--skip", action="store_true", dest="skip_pair", - help="Skip Pairing") - parser.add_option("-d", "--device", dest="device", - help="Device to connect", metavar="DEVICE") - return parser.parse_args() -def create_base_path(): - global BASE_PATH +def parse_options(): + parser.add_option("-i", "--interface", dest="interface", + help="HCI device", metavar="HCI") + parser.add_option("-s", "--skip", action="store_true", dest="skip_pair", + help="Skip Pairing") + parser.add_option("-d", "--device", dest="device", + help="Device to connect", metavar="DEVICE") + return parser.parse_args() - lt = time.localtime() - BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday, - lt.tm_hour, lt.tm_min, lt.tm_sec) - os.mkdir(BASE_PATH) -if __name__ == "__main__": - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +def create_base_path(): + global BASE_PATH - parser = OptionParser() + lt = time.localtime() + BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday, + lt.tm_hour, lt.tm_min, + lt.tm_sec) + os.mkdir(BASE_PATH) - (options, args) = parse_options() +if __name__ == "__main__": + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - mainloop = gobject.MainLoop() + parser = OptionParser() - bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + (options, args) = parse_options() - if options.interface: - path = manager.FindAdapter(options.interface) - else: - path = manager.DefaultAdapter() + mainloop = gobject.MainLoop() - adapter_iface = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Adapter") + bus = dbus.SystemBus() + manager = dbus.Interface(bus.get_object("org.bluez", "/"), + "org.bluez.Manager") - if options.skip_pair and not options.device: - print ("Device not specified") - exit(0) + if options.interface: + path = manager.FindAdapter(options.interface) + else: + path = manager.DefaultAdapter() - agent = BlueZAgent(bus, "/test/agent") - agent = BlueZAgent(bus, "/test/agent2") - adapter = Adapter(adapter_iface, options.device) - adapter_iface.RegisterAgent("/test/agent", "DisplayYesNo") + adapter_iface = dbus.Interface(bus.get_object("org.bluez", path), + "org.bluez.Adapter") - if options.skip_pair: - process_q.append(adapter.get_device) - else: - process_q.append(adapter.start_paring_initiator) - process_q.append(adapter.start_paring_responder) + if options.skip_pair and not options.device: + print ("Device not specified") + exit(0) - create_base_path() - gobject.timeout_add(2000, process_next) - mainloop.run() - adapter_iface.UnregisterAgent("/test/agent") + agent = BlueZAgent(bus, "/test/agent") + agent = BlueZAgent(bus, "/test/agent2") + adapter = Adapter(adapter_iface, options.device) + adapter_iface.RegisterAgent("/test/agent", "DisplayYesNo") + if options.skip_pair: + process_q.append(adapter.get_device) + else: + process_q.append(adapter.start_paring_initiator) + process_q.append(adapter.start_paring_responder) + create_base_path() + gobject.timeout_add(2000, process_next) + mainloop.run() + adapter_iface.UnregisterAgent("/test/agent") diff --git a/bluez/pair-two b/bluez/pair-two index b0bc065..7a7bfa4 100755 --- a/bluez/pair-two +++ b/bluez/pair-two @@ -13,140 +13,145 @@ import time dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) mainloop = GObject.MainLoop() + class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" + _dbus_error_name = "org.bluez.Error.Rejected" + class Agent(dbus.service.Object): - exit_on_release = False - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print("Authorize (%s, %s)" % (device, uuid)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - return "0000" - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - return; - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("ConfirmModeChange (%s)" % (mode)) - authorize = ask("Authorize mode change (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Mode change by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("Cancel") + exit_on_release = False + + def set_exit_on_release(self, exit_on_release): + self.exit_on_release = exit_on_release + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Release(self): + if self.exit_on_release: + mainloop.quit() + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def Authorize(self, device, uuid): + print("Authorize (%s, %s)" % (device, uuid)) + return + + @dbus.service.method("org.bluez.Agent", + in_signature="o", out_signature="s") + def RequestPinCode(self, device): + return "0000" + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def DisplayPasskey(self, device, passkey): + print("DisplayPasskey (%s, %06d)" % (device, passkey)) + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def DisplayPinCode(self, device, pincode): + print("DisplayPinCode (%s, %s)" % (device, pincode)) + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def RequestConfirmation(self, device, passkey): + return + + @dbus.service.method("org.bluez.Agent", + in_signature="s", out_signature="") + def ConfirmModeChange(self, mode): + print("ConfirmModeChange (%s)" % (mode)) + authorize = ask("Authorize mode change (yes/no): ") + if (authorize == "yes"): + return + raise Rejected("Mode change by user") + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Cancel(self): + print("Cancel") path0 = "/agent0" path1 = "/agent1" Agent(dbus.SystemBus(), path0) Agent(dbus.SystemBus(), path1) -# 0 is the initiator side, 1 is the responder side + +# 0 is the initiator side, 1 is the responder side def pair(bus, adapter0, iocap0, adapter1, iocap1): - pair.device0 = None - pair.device1 = None + pair.device0 = None + pair.device1 = None - def create_device_reply(device): - pair.device0 = device - props = adapter0.GetProperties() - time.sleep(1) - pair.device1 = adapter1.FindDevice(props["Address"]) - mainloop.quit() + def create_device_reply(device): + pair.device0 = device + props = adapter0.GetProperties() + time.sleep(1) + pair.device1 = adapter1.FindDevice(props["Address"]) + mainloop.quit() - def create_device_error(error): - print("Creating device failed: %s" % (error)) - mainloop.quit() - - props1 = adapter1.GetProperties() - adapter1.RegisterAgent(path1, iocap1) + def create_device_error(error): + print("Creating device failed: %s" % (error)) + mainloop.quit() - adapter0.CreatePairedDevice(props1["Address"], path0, iocap0, - timeout=60000, - reply_handler=create_device_reply, - error_handler=create_device_error) + props1 = adapter1.GetProperties() + adapter1.RegisterAgent(path1, iocap1) - mainloop.run() - adapter1.UnregisterAgent(path1) + adapter0.CreatePairedDevice(props1["Address"], path0, iocap0, + timeout=60000, + reply_handler=create_device_reply, + error_handler=create_device_error) - return pair.device0, pair.device1 + mainloop.run() + adapter1.UnregisterAgent(path1) + + return pair.device0, pair.device1 -def test_pair(bus, adapter0, iocap0, adapter1, iocap1): - device0, device1 = pair(bus, adapter0, iocap0, adapter1, iocap1) - if device0: - return True - else: - return False - - -if __name__ == "__main__": - bus = dbus.SystemBus() - try: - manager = dbus.Interface(bus.get_object("org.bluez", - "/"), "org.bluez.Manager") - except: - raise RuntimeError("Bluetooth daemon is not running!") - - props = manager.GetProperties() - if len(props["Adapters"]) < 2: - print "2 Bluetooth adapters are needed!" - - adapter0 = dbus.Interface(bus.get_object("org.bluez", - props["Adapters"][0]), "org.bluez.Adapter") - hci0 = re.search('hci[0-9]', props["Adapters"][0]).group(0) - adapter1 = dbus.Interface(bus.get_object("org.bluez", - props["Adapters"][1]), "org.bluez.Adapter") - hci1 = re.search('hci[0-9]', props["Adapters"][0]).group(0) - - try: - p1 = adapter1.GetProperties() - path = adapter0.FindDevice(p1["Address"]) - adapter0.RemoveDevice(path) - except: - pass - - try: - p0 = adapter0.GetProperties() - path = adapter1.FindDevice(p0["Address"]) - adapter1.RemoveDevice(path) - except: - pass - - paired = test_pair(bus, adapter0, "DisplayYesNo", adapter1, - "DisplayYesNo") - if paired: - exit(0) - else: - exit(1) +def test_pair(bus, adapter0, iocap0, adapter1, iocap1): + device0, device1 = pair(bus, adapter0, iocap0, adapter1, iocap1) + if device0: + return True + else: + return False + + +if __name__ == "__main__": + bus = dbus.SystemBus() + try: + manager = dbus.Interface(bus.get_object("org.bluez", + "/"), "org.bluez.Manager") + except: + raise RuntimeError("Bluetooth daemon is not running!") + + props = manager.GetProperties() + if len(props["Adapters"]) < 2: + print("2 Bluetooth adapters are needed!") + + adapter0 = dbus.Interface(bus.get_object("org.bluez", + props["Adapters"][0]), + "org.bluez.Adapter") + hci0 = re.search('hci[0-9]', props["Adapters"][0]).group(0) + adapter1 = dbus.Interface(bus.get_object("org.bluez", + props["Adapters"][1]), + "org.bluez.Adapter") + hci1 = re.search('hci[0-9]', props["Adapters"][0]).group(0) + + try: + p1 = adapter1.GetProperties() + path = adapter0.FindDevice(p1["Address"]) + adapter0.RemoveDevice(path) + except: + pass + + try: + p0 = adapter0.GetProperties() + path = adapter1.FindDevice(p0["Address"]) + adapter1.RemoveDevice(path) + except: + pass + + paired = test_pair(bus, adapter0, "DisplayYesNo", adapter1, + "DisplayYesNo") + if paired: + exit(0) + else: + exit(1) diff --git a/bluez/simple-agent b/bluez/simple-agent index a25eaf0..82c8d41 100755 --- a/bluez/simple-agent +++ b/bluez/simple-agent @@ -10,135 +10,137 @@ import dbus.service import dbus.mainloop.glib from optparse import OptionParser + def ask(prompt): - try: - return raw_input(prompt) - except: - return input(prompt) + try: + return raw_input(prompt) + except: + return input(prompt) + class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" + _dbus_error_name = "org.bluez.Error.Rejected" + class Agent(dbus.service.Object): - exit_on_release = True - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - print("Release") - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print("Authorize (%s, %s)" % (device, uuid)) - authorize = ask("Authorize connection (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Connection rejected by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - print("RequestPinCode (%s)" % (device)) - return ask("Enter PIN Code: ") - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="u") - def RequestPasskey(self, device): - print("RequestPasskey (%s)" % (device)) - passkey = ask("Enter passkey: ") - return dbus.UInt32(passkey) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - print("RequestConfirmation (%s, %06d)" % (device, passkey)) - confirm = ask("Confirm passkey (yes/no): ") - if (confirm == "yes"): - return - raise Rejected("Passkey doesn't match") - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("ConfirmModeChange (%s)" % (mode)) - authorize = ask("Authorize mode change (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Mode change by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("Cancel") + exit_on_release = True + + def set_exit_on_release(self, exit_on_release): + self.exit_on_release = exit_on_release + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Release(self): + print("Release") + if self.exit_on_release: + mainloop.quit() + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def Authorize(self, device, uuid): + print("Authorize (%s, %s)" % (device, uuid)) + authorize = ask("Authorize connection (yes/no): ") + if (authorize == "yes"): + return + raise Rejected("Connection rejected by user") + + @dbus.service.method("org.bluez.Agent", + in_signature="o", out_signature="s") + def RequestPinCode(self, device): + print("RequestPinCode (%s)" % (device)) + return ask("Enter PIN Code: ") + + @dbus.service.method("org.bluez.Agent", + in_signature="o", out_signature="u") + def RequestPasskey(self, device): + print("RequestPasskey (%s)" % (device)) + passkey = ask("Enter passkey: ") + return dbus.UInt32(passkey) + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def DisplayPasskey(self, device, passkey): + print("DisplayPasskey (%s, %06d)" % (device, passkey)) + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def DisplayPinCode(self, device, pincode): + print("DisplayPinCode (%s, %s)" % (device, pincode)) + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def RequestConfirmation(self, device, passkey): + print("RequestConfirmation (%s, %06d)" % (device, passkey)) + confirm = ask("Confirm passkey (yes/no): ") + if (confirm == "yes"): + return + raise Rejected("Passkey doesn't match") + + @dbus.service.method("org.bluez.Agent", + in_signature="s", out_signature="") + def ConfirmModeChange(self, mode): + print("ConfirmModeChange (%s)" % (mode)) + authorize = ask("Authorize mode change (yes/no): ") + if (authorize == "yes"): + return + raise Rejected("Mode change by user") + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Cancel(self): + print("Cancel") + def create_device_reply(device): - print("New device (%s)" % (device)) - mainloop.quit() + print("New device (%s)" % (device)) + mainloop.quit() + def create_device_error(error): - print("Creating device failed: %s" % (error)) - mainloop.quit() + print("Creating device failed: %s" % (error)) + mainloop.quit() if __name__ == '__main__': - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - - bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - capability = "KeyboardDisplay" + bus = dbus.SystemBus() + manager = dbus.Interface(bus.get_object("org.bluez", "/"), + "org.bluez.Manager") - parser = OptionParser() - parser.add_option("-c", "--capability", action="store", - type="string", dest="capability") - (options, args) = parser.parse_args() - if options.capability: - capability = options.capability + capability = "KeyboardDisplay" - if len(args) > 0: - path = manager.FindAdapter(args[0]) - else: - path = manager.DefaultAdapter() + parser = OptionParser() + parser.add_option("-c", "--capability", action="store", + type="string", dest="capability") + (options, args) = parser.parse_args() + if options.capability: + capability = options.capability - adapter = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Adapter") + if len(args) > 0: + path = manager.FindAdapter(args[0]) + else: + path = manager.DefaultAdapter() - path = "/test/agent" - agent = Agent(bus, path) + adapter = dbus.Interface(bus.get_object("org.bluez", path), + "org.bluez.Adapter") - mainloop = GObject.MainLoop() + path = "/test/agent" + agent = Agent(bus, path) - if len(args) > 1: - if len(args) > 2: - device = adapter.FindDevice(args[1]) - adapter.RemoveDevice(device) + mainloop = GObject.MainLoop() - agent.set_exit_on_release(False) - adapter.CreatePairedDevice(args[1], path, capability, - timeout=60000, - reply_handler=create_device_reply, - error_handler=create_device_error) - else: - adapter.RegisterAgent(path, capability) - print("Agent registered") + if len(args) > 1: + if len(args) > 2: + device = adapter.FindDevice(args[1]) + adapter.RemoveDevice(device) - mainloop.run() + agent.set_exit_on_release(False) + adapter.CreatePairedDevice(args[1], path, capability, + timeout=60000, + reply_handler=create_device_reply, + error_handler=create_device_error) + else: + adapter.RegisterAgent(path, capability) + print("Agent registered") - #adapter.UnregisterAgent(path) - #print("Agent unregistered") + mainloop.run() diff --git a/bluez/test-avrcp.py b/bluez/test-avrcp.py index 65b9cc1..277b75f 100755 --- a/bluez/test-avrcp.py +++ b/bluez/test-avrcp.py @@ -10,37 +10,37 @@ bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.bluez.Manager") option_list = [ - make_option("-i", "--device", action="store", - type="string", dest="dev_id"), - ] + make_option("-i", "--device", action="store", + type="string", dest="dev_id"), +] parser = OptionParser(option_list=option_list) (options, args) = parser.parse_args() if options.dev_id: - adapter_path = manager.FindAdapter(options.dev_id) + adapter_path = manager.FindAdapter(options.dev_id) else: - adapter_path = manager.DefaultAdapter() + adapter_path = manager.DefaultAdapter() adapter = dbus.Interface(bus.get_object("org.bluez", adapter_path), - "org.bluez.Adapter") + "org.bluez.Adapter") if len(args) < 1: - print("""Usage: %s [-i device] <bdaddr>""" % sys.argv[0]) - sys.exit(1) + print("""Usage: %s [-i device] <bdaddr>""" % sys.argv[0]) + sys.exit(1) device = adapter.FindDevice(args[0]) audio = dbus.Interface(bus.get_object("org.bluez", device), - "org.bluez.AudioSource") + "org.bluez.AudioSource") props = audio.GetProperties() if props["State"] != "connected": - audio.Connect() + audio.Connect() time.sleep(5) control = dbus.Interface(bus.get_object("org.bluez", device), - "org.bluez.Control") + "org.bluez.Control") control.VolumeDown() control.VolumeUp() diff --git a/bluez/ubt b/bluez/ubt index 9198eb9..1c6f2c5 100755 --- a/bluez/ubt +++ b/bluez/ubt @@ -1,22 +1,22 @@ #!/usr/bin/python # -#BlueZ - Bluetooth protocol stack for Linux +# BlueZ - Bluetooth protocol stack for Linux # -#Copyright (C) 2012 Collabora Ltd. +# Copyright (C) 2012 Collabora Ltd. # -#This program is free software; you can redistribute it and/or modify -#it under the terms of the GNU General Public License as published by -#the Free Software Foundation; either version 2 of the License, or -#(at your option) any later version. +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. # -#This program is distributed in the hope that it will be useful, -#but WITHOUT ANY WARRANTY; without even the implied warranty of -#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -#GNU General Public License for more details. +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. # -#You should have received a copy of the GNU General Public License -#along with this program; if not, write to the Free Software -#Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import gobject import os @@ -29,16 +29,16 @@ import urllib2 from optparse import OptionParser profiles = { - "00001103-0000-1000-8000-00805f9b34fb" : "DUN GW", - "00001105-0000-1000-8000-00805f9b34fb" : "OPP Server", - "0000110a-0000-1000-8000-00805f9b34fb" : "PBAP PSE", - "0000110c-0000-1000-8000-00805f9b34fb" : "AVRCP Target", - "0000110e-0000-1000-8000-00805f9b34fb" : "AVRCP Control", - "00001112-0000-1000-8000-00805f9b34fb" : "A2DP Source", - "00001116-0000-1000-8000-00805f9b34fb" : "HSP AG", - "0000112f-0000-1000-8000-00805f9b34fb" : "SAP Server", - "0000112f-0000-1000-8000-00805f9b34fb" : "PAN NAP", - "00001132-0000-1000-8000-00805f9b34fb" : "MAP MSE", + "00001103-0000-1000-8000-00805f9b34fb": "DUN GW", + "00001105-0000-1000-8000-00805f9b34fb": "OPP Server", + "0000110a-0000-1000-8000-00805f9b34fb": "PBAP PSE", + "0000110c-0000-1000-8000-00805f9b34fb": "AVRCP Target", + "0000110e-0000-1000-8000-00805f9b34fb": "AVRCP Control", + "00001112-0000-1000-8000-00805f9b34fb": "A2DP Source", + "00001116-0000-1000-8000-00805f9b34fb": "HSP AG", + "0000112f-0000-1000-8000-00805f9b34fb": "SAP Server", + "0000112f-0000-1000-8000-00805f9b34fb": "PAN NAP", + "00001132-0000-1000-8000-00805f9b34fb": "MAP MSE", } OBEX_DBUS = "org.bluez.obex.client" @@ -54,734 +54,750 @@ FN:Collabora TEL;CELL;PREF:145 END:VCARD """ + + def create_temp_vcard(): - f = open('/tmp/vcard.vcf', 'w') - f.write(VCARD_CONTENTS) - f.close() + f = open('/tmp/vcard.vcf', 'w') + f.write(VCARD_CONTENTS) + f.close() + def delete_temp_vcard(): - try: - os.remove('/tmp/vcard.vcf') - except: - return + try: + os.remove('/tmp/vcard.vcf') + except: + return BASE_PATH = "" - process_q = [] + + def process_next(): - if len(process_q) == 0: - mainloop.quit() - return + if len(process_q) == 0: + mainloop.quit() + return + + f = process_q[0] + process_q.remove(f) - f = process_q[0] - process_q.remove(f) + gobject.idle_add(f) - gobject.idle_add(f) def TEST_START(str): - print ("TEST STARTED: %s" % str) + print ("TEST STARTED: %s" % str) + + def TEST_FAILED(str): - print ("TEST FINISHED: %s : FAILED" % str) + print ("TEST FINISHED: %s : FAILED" % str) + + def TEST_PASSED(str): - print ("TEST FINISHED: %s : PASSED" % str) + print ("TEST FINISHED: %s : PASSED" % str) + def ask(prompt): - try: - return raw_input(prompt) - except: - return input(prompt) + try: + return raw_input(prompt) + except: + return input(prompt) + class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" + _dbus_error_name = "org.bluez.Error.Rejected" + class BlueZAgent(dbus.service.Object): - exit_on_release = False - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print(" | Authorizing %s, %s" % (device, uuid)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - print("| RequestPinCode (%s)" % (device)) - return ask("| Enter PIN Code: ") - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("| DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("| DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - print("| Confiming passkey %06d (%s)" % (passkey, device)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("| Confirming ModeChange (%s)" % (mode)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("| Cancel") + exit_on_release = False + + def set_exit_on_release(self, exit_on_release): + self.exit_on_release = exit_on_release + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Release(self): + if self.exit_on_release: + mainloop.quit() + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def Authorize(self, device, uuid): + print(" | Authorizing %s, %s" % (device, uuid)) + return + + @dbus.service.method("org.bluez.Agent", + in_signature="o", out_signature="s") + def RequestPinCode(self, device): + print("| RequestPinCode (%s)" % (device)) + return ask("| Enter PIN Code: ") + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def DisplayPasskey(self, device, passkey): + print("| DisplayPasskey (%s, %06d)" % (device, passkey)) + + @dbus.service.method("org.bluez.Agent", + in_signature="os", out_signature="") + def DisplayPinCode(self, device, pincode): + print("| DisplayPinCode (%s, %s)" % (device, pincode)) + + @dbus.service.method("org.bluez.Agent", + in_signature="ou", out_signature="") + def RequestConfirmation(self, device, passkey): + print("| Confiming passkey %06d (%s)" % (passkey, device)) + return + + @dbus.service.method("org.bluez.Agent", + in_signature="s", out_signature="") + def ConfirmModeChange(self, mode): + print("| Confirming ModeChange (%s)" % (mode)) + return + + @dbus.service.method("org.bluez.Agent", + in_signature="", out_signature="") + def Cancel(self): + print("| Cancel") + class PbapTransfer: - def __init__(self): - self.path = None - self.filename = None + def __init__(self): + self.path = None + self.filename = None + class PbapClient: - def __init__(self, session_path): - self.transfers = 0 - self.props = dict() - self.flush_func = None - bus = dbus.SessionBus() - obj = bus.get_object("org.bluez.obex.client", session_path) - self.session = dbus.Interface(obj, "org.bluez.obex.Session") - self.pbap = dbus.Interface(obj, - "org.bluez.obex.PhonebookAccess") - bus.add_signal_receiver(self.transfer_complete, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Complete", - path_keyword="path") - bus.add_signal_receiver(self.transfer_error, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Error", - path_keyword="path") - - def register(self, reply, transfer): - (path, properties) = reply - transfer.path = path - transfer.filename = properties["Filename"] - self.props[path] = transfer - - def error(self, err): - TEST_FAILED("PBAP PSE: %s" % err) - process_next() - - def transfer_complete(self, path): - req = self.props.get(path) - if req == None: - return - self.transfers -= 1 - del self.props[path] - - if (len(self.props) == 0) and (self.transfers == 0): - if self.flush_func != None: - f = self.flush_func - self.flush_func = None - f() - - def transfer_error(self, code, message, path): - req = self.props.get(path) - if req == None: - return - TEST_FAILED("PBAP PSE: %s" % message) - process_next() - - def pull(self, vcard, target): - req = PbapTransfer() - self.pbap.Pull(vcard, target, - reply_handler=lambda r: self.register(r, req), - error_handler=self.error) - self.transfers += 1 - - def pull_all(self, target): - req = PbapTransfer() - self.pbap.PullAll(target, - reply_handler=lambda r: self.register(r, req), - error_handler=self.error) - self.transfers += 1 - - def flush_transfers(self, func): - if (len(self.props) == 0) and (self.transfers == 0): - return - self.flush_func = func - - def interface(self): - return self.pbap + def __init__(self, session_path): + self.transfers = 0 + self.props = dict() + self.flush_func = None + bus = dbus.SessionBus() + obj = bus.get_object("org.bluez.obex.client", session_path) + self.session = dbus.Interface(obj, "org.bluez.obex.Session") + self.pbap = dbus.Interface(obj, + "org.bluez.obex.PhonebookAccess") + bus.add_signal_receiver(self.transfer_complete, + dbus_interface="org.bluez.obex.Transfer", + signal_name="Complete", + path_keyword="path") + bus.add_signal_receiver(self.transfer_error, + dbus_interface="org.bluez.obex.Transfer", + signal_name="Error", + path_keyword="path") + + def register(self, reply, transfer): + (path, properties) = reply + transfer.path = path + transfer.filename = properties["Filename"] + self.props[path] = transfer + + def error(self, err): + TEST_FAILED("PBAP PSE: %s" % err) + process_next() + + def transfer_complete(self, path): + req = self.props.get(path) + if req is None: + return + self.transfers -= 1 + del self.props[path] + + if (len(self.props) == 0) and (self.transfers == 0): + if self.flush_func is not None: + f = self.flush_func + self.flush_func = None + f() + + def transfer_error(self, code, message, path): + req = self.props.get(path) + if req is None: + return + TEST_FAILED("PBAP PSE: %s" % message) + process_next() + + def pull(self, vcard, target): + req = PbapTransfer() + self.pbap.Pull(vcard, target, + reply_handler=lambda r: self.register(r, req), + error_handler=self.error) + self.transfers += 1 + + def pull_all(self, target): + req = PbapTransfer() + self.pbap.PullAll(target, + reply_handler=lambda r: self.register(r, req), + error_handler=self.error) + self.transfers += 1 + + def flush_transfers(self, func): + if (len(self.props) == 0) and (self.transfers == 0): + return + self.flush_func = func + + def interface(self): + return self.pbap transfers = [] + class ObexdAgent(dbus.service.Object): - def __init__(self, conn=None, obj_path=None): - dbus.service.Object.__init__(self, conn, obj_path) - self.pending_auth = False + def __init__(self, conn=None, obj_path=None): + dbus.service.Object.__init__(self, conn, obj_path) + self.pending_auth = False - @dbus.service.method("org.bluez.obex.Agent", in_signature="osssii", - out_signature="s") - def Authorize(self, dpath, device, filename, ftype, length, time): - global transfers + @dbus.service.method("org.bluez.obex.Agent", in_signature="osssii", + out_signature="s") + def Authorize(self, dpath, device, filename, ftype, length, time): + global transfers - self.pending_auth = False - transfers.append(OppTransfer(dpath, filename, 0, length)) - return "/tmp/vcard-fa7e" + self.pending_auth = False + transfers.append(OppTransfer(dpath, filename, 0, length)) + return "/tmp/vcard-fa7e" - @dbus.service.method("org.bluez.obex.Agent", in_signature="", - out_signature="") - def Cancel(self): - self.pending_auth = False + @dbus.service.method("org.bluez.obex.Agent", in_signature="", + out_signature="") + def Cancel(self): + self.pending_auth = False class OppTransfer(object): - def __init__(self, dpath, filename=None, transfered=-1, size=-1): - self.dpath = dpath - self.filename = filename - self.transfered = transfered - self.size = size - - def update(self, filename=None, transfered=-1, total=-1): - if filename: - self.filename = filename - self.transfered = transfered - self.size = total - - def cancel(self): - transfer_iface = dbus.Interface(bus.get_object( - "org.bluez.obex", self.dpath), - "org.bluez.obex.Transfer") - transfer_iface.Cancel() - - def __str__(self): - p = float(self.transfered) / float(self.size) * 100 - return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p) - - __repr__ = __str__ + def __init__(self, dpath, filename=None, transfered=-1, size=-1): + self.dpath = dpath + self.filename = filename + self.transfered = transfered + self.size = size -class OppClient: - def __init__(self, session_path): - self.progress = 0 - self.transfer_path = None - bus = dbus.SessionBus() - obj = bus.get_object("org.bluez.obex.client", session_path) - self.session = dbus.Interface(obj, "org.bluez.obex.Session") - self.opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush") - bus.add_signal_receiver(self.transfer_complete, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Complete", - path_keyword="path") - bus.add_signal_receiver(self.transfer_error, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Error", - path_keyword="path") - - def create_transfer_reply(self, reply): - (path, properties) = reply - self.transfer_path = path - self.transfer_size = properties["Size"] - - def error(self, err): - print err - mainloop.quit() - - def transfer_complete(self, path): - if path != self.transfer_path: - return - TEST_PASSED("OPP Server"); - delete_temp_vcard() - process_next() - - def transfer_error(self, code, message, path): - if path != self.transfer_path: - return - TEST_FAILED("OPP Server"); - delete_temp_vcard() - process_next() - - def transfer_progress(self, prop, value, path): - return - - def pull_business_card(self, filename): - self.opp.PullBusinessCard(os.path.abspath(filename), - reply_handler=self.create_transfer_reply, - error_handler=self.error) - - def send_file(self, filename): - self.opp.SendFile(os.path.abspath(filename), - reply_handler=self.create_transfer_reply, - error_handler=self.error) + def update(self, filename=None, transfered=-1, total=-1): + if filename: + self.filename = filename + self.transfered = transfered + self.size = total -class Device(): - def __init__(self, path): - self.path = path - self.profiles = [] + def cancel(self): + transfer_iface = dbus.Interface(bus.get_object( + "org.bluez.obex", self.dpath), + "org.bluez.obex.Transfer") + transfer_iface.Cancel() - self.device = dbus.Interface(bus.get_object("org.bluez", - self.path), "org.bluez.Device") - self.props = self.device.GetProperties() - self.device.SetProperty("Trusted", True) + def __str__(self): + p = float(self.transfered) / float(self.size) * 100 + return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p) - bus.add_signal_receiver(self.device_property_changed, - dbus_interface="org.bluez.Device", - signal_name="PropertyChanged") + __repr__ = __str__ - if self.props["Paired"] == True: - self.get_properties() +class OppClient: + def __init__(self, session_path): + self.progress = 0 + self.transfer_path = None + bus = dbus.SessionBus() + obj = bus.get_object("org.bluez.obex.client", session_path) + self.session = dbus.Interface(obj, "org.bluez.obex.Session") + self.opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush") + bus.add_signal_receiver(self.transfer_complete, + dbus_interface="org.bluez.obex.Transfer", + signal_name="Complete", + path_keyword="path") + bus.add_signal_receiver(self.transfer_error, + dbus_interface="org.bluez.obex.Transfer", + signal_name="Error", + path_keyword="path") + + def create_transfer_reply(self, reply): + (path, properties) = reply + self.transfer_path = path + self.transfer_size = properties["Size"] + + def error(self, err): + print(err) + mainloop.quit() + + def transfer_complete(self, path): + if path != self.transfer_path: + return + TEST_PASSED("OPP Server") + delete_temp_vcard() + process_next() + + def transfer_error(self, code, message, path): + if path != self.transfer_path: + return + TEST_FAILED("OPP Server") + delete_temp_vcard() + process_next() + + def transfer_progress(self, prop, value, path): + return + + def pull_business_card(self, filename): + self.opp.PullBusinessCard(os.path.abspath(filename), + reply_handler=self.create_transfer_reply, + error_handler=self.error) + + def send_file(self, filename): + self.opp.SendFile(os.path.abspath(filename), + reply_handler=self.create_transfer_reply, + error_handler=self.error) - def test_map(self): - def set_folder(session, new_dir): - for node in new_dir.split("/"): - session.SetFolder(node) +class Device(): + def __init__(self, path): + self.path = path + self.profiles = [] - def message_check(messages): - if messages == {}: - return + self.device = dbus.Interface(bus.get_object("org.bluez", self.path), + "org.bluez.Device") + self.props = self.device.GetProperties() + self.device.SetProperty("Trusted", True) - for m in messages.values(): - assert('Subject' in m) - assert('Timestamp' in m) - assert('SenderAddress' in m) - assert('RecipientAddress' in m) - assert('Type' in m) - assert('Size' in m) - assert('Status' in m) + bus.add_signal_receiver(self.device_property_changed, + dbus_interface="org.bluez.Device", + signal_name="PropertyChanged") - TEST_START("MAP MSE") + if self.props["Paired"]: + self.get_properties() - try: - bus = dbus.SessionBus() + def test_map(self): + def set_folder(session, new_dir): + for node in new_dir.split("/"): + session.SetFolder(node) - client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), - OBEX_CLIENT) + def message_check(messages): + if messages == {}: + return - path = client.CreateSession(self.props["Address"], - {"Target": "map"}) + for m in messages.values(): + assert('Subject' in m) + assert('Timestamp' in m) + assert('SenderAddress' in m) + assert('RecipientAddress' in m) + assert('Type' in m) + assert('Size' in m) + assert('Status' in m) - session = dbus.Interface(bus.get_object(OBEX_DBUS, path), - OBEX_SESSION) + TEST_START("MAP MSE") - map = dbus.Interface(bus.get_object(OBEX_DBUS, path), - OBEX_MAP) + try: + bus = dbus.SessionBus() - set_folder(map, "telecom/msg") - folder =map.GetFolderListing(dict()) + client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), + OBEX_CLIENT) - for f in folder: - msg = map.GetMessageListing(f["Name"], dict()) - message_check(msg) + path = client.CreateSession(self.props["Address"], + {"Target": "map"}) - TEST_PASSED("MAP MSE") - except: - TEST_FAILED("MAP MSE") + session = dbus.Interface(bus.get_object(OBEX_DBUS, path), + OBEX_SESSION) - process_next() + map = dbus.Interface(bus.get_object(OBEX_DBUS, path), + OBEX_MAP) - def pbap_test_paths(self, pbap_client, paths): - if len(paths) == 0: - TEST_PASSED("PBAP PSE") - process_next() - return + set_folder(map, "telecom/msg") + folder = map.GetFolderListing(dict()) - try: - path = paths[0] + for f in folder: + msg = map.GetMessageListing(f["Name"], dict()) + message_check(msg) - pbap_client.interface().Select("int", path) + TEST_PASSED("MAP MSE") + except: + TEST_FAILED("MAP MSE") - ret = pbap_client.interface().GetSize() + process_next() - ret = pbap_client.interface().List() - for item in ret: - pbap_client.interface().SetFormat("vcard30") - pbap_client.interface().SetFilter(["VERSION", - "FN", "TEL"]); - pbap_client.pull(item[0], BASE_PATH + - "/pbap-vcard-" + path + item[1]) + def pbap_test_paths(self, pbap_client, paths): + if len(paths) == 0: + TEST_PASSED("PBAP PSE") + process_next() + return - pbap_client.interface().SetFormat("vcard30") - pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]); - pbap_client.pull_all(BASE_PATH + "/pbap-vcard-ALL-" + path) + try: + path = paths[0] - pbap_client.flush_transfers(lambda: - self.pbap_test_paths(pbap_client, - paths[1:])) - except: - TEST_FAILED("PBAP PSE path"); - process_next() + pbap_client.interface().Select("int", path) - def test_pbap(self): - TEST_START("PBAP PSE") + ret = pbap_client.interface().GetSize() - try: - bus = dbus.SessionBus() + ret = pbap_client.interface().List() + for item in ret: + pbap_client.interface().SetFormat("vcard30") + pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]) + pbap_client.pull(item[0], BASE_PATH + + "/pbap-vcard-" + path + item[1]) + + pbap_client.interface().SetFormat("vcard30") + pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]) + pbap_client.pull_all(BASE_PATH + "/pbap-vcard-ALL-" + path) + + pbap_client.flush_transfers( + lambda: self.pbap_test_paths(pbap_client, paths[1:])) + except: + TEST_FAILED("PBAP PSE path") + process_next() - client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), - OBEX_CLIENT) + def test_pbap(self): + TEST_START("PBAP PSE") - session_path = client.CreateSession(self.props["Address"], - {"Target": "PBAP"}) + try: + bus = dbus.SessionBus() - pbap_client = PbapClient(session_path) + client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), + OBEX_CLIENT) - self.pbap_test_paths(pbap_client, ["PB", "ICH", "OCH", - "MCH", "CCH"]) - except: - TEST_FAILED("PBAP PSE") - process_next() - + session_path = client.CreateSession(self.props["Address"], + {"Target": "PBAP"}) - def test_opp_server(self): - TEST_START("OPP Server") - try: - bus = dbus.SessionBus() + pbap_client = PbapClient(session_path) - client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), - OBEX_CLIENT) + self.pbap_test_paths(pbap_client, ["PB", "ICH", "OCH", + "MCH", "CCH"]) + except: + TEST_FAILED("PBAP PSE") + process_next() - path = client.CreateSession(self.props["Address"], - { "Target": "OPP" }) - opp_client = OppClient(path) + def test_opp_server(self): + TEST_START("OPP Server") + try: + bus = dbus.SessionBus() - create_temp_vcard() - opp_client.send_file("/tmp/vcard.vcf") - except: - delete_temp_vcard() - TEST_FAILED("OPP Server") - process_next() + client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), + OBEX_CLIENT) - def test_opp_client(self): - TEST_START("OPP client") - print("Start an OPP transfer in the phone...") + path = client.CreateSession(self.props["Address"], + {"Target": "OPP"}) + opp_client = OppClient(path) - def transfer_completed(dpath, success): - if success: - TEST_PASSED("OPP Client") - else: - TEST_FAILED("OPP Client") + create_temp_vcard() + opp_client.send_file("/tmp/vcard.vcf") + except: + delete_temp_vcard() + TEST_FAILED("OPP Server") + process_next() - process_next() + def test_opp_client(self): + TEST_START("OPP client") + print("Start an OPP transfer in the phone...") - bus = dbus.SessionBus() + def transfer_completed(dpath, success): + if success: + TEST_PASSED("OPP Client") + else: + TEST_FAILED("OPP Client") - manager = dbus.Interface(bus.get_object("org.bluez.obex", "/"), - "org.bluez.obex.Manager") - path_server = "/server/agent" - agent = ObexdAgent(bus, path_server) - manager.RegisterAgent(path_server) + process_next() - bus.add_signal_receiver(transfer_completed, - dbus_interface="org.bluez.obex.Manager", - signal_name="TransferCompleted") + bus = dbus.SessionBus() + manager = dbus.Interface(bus.get_object("org.bluez.obex", "/"), + "org.bluez.obex.Manager") + path_server = "/server/agent" + agent = ObexdAgent(bus, path_server) + manager.RegisterAgent(path_server) - def test_dun(self): - TEST_START("DUN GW") + bus.add_signal_receiver(transfer_completed, + dbus_interface="org.bluez.obex.Manager", + signal_name="TransferCompleted") - try: - bus = dbus.SystemBus() + def test_dun(self): + TEST_START("DUN GW") - manager = dbus.Interface(bus.get_object("net.connman", - "/"), "net.connman.Manager") - services = manager.GetServices() + try: + bus = dbus.SystemBus() - path = None - for s in services: - if s[1]["Name"] == self.props["Name"]: - path = s[0] - break - if not path: - raise + manager = dbus.Interface(bus.get_object("net.connman", "/"), + "net.connman.Manager") + services = manager.GetServices() - service = dbus.Interface(bus.get_object("net.connman", - path), "net.connman.Service") - service.Connect(timeout=60000) - url = urllib2.urlopen("http://connman.net") - f = open(BASE_PATH + "/DUN", 'w') - f.write(url.read(1000000)) - f.close() - service.Disconnect(timeout=60000) + path = None + for s in services: + if s[1]["Name"] == self.props["Name"]: + path = s[0] + break + if not path: + raise - TEST_PASSED("DUN GW") + service = dbus.Interface(bus.get_object("net.connman", path), + "net.connman.Service") + service.Connect(timeout=60000) + url = urllib2.urlopen("http://connman.net") + f = open(BASE_PATH + "/DUN", 'w') + f.write(url.read(1000000)) + f.close() + service.Disconnect(timeout=60000) - except: - TEST_FAILED("DUN GW") + TEST_PASSED("DUN GW") - process_next() + except: + TEST_FAILED("DUN GW") - def test_pan(self): - TEST_START("PAN NAP") + process_next() - try: - bus = dbus.SystemBus() + def test_pan(self): + TEST_START("PAN NAP") - manager = dbus.Interface(bus.get_object("net.connman", - "/"), "net.connman.Manager") + try: + bus = dbus.SystemBus() - services = manager.GetServices() + manager = dbus.Interface(bus.get_object("net.connman", "/"), + "net.connman.Manager") - path = None - for s in services: - if s[1]["Name"] == self.props["Name"]: - path = s[0] - break - if not path: - raise + services = manager.GetServices() - service = dbus.Interface(bus.get_object("net.connman", - path), "net.connman.Service") + path = None + for s in services: + if s[1]["Name"] == self.props["Name"]: + path = s[0] + break + if not path: + raise - service.Connect(timeout=60000) - url = urllib2.urlopen("http://connman.net") - f = open(BASE_PATH + "/PAN", 'w') - f.write(url.read(1000000)) - f.close() - service.Disconnect(timeout=60000) + service = dbus.Interface(bus.get_object("net.connman", path), + "net.connman.Service") - TEST_PASSED("PAN NAP") - except: - TEST_FAILED("PAN NAP") + service.Connect(timeout=60000) + url = urllib2.urlopen("http://connman.net") + f = open(BASE_PATH + "/PAN", 'w') + f.write(url.read(1000000)) + f.close() + service.Disconnect(timeout=60000) - process_next() + TEST_PASSED("PAN NAP") + except: + TEST_FAILED("PAN NAP") - def test_a2dp_source(self): - TEST_START("A2DP Source") + process_next() - try: - bus = dbus.SystemBus() - audio = dbus.Interface(bus.get_object("org.bluez", - self.path), "org.bluez.AudioSource") + def test_a2dp_source(self): + TEST_START("A2DP Source") - props = audio.GetProperties() + try: + bus = dbus.SystemBus() + audio = dbus.Interface(bus.get_object("org.bluez", self.path), + "org.bluez.AudioSource") - if props["State"] != "connected" and \ - props["State"] != "playing": - audio.Connect() + props = audio.GetProperties() - ret = ask("| Play music from the device! Did you hear" \ - " it (y/n)\n") + if props["State"] != "connected" and \ + props["State"] != "playing": + audio.Connect() - audio.Disconnect() + ret = ask("| Play music from the device! Did you hear" + " it (y/n)\n") - if ret == 'y' or ret == 'Y': - TEST_PASSED("A2DP Source") - else: - raise + audio.Disconnect() - except: - TEST_FAILED("A2DP Source") + if ret == 'y' or ret == 'Y': + TEST_PASSED("A2DP Source") + else: + raise - process_next() - - def device_test_profiles(self): - if "MAP MSE" in self.profiles: - process_q.append(self.test_map) + except: + TEST_FAILED("A2DP Source") - if "PBAP PSE" in self.profiles: - process_q.append(self.test_pbap) + process_next() - process_q.append(self.test_opp_client) + def device_test_profiles(self): + if "MAP MSE" in self.profiles: + process_q.append(self.test_map) - if "OPP Server" in self.profiles: - process_q.append(self.test_opp_server) + if "PBAP PSE" in self.profiles: + process_q.append(self.test_pbap) - if "DUN GW" in self.profiles: - process_q.append(self.test_dun) + process_q.append(self.test_opp_client) - if "PAN NAP" in self.profiles: - process_q.append(self.test_pan) + if "OPP Server" in self.profiles: + process_q.append(self.test_opp_server) - if "A2DP Source" in self.profiles: - process_q.append(self.test_a2dp_source) + if "DUN GW" in self.profiles: + process_q.append(self.test_dun) - process_next() + if "PAN NAP" in self.profiles: + process_q.append(self.test_pan) - def device_parse_uuids(self, uuids): - print ("| Profiles supported:") - for uuid in uuids: - if uuid in profiles.keys(): - print ("\t * %s" % profiles[uuid]) - self.profiles.append(profiles[uuid]) - self.device_test_profiles() - - def get_properties(self): - props = self.device.GetProperties() - self.device_parse_uuids(props["UUIDs"]) + if "A2DP Source" in self.profiles: + process_q.append(self.test_a2dp_source) + + process_next() + + def device_parse_uuids(self, uuids): + print ("| Profiles supported:") + for uuid in uuids: + if uuid in profiles.keys(): + print ("\t * %s" % profiles[uuid]) + self.profiles.append(profiles[uuid]) + self.device_test_profiles() + + def get_properties(self): + props = self.device.GetProperties() + self.device_parse_uuids(props["UUIDs"]) + + def device_property_changed(self, name, value): + if name == "Paired" and value: + self.device.SetProperty("Trusted", True) + TEST_PASSED("Pairable Responder") + gobject.timeout_add(7000, self.get_properties) - def device_property_changed(self, name, value): - if name == "Paired" and value == True: - self.device.SetProperty("Trusted", True) - TEST_PASSED("Pairable Responder") - gobject.timeout_add(7000, self.get_properties) class Adapter(): - def __init__(self, adapter, device): - self.adapter = adapter - self.device = device - self.found = {} - self.already_running = False + def __init__(self, adapter, device): + self.adapter = adapter + self.device = device + self.found = {} + self.already_running = False + + def create_device_reply(self, device): + TEST_PASSED("Pairing Initiatior") + self.adapter.RemoveDevice(device) + process_next() - def create_device_reply(self, device): - TEST_PASSED("Pairing Initiatior") - self.adapter.RemoveDevice(device) - process_next() + def create_device_error(self, error): + TEST_FAILED("Pairing Initiatior: Create device failed %s" + % (error)) + process_next() - def create_device_error(self, error): - TEST_FAILED("Pairing Initiatior: Create device failed %s" - % (error)) - process_next() + def device_created(self, path): + if self.already_running: + print("Ignore %s" % path) + return + self.already_running = True + print ("| Device created %s" % path) - def device_created(self, path): - if self.already_running == True: - print("Ignore %s" % path) - return - self.already_running = True - print ("| Device created %s" % path) + Device(path) - Device(path) + def create_paired_device(self, device): + try: + path = self.adapter.FindDevice(device) + self.adapter.RemoveDevice(path) + except: + pass - def create_paired_device(self, device): - try: - path = self.adapter.FindDevice(device) - self.adapter.RemoveDevice(path) - except: - pass + path = self.adapter.CreatePairedDevice( + device, + "/test/agent2", "DisplayYesNo", + timeout=60000, + reply_handler=self.create_device_reply, + error_handler=self.create_device_error) - path = self.adapter.CreatePairedDevice(device, - "/test/agent2", "DisplayYesNo", - timeout=60000, - reply_handler=self.create_device_reply, - error_handler=self.create_device_error) + def show_menu(self): + self.adapter.StopDiscovery() - def show_menu(self): - self.adapter.StopDiscovery() + d = dialog.Dialog() + device = d.menu("Select one device to pair with:", + choices=self.found.items()) + d.clear() - d = dialog.Dialog() - device = d.menu("Select one device to pair with:", - choices=self.found.items()) - d.clear() + if device[1] == '': + ret = ask("| Retry discovery (y/n)") + if ret == 'y' or ret == 'Y': + self.start_paring_initiator() + return + TEST_FAILED("Pairing Initiatior: No device selected") + exit(0) - if device[1] == '': - ret = ask("| Retry discovery (y/n)" ) - if ret == 'y' or ret == 'Y': - self.start_paring_initiator() - return - TEST_FAILED("Pairing Initiatior: No device selected") - exit(0) + self.create_paired_device(self.found[device[1]]) - self.create_paired_device(self.found[device[1]]) + def device_found(self, address, values): + if values["Name"] in self.found: + return - def device_found(self, address, values): - if values["Name"] in self.found: - return + self.found[values["Name"]] = values["Address"] - self.found[values["Name"]] = values["Address"] + def get_device(self): + path = self.adapter.FindDevice(self.device) - def get_device(self): - path = self.adapter.FindDevice(self.device) + Device(path) - Device(path) + def start_paring_initiator(self): + TEST_START("Pairing Initiatior") - def start_paring_initiator(self): - TEST_START("Pairing Initiatior") + if self.device: + self.create_paired_device(self.device) + return - if self.device: - self.create_paired_device(self.device) - return + self.adapter.StartDiscovery() + self.adapter.SetProperty("Powered", True) + self.adapter.SetProperty("Name", "BlueZ") + self.adapter.SetProperty("Pairable", False) - self.adapter.StartDiscovery() - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Pairable", False) + bus.add_signal_receiver(self.device_found, + dbus_interface="org.bluez.Adapter", + signal_name="DeviceFound") - bus.add_signal_receiver(self.device_found, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceFound") + gobject.timeout_add(12000, self.show_menu) - gobject.timeout_add(12000, self.show_menu) + def start_paring_responder(self): + TEST_START("Pairing Responder") - def start_paring_responder(self): - TEST_START("Pairing Responder") + props = self.adapter.GetProperties() - props = self.adapter.GetProperties() + print("| In the phone start pairing with %s (%s)." % + (props["Name"], props["Address"])) - print("| In the phone start pairing with %s (%s)." % - (props["Name"], props["Address"])) + self.adapter.SetProperty("Powered", True) + self.adapter.SetProperty("Name", "BlueZ") + self.adapter.SetProperty("Discoverable", True) + self.adapter.SetProperty("Pairable", True) - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Discoverable", True) - self.adapter.SetProperty("Pairable", True) + bus.add_signal_receiver(self.device_created, + dbus_interface="org.bluez.Adapter", + signal_name="DeviceCreated") - bus.add_signal_receiver(self.device_created, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceCreated") def parse_options(): - parser.add_option("-i", "--interface", dest="interface", - help="HCI device", metavar="HCI") - parser.add_option("-s", "--skip", action="store_true", dest="skip_pair", - help="Skip Pairing") - parser.add_option("-d", "--device", dest="device", - help="Device to connect", metavar="DEVICE") - return parser.parse_args() + parser.add_option("-i", "--interface", dest="interface", + help="HCI device", metavar="HCI") + parser.add_option("-s", "--skip", action="store_true", dest="skip_pair", + help="Skip Pairing") + parser.add_option("-d", "--device", dest="device", + help="Device to connect", metavar="DEVICE") + return parser.parse_args() + def create_base_path(): - global BASE_PATH + global BASE_PATH - lt = time.localtime() - BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday, - lt.tm_hour, lt.tm_min, lt.tm_sec) - os.mkdir(BASE_PATH) + lt = time.localtime() + BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday, + lt.tm_hour, lt.tm_min, + lt.tm_sec) + os.mkdir(BASE_PATH) -if __name__ == "__main__": - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - parser = OptionParser() +if __name__ == "__main__": + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - (options, args) = parse_options() + parser = OptionParser() - mainloop = gobject.MainLoop() + (options, args) = parse_options() - bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + mainloop = gobject.MainLoop() - if options.interface: - path = manager.FindAdapter(options.interface) - else: - path = manager.DefaultAdapter() + bus = dbus.SystemBus() + manager = dbus.Interface(bus.get_object("org.bluez", "/"), + "org.bluez.Manager") - adapter_iface = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Adapter") + if options.interface: + path = manager.FindAdapter(options.interface) + else: + path = manager.DefaultAdapter() - if options.skip_pair and not options.device: - print ("Device not specified") - exit(0) + adapter_iface = dbus.Interface(bus.get_object("org.bluez", path), + "org.bluez.Adapter") - agent = BlueZAgent(bus, "/test/agent") - agent = BlueZAgent(bus, "/test/agent2") - adapter = Adapter(adapter_iface, options.device) - adapter_iface.RegisterAgent("/test/agent", "DisplayYesNo") + if options.skip_pair and not options.device: + print ("Device not specified") + exit(0) - if options.skip_pair: - process_q.append(adapter.get_device) - else: - process_q.append(adapter.start_paring_initiator) - process_q.append(adapter.start_paring_responder) + agent = BlueZAgent(bus, "/test/agent") + agent = BlueZAgent(bus, "/test/agent2") + adapter = Adapter(adapter_iface, options.device) + adapter_iface.RegisterAgent("/test/agent", "DisplayYesNo") - create_base_path() - gobject.timeout_add(2000, process_next) - mainloop.run() - adapter_iface.UnregisterAgent("/test/agent") + if options.skip_pair: + process_q.append(adapter.get_device) + else: + process_q.append(adapter.start_paring_initiator) + process_q.append(adapter.start_paring_responder) + create_base_path() + gobject.timeout_add(2000, process_next) + mainloop.run() + adapter_iface.UnregisterAgent("/test/agent") -- GitLab