Skip to content
Snippets Groups Projects
Forked from pkg / apertis-tests
627 commits behind the upstream repository.
ubt 19.52 KiB
#! /usr/bin/python3
# -*- coding: utf-8 -*-
#
# BlueZ - Bluetooth protocol stack for Linux
#
# Copyright © 2012, 2015 Collabora Ltd.
#
# SPDX-License-Identifier: GPL-2.0+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA

from gi.repository import GObject

import os
import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import urllib.request
from optparse import OptionParser

# import from toplevel directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir))
from apertis_tests_lib.bluez import AdapterDeviceTester
from apertis_tests_lib.bluez import AskAgent
from apertis_tests_lib.bluez import build_device_path
from apertis_tests_lib.bluez import DeviceProfileTester
from apertis_tests_lib.bluez import get_hci


VCARD_CONTENTS = """BEGIN:VCARD
VERSION:2.1
N:;Collabora;;;
FN:Collabora
TEL;CELL;PREF:145
END:VCARD
"""


def create_temp_vcard():
    f = open('/tmp/vcard.vcf', 'w')
    f.write(VCARD_CONTENTS)
    f.close()


def delete_temp_vcard():
    try:
        os.remove('/tmp/vcard.vcf')
    except:
        return

BASE_PATH = ""
process_q = []


def process_next():
    if len(process_q) == 0:
        mainloop.quit()
        return

    f = process_q[0]
    process_q.remove(f)

    GObject.idle_add(f)


def TEST_START(str):
    print ("TEST STARTED: %s" % str)


def TEST_FAILED(str):
    print ("TEST FINISHED: %s : FAILED" % str)


def TEST_PASSED(str):
    print ("TEST FINISHED: %s : PASSED" % str)


class PbapTransfer:
    def __init__(self):
        self.path = None
        self.filename = None


class PbapClient:
    def __init__(self, session_path):
        self.transfers = 0
        self.props = dict()
        self.__filters = {}
        self.flush_func = None
        bus = dbus.SessionBus()
        obj = bus.get_object("org.bluez.obex", session_path)
        self.__pbap = dbus.Interface(obj, "org.bluez.obex.PhonebookAccess1")
        bus.add_signal_receiver(
            self.__transfer_properties_changed,
            dbus_interface="org.freedesktop.DBus.Properties",
            signal_name="PropertiesChanged",
            path_keyword="path")

    def register(self, path, properties, transfer):
        transfer.path = path
        transfer.filename = properties["Filename"]
        self.props[path] = transfer

    def error(self, err):
        TEST_FAILED("PBAP PSE: %s" % err)
        process_next()

    def set_filters(self, filters):
        self.__filter = filters

    def __transfer_properties_changed(self, interface, changed_properties,
                                      invalidated_properties, path):
        if interface != 'org.bluez.obex.Transfer1':
            return
        if 'Status' not in changed_properties:
            return

        req = self.props.get(path)
        if req is None:
            return

        new_status = changed_properties['Status']
        if new_status == 'complete':
            self.transfers -= 1
            del self.props[path]

            if (len(self.props) == 0) and (self.transfers == 0):
                if self.flush_func is not None:
                    f = self.flush_func
                    self.flush_func = None
                    f()
        elif new_status == 'error':
            TEST_FAILED("PBAP PSE: %s" % message)
            process_next()

    def pull(self, vcard, target):
        req = PbapTransfer()
        self.__pbap.Pull(
            vcard, target, self.__filters,
            reply_handler=lambda p, p2: self.register(p, p2, req),
            error_handler=self.error)
        self.transfers += 1

    def pull_all(self, target):
        req = PbapTransfer()
        self.__pbap.PullAll(
            target, self.__filters,
            reply_handler=lambda p, p2: self.register(p, p2, req),
            error_handler=self.error)
        self.transfers += 1

    def flush_transfers(self, func):
        if (len(self.props) == 0) and (self.transfers == 0):
            return
        self.flush_func = func

    def interface(self):
        return self.__pbap


transfers = []


class ObexdAgent(dbus.service.Object):
    @dbus.service.method("org.bluez.obex.Agent1", in_signature="o",
                         out_signature="s")
    def AuthorizePush(self, dpath):
        global transfers

        bus = dbus.SessionBus()
        obj = bus.get_object('org.bluez.obex', dpath)
        name = obj.Get('org.bluez.obex.Transfer1', 'Name',
                       dbus_interface='org.freedesktop.DBus.Properties')

        transfers.append(OppTransfer(dpath))
        return name

    @dbus.service.method("org.bluez.obex.Agent1", in_signature="",
                         out_signature="")
    def Cancel(self):
        pass

    @dbus.service.method("org.bluez.obex.Agent1", in_signature="",
                         out_signature="")
    def Release(self):
        pass


