#!/usr/bin/env python3 # -*- coding: utf-8 -*- # PYTHON_ARGCOMPLETE_OK # # Copyright © 2016 Collabora Ltd. # # SPDX-License-Identifier: MPL-2.0 # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # import argparse import argcomplete import configparser import glob import json import logging import os import pathlib import paramiko import re import shutil import stat import struct import subprocess import sys import tarfile import tempfile import urllib import xdg.BaseDirectory from contextlib import contextmanager, closing from gi.repository import GLib, Gio from urllib.error import URLError from urllib.parse import urlparse from urllib.request import urlopen, urlretrieve HARD_FLOAT_FLAG = 0x00000400 def print_progress(count, blockSize, total): barLength = 50 status = count * blockSize / total percents = "{0:.0f}".format(100 * status) filledLength = int(round(barLength * status)) bar = '=' * filledLength + ' ' * (barLength - filledLength) sys.stdout.write('\r%s |%s| %s%s %s' % ("sysroot.tar.gz", bar, percents, '%', "")), if status >= 1.0: sys.stdout.write('\n') sys.stdout.flush() class SysrootManagerError(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class InvalidBundleError(Exception): def __init__(self, message=''): self.message = message def __str__(self): return self.message class InvalidDeviceError(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class InvalidSysrootArchiveError(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class InvalidProjectError(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class NotConfiguredError(Exception): def __init__(self): pass class NotInstalledError(Exception): def __init__(self): pass class NotSupportedError(Exception): def __init__(self): pass class CommandFailedError(Exception): def __init__(self, stdout, stderr, exit_status): self.stdout = stdout self.stderr = stderr self.exit_status = exit_status def __str__(self): return """Exit code: {} Stdout: {} Stderr: {}""".format(self.exit_status, self.stdout, self.stderr) def is_valid_url(url): result = urlparse(url) return result.scheme and result.netloc and result.path class Colors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' force_disable = False @classmethod def disable(cls): cls.HEADER = '' cls.OKBLUE = '' cls.OKGREEN = '' cls.WARNING = '' cls.FAIL = '' cls.ENDC = '' @classmethod def enable(cls): if cls.force_disable: return cls.HEADER = '\033[95m' cls.OKBLUE = '\033[94m' cls.OKGREEN = '\033[92m' cls.WARNING = '\033[93m' cls.FAIL = '\033[91m' cls.ENDC = '\033[0m' class TargetTriplet: # Machine name, dpkg architecture, gnu triplet SUPPORTED = [ [ 'armv7l', "armhf", "arm-linux-gnueabihf" ], [ 'aarch64',"arm64", "aarch64-linux-gnu" ], [ 'x86_64', "amd64", "x86_64-linux-gnu" ], ] def __init__(self, string): for items in self.SUPPORTED: if string in items: self.machine, self.arch, self.triplet = items return raise NotSupportedError class Simulator: def __init__(self): try: with open('/etc/image_version') as f: self.version = SysrootVersion.from_string(f.read()) with open('/usr/lib/pkg-config.multiarch') as f: self.version.arch = TargetTriplet(f.read().strip()).arch except FileNotFoundError: # Missing file means we can't use the simulator raise NotInstalledError def _exec(self, *args): r = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return r.stdout.read().decode().strip() def install(self, bundle): self._exec('ribchesterctl', 'remove', bundle.id) self._exec('ribchesterctl', 'install', bundle.path) def uninstall(self, bundle_id): self._exec('ribchester', 'remove', bundle_id) def run(self, app, *args): self._exec('canterbury-exec', app, *args) class Device: def __init__(self, host, port=22, user=None, password=None): self.host = host if port is None: port = 22 self.port = port self.user = user if not self.user: self.user = "user" self.password = password self.version = None self._ssh = None def from_uri(uri): try: r = urlparse("ssh://" + uri) except Exception: raise InvalidDeviceError("Invalid URI format") return Device(r.hostname, r.port, r.username, r.password) def __str__(self): string = "" if self.user: string += user if self.password: string += ':' + self.password string += '@' string += self.host if self.port != 22: string += ':' + str(self.port) return string def _connect(self): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self.host, port=self.port, allow_agent=False, look_for_keys=False, gss_auth=False, username=self.user, password=self.password) return ssh def _exec(self, *args): with closing(self._connect()) as ssh: stdin, stdout, stderr = ssh.exec_command(' '.join(args)) out = stdout.read().decode().strip() err = stderr.read().decode().strip() status = stdout.channel.recv_exit_status () if status != 0: raise CommandFailedError(out, err, status) return out def load_sysroot_version(self): try: v = self._exec('cat', '/etc/image_version') self.version = SysrootVersion.from_string(v) except Exception as e: print(e) raise InvalidDeviceError("No image_version file found") a = self._exec('uname', '-m') try: triplet = TargetTriplet (a) except NotSupportedError: raise InvalidDeviceError("Unsupported architecture {}".format(a)) self.version.arch = triplet.arch def install(self, bundle): self.uninstall(bundle.id) bundledir = '/tmp/bundles' self._exec('mkdir', '-p', bundledir) remote_path = os.path.join(bundledir, os.path.basename(bundle.path)) with closing(self._connect()) as ssh: with closing(ssh.open_sftp()) as sftp: sftp.put(bundle.path, remote_path) self._exec('sudo', 'ribchesterctl', 'install', remote_path) self._exec('rm', remote_path) def uninstall(self, bundle_id): self._exec('sudo', 'ribchesterctl', 'remove', bundle_id) def run(self, app, *args): self._exec('canterbury-exec', app, *args) class SysrootVersion: def __init__(self, distro, release, arch, date=None, build=None, author=None, url=None): self.distro = distro self.release = release self.arch = arch self.date = date self.build = build self.author = author self.url = url def set_url(self, url): if url and not is_valid_url(url): raise ValueError("'url'") self.url = url def from_id(string): p = re.compile("^\s*(\w*)-(\d\d\.\d\d)-(\w*)\s*$") m = p.match(string.lower()) if not m: raise ValueError return SysrootVersion(m.groups()[0], m.groups()[1], m.groups()[2]) def from_string(string): p = re.compile("^\s*(\w*)\s*(\d\d\.\d\d) (\d{8,8})\.(\d+)\s*(\w*)\s*$") m = p.match(string) if not m: raise ValueError("'version'") return SysrootVersion(m.groups()[0], m.groups()[1], None, \ m.groups()[2], int(m.groups()[3], 10), m.groups()[4]) def get_name(self): return "{0} - {1} ({2})".format(self.distro, self.release, self.arch) def get_tag(self): return "{0}-{1}-{2}_{3}.{4}".format(self.distro, self.release, self.arch, self.date, self.build) def __str__(self): return "{0} {1} - {2}.{3} ({4})".format(self.distro, self.release, self.date, self.build, self.arch) def is_compatible(self, other): return self.distro == other.distro and \ self.release == other.release and \ self.arch == other.arch def __eq__(self, other): if not other: return False return self.release == other.release and \ self.date == other.date and \ self.build == other.build def __lt__(self, other): if not other: return False if self.distro and other.distro and self.distro != other.distro: return self.distro < other.distro if self.release != other.release: return self.release < other.release if self.arch and other.arch and self.arch != other.arch: return self.arch < other.arch if self.date != other.date: return self.date < other.date if self.build != other.build: return self.build < other.build return False def __gt__(self, other): if not other: return True if self.distro and other.distro and self.distro != other.distro: return self.distro > other.distro if self.release != other.release: return self.release > other.release if self.arch and other.arch and self.arch != other.arch: return self.arch > other.arch if self.date != other.date: return self.date > other.date if self.build != other.build: return self.build > other.build return False class Sysroot: def __init__(self, path): self.path = path self.parse_path(path) def parse_path(self, path): try: with open(os.path.join(path, 'etc', 'image_version')) as f: self.version = SysrootVersion.from_string(f.read()) with open(os.path.join(path, 'usr', 'lib', 'pkg-config.multiarch')) as f: self.version.arch = TargetTriplet(f.read().strip()).arch except FileNotFoundError as e: if not os.path.exists(path) or not os.listdir(path): raise NotInstalledError raise e if not path.endswith("/{0}/{1}/{2}".format(self.version.distro, self.version.release, self.version.arch)): raise ValueError("'version'") class SDK: def __init__(self): try: with open('/etc/image_version') as f: self.version = SysrootVersion.from_string(f.read()) with open('/usr/lib/pkg-config.multiarch') as f: self.version.arch = TargetTriplet(f.read().strip()).arch except FileNotFoundError: # Missing file means we're not running in the Apertis SDK raise NotInstalledError class SysrootArchive: def __init__(self, filename): self.filename = filename self.verify() def verify(self): try: with tarfile.open(self.filename, errorlevel=0) as f: # Extract version file and determine sysroot version version_file = None for path in ['etc/image_version', 'binary/etc/image_version']: try: version_file = f.getmember(path) break except KeyError: continue if not version_file: raise InvalidSysrootArchiveError('Missing image_version file in archive') try: with f.extractfile(version_file) as reader: self.version = SysrootVersion.from_string(reader.read().decode('utf-8')) except Exception as e: raise InvalidSysrootArchiveError('Invalid image_version file in archive') # Extract /usr/lib/pkg-config.multiarch and determine sysroot architecture arch_file = None for path in ['usr/lib/pkg-config.multiarch', 'binary/usr/lib/pkg-config.multiarch']: try: arch_file = f.getmember(path) break except KeyError: continue if not arch_file: raise InvalidSysrootArchiveError('Missing pkg-config.multiarch file in archive') try: with f.extractfile(arch_file) as reader: self.version.arch = TargetTriplet(reader.read().decode().strip()).arch except NotSupportedError: raise InvalidSysrootArchiveError('Architecture is not supported') except: raise InvalidSysrootArchiveError('Invalid content in pkg-config.multiarch file') except tarfile.ReadError as e: raise InvalidSysrootArchiveError(e) def extract(self, path): with tarfile.open(self.filename, errorlevel=0) as f: f.extractall(path) class SysrootManager: def __init__(self, path, url, user, password, config=None): self.path = path self.url = url self.user = user self.password = password self.config = config if not self.path: try: parser = configparser.ConfigParser() parser.read(config) self.path = os.path.expanduser(parser['general']['path']) except: self.path = '/opt/sysroot/' self.password_mgr = None if self.user: self.password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm() auth_handler = urllib.request.HTTPBasicAuthHandler(self.password_mgr) opener = urllib.request.build_opener(auth_handler) urllib.request.install_opener(opener) # Local operations def get_versions(self): sysroots = glob.glob(os.path.join(self.path, '*', '[0-9][0-9].[0-9][0-9]', '*')) versions = [] for path in sysroots: try: sysroot = Sysroot(path) versions.append(sysroot.version) except Exception as e: pass return versions def get_installed(self, version): try: p = os.path.join(self.path, version.distro, version.release, version.arch) return Sysroot(p) except NotInstalledError: return None except (FileNotFoundError, ValueError): raise SysrootManagerError("Invalid sysroot installation at '{0}'".format(p)) def install(self, archive): version = archive.version try: path = os.path.join(self.path, version.distro, version.release, version.arch) if not os.path.isdir(path): # XXX Do earlier in case permission is denied or allow restart os.makedirs(path) except Exception as e: raise SysrootManagerError("Couldn't create install directory: {0}".format(e)) try: archive.extract(path) except Exception as e: shutil.rmtree(path) # FIXME remove all empty directory if possible raise SysrootManagerError("Couldn't extract sysroot to install path: {0}".format(e)) bindir = os.path.join(path, 'binary') if os.path.isdir(bindir): for filename in os.listdir(bindir): os.rename(os.path.join(bindir, filename), os.path.join(path, filename)) os.rmdir(bindir) installed = self.get_installed(version) if not installed.version.is_compatible(version): raise SysrootManagerError("Mismatch between installed sysroot ({0}) and expected one".format(installed.version)) # Don't fail on this check as it's broken for current images if installed.version != version: logging.warning("Mismatch between installed version ({0}) and expected one ({1})".format(installed.version, version)) return installed def uninstall(self, sysroot): shutil.rmtree(sysroot.path) # Remote operations def _add_url(self, url): if self.password_mgr: self.password_mgr.add_password(None, url, self.user, self.password) def _get_url(self, version): url = self.url if not url: if not self.config: raise SysrootManagerError("No URL given to retrieve {0} sysroot" .format(version.distro)) try: parser = configparser.ConfigParser(interpolation=None) parser.read(self.config) try: url = parser[version.distro]['url'] except KeyError: url = parser['general']['url'] except (FileNotFoundError, KeyError): raise SysrootManagerError("No URL given to retrieve {0} sysroot" .format(version.distro)) url = url.replace('%(distro)', version.distro) \ .replace('%(release)', version.release) \ .replace('%(arch)', version.arch) if not is_valid_url(url): raise SysrootManagerError("Invalid URL format: {0}".format(url)) self._add_url(url) return url def _parse_version_file(self, content): try: mod_content = "[sysroot]\n" + content parser = configparser.ConfigParser() parser.read_string(mod_content) version = SysrootVersion.from_string(parser['sysroot']['version']) version.set_url(parser['sysroot']['url']) return version except configparser.ParsingError: raise SysrootManagerError("Invalid syntax for sysroot version file") except KeyError as e: raise SysrootManagerError("Missing {0} property in sysroot version file".format(e)) except ValueError as e: raise SysrootManagerError("Malformed {0} property in sysroot version file".format(e)) def get_download_dir(self): return os.path.join(self.path, 'downloads') def get_latest(self, version): try: resp = urlopen(self._get_url(version)) latest_version = self._parse_version_file(resp.read().decode('utf-8')) # Add distro and arch details if unknown if not latest_version.distro: latest_version.distro = version.distro if not latest_version.arch: latest_version.arch = version.arch return latest_version except URLError as e: raise SysrootManagerError("Couldn't retrieve sysroot version file: {0}".format(e.reason)) except UnicodeDecodeError: raise SysrootManagerError("Invalid sysroot version file") def download(self, version, dest=None, progress=False): if not dest: dest = self.get_download_dir() try: os.makedirs(dest, exist_ok=True) filename = tempfile.NamedTemporaryFile(dir=dest).name except Exception as e: raise SysrootManagerError("Couldn't create download directory: {0}".format(e)) try: hook = None if progress: hook = print_progress self._add_url(version.url) _, headers = urlretrieve(version.url, filename=filename, reporthook=hook) except URLError as e: try: os.remove(filename) except: pass raise SysrootManagerError("Error while downloading sysroot: {0}".format(e.reason)) try: f = SysrootArchive(filename) except InvalidSysrootArchiveError as e: os.remove(filename) raise SysrootManagerError("Invalid sysroot archive: {0}".format(e.message)) if not version.is_compatible(f.version): os.remove(filename) raise SysrootManagerError("Mismatch between downloaded sysroot ({0}) and expected one".format(f.version)) # Don't fail on this check as it's broken for current images if f.version != version: logging.warning("Mismatch between downloaded version ({0}) and expected one ({1})".format(f.version, version)) return f class Project: def __init__(self, path=None, bundle_id=None): self.bundle_id = None self.name = None self.version = None if not path: path = os.getcwd() self.root = self.find_root(path) self.parse_info() # Override project properties if specified if bundle_id: self.bundle_id = bundle_id self.check_validity() def check_validity(self): if not self.bundle_id: raise InvalidProjectError("Property 'BundleId' is not specified") if not self.version: raise InvalidProjectError("Property 'Version' is not specified") def find_root(self, path): p = pathlib.Path(path) p.resolve() while str(p) != p.root: for filename in ['autogen.sh', 'configure']: if p.joinpath(filename).exists() and \ p.joinpath(filename).lstat().st_mode & stat.S_IEXEC: return str(p) p = p.parent raise InvalidProjectError("No configuration script found") def parse_info(self): path = os.path.join(self.root, 'SDKConfig', 'ProjectInfo.json') if os.path.exists(path): try: with open(path) as f: info = json.load(f) self.bundle_id = info['BundleId'] self.name = info['Name'] self.version = info['App_Version'] except: pass def is_configured(self): return os.path.exists(os.path.join(self.root, "config.status")) def autoreconf(self): env = os.environ.copy() env['NOCONFIGURE'] = '1' args = ["./autogen.sh"] p = subprocess.Popen(args, cwd=self.root, env=env) p.wait() def configure(self, target, debug=False, force=False, cflags=[], ldflags=[], args=[]): triplet = TargetTriplet(target.version.arch) env = os.environ.copy() args = ["./configure"] args += ["--prefix=/Applications/" + self.bundle_id] args += ["--localstatedir=/var/Applications/" + self.bundle_id + "/var"] if debug: cflags += ["-g"] cflags += ["-O0"] # Extra configure options are needed if configuring for a sysroot if isinstance(target, Sysroot): cflags += ["--sysroot=" + target.path] cflags += ["-I" + os.path.join(target.path, 'usr', 'include')] ldflags += ["--sysroot=" + target.path] args += ["--host=" + triplet.triplet] args += ["--with-sysroot=" + target.path] self.set_pkg_config_vars(env, target) if not os.path.exists(os.path.join(self.root, "configure")): self.autoreconf() env['CFLAGS'] = ' '.join(cflags) env['LDFLAGS'] = ' '.join(ldflags) env['CC'] = "{}-{}".format(triplet.triplet, "gcc") env['LD'] = "{}-{}".format(triplet.triplet, "ld") p = subprocess.Popen(args, cwd=self.root, env=env) p.wait() def make(self, target='all', env=dict()): if not self.is_configured(): raise NotConfiguredError p = subprocess.Popen(['make', target], cwd=self.root, env=env) p.wait() def build(self, verbose=False): if not self.is_configured(): raise NotConfiguredError args = ["make"] if verbose: args += ["V=1"] p = subprocess.Popen(args, cwd=self.root) p.wait() def install(self, path=None): env = os.environ.copy() if path: env['DESTDIR'] = path self.make('install', env) def set_pkg_config_vars(self, env, sysroot): def join_paths(paths): triplet = TargetTriplet(sysroot.version.arch).triplet return ":".join(paths).replace("${SYSROOT}", sysroot.path) \ .replace("${TRIPLET}", triplet) # Abort is env already contains PKG_CONFIG_LIBDIR if os.getenv('PKG_CONFIG_LIBDIR'): return pkgconfig_dir = ["${SYSROOT}/usr/lib/${TRIPLET}/pkgconfig", "${SYSROOT}/usr/share/pkgconfig"] env['PKG_CONFIG_DIR'] = join_paths(pkgconfig_dir) pkgconfig_path = ["${SYSROOT}/usr/lib/${TRIPLET}/pkgconfig", "${SYSROOT}/usr/share/pkgconfig", "${SYSROOT}/usr/lib/pkgconfig", "/usr/lib/${TRIPLET}/pkgconfig", "/usr/${TRIPLET}/lib/pkgconfig"] env['PKG_CONFIG_PATH'] = join_paths(pkgconfig_path) pkgconfig_libdir = ["${SYSROOT}/usr/lib/${TRIPLET}/pkgconfig", "${SYSROOT}/usr/share/pkgconfig"] env['PKG_CONFIG_LIBDIR'] = join_paths(pkgconfig_libdir) pkgconfig_sysroot = ["${SYSROOT}"] env['PKG_CONFIG_SYSROOT_DIR'] = join_paths(pkgconfig_sysroot) class Bundle: def __init__(self, path, bundle_id=None, version=None): self.path = path self.id = bundle_id self.version = version def from_project(project, destdir=None): if destdir: destdir = os.path.expanduser(destdir) else: destdir = os.getcwd() filename = "{0}-{1}.bundle".format(project.bundle_id, project.version) path = os.path.join(destdir, filename) bundle = Bundle(path, project.bundle_id, project.version) with tempfile.TemporaryDirectory() as tmpdir: builddir = os.path.join(tmpdir, 'build') bundledir = os.path.join(tmpdir, 'bundle') repodir = os.path.join(tmpdir, 'repo') os.makedirs(builddir) os.makedirs(bundledir) with open(os.path.join(bundledir, 'metadata'), 'w') as f: f.write("[Application]\n") f.write("name={0}\n".format(bundle.id)) f.write("X-Apertis-BundleVersion={0}".format(bundle.version)) os.makedirs(os.path.join(bundledir, 'export')) project.install(builddir) shutil.copytree(os.path.join(builddir, 'Applications', bundle.id), os.path.join(bundledir, 'files')) cmd = ['flatpak', 'build-export', repodir, bundledir] ret = subprocess.run(cmd) if ret.returncode: sys.exit(ret.returncode) cmd = ['flatpak', 'build-bundle', repodir, bundle.path, bundle.id] ret = subprocess.run(cmd) if ret.returncode: sys.exit(ret.returncode) return bundle def from_file(path): bundle_file = Gio.File.new_for_path(path) m = GLib.MappedFile.new(path, False) b = m.get_bytes() m.unref() varianttype = GLib.VariantType.new('(a{sv}tayay(a{sv}aya(say)sstayay)aya(uayttay)a(yaytt))') bundle = GLib.Variant.new_from_bytes(varianttype, b, False) bundle.ref_sink() metadata = bundle.get_child_value(0).lookup_value('metadata', None).get_string() parser = configparser.ConfigParser() parser.read_string(metadata) return Bundle(path, parser['Application']['name'], parser['Application']['X-Apertis-BundleVersion']) class Ade: def __init__(self): self.command = '' self.subcommand = '' self.config = None self.url = None self.path = None self.file = None self.dest = None self.sdk = False self.simulator = False self.sysroot = None self.device = None self.project = False self.bundle = None self.bundle_id = None self.sysroot_version = None self.distro = None self.release = None self.arch = None self.user = None self.password = '' self.force = False self.debug = False self.verbose = False def get_sdk(self): try: return SDK() except NotInstalledError: return None except Exception as e: seld.die("Invalid SDK installation: {0}".format(e)) def find_configuration(self): if not self.config: self.config = xdg.BaseDirectory.load_first_config('ade', 'sysroot.conf') def validate_sysroot_version(self): # XXX If only URL is specified, try to obtain distro and arch from it. # XXX Allow project-specific config file to specify these info if not self.distro or not self.release: try: sdk = SDK() if not self.distro: self.info("* No distribution specified, defaulting to host distribution") self.distro = sdk.version.distro if not self.release: self.info("* No release version specified, defaulting to host release version") self.release = sdk.version.release if sdk.version.distro != self.distro: self.die("Mismatch between host distro and specified distro") except: self.die("No distribution/release specified") if not self.arch: self.info("* No architecture specified, defaulting to 'armhf'") self.arch = 'armhf' self.sysroot_version = SysrootVersion(self.distro, self.release, self.arch) def get_sysroot_manager(self): return SysrootManager(self.path, self.url, self.user, self.password, config=self.config) def get_object(self, classes=[Bundle, Device, Project, SDK, Simulator, Sysroot]): if self.sdk and SDK in classes: try: obj = SDK() except NotInstalledError: self.die("Not running in a SDK distribution") except Exception as e: self.die("Invalid SDK installation: {0}".format(e)) elif self.simulator and Simulator in classes: try: obj = Simulator() except NotInstalledError: self.die("Couldn't use simulator; not running in a SDK distribution") elif self.sysroot and Sysroot in classes: try: version = SysrootVersion.from_id(self.sysroot) obj = self.get_sysroot_manager().get_installed(version) if not obj: self.die("No sysroot currently installed for {0}{1}{2}" \ .format(Colors.OKBLUE, version.get_name(), Colors.ENDC)) except ValueError: self.die("Invalid sysroot tag format") except InvalidSysrootError: self.die("Invalid sysroot installed for {0}{1}{2}" \ .format(Colors.OKBLUE, version.get_name(), Colors.ENDC)) elif self.device and Device in classes: try: obj = Device.from_uri(self.device) obj.load_sysroot_version() except InvalidDeviceError as e: self.die("Invalid device: {0}".format(e)) elif self.project and Project in classes: try: obj = Project() except InvalidProjectError as e: self.die("Invalid project: {0}".format(e)) elif self.bundle and Bundle in classes: try: obj = Bundle.from_file(self.bundle) except InvalidBundleError as e: self.die("Invalid bundle: {0}".format(e)) else: return None return obj def get_target(self): target = self.get_object([Simulator, Sysroot, Device]) if not target: self.die("No target (simulator, sysroot or device) specified") return target def unpack_sysroot(self, target): if isinstance(target, Device): sdk = self.get_sdk() version = target.version if sdk and version.is_compatible(sdk.version): return sdk # Native compilation; SDK version matches device image version else: sysroot = self.get_sysroot_manager().get_installed(version) if not sysroot: self.die("No sysroot currently installed for {0}{1}{2}" .format(Colors.OKBLUE, version.get_name(), Colors.ENDC)) return sysroot return target def do_sysroot_list(self): manager = self.get_sysroot_manager() versions = manager.get_versions() if not versions: self.info("{0}No sysroot installed in directory {1}.{2}" .format(Colors.WARNING, manager.path, Colors.ENDC)) else: versions.sort() for version in versions: self.info("* {0}".format(version)) if self.format == 'parseable': l = [version.get_tag() for version in versions] print('InstalledSysroots:' + ';'.join(l)) def do_sysroot_installed(self): self.validate_sysroot_version() manager = self.get_sysroot_manager() sysroot = manager.get_installed(self.sysroot_version) if sysroot: self.info("* Retrieved current version: {0}{1}{2}".format(Colors.WARNING, sysroot.version, Colors.ENDC)) if self.format == 'parseable': print('InstalledVersion:' + sysroot.version.get_tag()) else: self.info("* Sysroot {0}{1}{2} is not currently installed" .format(Colors.OKBLUE, self.sysroot_version.get_name(), Colors.ENDC)) def do_sysroot_latest(self): self.validate_sysroot_version() self.info("* Checking latest version available for {0}{1}{2}" \ .format(Colors.OKBLUE, self.sysroot_version.get_name(), Colors.ENDC)) manager = self.get_sysroot_manager() version = manager.get_latest(self.sysroot_version) self.info("* Retrieved latest version: {0}{1}{2}" \ .format(Colors.OKGREEN, version, Colors.ENDC)) self.info("* Download URL: {0}".format(version.url)) if self.format == 'parseable': print('LatestVersion:' + version.get_tag()) print('LatestURL:' + version.url) def do_sysroot_download(self): self.validate_sysroot_version() manager = self.get_sysroot_manager() version = manager.get_latest(self.sysroot_version) f = manager.download(version, self.dest, progress=(self.format == 'friendly')) if not self.dest: self.dest = manager.get_download_dir() template = os.path.join(self.dest, "sysroot-{0}-{1}-{2}_{3}.{4}{5}.tar.gz") tfilename = template.format(version.distro, version.release, version.arch, version.date, version.build, "{0}") filename = tfilename.format("") if os.path.exists(filename): for i in range(1, 1000): filename = tfilename.format('(' + str(i) + ')') if not os.path.exists(filename): break self.info("* Moving downloaded sysroot to '{0}'".format(filename)) shutil.move(f.filename, filename) if self.format == 'parseable': print('DownloadedArchive:' + filename) def _verify_sysroot_archive(self, path): self.info("* Verifying sysroot archive '{0}'".format(self.file)) try: archive = SysrootArchive(self.file) new_version = archive.version self.info("* Sysroot archive has version: {0}{1}{2}".format(Colors.OKGREEN, new_version, Colors.ENDC)) except Exception as e: self.die("Invalid sysroot archive: {0}".format(e)) return archive def do_sysroot_verify(self): archive = self._verify_sysroot_archive(self.file) if self.format == 'parseable': print('VerifiedVersion:' + archive.version.get_tag()) def do_sysroot_install(self): archive = None manager = self.get_sysroot_manager() if not self.file: self.validate_sysroot_version() new_version = manager.get_latest(self.sysroot_version) else: if self.distro or self.release or self.arch: self.die("Incompatible arguments given: --file and --distro/--release/--arch") archive = self._verify_sysroot_archive(self.file) new_version = archive.version installed = manager.get_installed(new_version) if not installed: self.info("* Installing version {0}{1}{2}".format(Colors.OKGREEN, new_version, Colors.ENDC)) elif self.force: prefix = '' if new_version == installed.version: prefix = 're' self.info("* Forcing {0}installation of version {1}{2}{3}".format(prefix, Colors.OKGREEN, new_version, Colors.ENDC)) else: color = Colors.WARNING if new_version == installed.version: color = Colors.OKGREEN self.die("Sysroot {0}{1}{2} is already installed".format(color, installed.version, Colors.ENDC)) try: if not self.file: archive = manager.download(new_version, progress=(self.format == 'friendly')) if installed: manager.uninstall(installed) installed = manager.install(archive) finally: if archive and not self.file: os.remove(archive.filename) self.info("* Installation has been completed") if self.format == 'parseable': print('InstalledVersion:' + installed.version.get_tag()) def do_sysroot_update(self): self.validate_sysroot_version() manager = self.get_sysroot_manager() new_version = manager.get_latest(self.sysroot_version) installed = manager.get_installed(self.sysroot_version) if not installed: self.die("No sysroot currently installed for {0}{1}{2}" \ .format(Colors.OKBLUE, self.sysroot_version.get_name(), Colors.ENDC)) elif installed.version < new_version: self.info("* Upgrading from {0}{1}{2} to version {3}{4}{5}" \ .format(Colors.WARNING, installed.version, Colors.ENDC, Colors.OKGREEN, new_version, Colors.ENDC)) elif self.force: self.info("* Forcing installation of version {0}{1}{2}" \ .format(Colors.OKBLUE, new_version, Colors.ENDC)) elif installed.version == new_version: self.info("* Installed version {0}{1}{2} is already up-to-date" \ .format(Colors.OKGREEN, new_version, Colors.ENDC)) if self.format == 'parseable': print('InstalledVersion:' + installed.version.get_tag()) return elif installed.version > new_version: self.die("Installed version {0}{1}{2} is more recent than {3}{4}{5}" \ .format(Colors.WARNING, installed.version, Colors.ENDC, Colors.OKGREEN, new_version, Colors.ENDC)) try: archive = manager.download(new_version, progress=(self.format == 'friendly')) manager.uninstall(installed) installed = manager.install(archive) finally: if archive: os.remove(archive.filename) self.info("* Update has been completed") if self.format == 'parseable': print('InstalledVersion:' + installed.version.get_tag()) def do_sysroot_uninstall(self): self.validate_sysroot_version() manager = self.get_sysroot_manager() installed = manager.get_installed(self.sysroot_version) if not installed: self.die("No sysroot currently installed for {0}{1}{2}" \ .format(Colors.OKBLUE, self.sysroot_version.get_name(), Colors.ENDC)) else: self.info("* Uninstalling sysroot {0}{1}{2}" \ .format(Colors.WARNING, installed.version, Colors.ENDC)) manager.uninstall(installed) self.info("* Sysroot for {0}{1}{2} has been uninstalled" \ .format(Colors.OKBLUE, self.sysroot_version.get_name(), Colors.ENDC)) def do_info(self): obj = self.get_object() if isinstance(obj, SDK): self.info("* SDK version is {0}{1}{2}" .format(Colors.WARNING, obj.version, Colors.ENDC)) if self.format == 'parseable': print("SDKVersion:{0}".format(obj.version.get_tag())) if isinstance(obj, Sysroot): self.info("* Sysroot version {0}{1}{2} is installed at '{3}'" .format(Colors.WARNING, obj.version, Colors.ENDC, obj.path)) if self.format == 'parseable': print("SysrootVersion:{0}".format(obj.version.get_tag())) print("SysrootPath:{0}".format(obj.path)) elif isinstance(obj, Device): self.info("* Device has image version {0}{1}{2}" .format(Colors.WARNING, obj.version, Colors.ENDC)) if self.format == 'parseable': print("ImageVersion:{0}".format(obj.version.get_tag())) elif isinstance(obj, Project): self.info("* Project: {0}{1}{2}".format(Colors.WARNING, obj.name, Colors.ENDC)) self.info("* BundleId: {0}".format(obj.bundle_id)) self.info("* Version: {0}".format(obj.version)) if self.format == 'parseable': print("ProjectName:{0}".format(obj.name)) print("BundleId:{0}".format(obj.bundle_id)) print("AppVersion:{0}".format(obj.version)) elif isinstance(obj, Bundle): self.info("* BundleId: {0}".format(obj.id)) self.info("* Version: {0}".format(obj.version)) if self.format == 'parseable': print("BundleId:{0}".format(obj.id)) print("AppVersion:{0}".format(obj.version)) def do_configure(self): target = self.unpack_sysroot(self.get_target()) try: project = Project(bundle_id=self.bundle_id) project.configure(target, debug=self.debug, force=self.force, args=self.args) except Exception as e: self.die("Couldn't configure project: {0}".format(e)) def do_build(self): try: project = Project() project.build(self.verbose) except InvalidProjectError as e: self.die("Invalid project: {0}".format(e)) except NotConfiguredError: self.die("Project is not configured; run 'configure' command first") def do_export(self): try: project = Project() bundle = Bundle.from_project(project, self.dest) except NotConfiguredError: self.die("Project is not configured; run 'configure' command first") except InvalidProjectError as e: self.die("Invalid project: {0}".format(e)) except Exception as e: self.die("Couldn't create bundle: {0}".format(e)) def do_install(self): target = self.get_target() try: with tempfile.TemporaryDirectory() as tmpdir: if self.bundle: bundle = Bundle.from_file(self.bundle) else: project = Project() bundle = Bundle.from_project(project, tmpdir) target.install(bundle) except InvalidBundleError as e: self.die("Invalid bundle: {0}".format(e)) except Exception as e: self.die("Couldn't install application: {0}".format(e)) def do_uninstall(self): target = self.get_target() try: if not self.bundle_id: project = Project() self.bundle_id = project.bundle_id target.uninstall(self.bundle_id) except Exception as e: self.die("Couldn't uninstall application: {0}".format(e)) def do_run(self): target = self.get_target() target.run(self.app, *self.args) def info(self, message): if self.format == 'friendly': print(message) def die(self, message): logging.error(message) sys.exit(1) def run(self): if self.format != 'friendly': Colors.disable() self.find_configuration() method = 'do_' + self.command.replace('-', '_') if self.subcommand: method += '_' + self.subcommand.replace('-', '_') getattr(self, method)() if __name__ == '__main__': root_parser = argparse.ArgumentParser(description='ADE - Apertis Development Environment') root_parser.add_argument('--format', choices=['friendly', 'parseable'], default='friendly', help="Output format") argcomplete.autocomplete(root_parser) subparsers = root_parser.add_subparsers(dest='command') subparsers.required = True # Info parser info_parser = subparsers.add_parser('info', help="Retrieve information about an object") group = info_parser.add_mutually_exclusive_group() group.add_argument('--sdk', help="Use SDK as target", action='store_true') group.add_argument('--sysroot', help="Use sysroot as target (e.g. apertis-16.12-armhf)") group.add_argument('--device', help="Use device as target (e.g. user:pass@apertis)") group.add_argument('--project', help="Use current directory project as target", action='store_true') group.add_argument('--bundle', help="Use bundle file as target") # Sysroot parser sysroot_parser = subparsers.add_parser('sysroot', help='Sysroot related commands') sysroot_parser.add_argument('--config', help="Sysroot configuration file") sysroot_parser.add_argument('--path', help="Sysroot installation directory") sysroot_subparsers = sysroot_parser.add_subparsers(dest='subcommand') sysroot_subparsers.required = True # Common sysroot parsers sysroot_id_parser = argparse.ArgumentParser(add_help=False) sysroot_id_parser.add_argument('--distro', help="Distribution name (e.g. apertis)") sysroot_id_parser.add_argument('--release', help="Distribution release version (e.g. 16.09)") sysroot_id_parser.add_argument('--arch', help="Sysroot architecture", choices=['armhf', 'arm64']) sysroot_url_parser = argparse.ArgumentParser(add_help=False) sysroot_url_parser.add_argument('--url', help="Sysroot download URL") sysroot_auth_parser = argparse.ArgumentParser(add_help=False) sysroot_auth_parser.add_argument('--user', help="Sysroot remote server user") sysroot_auth_parser.add_argument('--password', help="Sysroot remote server password") # Sysroot subcommands parser = sysroot_subparsers.add_parser('list', help='List all installed sysroots') parser = sysroot_subparsers.add_parser('installed', help='Retrieve version of currently installed sysroot', parents=[sysroot_id_parser]) parser = sysroot_subparsers.add_parser('latest', help='Retrieve version of latest available sysroot', parents=[sysroot_id_parser, sysroot_url_parser, sysroot_auth_parser]) parser = sysroot_subparsers.add_parser('download', help='Download latest sysroot archive', parents=[sysroot_id_parser, sysroot_url_parser, sysroot_auth_parser]) parser.add_argument('--dest', help="Download destination directory") parser = sysroot_subparsers.add_parser('verify', help='Check sysroot archive for validity') parser.add_argument('--file', required=True, help='Path to the archive file to verify') parser = sysroot_subparsers.add_parser('install', help='Install new sysroot', parents=[sysroot_id_parser, sysroot_url_parser, sysroot_auth_parser]) parser.add_argument('--file', help="Path to (already downloaded) archive to install") parser.add_argument('--force', help="Force sysroot installation", action='store_true') parser = sysroot_subparsers.add_parser('update', help='Update sysroot to latest version', parents=[sysroot_id_parser, sysroot_url_parser, sysroot_auth_parser]) parser.add_argument('--force', help="Force sysroot update", action='store_true') parser = sysroot_subparsers.add_parser('uninstall', help='Uninstall sysroot', parents=[sysroot_id_parser]) # Configure parser configure_parser = subparsers.add_parser('configure', help="Configure application") configure_parser.add_argument('--force', help="Force configuration", action='store_true') configure_parser.add_argument('--debug', help="Enable debug symbols", action='store_true') configure_parser.add_argument('--bundle-id', help="Apertis bundle ID (e.g. org.apertis.App)") group = configure_parser.add_mutually_exclusive_group() group.add_argument('--simulator', '--native', help="Use simulator as target", action='store_true', dest='simulator') group.add_argument('--sysroot', help="Use sysroot as target (e.g. apertis-16.09-armhf)") group.add_argument('--device', help="Use device as target (e.g. user:pass@192.168.1.98)") configure_parser.add_argument('args', help="Configure options", nargs=argparse.REMAINDER) # Build parser build_parser = subparsers.add_parser('build', help="Build application") build_parser.add_argument('--verbose', help="Verbose output", action='store_true') build_parser.add_argument('args', help="Configure options", nargs=argparse.REMAINDER) # Export parser export_parser = subparsers.add_parser('export', help="Create an application bundle") export_parser.add_argument('--dest', help="Bundle destination directory") # Install parser install_parser = subparsers.add_parser('install', help="Install an application") install_parser.add_argument('--bundle', help="Path to bundle to install") group = install_parser.add_mutually_exclusive_group() group.add_argument('--simulator', help="Use simulator as target", action='store_true') group.add_argument('--device', help="Use device as target (e.g. user@apertis)") # Uninstall parser uninstall_parser = subparsers.add_parser('uninstall', help="Uninstall an application") uninstall_parser.add_argument('--bundle-id', help="Bundle to uninstall") group = uninstall_parser.add_mutually_exclusive_group() group.add_argument('--simulator', help="Use simulator as target", action='store_true') group.add_argument('--device', help="Use device as target (e.g. user@apertis)") # Run parser run_parser = subparsers.add_parser('run', help="Run application") run_parser.add_argument('--app', help="Remote path to application to run") group = run_parser.add_mutually_exclusive_group() group.add_argument('--simulator', help="Use simulator as target", action='store_true') group.add_argument('--device', help="Use device as target (e.g. user@apertis)") run_parser.add_argument('args', help="Arguments to pass to application", nargs=argparse.REMAINDER) argcomplete.autocomplete(root_parser) obj = Ade() args, extra = root_parser.parse_known_args(namespace=obj) if hasattr(obj, 'args'): obj.args += extra obj.run()