#! /usr/bin/python3 # -*- coding: utf-8 -*- # # BlueZ - Bluetooth protocol stack for Linux # # Copyright © 2012, 2015 Collabora Ltd. # # SPDX-License-Identifier: GPL-2.0+ # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA from gi.repository import GObject import os import dbus import dbus.service import dbus.mainloop.glib import sys import time import urllib.request from optparse import OptionParser # import from toplevel directory sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) from apertis_tests_lib.bluez import AdapterDeviceTester from apertis_tests_lib.bluez import AskAgent from apertis_tests_lib.bluez import build_device_path from apertis_tests_lib.bluez import DeviceProfileTester from apertis_tests_lib.bluez import get_hci VCARD_CONTENTS = """BEGIN:VCARD VERSION:2.1 N:;Collabora;;; FN:Collabora TEL;CELL;PREF:145 END:VCARD """ def create_temp_vcard(): f = open('/tmp/vcard.vcf', 'w') f.write(VCARD_CONTENTS) f.close() def delete_temp_vcard(): try: os.remove('/tmp/vcard.vcf') except: return BASE_PATH = "" process_q = [] def process_next(): if len(process_q) == 0: mainloop.quit() return f = process_q[0] process_q.remove(f) GObject.idle_add(f) def TEST_START(str): print ("TEST STARTED: %s" % str) def TEST_FAILED(str): print ("TEST FINISHED: %s : FAILED" % str) def TEST_PASSED(str): print ("TEST FINISHED: %s : PASSED" % str) class PbapTransfer: def __init__(self): self.path = None self.filename = None class PbapClient: def __init__(self, session_path): self.transfers = 0 self.props = dict() self.__filters = {} self.flush_func = None bus = dbus.SessionBus() obj = bus.get_object("org.bluez.obex", session_path) self.__pbap = dbus.Interface(obj, "org.bluez.obex.PhonebookAccess1") bus.add_signal_receiver( self.__transfer_properties_changed, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path") def register(self, path, properties, transfer): transfer.path = path transfer.filename = properties["Filename"] self.props[path] = transfer def error(self, err): TEST_FAILED("PBAP PSE: %s" % err) process_next() def set_filters(self, filters): self.__filter = filters def __transfer_properties_changed(self, interface, changed_properties, invalidated_properties, path): if interface != 'org.bluez.obex.Transfer1': return if 'Status' not in changed_properties: return req = self.props.get(path) if req is None: return new_status = changed_properties['Status'] if new_status == 'complete': self.transfers -= 1 del self.props[path] if (len(self.props) == 0) and (self.transfers == 0): if self.flush_func is not None: f = self.flush_func self.flush_func = None f() elif new_status == 'error': TEST_FAILED("PBAP PSE: %s" % message) process_next() def pull(self, vcard, target): req = PbapTransfer() self.__pbap.Pull( vcard, target, self.__filters, reply_handler=lambda p, p2: self.register(p, p2, req), error_handler=self.error) self.transfers += 1 def pull_all(self, target): req = PbapTransfer() self.__pbap.PullAll( target, self.__filters, reply_handler=lambda p, p2: self.register(p, p2, req), error_handler=self.error) self.transfers += 1 def flush_transfers(self, func): if (len(self.props) == 0) and (self.transfers == 0): return self.flush_func = func def interface(self): return self.__pbap transfers = [] class ObexdAgent(dbus.service.Object): @dbus.service.method("org.bluez.obex.Agent1", in_signature="o", out_signature="s") def AuthorizePush(self, dpath): global transfers bus = dbus.SessionBus() obj = bus.get_object('org.bluez.obex', dpath) name = obj.Get('org.bluez.obex.Transfer1', 'Name', dbus_interface='org.freedesktop.DBus.Properties') transfers.append(OppTransfer(dpath)) return name @dbus.service.method("org.bluez.obex.Agent1", in_signature="", out_signature="") def Cancel(self): pass @dbus.service.method("org.bluez.obex.Agent1", in_signature="", out_signature="") def Release(self): pass class OppTransfer(object): def __init__(self, dpath): self.dpath = dpath self.filename = None self.transfered = -1 self.size = -1 def update(self, filename=None, transfered=-1, total=-1): if filename: self.filename = filename self.transfered = transfered self.size = total def cancel(self): transfer_iface = dbus.Interface(bus.get_object( "org.bluez.obex", self.dpath), "org.bluez.obex.Transfer1") transfer_iface.Cancel() def __str__(self): p = float(self.transfered) / float(self.size) * 100 return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p) __repr__ = __str__ class OppClient: def __init__(self, session_path): self.progress = 0 self.transfer_path = None bus = dbus.SessionBus() obj = bus.get_object("org.bluez.obex", session_path) self.__opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush1") bus.add_signal_receiver( self.__transfer_properties_changed, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path") def __create_transfer_reply(self, path, properties): self.transfer_path = path self.transfer_size = properties["Size"] def __error(self, err): print(err) mainloop.quit() def __transfer_properties_changed(self, interface, changed_properties, invalidated_properties, path): if interface != 'org.bluez.obex.Transfer1': return if 'Status' not in changed_properties: return if path != self.transfer_path: return new_status = changed_properties['Status'] if new_status == 'complete': TEST_PASSED("OPP Server") elif new_status == 'error': TEST_FAILED("OPP Server") delete_temp_vcard() process_next() def transfer_progress(self, prop, value, path): return def pull_business_card(self, filename): self.__opp.PullBusinessCard(os.path.abspath(filename), reply_handler=self.__create_transfer_reply, error_handler=self.__error) def send_file(self, filename): self.__opp.SendFile(os.path.abspath(filename), reply_handler=self.__create_transfer_reply, error_handler=self.__error) class PbapDeviceProfileTester(DeviceProfileTester): def test_map(self): def set_folder(session, new_dir): for node in new_dir.split("/"): session.SetFolder(node) def message_check(messages): if messages == {}: return for m in messages.values(): assert('Subject' in m) assert('Timestamp' in m) assert('SenderAddress' in m) assert('RecipientAddress' in m) assert('Type' in m) assert('Size' in m) assert('Status' in m) TEST_START("MAP MSE") try: bus = dbus.SessionBus() client = dbus.Interface(bus.get_object('org.bluez.obex', '/org/bluez/obex'), 'org.bluez.obex.Client1') address = self._device_obj.Get( 'org.bluez.Device1', 'Address', dbus_interface='org.freedesktop.DBus.Properties') path = client.CreateSession(address, {"Target": "map"}) session = dbus.Interface(bus.get_object('org.bluez.obex', path), 'org.bluez.obex.Session1') map = dbus.Interface(bus.get_object('org.bluez.obex', path), 'org.bluez.obex.MessageAccess1') set_folder(map, "telecom/msg") folder = map.GetFolderListing(dict()) for f in folder: msg = map.GetMessageListing(f["Name"], dict()) message_check(msg) TEST_PASSED("MAP MSE") except: TEST_FAILED("MAP MSE") process_next() def pbap_test_paths(self, pbap_client, paths): if len(paths) == 0: TEST_PASSED("PBAP PSE") process_next() return try: path = paths[0] pbap_client.set_filters({ 'Format': 'vcard30', 'Fields': ["VERSION", "FN", "TEL"], }) pbap_client.interface().Select("int", path) ret = pbap_client.interface().GetSize() ret = pbap_client.interface().List({}) for item in ret: pbap_client.pull(item[0], BASE_PATH + "/pbap-vcard-" + path + item[1]) pbap_client.pull_all(BASE_PATH + "/pbap-vcard-ALL-" + path) pbap_client.flush_transfers( lambda: self.pbap_test_paths(pbap_client, paths[1:])) except: TEST_FAILED("PBAP PSE path") process_next() def test_pbap(self): TEST_START("PBAP PSE") try: bus = dbus.SessionBus() client = dbus.Interface(bus.get_object('org.bluez.obex', '/org/bluez/obex'), 'org.bluez.obex.Client1') address = self._device_obj.Get( 'org.bluez.Device1', 'Address', dbus_interface='org.freedesktop.DBus.Properties') session_path = client.CreateSession(address, {"Target": "pbap"}) pbap_client = PbapClient(session_path) self.pbap_test_paths(pbap_client, ["PB", "ICH", "OCH", "MCH", "CCH"]) except Exception as error: TEST_FAILED("PBAP PSE: " + str(error)) process_next() def test_opp_server(self): TEST_START("OPP Server") try: bus = dbus.SessionBus() client = dbus.Interface(bus.get_object('org.bluez.obex', '/org/bluez/obex'), 'org.bluez.obex.Client1') address = self._device_obj.Get( 'org.bluez.Device1', 'Address', dbus_interface='org.freedesktop.DBus.Properties') path = client.CreateSession(address, {"Target": "opp"}) opp_client = OppClient(path) create_temp_vcard() opp_client.send_file("/tmp/vcard.vcf") except Exception as error: delete_temp_vcard() TEST_FAILED("OPP Server: " + str(error)) process_next() def test_opp_client(self): TEST_START("OPP client") print("Start an OPP transfer in the phone...") def properties_changed(interface_name, changed_properties, invalidated_properties, path): if interface_name != 'org.bluez.obex.Transfer1': return if 'Status' not in changed_properties: return if changed_properties['Status']: TEST_PASSED("OPP Client") else: TEST_FAILED("OPP Client") manager.UnregisterAgent(agent._object_path) agent.remove_from_connection() match.remove() process_next() bus = dbus.SessionBus() manager = dbus.Interface(bus.get_object("org.bluez.obex", "/org/bluez/obex"), "org.bluez.obex.AgentManager1") agent = ObexdAgent(bus, "/server/agent") manager.RegisterAgent(agent._object_path) match = bus.add_signal_receiver( properties_changed, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword='path') def __test_dun_pan(self, path): bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") services = manager.GetServices() device_name = self._device_obj.Get( 'org.bluez.Device1', 'Name', dbus_interface='org.freedesktop.DBus.Properties') path = None for s in services: if s[1]["Name"] == device_name: path = s[0] break if not path: raise LookupError('Device ‘%s’ not found in services' % device_name) service = dbus.Interface(bus.get_object("net.connman", path), "net.connman.Service") service.Connect(timeout=60000) url = urllib.request.urlopen("http://connman.net") f = open(path, 'w') f.write(url.read(1000000)) f.close() service.Disconnect(timeout=60000) def test_dun(self): TEST_START("DUN GW") try: self.__test_dun_pan(BASE_PATH + "/DUN") TEST_PASSED("DUN GW") except Exception as error: TEST_FAILED("DUN GW: " + str(error)) process_next() def test_pan(self): TEST_START("PAN NAP") try: self.__test_dun_pan(BASE_PATH + "/PAN") TEST_PASSED("PAN NAP") except Exception as error: TEST_FAILED("PAN NAP: " + str(error)) process_next() def test_a2dp_source(self): TEST_START("A2DP Source") try: connected = self._device_obj.Get( 'org.bluez.Device1', 'Connected', dbus_interface='org.freedesktop.DBus.Properties') if not connected: self._device_obj.Connect(dbus_interface='org.bluez.Device1') ret = input("Play music from the device! Did you hear" " it (y/n): ") if not connected: self._device_obj.Disconnect(dbus_interface='org.bluez.Device1') if ret == 'y' or ret == 'Y': TEST_PASSED("A2DP Source") else: raise RuntimeError('Invalid response: ' + ret) except Exception as error: TEST_FAILED("A2DP Source: " + str(error)) process_next() def device_test_profiles(self, profiles): if "MAP MSE" in profiles: process_q.append(self.test_map) if "PBAP PSE" in profiles: process_q.append(self.test_pbap) process_q.append(self.test_opp_client) if "OPP Server" in profiles: process_q.append(self.test_opp_server) if "DUN GW" in profiles: process_q.append(self.test_dun) if "PAN NAP" in profiles: process_q.append(self.test_pan) if "A2DP Source" in profiles: process_q.append(self.test_a2dp_source) process_next() def parse_options(): parser.add_option("-i", "--interface", dest="interface", help="HCI device", metavar="HCI") parser.add_option("-s", "--skip", action="store_true", dest="skip_pair", help="Skip Pairing") parser.add_option("-d", "--device", dest="device", help="Device to connect", metavar="DEVICE") return parser.parse_args() def create_base_path(): global BASE_PATH lt = time.localtime() BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday, lt.tm_hour, lt.tm_min, lt.tm_sec) os.mkdir(BASE_PATH) if __name__ == "__main__": dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) parser = OptionParser() (options, args) = parse_options() mainloop = GObject.MainLoop() bus = dbus.SystemBus() object_manager_iface = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') agent_manager_iface = dbus.Interface(bus.get_object('org.bluez', '/org/bluez'), 'org.bluez.AgentManager1') objects = object_manager_iface.GetManagedObjects() adapters = {p: i for p, i in objects.items() if 'org.bluez.Adapter1' in i.keys()} if options.interface: path = '/org/bluez/' + options.interface else: # Arbitrarily choose the first adapter as default path = list(adapters.keys())[0] adapter_obj = bus.get_object("org.bluez", path) if options.skip_pair and not options.device: print ("Device not specified") exit(1) if options.device: path = build_device_path(get_hci(adapter_obj.object_path), options.device) device_obj = bus.get_object('org.bluez', path) else: device_obj = None agent = AskAgent(bus, "/test/agent", mainloop) tester = AdapterDeviceTester(bus, adapter_obj, device_obj, PbapDeviceProfileTester) agent_manager_iface.RegisterAgent("/test/agent", "DisplayYesNo") agent_manager_iface.RequestDefaultAgent('/test/agent') def test1(): def test1_cb(error_message): if error_message: TEST_FAILED(error_message) exit(1) else: TEST_PASSED('Pairing Initiator') process_next() TEST_START('Pairing Initiator') tester.start_pairing_initiator(test1_cb) def test2(): def test2_cb(error_message): if error_message: TEST_FAILED(error_message) exit(1) else: TEST_PASSED('Pairing Responder') process_next() TEST_START('Pairing Responder') tester.start_pairing_responder(test2_cb) if options.skip_pair: process_q.append(tester.get_device) else: process_q.append(test1) process_q.append(test2) create_base_path() GObject.timeout_add(2000, process_next) mainloop.run() agent_manager_iface.UnregisterAgent("/test/agent")