class OppTransfer(object):
    def __init__(self, dpath):
        self.dpath = dpath
        self.filename = None
        self.transfered = -1
        self.size = -1

    def update(self, filename=None, transfered=-1, total=-1):
        if filename:
            self.filename = filename
        self.transfered = transfered
        self.size = total

    def cancel(self):
        transfer_iface = dbus.Interface(bus.get_object(
            "org.bluez.obex", self.dpath),
            "org.bluez.obex.Transfer1")
        transfer_iface.Cancel()

    def __str__(self):
        p = float(self.transfered) / float(self.size) * 100
        return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p)

    __repr__ = __str__


class OppClient:
    def __init__(self, session_path):
        self.progress = 0
        self.transfer_path = None
        bus = dbus.SessionBus()
        obj = bus.get_object("org.bluez.obex", session_path)
        self.__opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush1")
        bus.add_signal_receiver(
            self.__transfer_properties_changed,
            dbus_interface="org.freedesktop.DBus.Properties",
            signal_name="PropertiesChanged",
            path_keyword="path")

    def __create_transfer_reply(self, path, properties):
        self.transfer_path = path
        self.transfer_size = properties["Size"]

    def __error(self, err):
        print(err)
        mainloop.quit()

    def __transfer_properties_changed(self, interface, changed_properties,
                                      invalidated_properties, path):
        if interface != 'org.bluez.obex.Transfer1':
            return
        if 'Status' not in changed_properties:
            return

        if path != self.transfer_path:
            return

        new_status = changed_properties['Status']
        if new_status == 'complete':
            TEST_PASSED("OPP Server")
        elif new_status == 'error':
            TEST_FAILED("OPP Server")

        delete_temp_vcard()
        process_next()

    def transfer_progress(self, prop, value, path):
        return

    def pull_business_card(self, filename):
        self.__opp.PullBusinessCard(os.path.abspath(filename),
                                    reply_handler=self.__create_transfer_reply,
                                    error_handler=self.__error)

    def send_file(self, filename):
        self.__opp.SendFile(os.path.abspath(filename),
                            reply_handler=self.__create_transfer_reply,
                            error_handler=self.__error)


