From c8b70e905e48d686d76784d74c5d92a1d7cc7447 Mon Sep 17 00:00:00 2001
From: Xavier Claessens <xavier.claessens@collabora.com>
Date: Fri, 21 Aug 2015 16:48:05 +0000
Subject: [PATCH] tracker: Rewrite tests in python

This is mostly a 1-1 translation. Tests are not smarter than
what they were before, to the exception that they don't include
sleep() of arbitrary duration everywhere.
---
 tracker/automated/test-tracker.py       | 199 ++++++++++++++++++++++++
 tracker/common.py                       |  98 ++++++++++++
 tracker/manual/test-removable-device.py |  59 +++++++
 3 files changed, 356 insertions(+)
 create mode 100755 tracker/automated/test-tracker.py
 create mode 100644 tracker/common.py
 create mode 100755 tracker/manual/test-removable-device.py

diff --git a/tracker/automated/test-tracker.py b/tracker/automated/test-tracker.py
new file mode 100755
index 0000000..68d2c18
--- /dev/null
+++ b/tracker/automated/test-tracker.py
@@ -0,0 +1,199 @@
+#! /usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Copyright © 2015 Collabora Ltd.
+#
+# SPDX-License-Identifier: MPL-2.0
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import unittest
+import subprocess
+import os
+import md5
+import sys
+
+from gi.repository import GLib
+from gi.repository import Gio
+from gi.repository import Grl
+
+# import from parent directory
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir))
+from common import TrackerIndexer
+from common import MEDIADIR
+
+# define this here to make lint happy with too long lines
+long_jpeg_name = '320px-European_Common_Frog_Rana_temporaria.jpg'
+
+
+class TrackerTest(unittest.TestCase):
+    def setUp(self):
+        self.indexer = TrackerIndexer()
+        self.loop = GLib.MainLoop.new(None, False)
+        self.homedir = os.path.expanduser("~")
+
+        # Copy media files appropriate directories
+        self.copytree('audio', 'Music')
+        self.copytree('documents', 'Documents')
+        self.copytree('images', 'Pictures')
+        self.copytree('playlists', 'Music')
+        self.copytree('videos', 'Videos')
+
+    def tearDown(self):
+        # Keep everything in place for further manual checks
+        pass
+
+    def copytree(self, src, dst):
+        abs_src = os.path.join(MEDIADIR, src, '*')
+        abs_dst = os.path.join(self.homedir, dst)
+        subprocess.check_call('cp -r ' + abs_src + ' ' + abs_dst, shell=True)
+
+    def tracker_config_tests(self):
+        settings = Gio.Settings.new("org.freedesktop.Tracker.Miner.Files")
+        self.assertEqual(settings.get_boolean('index-removable-devices'), True)
+        self.assertEqual(settings.get_int('removable-days-threshold'), 60)
+        self.assertEqual(settings.get_int('initial-sleep'), 0)
+        self.assertEqual(settings.get_string('sched-idle'), 'never')
+        self.assertEqual(settings.get_boolean('enable-monitors'), True)
+        # self.assertEqual(settings.get_boolean('enable-writeback'), False)
+
+    def tracker_journal_tests(self):
+        path = ('%s/.local/share/tracker/data/tracker-store.journal' %
+                self.homedir)
+        self.assertFalse(os.path.isfile(path))
+
+    def assert_indexed(self, filename, extra=None):
+        uri = 'file://%s/%s' % (self.homedir, filename)
+        self.assertTrue(self.indexer.is_indexed(uri, extra))
+
+    def assert_not_indexed(self, filename):
+        uri = 'file://%s/%s' % (self.homedir, filename)
+        self.assertTrue(self.indexer.is_not_indexed(uri))
+
+    def tracker_inital_tests(self):
+        playable_query = '?urn bosch:playable true . '
+        album_query = ('?urn nmm:musicAlbum ?album . '
+                       '?album nmm:albumTitle "GNOME Audio" . ')
+        performer_query = ('?urn nmm:performer ?performer . '
+                           '?performer nmm:artistName "Conrad Parker" . ')
+        title_query = '?urn nie:title "GNOME Generic Sound" . '
+        audio_query = album_query + performer_query + title_query
+        self.assert_indexed('Music/generic.mp3', audio_query)
+        self.assert_indexed('Music/generic.flac', audio_query + playable_query)
+        self.assert_indexed('Music/generic-no-artwork.mp3', audio_query)
+        self.assert_indexed('Music/generic.oga', audio_query)
+        self.assert_indexed('Music/generic.wav', playable_query)
+        self.assert_indexed('Documents/lorem_presentation.odp')
+        self.assert_indexed('Documents/lorem_spreadsheet.ods')
+        self.assert_indexed('Documents/lorem_text.txt')
+        self.assert_indexed('Documents/more_lorem_ipsum.odt')
+        self.assert_indexed('Pictures/' + long_jpeg_name)
+        self.assert_indexed('Pictures/collabora-logo-big.png')
+        self.assert_indexed('Music/Generic_Sounds.pls',
+                            '?urn nfo:entryCounter 3')
+        self.assert_indexed('Music/Ghosts.pls', '?urn nfo:entryCounter 38')
+        self.assert_indexed('Music/Ghosts.m3u', '?urn nfo:entryCounter 38')
+        self.assert_indexed('Videos/big_buck_bunny_smaller.ogv',
+                            playable_query)
+
+    def tracker_update_tests(self):
+        # Create a new file and assert it gets indexed
+        with open(self.homedir + '/Documents/something.txt', 'w') as f:
+            f.write('something')
+        self.indexer.wait(True)
+        self.assert_indexed('Documents/something.txt',
+                            '?urn nie:plainTextContent "something"')
+
+        # Modify the file should re-index it
+        with open(self.homedir + '/Documents/something.txt', 'w') as f:
+            f.write('something else')
+        self.indexer.wait(True)
+        self.assert_indexed('Documents/something.txt',
+                            '?urn nie:plainTextContent "something else"')
+
+        # Delete file and assert it's not indexed anymore
+        os.remove(self.homedir + '/Documents/something.txt')
+        self.indexer.wait(False)
+        self.assert_not_indexed('Documents/something.txt')
+
+    def assert_has_thumbnail(self, filename):
+        # Note that this is the path for local storage only, not for removable
+        # devices.
+        uri = 'file://%s/%s' % (self.homedir, filename)
+        uri_hash = md5.new(uri).hexdigest()
+        path = '%s/.cache/thumbnails/normal/%s.png' % (self.homedir, uri_hash)
+        self.assertTrue(os.path.isfile(path))
+
+    def thumbnail_tests(self):
+        self.assert_has_thumbnail('Documents/lorem_presentation.odp')
+        self.assert_has_thumbnail('Documents/lorem_spreadsheet.ods')
+        self.assert_has_thumbnail('Documents/more_lorem_ipsum.odt')
+        self.assert_has_thumbnail('Pictures/' + long_jpeg_name)
+        self.assert_has_thumbnail('Pictures/collabora-logo-big.png')
+        self.assert_has_thumbnail('Videos/big_buck_bunny_smaller.ogv')
+
+    def grl_browse_source(self, source, media):
+        ops = source.supported_operations()
+        self.assertTrue((ops & Grl.SupportedOps.BROWSE) != 0)
+
+        caps = source.get_caps(Grl.SupportedOps.BROWSE)
+        options = Grl.OperationOptions.new(caps)
+        options.set_count(10)
+        options.set_flags(Grl.ResolutionFlags.IDLE_RELAY)
+
+        keys = [Grl.METADATA_KEY_TITLE,
+                Grl.METADATA_KEY_ALBUM,
+                Grl.METADATA_KEY_ARTIST,
+                Grl.METADATA_KEY_THUMBNAIL,
+                Grl.METADATA_KEY_URL,
+                Grl.METADATA_KEY_CHILDCOUNT]
+
+        medias = source.browse_sync(media, keys, options)
+
+        for media in medias:
+            if isinstance(media, Grl.MediaBox):
+                self.grl_browse_source(source, media)
+                continue
+
+            url = media.get_url()
+            prefix = 'file://' + self.homedir + '/'
+            self.assertTrue(url.startswith(prefix))
+            filename = url[len(prefix):]
+
+            if filename in ['Music/generic.oga',
+                            'Music/generic.mp3',
+                            'Music/generic.flac',
+                            'Music/generic-no-artwork.mp3']:
+                self.assertEqual(media.get_title(), 'GNOME Generic Sound')
+                self.assertEqual(media.get_artist(), 'Conrad Parker')
+                self.assertEqual(media.get_album(), 'GNOME Audio')
+            elif filename in ['Music/generic.wav',
+                              'Pictures/' + long_jpeg_name,
+                              'Pictures/collabora-logo-big.png',
+                              'Videos/big_buck_bunny_smaller.ogv']:
+                title = filename.split('/')[1]
+                self.assertEqual(media.get_title(), title)
+
+    def grl_source_added_cb(self, registry, source):
+        self.grl_browse_source(source, None)
+        self.loop.quit()
+
+    def tracker_grilo_tests(self):
+        registry = Grl.Registry.get_default()
+        registry.load_plugin_by_id('grl-tracker')
+        registry.connect('source-added', self.grl_source_added_cb)
+        self.loop.run()
+
+    # This is the only test case to make only one setup of tracker
+    def test_all(self):
+        self.tracker_config_tests()
+        self.tracker_journal_tests()
+        self.indexer.start()
+        self.tracker_inital_tests()
+        self.tracker_update_tests()
+        self.tracker_grilo_tests()
+
+if __name__ == "__main__":
+    Grl.init([])
+    unittest.main()
diff --git a/tracker/common.py b/tracker/common.py
new file mode 100644
index 0000000..723b304
--- /dev/null
+++ b/tracker/common.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+
+# Copyright © 2015 Collabora Ltd.
+#
+# SPDX-License-Identifier: MPL-2.0
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import subprocess
+import os
+
+from gi.repository import GLib
+from gi.repository import TrackerControl
+from gi.repository import Tracker
+
+MEDIADIR = '/usr/lib/apertis-tests/resources/media'
+
+
+class TrackerIndexer():
+    def __init__(self):
+        # Stop tracker and reset its DB
+        subprocess.check_call(
+            'systemctl --user stop tracker-store tracker-miner-fs',
+            shell=True)
+        subprocess.check_call('tracker-control -r',
+                              stdout=open(os.devnull, 'wb'),
+                              shell=True)
+
+        self.loop = GLib.MainLoop.new(None, False)
+
+    def miner_progress_cb(self, manager, miner, status, progress,
+                          remaining_time):
+        # Ignore signal if status didn't change
+        if self.statuses[miner] == status:
+            return
+
+        print("TrackerIndexer: Miner '%s' status changed to '%s'" %
+              (miner, status))
+        self.statuses[miner] = status
+
+        # We are only interested about miners becoming Idle
+        if status != 'Idle':
+            return
+
+        if miner == 'org.freedesktop.Tracker1.Miner.Files':
+            self.wait_files = False
+
+        if miner == 'org.freedesktop.Tracker1.Miner.Extract':
+            # If Files miner is not done yet, extractor will have more work to
+            # do even if it's currently Idle waiting for the Files miner to be
+            # done.
+            if not self.wait_files:
+                self.wait_extractor = False
+
+        if not self.wait_files and not self.wait_extractor:
+            print("TrackerIndexer: quit main loop")
+            self.loop.quit()
+
+    def start(self):
+        self.manager = TrackerControl.MinerManager.new_full(True)
+        self.conn = Tracker.SparqlConnection.get(None)
+        self.statuses = {}
+
+        for miner in self.manager.get_available():
+            status = self.manager.get_status(miner)[1]
+            print("TrackerIndexer: Miner '%s' initial status is '%s'" %
+                  (miner, status))
+            self.statuses[miner] = status
+        self.manager.connect('miner-progress', self.miner_progress_cb)
+
+        # Wait for the initial indexing
+        self.wait(True)
+
+    # wait_extractor is used to tell if we expect the extractor to have work
+    # to do and thus we should wait for it. For example when deleting files
+    # we don't expect extractor to do anything.
+    def wait(self, wait_extractor):
+        self.wait_extractor = wait_extractor
+        self.wait_files = True
+        self.loop.run()
+
+    def is_indexed(self, uri, extra=None):
+        query = ('select ?urn where { '
+                 '  ?urn nie:url "%s" ; '
+                 '  tracker:available true . ') % (uri)
+        if extra is not None:
+            query += extra
+        query += ' }'
+
+        # Must have one and only one result
+        cursor = self.conn.query(query, None)
+        return cursor.next(None) and not cursor.next(None)
+
+    def is_not_indexed(self, uri):
+        query = ('select ?urn where { ?urn nie:url "%s" }') % (uri)
+        cursor = self.conn.query(query, None)
+        return not cursor.next(None)
diff --git a/tracker/manual/test-removable-device.py b/tracker/manual/test-removable-device.py
new file mode 100755
index 0000000..08ae101
--- /dev/null
+++ b/tracker/manual/test-removable-device.py
@@ -0,0 +1,59 @@
+#! /usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Copyright © 2015 Collabora Ltd.
+#
+# SPDX-License-Identifier: MPL-2.0
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import unittest
+import subprocess
+import sys
+import os
+
+from gi.repository import GLib
+from gi.repository import Gio
+
+# import from parent directory
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir))
+from common import TrackerIndexer
+from common import MEDIADIR
+
+
+class TestRemovableDevice(unittest.TestCase):
+    def setUp(self):
+        self.loop = GLib.MainLoop.new(None, False)
+        self.monitor = Gio.VolumeMonitor.get()
+        self.monitor.connect('mount-added', self.mount_added_cb)
+
+    def tearDown(self):
+        # Keep everything in place for further manual checks
+        pass
+
+    def mount_added_cb(self, monitor, mount):
+        self.path = mount.get_root().get_path()
+        self.loop.quit()
+
+    def assert_indexed(self, filename, extra=None):
+        uri = 'file://%s/%s' % (self.path, filename)
+        self.assertTrue(self.indexer.is_indexed(uri, extra))
+
+    def test_removable_device(self):
+        print('Please insert storage ...')
+        self.loop.run()
+
+        # Copy some files to the removable device
+        print('Copying files to ' + self.path + '...')
+        abs_src = os.path.join(MEDIADIR, 'audio', '*')
+        subprocess.check_call('cp -r ' + abs_src + ' ' + self.path, shell=True)
+
+        # Start indexer
+        self.indexer = TrackerIndexer()
+        self.indexer.start()
+
+        self.assert_indexed('generic.flac', '?urn bosch:playable true')
+
+if __name__ == "__main__":
+    unittest.main()
-- 
GitLab