Forked from
pkg / apertis-tests
627 commits behind the upstream repository.
-
Philip Withnall authored
When raising an exception, it’s generally a good idea to actually include the exception value — otherwise Python will re-raise the ‘last exception which was active in the current scope’, which is not so useful, especially when you want to print that exception. Reviewed-by:
Sjoerd Simons <sjoerd.simons@collabora.co.uk> Signed-off-by:
Philip Withnall <philip.withnall@collabora.co.uk> Differential Revision: https://phabricator.apertis.org/D1296
Philip Withnall authoredWhen raising an exception, it’s generally a good idea to actually include the exception value — otherwise Python will re-raise the ‘last exception which was active in the current scope’, which is not so useful, especially when you want to print that exception. Reviewed-by:
Sjoerd Simons <sjoerd.simons@collabora.co.uk> Signed-off-by:
Philip Withnall <philip.withnall@collabora.co.uk> Differential Revision: https://phabricator.apertis.org/D1296
ubt 19.52 KiB
#! /usr/bin/python3
# -*- coding: utf-8 -*-
#
# BlueZ - Bluetooth protocol stack for Linux
#
# Copyright © 2012, 2015 Collabora Ltd.
#
# SPDX-License-Identifier: GPL-2.0+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from gi.repository import GObject
import os
import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import urllib.request
from optparse import OptionParser
# import from toplevel directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir))
from apertis_tests_lib.bluez import AdapterDeviceTester
from apertis_tests_lib.bluez import AskAgent
from apertis_tests_lib.bluez import build_device_path
from apertis_tests_lib.bluez import DeviceProfileTester
from apertis_tests_lib.bluez import get_hci
VCARD_CONTENTS = """BEGIN:VCARD
VERSION:2.1
N:;Collabora;;;
FN:Collabora
TEL;CELL;PREF:145
END:VCARD
"""
def create_temp_vcard():
f = open('/tmp/vcard.vcf', 'w')
f.write(VCARD_CONTENTS)
f.close()
def delete_temp_vcard():
try:
os.remove('/tmp/vcard.vcf')
except:
return
BASE_PATH = ""
process_q = []
def process_next():
if len(process_q) == 0:
mainloop.quit()
return
f = process_q[0]
process_q.remove(f)
GObject.idle_add(f)
def TEST_START(str):
print ("TEST STARTED: %s" % str)
def TEST_FAILED(str):
print ("TEST FINISHED: %s : FAILED" % str)
def TEST_PASSED(str):
print ("TEST FINISHED: %s : PASSED" % str)
class PbapTransfer:
def __init__(self):
self.path = None
self.filename = None
class PbapClient:
def __init__(self, session_path):
self.transfers = 0
self.props = dict()
self.__filters = {}
self.flush_func = None
bus = dbus.SessionBus()
obj = bus.get_object("org.bluez.obex", session_path)
self.__pbap = dbus.Interface(obj, "org.bluez.obex.PhonebookAccess1")
bus.add_signal_receiver(
self.__transfer_properties_changed,
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
path_keyword="path")
def register(self, path, properties, transfer):
transfer.path = path
transfer.filename = properties["Filename"]
self.props[path] = transfer
def error(self, err):
TEST_FAILED("PBAP PSE: %s" % err)
process_next()
def set_filters(self, filters):
self.__filter = filters
def __transfer_properties_changed(self, interface, changed_properties,
invalidated_properties, path):
if interface != 'org.bluez.obex.Transfer1':
return
if 'Status' not in changed_properties:
return
req = self.props.get(path)
if req is None:
return
new_status = changed_properties['Status']
if new_status == 'complete':
self.transfers -= 1
del self.props[path]
if (len(self.props) == 0) and (self.transfers == 0):
if self.flush_func is not None:
f = self.flush_func
self.flush_func = None
f()
elif new_status == 'error':
TEST_FAILED("PBAP PSE: %s" % message)
process_next()
def pull(self, vcard, target):
req = PbapTransfer()
self.__pbap.Pull(
vcard, target, self.__filters,
reply_handler=lambda p, p2: self.register(p, p2, req),
error_handler=self.error)
self.transfers += 1
def pull_all(self, target):
req = PbapTransfer()
self.__pbap.PullAll(
target, self.__filters,
reply_handler=lambda p, p2: self.register(p, p2, req),
error_handler=self.error)
self.transfers += 1
def flush_transfers(self, func):
if (len(self.props) == 0) and (self.transfers == 0):
return
self.flush_func = func
def interface(self):
return self.__pbap
transfers = []
class ObexdAgent(dbus.service.Object):
@dbus.service.method("org.bluez.obex.Agent1", in_signature="o",
out_signature="s")
def AuthorizePush(self, dpath):
global transfers
bus = dbus.SessionBus()
obj = bus.get_object('org.bluez.obex', dpath)
name = obj.Get('org.bluez.obex.Transfer1', 'Name',
dbus_interface='org.freedesktop.DBus.Properties')
transfers.append(OppTransfer(dpath))
return name
@dbus.service.method("org.bluez.obex.Agent1", in_signature="",
out_signature="")
def Cancel(self):
pass
@dbus.service.method("org.bluez.obex.Agent1", in_signature="",
out_signature="")
def Release(self):
pass
class OppTransfer(object):
def __init__(self, dpath):
self.dpath = dpath
self.filename = None
self.transfered = -1
self.size = -1
def update(self, filename=None, transfered=-1, total=-1):
if filename:
self.filename = filename
self.transfered = transfered
self.size = total
def cancel(self):
transfer_iface = dbus.Interface(bus.get_object(
"org.bluez.obex", self.dpath),
"org.bluez.obex.Transfer1")
transfer_iface.Cancel()
def __str__(self):
p = float(self.transfered) / float(self.size) * 100
return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p)
__repr__ = __str__
class OppClient:
def __init__(self, session_path):
self.progress = 0
self.transfer_path = None
bus = dbus.SessionBus()
obj = bus.get_object("org.bluez.obex", session_path)
self.__opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush1")
bus.add_signal_receiver(
self.__transfer_properties_changed,
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
path_keyword="path")
def __create_transfer_reply(self, path, properties):
self.transfer_path = path
self.transfer_size = properties["Size"]
def __error(self, err):
print(err)
mainloop.quit()
def __transfer_properties_changed(self, interface, changed_properties,
invalidated_properties, path):
if interface != 'org.bluez.obex.Transfer1':
return
if 'Status' not in changed_properties:
return
if path != self.transfer_path:
return
new_status = changed_properties['Status']
if new_status == 'complete':
TEST_PASSED("OPP Server")
elif new_status == 'error':
TEST_FAILED("OPP Server")
delete_temp_vcard()
process_next()
def transfer_progress(self, prop, value, path):
return
def pull_business_card(self, filename):
self.__opp.PullBusinessCard(os.path.abspath(filename),
reply_handler=self.__create_transfer_reply,
error_handler=self.__error)
def send_file(self, filename):
self.__opp.SendFile(os.path.abspath(filename),
reply_handler=self.__create_transfer_reply,
error_handler=self.__error)
class PbapDeviceProfileTester(DeviceProfileTester):
def test_map(self):
def set_folder(session, new_dir):
for node in new_dir.split("/"):
session.SetFolder(node)
def message_check(messages):
if messages == {}:
return
for m in messages.values():
assert('Subject' in m)
assert('Timestamp' in m)
assert('SenderAddress' in m)
assert('RecipientAddress' in m)
assert('Type' in m)
assert('Size' in m)
assert('Status' in m)
TEST_START("MAP MSE")
try:
bus = dbus.SessionBus()
client = dbus.Interface(bus.get_object('org.bluez.obex',
'/org/bluez/obex'),
'org.bluez.obex.Client1')
address = self._device_obj.Get(
'org.bluez.Device1',
'Address',
dbus_interface='org.freedesktop.DBus.Properties')
path = client.CreateSession(address, {"Target": "map"})
session = dbus.Interface(bus.get_object('org.bluez.obex', path),
'org.bluez.obex.Session1')
map = dbus.Interface(bus.get_object('org.bluez.obex', path),
'org.bluez.obex.MessageAccess1')
set_folder(map, "telecom/msg")
folder = map.GetFolderListing(dict())
for f in folder:
msg = map.GetMessageListing(f["Name"], dict())
message_check(msg)
TEST_PASSED("MAP MSE")
except:
TEST_FAILED("MAP MSE")
process_next()
def pbap_test_paths(self, pbap_client, paths):
if len(paths) == 0:
TEST_PASSED("PBAP PSE")
process_next()
return
try:
path = paths[0]
pbap_client.set_filters({
'Format': 'vcard30',
'Fields': ["VERSION", "FN", "TEL"],
})
pbap_client.interface().Select("int", path)
ret = pbap_client.interface().GetSize()
ret = pbap_client.interface().List({})
for item in ret:
pbap_client.pull(item[0], BASE_PATH +
"/pbap-vcard-" + path + item[1])
pbap_client.pull_all(BASE_PATH + "/pbap-vcard-ALL-" + path)
pbap_client.flush_transfers(
lambda: self.pbap_test_paths(pbap_client, paths[1:]))
except:
TEST_FAILED("PBAP PSE path")
process_next()
def test_pbap(self):
TEST_START("PBAP PSE")
try:
bus = dbus.SessionBus()
client = dbus.Interface(bus.get_object('org.bluez.obex',
'/org/bluez/obex'),
'org.bluez.obex.Client1')
address = self._device_obj.Get(
'org.bluez.Device1',
'Address',
dbus_interface='org.freedesktop.DBus.Properties')
session_path = client.CreateSession(address, {"Target": "pbap"})
pbap_client = PbapClient(session_path)
self.pbap_test_paths(pbap_client,
["PB", "ICH", "OCH", "MCH", "CCH"])
except Exception as error:
TEST_FAILED("PBAP PSE: " + str(error))
process_next()
def test_opp_server(self):
TEST_START("OPP Server")
try:
bus = dbus.SessionBus()
client = dbus.Interface(bus.get_object('org.bluez.obex',
'/org/bluez/obex'),
'org.bluez.obex.Client1')
address = self._device_obj.Get(
'org.bluez.Device1',
'Address',
dbus_interface='org.freedesktop.DBus.Properties')
path = client.CreateSession(address, {"Target": "opp"})
opp_client = OppClient(path)
create_temp_vcard()
opp_client.send_file("/tmp/vcard.vcf")
except Exception as error:
delete_temp_vcard()
TEST_FAILED("OPP Server: " + str(error))
process_next()
def test_opp_client(self):
TEST_START("OPP client")
print("Start an OPP transfer in the phone...")
def properties_changed(interface_name, changed_properties,
invalidated_properties, path):
if interface_name != 'org.bluez.obex.Transfer1':
return
if 'Status' not in changed_properties:
return
if changed_properties['Status']:
TEST_PASSED("OPP Client")
else:
TEST_FAILED("OPP Client")
manager.UnregisterAgent(agent._object_path)
agent.remove_from_connection()
match.remove()
process_next()
bus = dbus.SessionBus()
manager = dbus.Interface(bus.get_object("org.bluez.obex",
"/org/bluez/obex"),
"org.bluez.obex.AgentManager1")
agent = ObexdAgent(bus, "/server/agent")
manager.RegisterAgent(agent._object_path)
match = bus.add_signal_receiver(
properties_changed,
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
path_keyword='path')
def __test_dun_pan(self, path):
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("net.connman", "/"),
"net.connman.Manager")
services = manager.GetServices()
device_name = self._device_obj.Get(
'org.bluez.Device1',
'Name',
dbus_interface='org.freedesktop.DBus.Properties')
path = None
for s in services:
if s[1]["Name"] == device_name:
path = s[0]
break
if not path:
raise LookupError('Device ‘%s’ not found in services' % device_name)
service = dbus.Interface(bus.get_object("net.connman", path),
"net.connman.Service")
service.Connect(timeout=60000)
url = urllib.request.urlopen("http://connman.net")
f = open(path, 'w')
f.write(url.read(1000000))
f.close()
service.Disconnect(timeout=60000)
def test_dun(self):
TEST_START("DUN GW")
try:
self.__test_dun_pan(BASE_PATH + "/DUN")
TEST_PASSED("DUN GW")
except Exception as error:
TEST_FAILED("DUN GW: " + str(error))
process_next()
def test_pan(self):
TEST_START("PAN NAP")
try:
self.__test_dun_pan(BASE_PATH + "/PAN")
TEST_PASSED("PAN NAP")
except Exception as error:
TEST_FAILED("PAN NAP: " + str(error))
process_next()
def test_a2dp_source(self):
TEST_START("A2DP Source")
try:
connected = self._device_obj.Get(
'org.bluez.Device1',
'Connected',
dbus_interface='org.freedesktop.DBus.Properties')
if not connected:
self._device_obj.Connect(dbus_interface='org.bluez.Device1')
ret = input("Play music from the device! Did you hear"
" it (y/n): ")
if not connected:
self._device_obj.Disconnect(dbus_interface='org.bluez.Device1')
if ret == 'y' or ret == 'Y':
TEST_PASSED("A2DP Source")
else:
raise RuntimeError('Invalid response: ' + ret)
except Exception as error:
TEST_FAILED("A2DP Source: " + str(error))
process_next()
def device_test_profiles(self, profiles):
if "MAP MSE" in profiles:
process_q.append(self.test_map)
if "PBAP PSE" in profiles:
process_q.append(self.test_pbap)
process_q.append(self.test_opp_client)
if "OPP Server" in profiles:
process_q.append(self.test_opp_server)
if "DUN GW" in profiles:
process_q.append(self.test_dun)
if "PAN NAP" in profiles:
process_q.append(self.test_pan)
if "A2DP Source" in profiles:
process_q.append(self.test_a2dp_source)
process_next()
def parse_options():
parser.add_option("-i", "--interface", dest="interface",
help="HCI device", metavar="HCI")
parser.add_option("-s", "--skip", action="store_true", dest="skip_pair",
help="Skip Pairing")
parser.add_option("-d", "--device", dest="device",
help="Device to connect", metavar="DEVICE")
return parser.parse_args()
def create_base_path():
global BASE_PATH
lt = time.localtime()
BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday,
lt.tm_hour, lt.tm_min,
lt.tm_sec)
os.mkdir(BASE_PATH)
if __name__ == "__main__":
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
parser = OptionParser()
(options, args) = parse_options()
mainloop = GObject.MainLoop()
bus = dbus.SystemBus()
object_manager_iface = dbus.Interface(bus.get_object('org.bluez', '/'),
'org.freedesktop.DBus.ObjectManager')
agent_manager_iface = dbus.Interface(bus.get_object('org.bluez',
'/org/bluez'),
'org.bluez.AgentManager1')
objects = object_manager_iface.GetManagedObjects()
adapters = {p: i for p, i in objects.items()
if 'org.bluez.Adapter1' in i.keys()}
if options.interface:
path = '/org/bluez/' + options.interface
else:
# Arbitrarily choose the first adapter as default
path = list(adapters.keys())[0]
adapter_obj = bus.get_object("org.bluez", path)
if options.skip_pair and not options.device:
print ("Device not specified")
exit(1)
if options.device:
path = build_device_path(get_hci(adapter_obj.object_path),
options.device)
device_obj = bus.get_object('org.bluez', path)
else:
device_obj = None
agent = AskAgent(bus, "/test/agent", mainloop)
tester = AdapterDeviceTester(bus, adapter_obj, device_obj,
PbapDeviceProfileTester)
agent_manager_iface.RegisterAgent("/test/agent", "DisplayYesNo")
agent_manager_iface.RequestDefaultAgent('/test/agent')
def test1():
def test1_cb(error_message):
if error_message:
TEST_FAILED(error_message)
exit(1)
else:
TEST_PASSED('Pairing Initiator')
process_next()
TEST_START('Pairing Initiator')
tester.start_pairing_initiator(test1_cb)
def test2():
def test2_cb(error_message):
if error_message:
TEST_FAILED(error_message)
exit(1)
else:
TEST_PASSED('Pairing Responder')
process_next()
TEST_START('Pairing Responder')
tester.start_pairing_responder(test2_cb)
if options.skip_pair:
process_q.append(tester.get_device)
else:
process_q.append(test1)
process_q.append(test2)
create_base_path()
GObject.timeout_add(2000, process_next)
mainloop.run()
agent_manager_iface.UnregisterAgent("/test/agent")