class PbapDeviceProfileTester(DeviceProfileTester):
    def test_map(self):
        def set_folder(session, new_dir):
            for node in new_dir.split("/"):
                session.SetFolder(node)

        def message_check(messages):
            if messages == {}:
                return

            for m in messages.values():
                assert('Subject' in m)
                assert('Timestamp' in m)
                assert('SenderAddress' in m)
                assert('RecipientAddress' in m)
                assert('Type' in m)
                assert('Size' in m)
                assert('Status' in m)

        TEST_START("MAP MSE")

        try:
            bus = dbus.SessionBus()

            client = dbus.Interface(bus.get_object('org.bluez.obex',
                                                   '/org/bluez/obex'),
                                    'org.bluez.obex.Client1')

            address = self._device_obj.Get(
                'org.bluez.Device1',
                'Address',
                dbus_interface='org.freedesktop.DBus.Properties')
            path = client.CreateSession(address, {"Target": "map"})

            session = dbus.Interface(bus.get_object('org.bluez.obex', path),
                                     'org.bluez.obex.Session1')

            map = dbus.Interface(bus.get_object('org.bluez.obex', path),
                                 'org.bluez.obex.MessageAccess1')

            set_folder(map, "telecom/msg")
            folder = map.GetFolderListing(dict())

            for f in folder:
                msg = map.GetMessageListing(f["Name"], dict())
                message_check(msg)

            TEST_PASSED("MAP MSE")
        except:
            TEST_FAILED("MAP MSE")

        process_next()

    def pbap_test_paths(self, pbap_client, paths):
        if len(paths) == 0:
            TEST_PASSED("PBAP PSE")
            process_next()
            return

        try:
            path = paths[0]

            pbap_client.set_filters({
                'Format': 'vcard30',
                'Fields': ["VERSION", "FN", "TEL"],
            })

            pbap_client.interface().Select("int", path)

            ret = pbap_client.interface().GetSize()

            ret = pbap_client.interface().List({})
            for item in ret:
                pbap_client.pull(item[0], BASE_PATH +
                                 "/pbap-vcard-" + path + item[1])

            pbap_client.pull_all(BASE_PATH + "/pbap-vcard-ALL-" + path)

            pbap_client.flush_transfers(
                lambda: self.pbap_test_paths(pbap_client, paths[1:]))
        except:
            TEST_FAILED("PBAP PSE path")
            process_next()

    def test_pbap(self):
        TEST_START("PBAP PSE")

        try:
            bus = dbus.SessionBus()

            client = dbus.Interface(bus.get_object('org.bluez.obex',
                                                   '/org/bluez/obex'),
                                    'org.bluez.obex.Client1')

            address = self._device_obj.Get(
                'org.bluez.Device1',
                'Address',
                dbus_interface='org.freedesktop.DBus.Properties')
            session_path = client.CreateSession(address, {"Target": "pbap"})

            pbap_client = PbapClient(session_path)

            self.pbap_test_paths(pbap_client,
                                 ["PB", "ICH", "OCH", "MCH", "CCH"])
        except Exception as error:
            TEST_FAILED("PBAP PSE: " + str(error))
            process_next()

    def test_opp_server(self):
        TEST_START("OPP Server")
        try:
            bus = dbus.SessionBus()

            client = dbus.Interface(bus.get_object('org.bluez.obex',
                                                   '/org/bluez/obex'),
                                    'org.bluez.obex.Client1')

            address = self._device_obj.Get(
                'org.bluez.Device1',
                'Address',
                dbus_interface='org.freedesktop.DBus.Properties')
            path = client.CreateSession(address, {"Target": "opp"})
            opp_client = OppClient(path)

            create_temp_vcard()
            opp_client.send_file("/tmp/vcard.vcf")
        except Exception as error:
            delete_temp_vcard()
            TEST_FAILED("OPP Server: " + str(error))
            process_next()

    def test_opp_client(self):
        TEST_START("OPP client")
        print("Start an OPP transfer in the phone...")

        def properties_changed(interface_name, changed_properties,
                               invalidated_properties, path):
            if interface_name != 'org.bluez.obex.Transfer1':
                return
            if 'Status' not in changed_properties:
                return

            if changed_properties['Status']:
                TEST_PASSED("OPP Client")
            else:
                TEST_FAILED("OPP Client")

            manager.UnregisterAgent(agent._object_path)
            agent.remove_from_connection()
            match.remove()

            process_next()

        bus = dbus.SessionBus()

        manager = dbus.Interface(bus.get_object("org.bluez.obex",
                                                "/org/bluez/obex"),
                                 "org.bluez.obex.AgentManager1")
        agent = ObexdAgent(bus, "/server/agent")
        manager.RegisterAgent(agent._object_path)

        match = bus.add_signal_receiver(
            properties_changed,
            dbus_interface="org.freedesktop.DBus.Properties",
            signal_name="PropertiesChanged",
            path_keyword='path')

    def __test_dun_pan(self, path):
        bus = dbus.SystemBus()

        manager = dbus.Interface(bus.get_object("net.connman", "/"),
                                 "net.connman.Manager")
        services = manager.GetServices()

        device_name = self._device_obj.Get(
            'org.bluez.Device1',
            'Name',
            dbus_interface='org.freedesktop.DBus.Properties')

        path = None
        for s in services:
            if s[1]["Name"] == device_name:
                path = s[0]
                break
        if not path:
            raise LookupError('Device ‘%s’ not found in services' % device_name)

        service = dbus.Interface(bus.get_object("net.connman", path),
                                 "net.connman.Service")
        service.Connect(timeout=60000)
        url = urllib.request.urlopen("http://connman.net")
        f = open(path, 'w')
        f.write(url.read(1000000))
        f.close()
        service.Disconnect(timeout=60000)

    def test_dun(self):
        TEST_START("DUN GW")

        try:
            self.__test_dun_pan(BASE_PATH + "/DUN")
            TEST_PASSED("DUN GW")
        except Exception as error:
            TEST_FAILED("DUN GW: " + str(error))

        process_next()

    def test_pan(self):
        TEST_START("PAN NAP")

        try:
            self.__test_dun_pan(BASE_PATH + "/PAN")
            TEST_PASSED("PAN NAP")
        except Exception as error:
            TEST_FAILED("PAN NAP: " + str(error))

        process_next()

    def test_a2dp_source(self):
        TEST_START("A2DP Source")

        try:
            connected = self._device_obj.Get(
                'org.bluez.Device1',
                'Connected',
                dbus_interface='org.freedesktop.DBus.Properties')
            if not connected:
                self._device_obj.Connect(dbus_interface='org.bluez.Device1')

            ret = input("Play music from the device! Did you hear"
                        " it (y/n): ")

            if not connected:
                self._device_obj.Disconnect(dbus_interface='org.bluez.Device1')

            if ret == 'y' or ret == 'Y':
                TEST_PASSED("A2DP Source")
            else:
                raise RuntimeError('Invalid response: ' + ret)

        except Exception as error:
            TEST_FAILED("A2DP Source: " + str(error))

        process_next()

    def device_test_profiles(self, profiles):
        if "MAP MSE" in profiles:
            process_q.append(self.test_map)

        if "PBAP PSE" in profiles:
            process_q.append(self.test_pbap)

        process_q.append(self.test_opp_client)

        if "OPP Server" in profiles:
            process_q.append(self.test_opp_server)

        if "DUN GW" in profiles:
            process_q.append(self.test_dun)

        if "PAN NAP" in profiles:
            process_q.append(self.test_pan)

        if "A2DP Source" in profiles:
            process_q.append(self.test_a2dp_source)

        process_next()


