diff --git a/apertis_tests_lib/bluez.py b/apertis_tests_lib/bluez.py new file mode 100644 index 0000000000000000000000000000000000000000..1afde347dfc748c2fe1de9cd48eb9b2687f7501b --- /dev/null +++ b/apertis_tests_lib/bluez.py @@ -0,0 +1,713 @@ +# -*- coding: utf-8 -*- +# +# BlueZ - Bluetooth protocol stack for Linux +# +# Copyright © 2012, 2015 Collabora Ltd. +# +# SPDX-License-Identifier: GPL-2.0+ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +from gi.repository import GObject + +from abc import ABCMeta +from abc import abstractmethod +import dbus +import dbus.service +import re +import subprocess +import time + + +bluetooth_profiles = { + "0000111f-0000-1000-8000-00805f9b34fb": "HFP AG", + "00001103-0000-1000-8000-00805f9b34fb": "DUN GW", + "00001105-0000-1000-8000-00805f9b34fb": "OPP Server", + "0000110a-0000-1000-8000-00805f9b34fb": "PBAP PSE", + "0000110c-0000-1000-8000-00805f9b34fb": "AVRCP Target", + "0000110e-0000-1000-8000-00805f9b34fb": "AVRCP Control", + "00001112-0000-1000-8000-00805f9b34fb": "A2DP Source", + "00001116-0000-1000-8000-00805f9b34fb": "HSP AG", + "0000112f-0000-1000-8000-00805f9b34fb": "SAP Server", + "0000112f-0000-1000-8000-00805f9b34fb": "PAN NAP", + "00001132-0000-1000-8000-00805f9b34fb": "MAP MSE", +} + + +def bluetooth_profile_uuid_to_name(uuid): + return bluetooth_profiles[uuid] + + +def bluetooth_profile_is_supported(uuid): + return uuid in bluetooth_profiles + + +def TEST_START(str): + print("TEST STARTED: %s" % str) + + +def TEST_FAILED(str): + print("TEST FINISHED: %s : FAILED" % str) + + +def TEST_PASSED(str): + print("TEST FINISHED: %s : PASSED" % str) + + +def get_hci(object_path): + return re.search('hci[0-9]', object_path).group(0) + + +def build_device_path(hci, address): + # Aiming for something like /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX + if hci == '' or '/' in hci: + raise Exception('Invalid hci ‘%s’' % hci) + if address == '' or '/' in address: + raise Exception('Invalid address ‘%s’' % address) + + hci = str(hci) + address = str(address) + + return '/org/bluez/' + hci + '/dev_' + address.upper().replace(':', '_') + + +def scan_for_device(bus, adapter, device_address, result_func): + """Scan on adapter until a device with the given device_address appears. + Return the device object in the callback.""" + manager = dbus.Interface(bus.get_object("org.bluez", "/"), + "org.freedesktop.DBus.ObjectManager") + device_obj = None + match = None + + print('Scanning for device ‘%s’ on adapter ‘%s’…' % + (device_address, adapter.object_path)) + + # Sanity check. + adapter_address = adapter.Get( + 'org.bluez.Adapter1', + 'Address', + dbus_interface='org.freedesktop.DBus.Properties') + assert(adapter_address != device_address) + + objs = manager.GetManagedObjects() + for path, interfaces in objs.items(): + print('Saw: ' + path) + + if 'org.bluez.Device1' in interfaces and \ + interfaces['org.bluez.Device1']['Address'] == device_address and \ + interfaces['org.bluez.Device1']['Adapter'] == adapter.object_path: + print('Found device ‘%s’' % path) + result_func(bus.get_object('org.bluez', path)) + return + + def interfaces_added(path, interfaces): + print('Interfaces added: ' + path) + + if 'org.bluez.Device1' not in interfaces or \ + interfaces['org.bluez.Device1']['Address'] != device_address or \ + interfaces['org.bluez.Device1']['Adapter'] != adapter.object_path: + return + + print('Found device ‘%s’' % path) + device_obj = bus.get_object('org.bluez', path) + + adapter.StopDiscovery(dbus_interface='org.bluez.Adapter1') + match.remove() + + result_func(device_obj) + + match = bus.add_signal_receiver( + interfaces_added, + dbus_interface='org.freedesktop.DBus.ObjectManager', + signal_name='InterfacesAdded') + + # Start discovery; ignore errors if we’re already discovering. + try: + adapter.StartDiscovery(dbus_interface='org.bluez.Adapter1') + except error: + if error.get_dbus_name() == 'org.bluez.Error.InProgress': + pass + raise error + + +# 0 is the initiator side, 1 is the responder side +# Note that if the adapters are already paired, this is a no-op and will return +# success. +def bluetooth_pair_adapters(bus, adapter0, adapter1, iocap, + agent_path, result_func): + device0 = None + device1 = None + + assert(adapter0.object_path != adapter1.object_path) + + print('Pairing adapters ‘%s’ and ‘%s’…' % + (adapter0.object_path, adapter1.object_path)) + + agent_manager_iface = dbus.Interface(bus.get_object('org.bluez', + '/org/bluez'), + 'org.bluez.AgentManager1') + agent_manager_iface.RegisterAgent(agent_path, iocap) + agent_manager_iface.RequestDefaultAgent(agent_path) + + props0 = adapter0.GetAll('org.bluez.Adapter1', + dbus_interface='org.freedesktop.DBus.Properties') + props1 = adapter1.GetAll('org.bluez.Adapter1', + dbus_interface='org.freedesktop.DBus.Properties') + + hci0 = get_hci(adapter0.object_path) + hci1 = get_hci(adapter1.object_path) + + def pair_finish(device0, device1): + if device0 is not None: + print('Paired adapters ‘%s’ and ‘%s’' % + (adapter0.object_path, adapter1.object_path)) + agent_manager_iface.UnregisterAgent(agent_path) + result_func(device0, device1) + + def pair_reply(): + assert(device0 is not None and device1 is not None) + pair_finish(device0, device1) + + def error_reply(error): + if error.get_dbus_name() == 'org.bluez.Error.AlreadyExists': + # No-op. + pair_finish(device0, device1) + return + + print("Creating device ‘%s’ failed: %s" % (device0.object_path, error)) + pair_finish(None, None) + + def scan_finish(device0, device1): + print('Finished scanning; starting to pair…') + device0.Pair(reply_handler=pair_reply, error_handler=error_reply, + timeout=60000, dbus_interface='org.bluez.Device1') + + def scan1_result(device): + nonlocal device0, device1 + + assert(device is not None) + assert(device.object_path == + build_device_path(get_hci(adapter0.object_path), + props1['Address'])) + device0 = device + if device1 is not None: + scan_finish(device0, device1) + + def scan2_result(device): + nonlocal device0, device1 + + assert(device is not None) + assert(device.object_path == + build_device_path(get_hci(adapter1.object_path), + props0['Address'])) + device1 = device + if device0 is not None: + scan_finish(device0, device1) + + scan_for_device(bus, adapter0, props1['Address'], result_func=scan1_result) + scan_for_device(bus, adapter1, props0['Address'], result_func=scan2_result) + + +def object_set_properties(obj, properties_interface, properties, result_func): + """Set one or more properties and wait for a change notification.""" + assert(len(properties) > 0) + + awaiting_notification = set() + + def properties_changed(interface_name, changed_properties, + invalidated_properties): + nonlocal awaiting_notification + + if interface_name != properties_interface: + return + + awaiting_notification = \ + awaiting_notification - changed_properties.keys() - \ + set(invalidated_properties) + + if not awaiting_notification: + match.remove() + result_func(obj) + + iface = dbus.Interface(obj, 'org.freedesktop.DBus.Properties') + match = iface.connect_to_signal('PropertiesChanged', properties_changed) + vals = iface.GetAll(properties_interface) + + # Set the properties if they weren’t already set. This is not entirely + # race-proof. + for p, v in properties.items(): + if vals[p] != v: + awaiting_notification.add(p) + iface.Set(properties_interface, p, v, signature='ssv') + + if len(awaiting_notification) == 0: + match.remove() + GObject.idle_add(lambda: result_func(obj)) + + +def object_set_properties_blocking(obj, properties_interface, properties, + main_loop): + object_set_properties(obj, properties_interface, properties, + lambda o: main_loop.quit()) + main_loop.run() + + +def object_wait_for_properties(obj, properties_interface, properties, + result_func): + """Wait for properties to be updated if needed""" + assert(len(properties) > 0) + + awaiting_notification = set() + + def properties_changed(interface_name, changed_properties, + invalidated_properties): + nonlocal awaiting_notification + + if interface_name != properties_interface: + return + + awaiting_notification = \ + awaiting_notification - changed_properties.keys() - \ + set(invalidated_properties) + + if len(awaiting_notification) == 0: + match.remove() + result_func(obj) + + iface = dbus.Interface(obj, 'org.freedesktop.DBus.Properties') + match = iface.connect_to_signal('PropertiesChanged', properties_changed) + vals = iface.GetAll(properties_interface) + + for p, v in properties.items(): + if vals[p] != v: + awaiting_notification.add(p) + + if len(awaiting_notification) == 0: + match.remove() + GObject.idle_add(lambda: result_func(obj)) + + +def object_wait_for_properties_blocking(obj, properties_interface, properties, + main_loop): + object_wait_for_properties(obj, properties_interface, properties, + lambda o: main_loop.quit()) + main_loop.run() + + +def adapters_ensure_powered(adapters, result_func): + assert(len(adapters) > 0) + remaining = set(adapters) + + def inner_result_func(obj): + nonlocal remaining + + print('Adapter ‘%s’ is powered' % obj.object_path) + remaining.remove(obj) + if len(remaining) == 0: + result_func() + + for adapter in adapters: + print('Making adapter ‘%s’ powered…' % adapter.object_path) + object_set_properties(adapter, 'org.bluez.Adapter1', { + 'Powered': True + }, inner_result_func) + + +def adapters_make_pairable(adapters, result_func): + assert(len(adapters) > 0) + remaining = set(adapters) + + def inner_result_func(obj): + nonlocal remaining + + print('Adapter ‘%s’ is pairable' % obj.object_path) + remaining.remove(obj) + if len(remaining) == 0: + result_func() + + for adapter in adapters: + print('Making adapter ‘%s’ pairable…' % adapter.object_path) + object_set_properties(adapter, 'org.bluez.Adapter1', { + 'Powered': True, + 'Pairable': True, + 'Discoverable': True, + }, inner_result_func) + + +def adapters_make_discoverable(adapters, result_func): + assert(len(adapters) > 0) + remaining = set(adapters) + + def inner_result_func(obj): + nonlocal remaining + + print('Adapter ‘%s’ is discoverable' % obj.object_path) + remaining.remove(obj) + if len(remaining) == 0: + result_func() + + for adapter in adapters: + print('Making adapter ‘%s’ discoverable…' % adapter.object_path) + object_set_properties(adapter, 'org.bluez.Adapter1', { + 'Powered': True, + 'Discoverable': True, + }, inner_result_func) + + +class Rejected(dbus.DBusException): + _dbus_error_name = 'org.bluez.Error.Rejected' + + +class ConstantAgent(dbus.service.Object): + """Agent which returns 0000 for all PINs and passkeys.""" + exit_on_release = False + + def __init__(self, bus, object_path, main_loop): + self.__main_loop = main_loop + super().__init__(bus, object_path) + + def set_exit_on_release(self, exit_on_release): + self.exit_on_release = exit_on_release + + @dbus.service.method("org.bluez.Agent1", + in_signature="", out_signature="") + def Release(self): + if self.exit_on_release: + self.__main_loop.quit() + + @dbus.service.method("org.bluez.Agent1", + in_signature="os", out_signature="") + def AuthorizeService(self, device, uuid): + print("Authorize (%s, %s)" % (device, uuid)) + + @dbus.service.method("org.bluez.Agent1", + in_signature="o", out_signature="s") + def RequestPinCode(self, device): + return '0000' + + @dbus.service.method("org.bluez.Agent1", + in_signature="os", out_signature="") + def DisplayPinCode(self, device, pincode): + print("DisplayPinCode (%s, %s)" % (device, pincode)) + + @dbus.service.method("org.bluez.Agent1", + in_signature="o", out_signature="u") + def RequestPasskey(self, device): + return dbus.UInt32(0) + + @dbus.service.method("org.bluez.Agent1", + in_signature="ouq", out_signature="") + def DisplayPasskey(self, device, passkey, entered): + print("DisplayPasskey (%s, %06d, %d)" % (device, passkey, entered)) + + @dbus.service.method("org.bluez.Agent1", + in_signature="o", out_signature="") + def RequestAuthorization(self, device): + print('RequestAuthorization (%s)' % device) + + @dbus.service.method("org.bluez.Agent1", + in_signature="ou", out_signature="") + def RequestConfirmation(self, device, passkey): + print('RequestConfirmation (%s, %06d)' % (device, passkey)) + + @dbus.service.method("org.bluez.Agent1", + in_signature="", out_signature="") + def Cancel(self): + print("Cancel") + + +class AskAgent(dbus.service.Object): + """Agent which asks about PINs and passkeys.""" + exit_on_release = True + + def __init__(self, bus, object_path, main_loop, + authorize_everything=False): + self.__main_loop = main_loop + self.__authorize_everything = authorize_everything + super().__init__(bus, object_path) + + def set_exit_on_release(self, exit_on_release): + self.exit_on_release = exit_on_release + + @dbus.service.method("org.bluez.Agent1", + in_signature="", out_signature="") + def Release(self): + print("Release") + if self.exit_on_release: + self.__main_loop.quit() + + @dbus.service.method("org.bluez.Agent1", + in_signature="os", out_signature="") + def AuthorizeService(self, device, uuid): + if self.__authorize_everything: + print("Authorizing %s, %s" % (device, uuid)) + else: + print("Authorize (%s, %s)" % (device, uuid)) + authorize = input("Authorize connection (yes/no): ") + if (authorize == "yes"): + return + raise Rejected("Connection rejected by user") + + @dbus.service.method("org.bluez.Agent1", + in_signature="o", out_signature="s") + def RequestPinCode(self, device): + print("RequestPinCode (%s)" % (device)) + return input("Enter PIN Code: ") + + @dbus.service.method("org.bluez.Agent1", + in_signature="os", out_signature="") + def DisplayPinCode(self, device, pincode): + print("DisplayPinCode (%s, %s)" % (device, pincode)) + + @dbus.service.method("org.bluez.Agent1", + in_signature="o", out_signature="u") + def RequestPasskey(self, device): + print("RequestPasskey (%s)" % (device)) + passkey = input("Enter passkey: ") + return dbus.UInt32(passkey) + + @dbus.service.method("org.bluez.Agent1", + in_signature="ouq", out_signature="") + def DisplayPasskey(self, device, passkey, entered): + print("DisplayPasskey (%s, %06d, %d)" % (device, passkey, entered)) + + @dbus.service.method("org.bluez.Agent1", + in_signature="o", out_signature="") + def RequestAuthorization(self, device): + if self.__authorize_everything: + print("Confiming authorization (%s)" % (device)) + else: + print("RequestAuthorization (%s)" % (device, passkey)) + confirm = input("Confirm authorization (yes/no): ") + if (confirm == "yes"): + return + raise Rejected("Authorization rejected by user") + + @dbus.service.method("org.bluez.Agent1", + in_signature="ou", out_signature="") + def RequestConfirmation(self, device, passkey): + if self.__authorize_everything: + print("Confiming passkey %06d (%s)" % (passkey, device)) + else: + print("RequestConfirmation (%s, %06d)" % (device, passkey)) + confirm = input("Confirm passkey (yes/no): ") + if (confirm == "yes"): + return + raise Rejected("Passkey doesn't match") + + @dbus.service.method("org.bluez.Agent1", + in_signature="", out_signature="") + def Cancel(self): + print("Cancel") + + +class AdapterDeviceTester(): + """Adapter tester class which tests pairing with a device (a phone) in both + directions — as the initiator and then as the responder. After pairing, + it runs the tests in the device_test_class on the paired device.""" + def __init__(self, bus, adapter, device, device_test_class): + self.__bus = bus + self.__adapter_obj = adapter + self.__device_obj = device + self.__device_test_class = device_test_class + + def __pair_reply(self): + self.__adapter_obj.RemoveDevice( + self.__device_obj.object_path, + dbus_interface='org.bluez.Adapter1') + self.__pairing_initiator_result_func(None) + + def __pair_error(self, error): + if error.get_dbus_name() == 'org.bluez.Error.AlreadyExists': + self.__pairing_initiator_result_func(None) + else: + self.__pairing_initiator_result_func( + "Pairing Initiator: Create device failed %s" % (error)) + + def _create_paired_device(self, device_obj): + device_obj.Pair(dbus_interface='org.bluez.Device1', + reply_handler=self.__pair_reply, + error_handler=self.__pair_error) + + def __show_menu(self): + self.__adapter_obj.StopDiscovery(dbus_interface='org.bluez.Adapter1') + + manager = dbus.Interface(self.__bus.get_object('org.bluez', '/'), + 'org.freedesktop.DBus.ObjectManager') + objects = manager.GetManagedObjects() + + print('Devices:') + for p, i in objects.items(): + if 'org.bluez.Device1' in i and \ + i['org.bluez.Device1']['Adapter'] == \ + self.__adapter_obj.object_path: + address = str(i['org.bluez.Device1']['Address']) + name = str(i['org.bluez.Device1'].get(['Name'], address)) + + print('%s: %s, %s' % (p, address, name)) + device_path = input("Select one device to pair with: ") + + if device_path == '': + ret = input("Retry discovery (y/n) ") + if ret == 'y' or ret == 'Y': + self.start_pairing_initiator( + self.__pairing_initiator_result_func) + return + self.__pairing_initiator_result_func( + "Pairing Initiator: No device selected") + return + + if device_path not in objects: + self.__pairing_initiator_result_func( + "Pairing Initiator: Invalid device selected") + return + + self.__device_obj = self.__bus.get_object('org.bluez', device_path) + self._create_paired_device(self.__device_obj) + + def get_device(self): + self.__device_test_class(self.__device_obj) + + def start_pairing_initiator(self, result_func): + self.__pairing_initiator_result_func = result_func + + if self.__device_obj: + self._create_paired_device(self.__device_obj) + return + + # Unpair from everything first so we start with a clean slate. + manager = dbus.Interface(self.__bus.get_object('org.bluez', '/'), + 'org.freedesktop.DBus.ObjectManager') + objects = manager.GetManagedObjects() + + for p, i in objects.items(): + if 'org.bluez.Device1' in i and \ + i['org.bluez.Device1']['Adapter'] == \ + self.__adapter_obj.object_path and \ + i['org.bluez.Device1']['Paired']: + self.__adapter_obj.RemoveDevice( + p, + dbus_interface='org.bluez.Adapter1') + + def properties_cb(obj): + self.__adapter_obj.StartDiscovery( + dbus_interface='org.bluez.Adapter1') + GObject.timeout_add(12000, self.__show_menu) + + object_set_properties(self.__adapter_obj, 'org.bluez.Adapter1', { + 'Powered': True, + 'Alias': 'Bluez', + 'Pairable': False, + }, properties_cb) + + def __properties_changed_responder(self, interface_name, + changed_properties, + invalidated_properties, + path): + if interface_name != 'org.bluez.Device1': + return + if 'Paired' not in changed_properties.keys() and \ + 'Paired' not in invalidated_properties: + return + + self.__responder_match.remove() + self.__responder_match = None + + device_obj = self.__bus.get_object('org.bluez', path) + self.__device_test_class(device_obj) + self.__pairing_responder_result_func(None) + + def start_pairing_responder(self, result_func): + self.__pairing_responder_result_func = result_func + + adapter_props = dbus.Interface(self.__adapter_obj, + 'org.freedesktop.DBus.Properties') + props = adapter_props.GetAll('org.bluez.Adapter1') + + print("In the phone start pairing with %s (%s)." % + (props['Alias'], props["Address"])) + + # Wait for any Bluetooth device to become paired. + self.__responder_match = self.__bus.add_signal_receiver( + self.__properties_changed_responder, + dbus_interface='org.freedesktop.DBus.Properties', + signal_name='PropertiesChanged', + path_keyword='path') + + def properties_cb(obj): + pass + + object_set_properties(self.__adapter_obj, 'org.bluez.Adapter1', { + 'Powered': True, + 'Alias': 'BlueZ', + 'Discoverable': True, + 'Pairable': True, + }, properties_cb) + + +class DeviceProfileTester(metaclass=ABCMeta): + """Device tester class which runs a test function for each profile + advertised by a device. The device can either be specified or discovered + and paired with by the class, interactively.""" + def __init__(self, device): + self._device_obj = dbus.Interface(device, "org.bluez.Device1") + + self.__match = self._device_obj.connect_to_signal( + 'PropertiesChanged', + self.__device_properties_changed) + + paired = self._device_obj.Get( + 'org.bluez.Device1', + 'Paired', + dbus_interface='org.freedesktop.DBus.Properties') + if paired: + self.__handle_is_paired() + + def __handle_is_paired(self): + self._device_obj.Set('org.bluez.Device1', "Trusted", True, + dbus_interface='org.freedesktop.DBus.Properties') + self.__match.remove() + self.__match = None + + uuids = self._device_obj.Get( + 'org.bluez.Device1', + 'UUIDs', + dbus_interface='org.freedesktop.DBus.Properties') + self.__parse_uuids(uuids) + + def __parse_uuids(self, uuids): + profiles = [] + + print("Profiles supported:") + for uuid in uuids: + try: + name = bluetooth_profile_uuid_to_name(uuid) + print(" • %s" % name) + profiles.append(name) + except KeyError: + pass + + self.device_test_profiles(profiles) + + def __device_properties_changed(self, iface_name, changed_properties, + invalidated_properties): + if iface_name != 'org.bluez.Device1': + return + + if 'Paired' in changed_properties and changed_properties['Paired']: + self.__handle_is_paired() + + @abstractmethod + def device_test_profiles(self, profiles): + pass diff --git a/bluez/bluez-hfp b/bluez/bluez-hfp index 780c39a5081edb8a3d911e8cd3e1935acdcea6e0..9c1c3cc9a1a6071c722002a1cc123c11371a68c0 100755 --- a/bluez/bluez-hfp +++ b/bluez/bluez-hfp @@ -1,4 +1,4 @@ -#! /usr/bin/python +#! /usr/bin/python3 # -*- coding: utf-8 -*- # # BlueZ - Bluetooth protocol stack for Linux @@ -20,21 +20,26 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -import gobject +from gi.repository import GObject + import os import dbus import dbus.service import dbus.mainloop.glib +import sys import time -import dialog -import urllib2 from optparse import OptionParser -profiles = { - "0000111f-0000-1000-8000-00805f9b34fb": "HFP AG", -} +# import from toplevel directory +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) +from apertis_tests_lib.bluez import AdapterDeviceTester +from apertis_tests_lib.bluez import AskAgent +from apertis_tests_lib.bluez import build_device_path +from apertis_tests_lib.bluez import DeviceProfileTester +from apertis_tests_lib.bluez import get_hci +from apertis_tests_lib.bluez import TEST_START, TEST_FAILED, TEST_PASSED + -BASE_PATH = "" process_q = [] @@ -46,100 +51,15 @@ def process_next(): f = process_q[0] process_q.remove(f) - gobject.idle_add(f) - - -def TEST_START(str): - print ("TEST STARTED: %s" % str) - - -def TEST_FAILED(str): - print ("TEST FINISHED: %s : FAILED" % str) - - -def TEST_PASSED(str): - print ("TEST FINISHED: %s : PASSED" % str) - + GObject.idle_add(f) -def ask(prompt): - try: - return raw_input(prompt) - except: - return input(prompt) - - -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - - -class BlueZAgent(dbus.service.Object): - exit_on_release = False - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print(" | Authorizing %s, %s" % (device, uuid)) - return - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - print("| RequestPinCode (%s)" % (device)) - return ask("| Enter PIN Code: ") - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("| DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("| DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - print("| Confiming passkey %06d (%s)" % (passkey, device)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("| Confirming ModeChange (%s)" % (mode)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("| Cancel") - - -class Device(): - def __init__(self, path): - self.path = path - self.profiles = [] - - self.device = dbus.Interface(bus.get_object("org.bluez", self.path), - "org.bluez.Device") - self.props = self.device.GetProperties() - self.device.SetProperty("Trusted", True) - - bus.add_signal_receiver(self.device_property_changed, - dbus_interface="org.bluez.Device", - signal_name="PropertyChanged") +class HfpAgDeviceProfileTester(DeviceProfileTester): + def device_test_profiles(self, profiles): + if 'HFP AG' in profiles: + process_q.append(self.test_hfp_ag) - if self.props["Paired"]: - self.get_properties() + process_next() def test_hfp_ag(self): TEST_START("HFP AG") @@ -162,22 +82,22 @@ class Device(): vcm = dbus.Interface(bus.get_object('org.ofono', path), 'org.ofono.VoiceCallManager') - number = ask("Type the phone number to call:") + number = input("Type the phone number to call:") path = vcm.Dial(number, "default") - ask("Press ENTER to hangup the call:") + input("Press ENTER to hangup the call:") vcm.HangupAll() - ret = ask("| Did you hear the call (y/n)\n") + ret = input("Did you hear the call (y/n)\n") if ret != 'y' and ret != 'Y': raise - ask("From a second phone call the phone connected to " - "oFono.\n Once the phone starts ring press ENTER" - " to anwser:") + input("From a second phone call the phone connected to " + "oFono.\n Once the phone starts ring press ENTER" + " to anwser:") calls = vcm.GetCalls() @@ -192,11 +112,11 @@ class Device(): call.Answer() - ask("Press ENTER to hangup the call:") + input("Press ENTER to hangup the call:") vcm.HangupAll() - ret = ask("| Did you hear the call (y/n)\n") + ret = input("Did you hear the call (y/n)\n") if ret != 'y' and ret != 'Y': raise @@ -209,130 +129,6 @@ class Device(): TEST_PASSED("HFP AG") process_next() - def device_test_profiles(self): - if "HFP AG" in self.profiles: - process_q.append(self.test_hfp_ag) - - process_next() - - def device_parse_uuids(self, uuids): - print ("| Profiles supported:") - for uuid in uuids: - if uuid in profiles.keys(): - print ("\t * %s" % profiles[uuid]) - self.profiles.append(profiles[uuid]) - self.device_test_profiles() - - def get_properties(self): - props = self.device.GetProperties() - self.device_parse_uuids(props["UUIDs"]) - - def device_property_changed(self, name, value): - if name == "Paired" and value: - self.device.SetProperty("Trusted", True) - TEST_PASSED("Pairable Responder") - gobject.timeout_add(7000, self.get_properties) - - -class Adapter(): - def __init__(self, adapter, device): - self.adapter = adapter - self.device = device - self.found = {} - - def create_device_reply(self, device): - TEST_PASSED("Pairing Initiatior") - self.adapter.RemoveDevice(device) - process_next() - - def create_device_error(self, error): - TEST_FAILED("Pairing Initiatior: Create device failed %s" % (error)) - process_next() - - def device_created(self, path): - - print ("| Device created %s" % path) - - Device(path) - - def create_paired_device(self, device): - try: - path = self.adapter.FindDevice(device) - self.adapter.RemoveDevice(path) - except: - pass - - path = self.adapter.CreatePairedDevice( - device, - "/test/agent2", "DisplayYesNo", - timeout=60000, - reply_handler=self.create_device_reply, - error_handler=self.create_device_error) - - def show_menu(self): - self.adapter.StopDiscovery() - - d = dialog.Dialog() - device = d.menu("Select one device to pair with:", - choices=self.found.items()) - d.clear() - - if device[1] == '': - ret = ask("| Retry discovery (y/n)") - if ret == 'y' or ret == 'Y': - self.start_paring_initiator() - return - TEST_FAILED("Pairing Initiatior: No device selected") - exit(0) - - self.create_paired_device(self.found[device[1]]) - - def device_found(self, address, values): - if values["Name"] in self.found: - return - - self.found[values["Name"]] = values["Address"] - - def get_device(self): - path = self.adapter.FindDevice(self.device) - - Device(path) - - def start_paring_initiator(self): - TEST_START("Pairing Initiatior") - - if self.device: - self.create_paired_device(self.device) - return - - self.adapter.StartDiscovery() - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Pairable", False) - - bus.add_signal_receiver(self.device_found, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceFound") - - gobject.timeout_add(12000, self.show_menu) - - def start_paring_responder(self): - TEST_START("Pairing Responder") - - props = self.adapter.GetProperties() - - print("| In the phone start pairing with %s (%s)." % - (props["Name"], props["Address"])) - - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Discoverable", True) - self.adapter.SetProperty("Pairable", True) - - bus.add_signal_receiver(self.device_created, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceCreated") - def parse_options(): parser.add_option("-i", "--interface", dest="interface", @@ -344,15 +140,6 @@ def parse_options(): return parser.parse_args() -def create_base_path(): - global BASE_PATH - - lt = time.localtime() - BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday, - lt.tm_hour, lt.tm_min, - lt.tm_sec) - os.mkdir(BASE_PATH) - if __name__ == "__main__": dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) @@ -360,36 +147,75 @@ if __name__ == "__main__": (options, args) = parse_options() - mainloop = gobject.MainLoop() + mainloop = GObject.MainLoop() bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + object_manager_iface = dbus.Interface(bus.get_object('org.bluez', '/'), + 'org.freedesktop.DBus.ObjectManager') + agent_manager_iface = dbus.Interface(bus.get_object('org.bluez', + '/org/bluez'), + 'org.bluez.AgentManager1') + + objects = object_manager_iface.GetManagedObjects() + adapters = {p: i for p, i in objects.items() + if 'org.bluez.Adapter1' in i.keys()} if options.interface: - path = manager.FindAdapter(options.interface) + path = '/org/bluez/' + options.interface else: - path = manager.DefaultAdapter() + # Arbitrarily choose the first adapter as default + path = list(adapters.keys())[0] + + adapter_obj = bus.get_object("org.bluez", path) - adapter_iface = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Adapter") + print('Using adapter ‘%s’' % adapter_obj.object_path) if options.skip_pair and not options.device: - print ("Device not specified") - exit(0) + print("Device not specified") + exit(1) + + if options.device is not None: + path = build_device_path(get_hci(adapter_obj.object_path), + options.device) + device_obj = bus.get_object('org.bluez', path) + else: + device_obj = None + + agent = AskAgent(bus, "/test/agent", mainloop, authorize_everything=True) + tester = AdapterDeviceTester(bus, adapter_obj, device_obj, + HfpAgDeviceProfileTester) + agent_manager_iface.RegisterAgent(agent._object_path, 'DisplayYesNo') + + def test1(): + def test1_cb(error_message): + if error_message: + TEST_FAILED(error_message) + exit(1) + else: + TEST_PASSED('Pairing Initiator') + process_next() + + TEST_START('Pairing Initiator') + tester.start_pairing_initiator(test1_cb) + + def test2(): + def test2_cb(error_message): + if error_message: + TEST_FAILED(error_message) + exit(1) + else: + TEST_PASSED('Pairing Responder') + process_next() - agent = BlueZAgent(bus, "/test/agent") - agent = BlueZAgent(bus, "/test/agent2") - adapter = Adapter(adapter_iface, options.device) - adapter_iface.RegisterAgent("/test/agent", "DisplayYesNo") + TEST_START('Pairing Responder') + tester.start_pairing_responder(test2_cb) if options.skip_pair: - process_q.append(adapter.get_device) + process_q.append(tester.get_device) else: - process_q.append(adapter.start_paring_initiator) - process_q.append(adapter.start_paring_responder) + process_q.append(test1) + process_q.append(test2) - create_base_path() - gobject.timeout_add(2000, process_next) + GObject.idle_add(process_next) mainloop.run() - adapter_iface.UnregisterAgent("/test/agent") + agent_manager_iface.UnregisterAgent('/test/agent') diff --git a/bluez/bluez-test b/bluez/bluez-test index e1563750bb5f0e24da9fab25e78414c699dda1b8..9235d879d1c6368e5e339317688384394e774c53 100755 --- a/bluez/bluez-test +++ b/bluez/bluez-test @@ -1,4 +1,4 @@ -#! /usr/bin/python +#! /usr/bin/python3 # -*- coding: utf-8 -*- # # BlueZ - Bluetooth protocol stack for Linux @@ -22,212 +22,136 @@ from gi.repository import GObject -import re import os import unittest import dbus import dbus.service import dbus.mainloop.glib import subprocess +import sys import time -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) -mainloop = GObject.MainLoop() +# import from toplevel directory +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) +from apertis_tests_lib.bluez import adapters_ensure_powered +from apertis_tests_lib.bluez import adapters_make_discoverable +from apertis_tests_lib.bluez import adapters_make_pairable +from apertis_tests_lib.bluez import bluetooth_pair_adapters +from apertis_tests_lib.bluez import build_device_path +from apertis_tests_lib.bluez import ConstantAgent +from apertis_tests_lib.bluez import get_hci +from apertis_tests_lib.bluez import object_set_properties_blocking +from apertis_tests_lib.bluez import object_wait_for_properties_blocking -voice_gateway = """<record> - <attribute id="0x0000"> - <uint32 value="0x00010003" /> - </attribute> - <attribute id="0x0001"> - <sequence> - <uuid value="0x1121" /> - <uuid value="0x1203" /> - </sequence> - </attribute> - <attribute id="0x0004"> - <sequence> - <sequence> - <uuid value="0x0100" /> - </sequence> - <sequence> - <uuid value="0x0003" /> - <uint8 value="0x0b" /> - </sequence> - </sequence> - </attribute> - <attribute id="0x0005"> - <sequence> - <uuid value="0x1002" /> - </sequence> - </attribute> - <attribute id="0x0009"> - <sequence> - <sequence> - <uuid value="0x1108" /> - <uint16 value="0x0100" /> - </sequence> - </sequence> - </attribute> - <attribute id="0x0100"> - <text value="Voice Gateway" /> - </attribute> -</record>""" - - -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - - -class Agent(dbus.service.Object): - exit_on_release = False - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print("Authorize (%s, %s)" % (device, uuid)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - return "0000" - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - return - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("ConfirmModeChange (%s)" % (mode)) - authorize = ask("Authorize mode change (yes/no): ") - if authorize == "yes": - return - raise Rejected("Mode change by user") - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("Cancel") +dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +mainloop = GObject.MainLoop() -path0 = "/agent0" -path1 = "/agent1" -Agent(dbus.SystemBus(), path0) -Agent(dbus.SystemBus(), path1) +ConstantAgent(dbus.SystemBus(), '/agent', mainloop) -# 0 is the initiator side, 1 is the responder side -def pair(bus, adapter0, iocap0, adapter1, iocap1): - pair.device0 = None - pair.device1 = None - def create_device_reply(device): - pair.device0 = device - props = adapter0.GetProperties() - time.sleep(1) - pair.device1 = adapter1.FindDevice(props["Address"]) - mainloop.quit() +def test_pair(bus, adapter0, adapter1, iocap): + retval = False - def create_device_error(error): - print("Creating device failed: %s" % (error)) - mainloop.quit() + # Remove any previous pairings first. + hci0 = get_hci(adapter0.object_path) + hci1 = get_hci(adapter1.object_path) - props1 = adapter1.GetProperties() - adapter1.RegisterAgent(path1, iocap1) + try: + p1 = adapter1.GetAll('org.bluez.Adapter1') + path = build_device_path(hci0, p1['Address']) + adapter0.RemoveDevice(path, dbus_interface='org.bluez.Adapter1') + except: + pass - adapter0.CreatePairedDevice(props1["Address"], path0, iocap0, - timeout=60000, - reply_handler=create_device_reply, - error_handler=create_device_error) + try: + p0 = adapter0.GetAll('org.bluez.Adapter1') + path = build_device_path(hci1, p0['Address']) + adapter1.RemoveDevice(path, dbus_interface='org.bluez.Adapter1') + except: + pass + # Make sure both adapters are discoverable first. + adapters_make_pairable([adapter0, adapter1], lambda: mainloop.quit()) mainloop.run() - adapter1.UnregisterAgent(path1) - time.sleep(2) + def result(device0, device1): + nonlocal retval - return pair.device0, pair.device1 + retval = (device0 is not None) + mainloop.quit() + bluetooth_pair_adapters(bus, adapter0, adapter1, iocap, '/agent', + result_func=result) + mainloop.run() -def test_pair(bus, adapter0, iocap0, adapter1, iocap1): - device0, device1 = pair(bus, adapter0, iocap0, adapter1, iocap1) - if device0: - adapter0.RemoveDevice(device0) - adapter1.RemoveDevice(device1) - return True - else: - return False + return retval class TestManager(unittest.TestCase): def setUp(self): self.bus = dbus.SystemBus() try: - self.manager = \ + self.__manager = \ dbus.Interface(self.bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + "org.freedesktop.DBus.ObjectManager") except: raise RuntimeError("Bluetooth daemon is not running!") def test_daemon(self): - self.assertTrue(self.manager) + self.assertTrue(self.__manager) def test_manager(self): - props = self.manager.GetProperties() - self.assertIn('Adapters', props) + objs = self.__manager.GetManagedObjects() + adapters = {p: i for p, i in objs.items() + if 'org.bluez.Adapter1' in i.keys()} + # might raise an exception - self.assertTrue(self.manager.DefaultAdapter()) + self.assertNotEqual(adapters, {}) # might raise an exception - self.assertTrue(self.manager.FindAdapter('hci0')) + self.assertIn('/org/bluez/hci0', adapters) class TestAdapter(unittest.TestCase): def setUp(self): self.bus = dbus.SystemBus() try: - self.manager = \ + self.__manager = \ dbus.Interface(self.bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + "org.freedesktop.DBus.ObjectManager") except: raise RuntimeError("Bluetooth daemon is not running!") - props = self.manager.GetProperties() - self.assertGreaterEqual(len(props["Adapters"]), 2, + objs = self.__manager.GetManagedObjects() + adapters = {p: i for p, i in objs.items() + if 'org.bluez.Adapter1' in i.keys()} + + self.assertGreaterEqual(len(adapters), 2, 'length of %r should be >= 2' % - props["Adapters"]) + adapters) - adapter0 = self.bus.get_object("org.bluez", props["Adapters"][0]) - self.adapter0 = dbus.Interface(adapter0, "org.bluez.Adapter") - self.hci0 = re.search('hci[0-9]', props["Adapters"][0]).group(0) + adapter0 = self.bus.get_object("org.bluez", list(adapters.keys())[0]) + self.__adapter0 = dbus.Interface(adapter0, + 'org.freedesktop.DBus.Properties') + self.__hci0 = get_hci(list(adapters.keys())[0]) - adapter1 = self.bus.get_object("org.bluez", props["Adapters"][1]) - self.adapter1 = dbus.Interface(adapter1, "org.bluez.Adapter") - self.hci1 = re.search('hci[0-9]', props["Adapters"][0]).group(0) + adapter1 = self.bus.get_object("org.bluez", list(adapters.keys())[1]) + self.__adapter1 = dbus.Interface(adapter1, + 'org.freedesktop.DBus.Properties') + self.__hci1 = get_hci(list(adapters.keys())[1]) + + # Ensure the adapters are powered. + adapters_ensure_powered([self.__adapter0, self.__adapter1], + lambda: mainloop.quit()) + mainloop.run() def test_adapter(self): - props = self.adapter0.GetProperties() + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertIn('Address', props) self.assertIn('Name', props) + self.assertIn('Alias', props) self.assertIn('Class', props) self.assertIn('Powered', props) self.assertIn('Discoverable', props) @@ -235,253 +159,318 @@ class TestAdapter(unittest.TestCase): self.assertIn('PairableTimeout', props) self.assertIn('DiscoverableTimeout', props) self.assertIn('Discovering', props) - self.assertIn('Devices', props) self.assertIn('UUIDs', props) - def test_adapter_name(self): - props = self.adapter0.GetProperties() - name = props["Name"] - self.adapter0.SetProperty("Name", "unittest-0") - time.sleep(0.5) - props = self.adapter0.GetProperties() - self.assertEqual('unittest-0', props["Name"]) - self.adapter0.SetProperty("Name", name) + def test_adapter_alias(self): + props = self.__adapter0.GetAll('org.bluez.Adapter1') + alias = props["Alias"] + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Alias': dbus.String('unittest-0', variant_level=1), + }, mainloop) + props = self.__adapter0.GetAll('org.bluez.Adapter1') + self.assertEqual('unittest-0', props["Alias"]) + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Alias': dbus.String(alias, variant_level=1), + }, mainloop) def test_adapter_class(self): - props = self.adapter0.GetProperties() + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertEqual(props["Class"] & 0x00ffff, 0x00420) def assertDown(self, hci): status = subprocess.check_output(['hciconfig', hci]) - self.assertIn('DOWN', status) + self.assertIn(b'DOWN', status) def assertUp(self, hci): status = subprocess.check_output(['hciconfig', hci]) - self.assertIn('UP', status) + self.assertIn(b'UP', status) def assertDiscoverable(self, hci): status = subprocess.check_output(['hciconfig', hci]) - self.assertIn('ISCAN', status) + self.assertIn(b'ISCAN', status) def assertNotDiscoverable(self, hci): status = subprocess.check_output(['hciconfig', hci]) - self.assertNotIn('ISCAN', status) + self.assertNotIn(b'ISCAN', status) def test_adapter_powered(self): - self.adapter0.SetProperty("Powered", False) + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Powered': dbus.Boolean(False, variant_level=1), + }, mainloop) - props = self.adapter0.GetProperties() + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertEqual(props["Powered"], False, '%r should contain {Powered: False}' % props) - self.assertDown(self.hci0) + self.assertDown(self.__hci0) - self.adapter0.SetProperty("Powered", True) - status = subprocess.check_output(['hciconfig', self.hci0]) - self.assertUp(self.hci0) + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Powered': dbus.Boolean(True, variant_level=1), + }, mainloop) + self.assertUp(self.__hci0) def test_adapter_discoverable(self): - self.adapter0.SetProperty("Discoverable", False) - props = self.adapter0.GetProperties() + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Discoverable': dbus.Boolean(False, variant_level=1), + }, mainloop) + + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertEqual(props["Discoverable"], False, '%r should contain {Discoverable: False}' % props) - self.assertNotDiscoverable(self.hci0) - self.adapter0.SetProperty("Discoverable", True) - self.assertDiscoverable(self.hci0) - self.adapter0.SetProperty("Discoverable", False) + self.assertNotDiscoverable(self.__hci0) + + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Discoverable': dbus.Boolean(True, variant_level=1), + }, mainloop) - timeout = dbus.UInt32(10) - self.adapter0.SetProperty("DiscoverableTimeout", timeout) + self.assertDiscoverable(self.__hci0) - self.adapter0.SetProperty("Discoverable", True) - self.assertDiscoverable(self.hci0) + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Discoverable': dbus.Boolean(False, variant_level=1), + }, mainloop) + + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'DiscoverableTimeout': dbus.UInt32(10, variant_level=1), + }, mainloop) + + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Discoverable': dbus.Boolean(True, variant_level=1), + }, mainloop) + + self.assertDiscoverable(self.__hci0) time.sleep(11) - props = self.adapter0.GetProperties() + + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertEqual(props["Discoverable"], False, '%r should contain {Discoverable: False}' % props) - self.assertNotDiscoverable(self.hci0) - timeout = dbus.UInt32(0) - self.adapter0.SetProperty("DiscoverableTimeout", timeout) + self.assertNotDiscoverable(self.__hci0) + + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'DiscoverableTimeout': dbus.UInt32(0, variant_level=1), + }, mainloop) def test_adapter_pairable(self): - self.adapter0.SetProperty("Pairable", False) - props = self.adapter0.GetProperties() + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Pairable': dbus.Boolean(False, variant_level=1), + }, mainloop) + + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertEqual(props["Pairable"], False, '%r should contain {Pairable: False}' % props) - self.adapter0.SetProperty("Pairable", True) - props = self.adapter0.GetProperties() + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Pairable': dbus.Boolean(True, variant_level=1), + }, mainloop) + + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertEqual(props["Pairable"], True, '%r should contain {Pairable: True}' % props) - timeout = dbus.UInt32(10) - self.adapter0.SetProperty("PairableTimeout", timeout) - self.adapter0.SetProperty("Pairable", True) + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'PairableTimeout': dbus.UInt32(10, variant_level=1), + 'Pairable': dbus.Boolean(True, variant_level=1), + }, mainloop) + time.sleep(11) - props = self.adapter0.GetProperties() + + props = self.__adapter0.GetAll('org.bluez.Adapter1') self.assertEqual(props["Pairable"], False, '%r should contain {Pairable: False}' % props) - timeout = dbus.UInt32(0) - self.adapter0.SetProperty("PairableTimeout", timeout) - self.adapter0.SetProperty("Pairable", True) - - def device_found(self, device, array): - self.assertIn('Address', array) - self.assertIn('Class', array) - self.assertIn('Icon', array) - self.assertIn('RSSI', array) - self.assertIn('Name', array) - self.assertIn('LegacyPairing', array) - self.assertIn('Paired', array) - self.assertIn('Trusted', array) - self.assertIn('UUIDs', array) + + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'PairableTimeout': dbus.UInt32(0, variant_level=1), + 'Pairable': dbus.Boolean(True, variant_level=1), + }, mainloop) + + def __interfaces_added(self, path, interfaces): + if 'org.bluez.Device1' not in interfaces: + return + if interfaces['org.bluez.Device1']['Adapter'] != \ + self.__adapter1.object_path: + return + + # Check for non-optional properties + props = interfaces['org.bluez.Device1'] + self.assertIn('Address', props) + self.assertIn('LegacyPairing', props) + self.assertIn('Paired', props) + self.assertIn('Trusted', props) + mainloop.quit() def test_adapter_discover(self): - self.adapter0.SetProperty("Discoverable", True) - time.sleep(1) - iface = 'org.bluez.Adapter' - match = self.adapter1.connect_to_signal("DeviceFound", - self.device_found, - dbus_interface=iface) - self.adapter1.StartDiscovery() + # ‘Discovering’ is a counter, so we need to know its initial value to + # compare against at the end of the test. + props = self.__adapter1.GetAll( + 'org.bluez.Adapter1', + dbus_interface='org.freedesktop.DBus.Properties') + discovering = props['Discovering'] + + # Make adapter0 discoverable so adapter1 definitely has something to + # discover. + adapters_make_discoverable([self.__adapter0], lambda: mainloop.quit()) mainloop.run() + + match = self.__manager.connect_to_signal( + 'InterfacesAdded', + self.__interfaces_added, + dbus_interface='org.bluez.Adapter1') + objs = self.__manager.GetManagedObjects() + for p, i in objs.items(): + self.__interfaces_added(p, i) + + self.__adapter1.StartDiscovery(dbus_interface='org.bluez.Adapter1') + GObject.timeout_add(500, lambda: mainloop.quit()) + + mainloop.run() + match.remove() - props1 = self.adapter1.GetProperties() - self.assertEqual(props1["Discovering"], True, - '%r should contain {Discovering: True}' % props1) - self.adapter1.StopDiscovery() - self.adapter0.SetProperty("Discoverable", False) - time.sleep(1) - props1 = self.adapter1.GetProperties() - self.assertEqual(props1["Discovering"], False, - '%r should contain {Discovering: False}' % props1) - - def test_adapter_paring(self): - try: - p1 = self.adapter1.GetProperties() - path = self.adapter0.FindDevice(p1["Address"]) - self.adapter0.RemoveDevice(path) - except: - pass + object_wait_for_properties_blocking( + self.__adapter1, + 'org.bluez.Adapter1', { + 'Discovering': True, + }, mainloop) - try: - p0 = self.adapter0.GetProperties() - path = self.adapter1.FindDevice(p0["Address"]) - self.adapter1.RemoveDevice(path) - except: - pass + self.__adapter1.StopDiscovery(dbus_interface='org.bluez.Adapter1') - self.assertTrue(test_pair(self.bus, self.adapter0, - "DisplayYesNo", self.adapter1, - "DisplayYesNo")) + object_set_properties_blocking(self.__adapter0, 'org.bluez.Adapter1', { + 'Discoverable': dbus.Boolean(False, variant_level=1), + }, mainloop) - self.assertTrue(test_pair(self.bus, self.adapter0, "DisplayYesNo", - self.adapter1, "NoInputNoOutput")) - self.assertTrue(test_pair(self.bus, self.adapter0, - "NoInputNoOutput", self.adapter1, + object_wait_for_properties_blocking( + self.__adapter1, + 'org.bluez.Adapter1', { + 'Discovering': discovering, + }, mainloop) + + def test_adapter_pairing(self): + self.assertTrue(test_pair(self.bus, self.__adapter0, self.__adapter1, "DisplayYesNo")) + self.assertTrue(test_pair(self.bus, self.__adapter0, self.__adapter1, + "NoInputNoOutput")) - os.system("hciconfig hci0 sspmode 0") - self.assertTrue(test_pair(self.bus, self.adapter0, - "DisplayYesNo", self.adapter1, + subprocess.check_call(['sudo', 'hciconfig', self.__hci0, + 'sspmode', '0']) + self.assertTrue(test_pair(self.bus, self.__adapter0, self.__adapter1, "DisplayYesNo")) - os.system("hciconfig hci0 sspmode 1") + subprocess.check_call(['sudo', 'hciconfig', self.__hci0, + 'sspmode', '1']) class TestDevice(unittest.TestCase): def setUp(self): self.bus = dbus.SystemBus() try: - self.manager = \ + self.__manager = \ dbus.Interface(self.bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + "org.freedesktop.DBus.ObjectManager") except: raise RuntimeError("Bluetooth daemon is not running!") - props = self.manager.GetProperties() - self.assertGreaterEqual(len(props["Adapters"]), 2, + objs = self.__manager.GetManagedObjects() + adapters = {p: i for p, i in objs.items() + if 'org.bluez.Adapter1' in i.keys()} + + self.assertGreaterEqual(len(adapters), 2, 'length of %r should be >= 2' % - props["Adapters"]) + adapters) - adapter0 = self.bus.get_object("org.bluez", props["Adapters"][0]) - self.adapter0 = dbus.Interface(adapter0, "org.bluez.Adapter") + adapter0 = self.bus.get_object("org.bluez", list(adapters.keys())[0]) + self.__adapter0 = dbus.Interface(adapter0, "org.bluez.Adapter1") - adapter1 = self.bus.get_object("org.bluez", props["Adapters"][1]) - self.adapter1 = dbus.Interface(adapter1, "org.bluez.Adapter") + adapter1 = self.bus.get_object("org.bluez", list(adapters.keys())[1]) + self.__adapter1 = dbus.Interface(adapter1, "org.bluez.Adapter1") - self.device0_str, self.device1_str = pair(self.bus, - self.adapter0, - "DisplayYesNo", - self.adapter1, - "DisplayYesNo") + def result(device0, device1): + self.__device0 = device0 + self.__device1 = device1 + mainloop.quit() + + def adapter_pairable_cb(): + bluetooth_pair_adapters(self.bus, self.__adapter0, + self.__adapter1, + 'DisplayYesNo', '/agent', + result_func=result) - self.assertTrue(self.device0_str) - self.assertTrue(self.device1_str) + # Ensure both adapters are powered, pairable, discoverable. + adapters_make_pairable([adapter0, adapter1], adapter_pairable_cb) + + mainloop.run() - device0 = self.bus.get_object("org.bluez", self.device0_str) - self.device0 = dbus.Interface(device0, "org.bluez.Device") + self.assertIsNotNone(self.__device0) + self.assertIsNotNone(self.__device1) def tearDown(self): - self.adapter0.RemoveDevice(self.device0_str) - self.adapter1.RemoveDevice(self.device1_str) + try: + self.__adapter0.RemoveDevice(self.__device0.object_path) + except: + pass + try: + self.__adapter1.RemoveDevice(self.__device1.object_path) + except: + pass def test_device(self): - props = self.device0.GetProperties() + props = self.__device0.GetAll( + 'org.bluez.Device1', + dbus_interface='org.freedesktop.DBus.Properties') + # Check for non-optional properties. self.assertIn('Adapter', props) self.assertIn('Address', props) self.assertIn('Alias', props) self.assertIn('Blocked', props) self.assertIn('Connected', props) - self.assertIn('Name', props) self.assertIn('Paired', props) - self.assertIn('Services', props) self.assertIn('Trusted', props) - self.assertIn('UUIDs', props) - - def test_device_discover_services(self): - self.assertTrue(self.device0.DiscoverServices("")) - - def test_services(self): - props = self.manager.GetProperties() - - adapter1 = self.bus.get_object("org.bluez", props["Adapters"][1]) - service1 = dbus.Interface(adapter1, "org.bluez.Service") - - handle = service1.AddRecord(voice_gateway) - self.assertTrue(self.device0.DiscoverServices("0x1121")) - service1.RemoveRecord(handle) - self.assertFalse(self.device0.DiscoverServices("0x1121")) def test_device_trusted(self): - self.device0.SetProperty("Trusted", True) - props = self.device0.GetProperties() + object_set_properties_blocking(self.__device0, 'org.bluez.Device1', { + 'Trusted': dbus.Boolean(True, variant_level=1), + }, mainloop) + + props = self.__device0.GetAll( + 'org.bluez.Device1', + dbus_interface='org.freedesktop.DBus.Properties') self.assertEqual(props["Trusted"], True, '%r should contain {Trusted: True}' % props) - self.device0.SetProperty("Trusted", False) - props = self.device0.GetProperties() + object_set_properties_blocking(self.__device0, 'org.bluez.Device1', { + 'Trusted': dbus.Boolean(False, variant_level=1), + }, mainloop) + + props = self.__device0.GetAll( + 'org.bluez.Device1', + dbus_interface='org.freedesktop.DBus.Properties') self.assertEqual(props["Trusted"], False, '%r should contain {Trusted: False}' % props) def test_device_blocked(self): - self.device0.SetProperty("Blocked", True) - props = self.device0.GetProperties() + object_set_properties_blocking(self.__device0, 'org.bluez.Device1', { + 'Blocked': dbus.Boolean(True, variant_level=1), + }, mainloop) + + props = self.__device0.GetAll( + 'org.bluez.Device1', + dbus_interface='org.freedesktop.DBus.Properties') self.assertEqual(props["Blocked"], True, '%r should contain {Blocked: True}' % props) - self.device0.SetProperty("Blocked", False) - props = self.device0.GetProperties() + object_set_properties_blocking(self.__device0, 'org.bluez.Device1', { + 'Blocked': dbus.Boolean(False, variant_level=1), + }, mainloop) + + props = self.__device0.GetAll( + 'org.bluez.Device1', + dbus_interface='org.freedesktop.DBus.Properties') self.assertEqual(props["Blocked"], False, '%r should contain {Blocked: False}' % props) if __name__ == '__main__': - suite = unittest.TestLoader().loadTestsFromTestCase(TestManager) - unittest.TextTestRunner(verbosity=2).run(suite) - - suite = unittest.TestLoader().loadTestsFromTestCase(TestAdapter) - unittest.TextTestRunner(verbosity=2).run(suite) + suites = [] + suites.append(unittest.TestLoader().loadTestsFromTestCase(TestManager)) + suites.append(unittest.TestLoader().loadTestsFromTestCase(TestAdapter)) + suites.append(unittest.TestLoader().loadTestsFromTestCase(TestDevice)) - suite = unittest.TestLoader().loadTestsFromTestCase(TestDevice) - unittest.TextTestRunner(verbosity=2).run(suite) + combo = unittest.TestSuite(suites) + unittest.TextTestRunner(verbosity=2).run(combo) diff --git a/bluez/pair-two b/bluez/pair-two index f63c9e0e94eeb6361c49e6b202f8a6facfcdf092..27909bf30075645a39ad1bc892573c69f890dd8c 100755 --- a/bluez/pair-two +++ b/bluez/pair-two @@ -1,4 +1,4 @@ -#! /usr/bin/python +#! /usr/bin/python3 # -*- coding: utf-8 -*- # # BlueZ - Bluetooth protocol stack for Linux @@ -22,156 +22,100 @@ from gi.repository import GObject -import re +from optparse import OptionParser import os import unittest import dbus import dbus.service import dbus.mainloop.glib +import sys import time -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) -mainloop = GObject.MainLoop() - - -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - - -class Agent(dbus.service.Object): - exit_on_release = False - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print("Authorize (%s, %s)" % (device, uuid)) - return +# import from toplevel directory +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) +from apertis_tests_lib.bluez import adapters_make_pairable +from apertis_tests_lib.bluez import bluetooth_pair_adapters +from apertis_tests_lib.bluez import build_device_path +from apertis_tests_lib.bluez import ConstantAgent +from apertis_tests_lib.bluez import get_hci - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - return "0000" - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - return - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("ConfirmModeChange (%s)" % (mode)) - authorize = ask("Authorize mode change (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Mode change by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("Cancel") +dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +main_loop = GObject.MainLoop() -path0 = "/agent0" -path1 = "/agent1" -Agent(dbus.SystemBus(), path0) -Agent(dbus.SystemBus(), path1) +ConstantAgent(dbus.SystemBus(), '/agent', main_loop) -# 0 is the initiator side, 1 is the responder side -def pair(bus, adapter0, iocap0, adapter1, iocap1): - pair.device0 = None - pair.device1 = None - def create_device_reply(device): - pair.device0 = device - props = adapter0.GetProperties() - time.sleep(1) - pair.device1 = adapter1.FindDevice(props["Address"]) - mainloop.quit() +def parse_options(): + parser.add_option('-1', '--interface1', dest='interface1', + help='HCI device 1', metavar='HCI') + parser.add_option('-2', '--interface2', dest='interface2', + help='HCI device 2', metavar='HCI') + return parser.parse_args() - def create_device_error(error): - print("Creating device failed: %s" % (error)) - mainloop.quit() - props1 = adapter1.GetProperties() - adapter1.RegisterAgent(path1, iocap1) +if __name__ == '__main__': + bus = dbus.SystemBus() + try: + manager = dbus.Interface(bus.get_object('org.bluez', '/'), + 'org.freedesktop.DBus.ObjectManager') + except: + raise RuntimeError('Bluetooth daemon is not running!') - adapter0.CreatePairedDevice(props1["Address"], path0, iocap0, - timeout=60000, - reply_handler=create_device_reply, - error_handler=create_device_error) + objs = manager.GetManagedObjects() + adapters = {p: i for p, i in objs.items() + if 'org.bluez.Adapter1' in i.keys()} - mainloop.run() - adapter1.UnregisterAgent(path1) + if len(adapters) < 2: + print('2 Bluetooth adapters are needed!') + exit(1) - return pair.device0, pair.device1 + parser = OptionParser() + (options, args) = parse_options() -def test_pair(bus, adapter0, iocap0, adapter1, iocap1): - device0, device1 = pair(bus, adapter0, iocap0, adapter1, iocap1) - if device0: - return True + if options.interface1: + adapter0_path = '/org/bluez/' + options.interface1 else: - return False - - -if __name__ == "__main__": - bus = dbus.SystemBus() - try: - manager = dbus.Interface(bus.get_object("org.bluez", - "/"), "org.bluez.Manager") - except: - raise RuntimeError("Bluetooth daemon is not running!") + adapter0_path = list(adapters.keys())[0] + adapter0 = bus.get_object('org.bluez', adapter0_path) + hci0 = get_hci(adapter0_path) - props = manager.GetProperties() - if len(props["Adapters"]) < 2: - print("2 Bluetooth adapters are needed!") - - adapter0 = dbus.Interface(bus.get_object("org.bluez", - props["Adapters"][0]), - "org.bluez.Adapter") - hci0 = re.search('hci[0-9]', props["Adapters"][0]).group(0) - adapter1 = dbus.Interface(bus.get_object("org.bluez", - props["Adapters"][1]), - "org.bluez.Adapter") - hci1 = re.search('hci[0-9]', props["Adapters"][0]).group(0) + if options.interface2: + adapter1_path = '/org/bluez/' + options.interface2 + else: + adapter1_path = list(adapters.keys())[1] + adapter1 = bus.get_object('org.bluez', adapter1_path) + hci1 = get_hci(adapter1_path) + # Remove the adapters’ devices from each other to eliminate any existing + # pairing. try: - p1 = adapter1.GetProperties() - path = adapter0.FindDevice(p1["Address"]) - adapter0.RemoveDevice(path) + p1 = adapter1.GetAll('org.bluez.Adapter1', + dbus_interface='org.freedesktop.DBus.Properties') + path = build_device_path(hci0, p1['Address']) + adapter0.RemoveDevice(path, dbus_interface='org.bluez.Adapter1') except: pass try: - p0 = adapter0.GetProperties() - path = adapter1.FindDevice(p0["Address"]) - adapter1.RemoveDevice(path) + p0 = adapter0.GetAll('org.bluez.Adapter1', + dbus_interface='org.freedesktop.DBus.Properties') + path = build_device_path(hci1, p0['Address']) + adapter1.RemoveDevice(path, dbus_interface='org.bluez.Adapter1') except: pass - paired = test_pair(bus, adapter0, "DisplayYesNo", adapter1, - "DisplayYesNo") - if paired: - exit(0) - else: - exit(1) + def pair_reply(device0, device1): + main_loop.quit() + exit(0 if device0 is not None else 1) + + def pairable_reply(): + bluetooth_pair_adapters(bus, adapter0, adapter1, 'DisplayYesNo', + '/agent', result_func=pair_reply) + + # Make both adapters pairable and discoverable. + adapters_make_pairable([adapter0, adapter1], pairable_reply) + + main_loop.run() diff --git a/bluez/simple-agent b/bluez/simple-agent index 00c31e9c56517ef9193be371b876817215278fd4..edcb97282664ab3aa4321f5e123b1b9ae76e023c 100755 --- a/bluez/simple-agent +++ b/bluez/simple-agent @@ -1,4 +1,4 @@ -#! /usr/bin/python +#! /usr/bin/python3 # -*- coding: utf-8 -*- # # BlueZ - Bluetooth protocol stack for Linux @@ -20,8 +20,6 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -from __future__ import absolute_import, print_function, unicode_literals - from gi.repository import GObject import sys @@ -29,138 +27,66 @@ import dbus import dbus.service import dbus.mainloop.glib from optparse import OptionParser +import os +# import from toplevel directory +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) +from apertis_tests_lib.bluez import AskAgent +from apertis_tests_lib.bluez import build_device_path -def ask(prompt): - try: - return raw_input(prompt) - except: - return input(prompt) - - -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - - -class Agent(dbus.service.Object): - exit_on_release = True - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - print("Release") - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print("Authorize (%s, %s)" % (device, uuid)) - authorize = ask("Authorize connection (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Connection rejected by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - print("RequestPinCode (%s)" % (device)) - return ask("Enter PIN Code: ") - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="u") - def RequestPasskey(self, device): - print("RequestPasskey (%s)" % (device)) - passkey = ask("Enter passkey: ") - return dbus.UInt32(passkey) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - print("RequestConfirmation (%s, %06d)" % (device, passkey)) - confirm = ask("Confirm passkey (yes/no): ") - if (confirm == "yes"): - return - raise Rejected("Passkey doesn't match") - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("ConfirmModeChange (%s)" % (mode)) - authorize = ask("Authorize mode change (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Mode change by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("Cancel") - - -def create_device_reply(device): - print("New device (%s)" % (device)) - mainloop.quit() - - -def create_device_error(error): - print("Creating device failed: %s" % (error)) - mainloop.quit() if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + mainloop = GObject.MainLoop() + + def pair_reply(): + print("New device (%s)" % (device_path)) + mainloop.quit() + + def pair_error(error): + print("Creating device failed: %s" % (error)) + mainloop.quit() bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), - "org.bluez.Manager") - - capability = "KeyboardDisplay" + "org.freedesktop.DBus.ObjectManager") + agent_manager_iface = dbus.Interface(bus.get_object('org.bluez', + '/org/bluez'), + 'org.bluez.AgentManager1') parser = OptionParser() - parser.add_option("-c", "--capability", action="store", - type="string", dest="capability") + parser.add_option("-c", "--capability", type="string", + default='KeyboardDisplay') (options, args) = parser.parse_args() - if options.capability: - capability = options.capability + + objs = manager.GetManagedObjects() + adapters = {p: i for p, i in objs.items() + if 'org.bluez.Adapter1' in i.keys()} if len(args) > 0: - path = manager.FindAdapter(args[0]) + path = '/org/bluez/' + args[0] else: - path = manager.DefaultAdapter() + path = list(adapters.keys())[0] adapter = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Adapter") - - path = "/test/agent" - agent = Agent(bus, path) + "org.bluez.Adapter1") - mainloop = GObject.MainLoop() + agent = AskAgent(bus, '/test/agent', mainloop) if len(args) > 1: + device_path = build_device_path(args[0], args[1]) + if len(args) > 2: - device = adapter.FindDevice(args[1]) - adapter.RemoveDevice(device) + adapter.RemoveDevice(device_path) agent.set_exit_on_release(False) - adapter.CreatePairedDevice(args[1], path, capability, - timeout=60000, - reply_handler=create_device_reply, - error_handler=create_device_error) + device = bus.get_object('org.bluez', device_path) + device.Pair(dbus_interface='org.bluez.Device1', + reply_handler=pair_reply, + error_handler=pair_error, + timeout=60000) else: - adapter.RegisterAgent(path, capability) + agent_manager_iface.RegisterAgent('/test/agent', options.capability) print("Agent registered") mainloop.run() diff --git a/bluez/test-avrcp.py b/bluez/test-avrcp.py index aea30044f8b961ece26f16759a919a92e0dded98..dc8bf25bd99c4aff36f0b0c762f9f3bcc9b0e99c 100755 --- a/bluez/test-avrcp.py +++ b/bluez/test-avrcp.py @@ -1,4 +1,4 @@ -#! /usr/bin/python +#! /usr/bin/python3 # -*- coding: utf-8 -*- # # BlueZ - Bluetooth protocol stack for Linux @@ -24,10 +24,19 @@ import time import sys import dbus from optparse import OptionParser, make_option +import os + +# import from toplevel directory +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) +from apertis_tests_lib.bluez import AskAgent +from apertis_tests_lib.bluez import build_device_path +from apertis_tests_lib.bluez import get_hci + bus = dbus.SystemBus() -manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.bluez.Manager") +manager = dbus.Interface(bus.get_object("org.bluez", "/"), + "org.freedesktop.DBus.ObjectManager") option_list = [ make_option("-i", "--device", action="store", @@ -38,29 +47,35 @@ parser = OptionParser(option_list=option_list) (options, args) = parser.parse_args() +objs = manager.GetManagedObjects() +adapters = {p: i for p, i in objs.items() + if 'org.bluez.Adapter1' in i.keys()} + if options.dev_id: - adapter_path = manager.FindAdapter(options.dev_id) + adapter_path = '/org/bluez/' + options.dev_id else: - adapter_path = manager.DefaultAdapter() + adapter_path = list(adapters.keys())[0] adapter = dbus.Interface(bus.get_object("org.bluez", adapter_path), - "org.bluez.Adapter") + "org.bluez.Adapter1") if len(args) < 1: print("""Usage: %s [-i device] <bdaddr>""" % sys.argv[0]) sys.exit(1) -device = adapter.FindDevice(args[0]) +device = build_device_path(get_hci(adapter_path), args[0]) audio = dbus.Interface(bus.get_object("org.bluez", device), "org.bluez.AudioSource") -props = audio.GetProperties() -if props["State"] != "connected": - audio.Connect() +# Connect all profiles +connected = audio.Get('org.bluez.Device1', 'Connected', + dbus_interface='org.freedesktop.DBus.Properties') +if not connected: + audio.Connect(dbus_interface='org.bluez.Device1') time.sleep(5) control = dbus.Interface(bus.get_object("org.bluez", device), - "org.bluez.Control") + "org.bluez.MediaControl1") control.VolumeDown() control.VolumeUp() diff --git a/bluez/ubt b/bluez/ubt index 0bf647cd4982f57d1bf3b67ca4ddf7d8dd0bdf62..23e36a931172e8be111ad3d5bb16081fd813351d 100755 --- a/bluez/ubt +++ b/bluez/ubt @@ -1,4 +1,4 @@ -#! /usr/bin/python +#! /usr/bin/python3 # -*- coding: utf-8 -*- # # BlueZ - Bluetooth protocol stack for Linux @@ -20,34 +20,25 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -import gobject +from gi.repository import GObject + import os import dbus import dbus.service import dbus.mainloop.glib +import sys import time -import dialog -import urllib2 +import urllib.request from optparse import OptionParser -profiles = { - "00001103-0000-1000-8000-00805f9b34fb": "DUN GW", - "00001105-0000-1000-8000-00805f9b34fb": "OPP Server", - "0000110a-0000-1000-8000-00805f9b34fb": "PBAP PSE", - "0000110c-0000-1000-8000-00805f9b34fb": "AVRCP Target", - "0000110e-0000-1000-8000-00805f9b34fb": "AVRCP Control", - "00001112-0000-1000-8000-00805f9b34fb": "A2DP Source", - "00001116-0000-1000-8000-00805f9b34fb": "HSP AG", - "0000112f-0000-1000-8000-00805f9b34fb": "SAP Server", - "0000112f-0000-1000-8000-00805f9b34fb": "PAN NAP", - "00001132-0000-1000-8000-00805f9b34fb": "MAP MSE", -} - -OBEX_DBUS = "org.bluez.obex.client" -OBEX_CLIENT = "org.bluez.obex.Client" -OBEX_SESSION = "org.bluez.obex.Session" -OBEX_MAP = "org.bluez.obex.MessageAccess" -OBEX_PBAP = "org.bluez.obex.PhonebookAccess" +# import from toplevel directory +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) +from apertis_tests_lib.bluez import AdapterDeviceTester +from apertis_tests_lib.bluez import AskAgent +from apertis_tests_lib.bluez import build_device_path +from apertis_tests_lib.bluez import DeviceProfileTester +from apertis_tests_lib.bluez import get_hci + VCARD_CONTENTS = """BEGIN:VCARD VERSION:2.1 @@ -82,7 +73,7 @@ def process_next(): f = process_q[0] process_q.remove(f) - gobject.idle_add(f) + GObject.idle_add(f) def TEST_START(str): @@ -97,69 +88,6 @@ def TEST_PASSED(str): print ("TEST FINISHED: %s : PASSED" % str) -def ask(prompt): - try: - return raw_input(prompt) - except: - return input(prompt) - - -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - - -class BlueZAgent(dbus.service.Object): - exit_on_release = False - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print(" | Authorizing %s, %s" % (device, uuid)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - print("| RequestPinCode (%s)" % (device)) - return ask("| Enter PIN Code: ") - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("| DisplayPasskey (%s, %06d)" % (device, passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - print("| DisplayPinCode (%s, %s)" % (device, pincode)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - print("| Confiming passkey %06d (%s)" % (passkey, device)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("| Confirming ModeChange (%s)" % (mode)) - return - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("| Cancel") - - class PbapTransfer: def __init__(self): self.path = None @@ -170,23 +98,18 @@ class PbapClient: def __init__(self, session_path): self.transfers = 0 self.props = dict() + self.__filters = {} self.flush_func = None bus = dbus.SessionBus() - obj = bus.get_object("org.bluez.obex.client", session_path) - self.session = dbus.Interface(obj, "org.bluez.obex.Session") - self.pbap = dbus.Interface(obj, - "org.bluez.obex.PhonebookAccess") - bus.add_signal_receiver(self.transfer_complete, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Complete", - path_keyword="path") - bus.add_signal_receiver(self.transfer_error, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Error", - path_keyword="path") - - def register(self, reply, transfer): - (path, properties) = reply + obj = bus.get_object("org.bluez.obex", session_path) + self.__pbap = dbus.Interface(obj, "org.bluez.obex.PhonebookAccess1") + bus.add_signal_receiver( + self.__transfer_properties_changed, + dbus_interface="org.freedesktop.DBus.Properties", + signal_name="PropertiesChanged", + path_keyword="path") + + def register(self, path, properties, transfer): transfer.path = path transfer.filename = properties["Filename"] self.props[path] = transfer @@ -195,38 +118,48 @@ class PbapClient: TEST_FAILED("PBAP PSE: %s" % err) process_next() - def transfer_complete(self, path): - req = self.props.get(path) - if req is None: - return - self.transfers -= 1 - del self.props[path] + def set_filters(self, filters): + self.__filter = filters - if (len(self.props) == 0) and (self.transfers == 0): - if self.flush_func is not None: - f = self.flush_func - self.flush_func = None - f() + def __transfer_properties_changed(self, interface, changed_properties, + invalidated_properties, path): + if interface != 'org.bluez.obex.Transfer1': + return + if 'Status' not in changed_properties: + return - def transfer_error(self, code, message, path): req = self.props.get(path) if req is None: return - TEST_FAILED("PBAP PSE: %s" % message) - process_next() + + new_status = changed_properties['Status'] + if new_status == 'complete': + self.transfers -= 1 + del self.props[path] + + if (len(self.props) == 0) and (self.transfers == 0): + if self.flush_func is not None: + f = self.flush_func + self.flush_func = None + f() + elif new_status == 'error': + TEST_FAILED("PBAP PSE: %s" % message) + process_next() def pull(self, vcard, target): req = PbapTransfer() - self.pbap.Pull(vcard, target, - reply_handler=lambda r: self.register(r, req), - error_handler=self.error) + self.__pbap.Pull( + vcard, target, self.__filters, + reply_handler=lambda p, p2: self.register(p, p2, req), + error_handler=self.error) self.transfers += 1 def pull_all(self, target): req = PbapTransfer() - self.pbap.PullAll(target, - reply_handler=lambda r: self.register(r, req), - error_handler=self.error) + self.__pbap.PullAll( + target, self.__filters, + reply_handler=lambda p, p2: self.register(p, p2, req), + error_handler=self.error) self.transfers += 1 def flush_transfers(self, func): @@ -235,37 +168,43 @@ class PbapClient: self.flush_func = func def interface(self): - return self.pbap + return self.__pbap + transfers = [] class ObexdAgent(dbus.service.Object): - def __init__(self, conn=None, obj_path=None): - dbus.service.Object.__init__(self, conn, obj_path) - self.pending_auth = False - - @dbus.service.method("org.bluez.obex.Agent", in_signature="osssii", + @dbus.service.method("org.bluez.obex.Agent1", in_signature="o", out_signature="s") - def Authorize(self, dpath, device, filename, ftype, length, time): + def AuthorizePush(self, dpath): global transfers - self.pending_auth = False - transfers.append(OppTransfer(dpath, filename, 0, length)) - return "/tmp/vcard-fa7e" + bus = dbus.SessionBus() + obj = bus.get_object('org.bluez.obex', dpath) + name = obj.Get('org.bluez.obex.Transfer1', 'Name', + dbus_interface='org.freedesktop.DBus.Properties') + + transfers.append(OppTransfer(dpath)) + return name - @dbus.service.method("org.bluez.obex.Agent", in_signature="", + @dbus.service.method("org.bluez.obex.Agent1", in_signature="", out_signature="") def Cancel(self): - self.pending_auth = False + pass + + @dbus.service.method("org.bluez.obex.Agent1", in_signature="", + out_signature="") + def Release(self): + pass class OppTransfer(object): - def __init__(self, dpath, filename=None, transfered=-1, size=-1): + def __init__(self, dpath): self.dpath = dpath - self.filename = filename - self.transfered = transfered - self.size = size + self.filename = None + self.transfered = -1 + self.size = -1 def update(self, filename=None, transfered=-1, total=-1): if filename: @@ -276,7 +215,7 @@ class OppTransfer(object): def cancel(self): transfer_iface = dbus.Interface(bus.get_object( "org.bluez.obex", self.dpath), - "org.bluez.obex.Transfer") + "org.bluez.obex.Transfer1") transfer_iface.Cancel() def __str__(self): @@ -291,38 +230,38 @@ class OppClient: self.progress = 0 self.transfer_path = None bus = dbus.SessionBus() - obj = bus.get_object("org.bluez.obex.client", session_path) - self.session = dbus.Interface(obj, "org.bluez.obex.Session") - self.opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush") - bus.add_signal_receiver(self.transfer_complete, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Complete", - path_keyword="path") - bus.add_signal_receiver(self.transfer_error, - dbus_interface="org.bluez.obex.Transfer", - signal_name="Error", - path_keyword="path") - - def create_transfer_reply(self, reply): - (path, properties) = reply + obj = bus.get_object("org.bluez.obex", session_path) + self.__opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush1") + bus.add_signal_receiver( + self.__transfer_properties_changed, + dbus_interface="org.freedesktop.DBus.Properties", + signal_name="PropertiesChanged", + path_keyword="path") + + def __create_transfer_reply(self, path, properties): self.transfer_path = path self.transfer_size = properties["Size"] - def error(self, err): + def __error(self, err): print(err) mainloop.quit() - def transfer_complete(self, path): - if path != self.transfer_path: + def __transfer_properties_changed(self, interface, changed_properties, + invalidated_properties, path): + if interface != 'org.bluez.obex.Transfer1': + return + if 'Status' not in changed_properties: return - TEST_PASSED("OPP Server") - delete_temp_vcard() - process_next() - def transfer_error(self, code, message, path): if path != self.transfer_path: return - TEST_FAILED("OPP Server") + + new_status = changed_properties['Status'] + if new_status == 'complete': + TEST_PASSED("OPP Server") + elif new_status == 'error': + TEST_FAILED("OPP Server") + delete_temp_vcard() process_next() @@ -330,33 +269,17 @@ class OppClient: return def pull_business_card(self, filename): - self.opp.PullBusinessCard(os.path.abspath(filename), - reply_handler=self.create_transfer_reply, - error_handler=self.error) + self.__opp.PullBusinessCard(os.path.abspath(filename), + reply_handler=self.__create_transfer_reply, + error_handler=self.__error) def send_file(self, filename): - self.opp.SendFile(os.path.abspath(filename), - reply_handler=self.create_transfer_reply, - error_handler=self.error) - - -class Device(): - def __init__(self, path): - self.path = path - self.profiles = [] + self.__opp.SendFile(os.path.abspath(filename), + reply_handler=self.__create_transfer_reply, + error_handler=self.__error) - self.device = dbus.Interface(bus.get_object("org.bluez", self.path), - "org.bluez.Device") - self.props = self.device.GetProperties() - self.device.SetProperty("Trusted", True) - - bus.add_signal_receiver(self.device_property_changed, - dbus_interface="org.bluez.Device", - signal_name="PropertyChanged") - - if self.props["Paired"]: - self.get_properties() +class PbapDeviceProfileTester(DeviceProfileTester): def test_map(self): def set_folder(session, new_dir): for node in new_dir.split("/"): @@ -380,17 +303,21 @@ class Device(): try: bus = dbus.SessionBus() - client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), - OBEX_CLIENT) + client = dbus.Interface(bus.get_object('org.bluez.obex', + '/org/bluez/obex'), + 'org.bluez.obex.Client1') - path = client.CreateSession(self.props["Address"], - {"Target": "map"}) + address = self._device_obj.Get( + 'org.bluez.Device1', + 'Address', + dbus_interface='org.freedesktop.DBus.Properties') + path = client.CreateSession(address, {"Target": "map"}) - session = dbus.Interface(bus.get_object(OBEX_DBUS, path), - OBEX_SESSION) + session = dbus.Interface(bus.get_object('org.bluez.obex', path), + 'org.bluez.obex.Session1') - map = dbus.Interface(bus.get_object(OBEX_DBUS, path), - OBEX_MAP) + map = dbus.Interface(bus.get_object('org.bluez.obex', path), + 'org.bluez.obex.MessageAccess1') set_folder(map, "telecom/msg") folder = map.GetFolderListing(dict()) @@ -414,19 +341,20 @@ class Device(): try: path = paths[0] + pbap_client.set_filters({ + 'Format': 'vcard30', + 'Fields': ["VERSION", "FN", "TEL"], + }) + pbap_client.interface().Select("int", path) ret = pbap_client.interface().GetSize() - ret = pbap_client.interface().List() + ret = pbap_client.interface().List({}) for item in ret: - pbap_client.interface().SetFormat("vcard30") - pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]) pbap_client.pull(item[0], BASE_PATH + "/pbap-vcard-" + path + item[1]) - pbap_client.interface().SetFormat("vcard30") - pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]) pbap_client.pull_all(BASE_PATH + "/pbap-vcard-ALL-" + path) pbap_client.flush_transfers( @@ -441,18 +369,22 @@ class Device(): try: bus = dbus.SessionBus() - client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), - OBEX_CLIENT) + client = dbus.Interface(bus.get_object('org.bluez.obex', + '/org/bluez/obex'), + 'org.bluez.obex.Client1') - session_path = client.CreateSession(self.props["Address"], - {"Target": "PBAP"}) + address = self._device_obj.Get( + 'org.bluez.Device1', + 'Address', + dbus_interface='org.freedesktop.DBus.Properties') + session_path = client.CreateSession(address, {"Target": "pbap"}) pbap_client = PbapClient(session_path) - self.pbap_test_paths(pbap_client, ["PB", "ICH", "OCH", - "MCH", "CCH"]) - except: - TEST_FAILED("PBAP PSE") + self.pbap_test_paths(pbap_client, + ["PB", "ICH", "OCH", "MCH", "CCH"]) + except Exception as error: + TEST_FAILED("PBAP PSE: " + str(error)) process_next() def test_opp_server(self): @@ -460,43 +392,59 @@ class Device(): try: bus = dbus.SessionBus() - client = dbus.Interface(bus.get_object(OBEX_DBUS, "/"), - OBEX_CLIENT) + client = dbus.Interface(bus.get_object('org.bluez.obex', + '/org/bluez/obex'), + 'org.bluez.obex.Client1') - path = client.CreateSession(self.props["Address"], - {"Target": "OPP"}) + address = self._device_obj.Get( + 'org.bluez.Device1', + 'Address', + dbus_interface='org.freedesktop.DBus.Properties') + path = client.CreateSession(address, {"Target": "opp"}) opp_client = OppClient(path) create_temp_vcard() opp_client.send_file("/tmp/vcard.vcf") - except: + except Exception as error: delete_temp_vcard() - TEST_FAILED("OPP Server") + TEST_FAILED("OPP Server: " + str(error)) process_next() def test_opp_client(self): TEST_START("OPP client") print("Start an OPP transfer in the phone...") - def transfer_completed(dpath, success): - if success: + def properties_changed(interface_name, changed_properties, + invalidated_properties, path): + if interface_name != 'org.bluez.obex.Transfer1': + return + if 'Status' not in changed_properties: + return + + if changed_properties['Status']: TEST_PASSED("OPP Client") else: TEST_FAILED("OPP Client") + manager.UnregisterAgent(agent._object_path) + agent.remove_from_connection() + match.remove() + process_next() bus = dbus.SessionBus() - manager = dbus.Interface(bus.get_object("org.bluez.obex", "/"), - "org.bluez.obex.Manager") - path_server = "/server/agent" - agent = ObexdAgent(bus, path_server) - manager.RegisterAgent(path_server) + manager = dbus.Interface(bus.get_object("org.bluez.obex", + "/org/bluez/obex"), + "org.bluez.obex.AgentManager1") + agent = ObexdAgent(bus, "/server/agent") + manager.RegisterAgent(agent._object_path) - bus.add_signal_receiver(transfer_completed, - dbus_interface="org.bluez.obex.Manager", - signal_name="TransferCompleted") + match = bus.add_signal_receiver( + properties_changed, + dbus_interface="org.freedesktop.DBus.Properties", + signal_name="PropertiesChanged", + path_keyword='path') def test_dun(self): TEST_START("DUN GW") @@ -508,9 +456,14 @@ class Device(): "net.connman.Manager") services = manager.GetServices() + device_name = self._device_obj.Get( + 'org.bluez.Device1', + 'Name', + dbus_interface='org.freedesktop.DBus.Properties') + path = None for s in services: - if s[1]["Name"] == self.props["Name"]: + if s[1]["Name"] == device_name: path = s[0] break if not path: @@ -519,7 +472,7 @@ class Device(): service = dbus.Interface(bus.get_object("net.connman", path), "net.connman.Service") service.Connect(timeout=60000) - url = urllib2.urlopen("http://connman.net") + url = urllib.request.urlopen("http://connman.net") f = open(BASE_PATH + "/DUN", 'w') f.write(url.read(1000000)) f.close() @@ -527,8 +480,8 @@ class Device(): TEST_PASSED("DUN GW") - except: - TEST_FAILED("DUN GW") + except Exception as error: + TEST_FAILED("DUN GW: " + str(error)) process_next() @@ -543,9 +496,14 @@ class Device(): services = manager.GetServices() + device_name = self._device_obj.Get( + 'org.bluez.Device1', + 'Name', + dbus_interface='org.freedesktop.DBus.Properties') + path = None for s in services: - if s[1]["Name"] == self.props["Name"]: + if s[1]["Name"] == device_name: path = s[0] break if not path: @@ -562,8 +520,8 @@ class Device(): service.Disconnect(timeout=60000) TEST_PASSED("PAN NAP") - except: - TEST_FAILED("PAN NAP") + except Exception as error: + TEST_FAILED("PAN NAP: " + str(error)) process_next() @@ -571,177 +529,52 @@ class Device(): TEST_START("A2DP Source") try: - bus = dbus.SystemBus() - audio = dbus.Interface(bus.get_object("org.bluez", self.path), - "org.bluez.AudioSource") - - props = audio.GetProperties() + connected = self._device_obj.Get( + 'org.bluez.Device1', + 'Connected', + dbus_interface='org.freedesktop.DBus.Properties') + if not connected: + self._device_obj.Connect(dbus_interface='org.bluez.Device1') - if props["State"] != "connected" and \ - props["State"] != "playing": - audio.Connect() + ret = input("Play music from the device! Did you hear" + " it (y/n): ") - ret = ask("| Play music from the device! Did you hear" - " it (y/n)\n") - - audio.Disconnect() + if not connected: + self._device_obj.Disconnect(dbus_interface='org.bluez.Device1') if ret == 'y' or ret == 'Y': TEST_PASSED("A2DP Source") else: raise - except: - TEST_FAILED("A2DP Source") + except Exception as error: + TEST_FAILED("A2DP Source: " + str(error)) process_next() - def device_test_profiles(self): - if "MAP MSE" in self.profiles: + def device_test_profiles(self, profiles): + if "MAP MSE" in profiles: process_q.append(self.test_map) - if "PBAP PSE" in self.profiles: + if "PBAP PSE" in profiles: process_q.append(self.test_pbap) process_q.append(self.test_opp_client) - if "OPP Server" in self.profiles: + if "OPP Server" in profiles: process_q.append(self.test_opp_server) - if "DUN GW" in self.profiles: + if "DUN GW" in profiles: process_q.append(self.test_dun) - if "PAN NAP" in self.profiles: + if "PAN NAP" in profiles: process_q.append(self.test_pan) - if "A2DP Source" in self.profiles: + if "A2DP Source" in profiles: process_q.append(self.test_a2dp_source) process_next() - def device_parse_uuids(self, uuids): - print ("| Profiles supported:") - for uuid in uuids: - if uuid in profiles.keys(): - print ("\t * %s" % profiles[uuid]) - self.profiles.append(profiles[uuid]) - self.device_test_profiles() - - def get_properties(self): - props = self.device.GetProperties() - self.device_parse_uuids(props["UUIDs"]) - - def device_property_changed(self, name, value): - if name == "Paired" and value: - self.device.SetProperty("Trusted", True) - TEST_PASSED("Pairable Responder") - gobject.timeout_add(7000, self.get_properties) - - -class Adapter(): - def __init__(self, adapter, device): - self.adapter = adapter - self.device = device - self.found = {} - self.already_running = False - - def create_device_reply(self, device): - TEST_PASSED("Pairing Initiatior") - self.adapter.RemoveDevice(device) - process_next() - - def create_device_error(self, error): - TEST_FAILED("Pairing Initiatior: Create device failed %s" - % (error)) - process_next() - - def device_created(self, path): - if self.already_running: - print("Ignore %s" % path) - return - self.already_running = True - print ("| Device created %s" % path) - - Device(path) - - def create_paired_device(self, device): - try: - path = self.adapter.FindDevice(device) - self.adapter.RemoveDevice(path) - except: - pass - - path = self.adapter.CreatePairedDevice( - device, - "/test/agent2", "DisplayYesNo", - timeout=60000, - reply_handler=self.create_device_reply, - error_handler=self.create_device_error) - - def show_menu(self): - self.adapter.StopDiscovery() - - d = dialog.Dialog() - device = d.menu("Select one device to pair with:", - choices=self.found.items()) - d.clear() - - if device[1] == '': - ret = ask("| Retry discovery (y/n)") - if ret == 'y' or ret == 'Y': - self.start_paring_initiator() - return - TEST_FAILED("Pairing Initiatior: No device selected") - exit(0) - - self.create_paired_device(self.found[device[1]]) - - def device_found(self, address, values): - if values["Name"] in self.found: - return - - self.found[values["Name"]] = values["Address"] - - def get_device(self): - path = self.adapter.FindDevice(self.device) - - Device(path) - - def start_paring_initiator(self): - TEST_START("Pairing Initiatior") - - if self.device: - self.create_paired_device(self.device) - return - - self.adapter.StartDiscovery() - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Pairable", False) - - bus.add_signal_receiver(self.device_found, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceFound") - - gobject.timeout_add(12000, self.show_menu) - - def start_paring_responder(self): - TEST_START("Pairing Responder") - - props = self.adapter.GetProperties() - - print("| In the phone start pairing with %s (%s)." % - (props["Name"], props["Address"])) - - self.adapter.SetProperty("Powered", True) - self.adapter.SetProperty("Name", "BlueZ") - self.adapter.SetProperty("Discoverable", True) - self.adapter.SetProperty("Pairable", True) - - bus.add_signal_receiver(self.device_created, - dbus_interface="org.bluez.Adapter", - signal_name="DeviceCreated") - def parse_options(): parser.add_option("-i", "--interface", dest="interface", @@ -770,36 +603,75 @@ if __name__ == "__main__": (options, args) = parse_options() - mainloop = gobject.MainLoop() + mainloop = GObject.MainLoop() bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), - "org.bluez.Manager") + object_manager_iface = dbus.Interface(bus.get_object('org.bluez', '/'), + 'org.freedesktop.DBus.ObjectManager') + agent_manager_iface = dbus.Interface(bus.get_object('org.bluez', + '/org/bluez'), + 'org.bluez.AgentManager1') + + objects = object_manager_iface.GetManagedObjects() + adapters = {p: i for p, i in objects.items() + if 'org.bluez.Adapter1' in i.keys()} if options.interface: - path = manager.FindAdapter(options.interface) + path = '/org/bluez/' + options.interface else: - path = manager.DefaultAdapter() + # Arbitrarily choose the first adapter as default + path = list(adapters.keys())[0] - adapter_iface = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Adapter") + adapter_obj = bus.get_object("org.bluez", path) if options.skip_pair and not options.device: print ("Device not specified") - exit(0) + exit(1) + + if options.device: + path = build_device_path(get_hci(adapter_obj.object_path), + options.device) + device_obj = bus.get_object('org.bluez', path) + else: + device_obj = None + + agent = AskAgent(bus, "/test/agent", mainloop) + tester = AdapterDeviceTester(bus, adapter_obj, device_obj, + PbapDeviceProfileTester) + agent_manager_iface.RegisterAgent("/test/agent", "DisplayYesNo") + agent_manager_iface.RequestDefaultAgent('/test/agent') + + def test1(): + def test1_cb(error_message): + if error_message: + TEST_FAILED(error_message) + exit(1) + else: + TEST_PASSED('Pairing Initiator') + process_next() + + TEST_START('Pairing Initiator') + tester.start_pairing_initiator(test1_cb) + + def test2(): + def test2_cb(error_message): + if error_message: + TEST_FAILED(error_message) + exit(1) + else: + TEST_PASSED('Pairing Responder') + process_next() - agent = BlueZAgent(bus, "/test/agent") - agent = BlueZAgent(bus, "/test/agent2") - adapter = Adapter(adapter_iface, options.device) - adapter_iface.RegisterAgent("/test/agent", "DisplayYesNo") + TEST_START('Pairing Responder') + tester.start_pairing_responder(test2_cb) if options.skip_pair: - process_q.append(adapter.get_device) + process_q.append(tester.get_device) else: - process_q.append(adapter.start_paring_initiator) - process_q.append(adapter.start_paring_responder) + process_q.append(test1) + process_q.append(test2) create_base_path() - gobject.timeout_add(2000, process_next) + GObject.timeout_add(2000, process_next) mainloop.run() - adapter_iface.UnregisterAgent("/test/agent") + agent_manager_iface.UnregisterAgent("/test/agent")