def parse_options():
    parser.add_option("-i", "--interface", dest="interface",
                      help="HCI device", metavar="HCI")
    parser.add_option("-s", "--skip", action="store_true", dest="skip_pair",
                      help="Skip Pairing")
    parser.add_option("-d", "--device", dest="device",
                      help="Device to connect", metavar="DEVICE")
    return parser.parse_args()


def create_base_path():
    global BASE_PATH

    lt = time.localtime()
    BASE_PATH = "/tmp/bluez-%02d%02d-%02d%02d%02d" % (lt.tm_mon, lt.tm_mday,
                                                      lt.tm_hour, lt.tm_min,
                                                      lt.tm_sec)
    os.mkdir(BASE_PATH)


if __name__ == "__main__":
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    parser = OptionParser()

    (options, args) = parse_options()

    mainloop = GObject.MainLoop()

    bus = dbus.SystemBus()
    object_manager_iface = dbus.Interface(bus.get_object('org.bluez', '/'),
                                          'org.freedesktop.DBus.ObjectManager')
    agent_manager_iface = dbus.Interface(bus.get_object('org.bluez',
                                                        '/org/bluez'),
                                         'org.bluez.AgentManager1')

    objects = object_manager_iface.GetManagedObjects()
    adapters = {p: i for p, i in objects.items()
                if 'org.bluez.Adapter1' in i.keys()}

    if options.interface:
        path = '/org/bluez/' + options.interface
    else:
        # Arbitrarily choose the first adapter as default
        path = list(adapters.keys())[0]

    adapter_obj = bus.get_object("org.bluez", path)

    if options.skip_pair and not options.device:
        print ("Device not specified")
        exit(1)

    if options.device:
        path = build_device_path(get_hci(adapter_obj.object_path),
                                 options.device)
        device_obj = bus.get_object('org.bluez', path)
    else:
        device_obj = None

    agent = AskAgent(bus, "/test/agent", mainloop)
    tester = AdapterDeviceTester(bus, adapter_obj, device_obj,
                                 PbapDeviceProfileTester)
    agent_manager_iface.RegisterAgent("/test/agent", "DisplayYesNo")
    agent_manager_iface.RequestDefaultAgent('/test/agent')

    def test1():
        def test1_cb(error_message):
            if error_message:
                TEST_FAILED(error_message)
                exit(1)
            else:
                TEST_PASSED('Pairing Initiator')
            process_next()

        TEST_START('Pairing Initiator')
        tester.start_pairing_initiator(test1_cb)

    def test2():
        def test2_cb(error_message):
            if error_message:
                TEST_FAILED(error_message)
                exit(1)
            else:
                TEST_PASSED('Pairing Responder')
            process_next()

        TEST_START('Pairing Responder')
        tester.start_pairing_responder(test2_cb)

    if options.skip_pair:
        process_q.append(tester.get_device)
    else:
        process_q.append(test1)
        process_q.append(test2)

    create_base_path()
    GObject.timeout_add(2000, process_next)
    mainloop.run()
    agent_manager_iface.UnregisterAgent("/test/agent")