diff --git a/debian/control b/debian/control index 69275d05442a5c856b5b734d3760949a95ea7156..a68d39c98dc3766589852dfe53a43e8faafe1992 100644 --- a/debian/control +++ b/debian/control @@ -34,6 +34,9 @@ Depends: python3-paramiko, python3-gi, python3-gitlab, + python3-bs4, + python3-requests, + python3-tqdm, systemd-container Suggests: ribchester (>= 0.1612.7) diff --git a/tools/Makefile b/tools/Makefile index dc1c48e149e00846fb1927c488f6dd701442445c..6a78f4cc9b459c24fae28d08971b705feed756d7 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -6,6 +6,7 @@ ROOT_TOOLDIR ?= $(DESTDIR)$(bindir) TOOLS = \ ade \ devroot-enter \ + apertis-images \ apertis-pkg-merge-updates \ apertis-pkg-pull-updates \ apertis-switch-coreutils-rust \ diff --git a/tools/apertis-images b/tools/apertis-images new file mode 100755 index 0000000000000000000000000000000000000000..445c6d580bc91773428ff862d7567d0a229c62ad --- /dev/null +++ b/tools/apertis-images @@ -0,0 +1,1057 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import ( + IO, + Iterable, + Iterator, + Optional, + Protocol, + Type, + TypeVar, + Union, + runtime_checkable, +) + +import argparse +import contextlib +import enum +import hashlib +import logging +import os +import re +import platform +import shlex +import shutil +import tempfile +import urllib.parse +import urllib.request +import zlib + +from bs4 import BeautifulSoup +from debian.debfile import DebFile +from tqdm import tqdm + +import requests + +logger = logging.getLogger() + + +@dataclass +class ApertisImagesException(Exception): + message: str + + +@dataclass +class DebianPackage: + name: str + source: str + version: str + sha256: str + + +@dataclass +class DebianPackageEntry: + package: DebianPackage + filename: str + + +@dataclass +class UBoot: + qemu_arch: str + + @property + def debian_package_entry(self) -> DebianPackageEntry: + return DebianPackageEntry( + DebianPackage( + name='u-boot-qemu', + source='u-boot', + version='2022.04+dfsg-2', + sha256='a7458375b012b7fcab1e816a577f83643529da2e32f5169a73afb9de05ddb415', + ), + filename=f'usr/lib/u-boot/qemu_{self.qemu_arch}/u-boot.bin', + ) + + +class Ovmf: + @property + def debian_package_entry(self) -> DebianPackageEntry: + return DebianPackageEntry( + DebianPackage( + name='ovmf', + source='edk2', + version='2022.05-2', + sha256='9a97e831667dff6ff85f2b6565c1b58f8cbcf2d8999e395ef3214df0c3eea52c', + ), + 'usr/share/OVMF/OVMF_CODE.fd', + ) + + +Bootloader = Union[UBoot, Ovmf] + + +class Deployment(enum.Enum): + APT = 'apt' + OSTREE = 'ostree' + + +class Arch(enum.Enum): + AMD64 = 'amd64' + ARM64 = 'arm64' + ARMHF = 'armhf' + + @staticmethod + def host() -> Arch: + host = platform.machine() + if host == 'x86_64': + return Arch.AMD64 + elif host == 'aarch64': + return Arch.ARM64 + elif host.startswith('arm'): + return Arch.ARMHF + else: + raise ApertisImagesException(f'Unknown host architecture: {host}') + + @property + def bootloader(self) -> Bootloader: + if self == Arch.AMD64: + return Ovmf() + else: + return UBoot(qemu_arch='arm64' if self == Arch.ARM64 else 'arm') + + +class SpecialVersions: + RELEASE = 'release' + DAILY = 'daily' + + +@dataclass +class ImageReference: + _FILENAME_RE = re.compile( + r"""apertis + (?P<ostree>_ostree)? + _ + (?P<release>[^-]+) + - + (?P<variant>[^-]+) + - + (?P<arch>[^-]+) + - + [^_]+ # boot + _ + (?P<version>.*) + $ + """, + re.X, + ) + + class ParseError(ApertisImagesException): + pass + + release: str + version: str + variant: str + arch: Arch + deployment: Deployment + + @dataclass(frozen=True) + class Partial: + release: Optional[str] + version: Optional[str] + variant: Optional[str] + arch: Optional[Arch] + deployment: Optional[Deployment] + + @staticmethod + def empty() -> ImageReference.Partial: + return ImageReference.Partial( + release=None, version=None, variant=None, arch=None, deployment=None + ) + + @staticmethod + def from_url(s: str) -> ImageReference: + url = urllib.parse.urlparse(s) + parts = url.path.split('/')[-5:] + if len(parts) != 5: + raise ImageReference.ParseError('URL should point to an image file') + + return ImageReference( + release=parts[0], + version=parts[1], + arch=Arch[parts[2].upper()], + variant=parts[3], + deployment=Deployment.OSTREE if 'ostree' in parts[4] else Deployment.APT, + ) + + @staticmethod + def from_filename(name: str) -> ImageReference: + match = ImageReference._FILENAME_RE.match(name.removesuffix('.img')) + if match is None: + raise ImageReference.ParseError('Failed to match filename') + + return ImageReference( + release=match.group('release'), + version=match.group('version'), + arch=Arch[match.group('arch').upper()], + variant=match.group('variant'), + deployment=Deployment.OSTREE if match.group('ostree') else Deployment.APT, + ) + + @property + def name(self) -> str: + if self.variant == 'sdk': + boot_type = 'sdk' + elif self.arch == Arch.AMD64: + boot_type = 'uefi' + else: + boot_type = 'uboot' + + parts = ['apertis'] + if self.deployment == Deployment.OSTREE: + parts.append('ostree') + + parts.extend( + ( + '-'.join( + ( + self.release, + self.variant, + self.arch.name.lower(), + boot_type, + ) + ), + self.version, + ) + ) + + return '_'.join(parts) + + +@contextlib.contextmanager +def atomic_write(*, dest: Path, tmp_dir: Optional[Path]) -> Iterator[IO[bytes]]: + if tmp_dir is not None: + tmp_dir.mkdir(parents=True, exist_ok=True) + + dest.parent.mkdir(parents=True, exist_ok=True) + + with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as fp: + try: + yield fp + + fp.flush() + os.rename(fp.name, dest) + except: # noqa: E722 + os.unlink(fp.name) + raise + + +def download_file( + url: str, + *, + description: str, + sha256: str, + dest: IO[bytes], + gunzip: bool = False, +) -> None: + CHUNK_SIZE = 4 * 1024 * 1024 + + logger.debug('Download %s to %s', url, dest) + + with requests.Session() as session: + response = session.get(url, stream=True) + response.raise_for_status() + + file_size = int(response.headers['content-length']) + + hasher = hashlib.new('sha256') + total_bytes_downloaded = 0 + + if gunzip: + decompressor = zlib.decompressobj(wbits=zlib.MAX_WBITS | 16) + else: + decompressor = None + + with tqdm( # type: ignore + desc=f'Downloading {description}', + total=file_size, + unit='B', + unit_scale=True, + unit_divisor=1024, + ) as progress: + for chunk in response.iter_content(CHUNK_SIZE): + hasher.update(chunk) + + if decompressor is not None: + decompressed = decompressor.decompress(chunk) + dest.write(decompressed) + else: + dest.write(chunk) + + bytes_downloaded = response.raw.tell() - total_bytes_downloaded + progress.update(bytes_downloaded) + total_bytes_downloaded += bytes_downloaded + + if decompressor is not None: + dest.write(decompressor.flush()) + + if (actual_sha256 := hasher.hexdigest()) != sha256: + raise ValueError(f'Bad sha256: expected {sha256}, got {actual_sha256}') + + +class ImageRepository: + _BASE = 'https://images.apertis.org' + + _RELEASE_CHANNEL = 'release' + _DAILY_CHANNEL = 'daily' + + _RC_RE = re.compile(r'rc(\d+)$') + + @dataclass + class Query: + variant: str + arch: Arch + deployment: Deployment + + def __init__(self) -> None: + pass + + def _get_gz_url(self, image: ImageReference) -> str: + return '/'.join( + ( + self._BASE, + self._RELEASE_CHANNEL + if image.version.startswith('v') + else self._DAILY_CHANNEL, + image.release, + image.version, + image.arch.name.lower(), + image.variant, + image.name + '.img.gz', + ), + ) + + def _get_sha256_url(self, image: ImageReference) -> str: + return self._get_gz_url(image) + '.sha256' + + def _list_versions_in_channel(self, channel: str, release: str) -> Iterable[str]: + response = requests.get(f'{self._BASE}/{channel}/{release}') + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') + + found_one = False + + for tag in soup.find_all('a'): + href = tag.get('href') + if not href.startswith('.') and href.endswith('/'): + found_one = True + yield href.rstrip('/') + + if not found_one: + raise ApertisImagesException( + f'Failed to find any release at {response.url}' + ) + + def _contains_image( + self, release: str, version: str, test: Optional[ImageRepository.Query] + ) -> bool: + if test is None: + return False + + image = ImageReference( + release=release, + version=version, + variant=test.variant, + arch=test.arch, + deployment=test.deployment, + ) + return self.image_exists(image) + + def get_latest_release( + self, + release: str, + *, + containing: Optional[ImageRepository.Query] = None, + ) -> Optional[str]: + versions = sorted( + self._list_versions_in_channel('release', release), reverse=True + ) + for version in versions: + # After reverse, releases are ordered newest-to-oldest, but, rc's are listed + # *before* their release. + if (match := self._RC_RE.search(version)) is not None: + # If the stable release is out, try that first. + non_rc = version[: match.start(0)] + if non_rc in versions and self._contains_image( + release, version, containing + ): + return non_rc + + if self._contains_image(release, version, containing): + return version + + return None + + def get_latest_daily( + self, + release: str, + *, + containing: Optional[ImageRepository.Query] = None, + ) -> Optional[str]: + versions = sorted( + self._list_versions_in_channel('daily', release), reverse=True + ) + for version in versions: + if self._contains_image(release, version, containing): + return version + + return None + + def image_exists(self, image: ImageReference) -> bool: + for url in (self._get_gz_url(image), self._get_sha256_url(image)): + response = requests.head(url) + if response.status_code == 404: + return False + elif not (200 <= response.status_code <= 299): + response.raise_for_status() + + return True + + def download(self, *, image: ImageReference, dest: Path, tmp_dir: Path) -> None: + url = self._get_gz_url(image) + sha256_url = self._get_sha256_url(image) + + logger.debug('Get sha256 from %s', sha256_url) + sha256_response = requests.get(sha256_url) + sha256_response.raise_for_status() + sha256 = sha256_response.text.split()[0] + + with atomic_write(dest=dest, tmp_dir=tmp_dir) as fp: + download_file( + url, description=dest.name, sha256=sha256, dest=fp, gunzip=True + ) + + +class DebianPackagePool: + _SNAPSHOT = '20220616T152532Z' + + def __init__(self) -> None: + pass + + def _get_package_url(self, package: DebianPackage) -> str: + return '/'.join( + ( + 'https://snapshot.debian.org/archive/debian', + self._SNAPSHOT, + 'pool/main', + package.source[0], + package.source, + f'{package.name}_{package.version}_all.deb', + ) + ) + + def download_entry( + self, + *, + entry: DebianPackageEntry, + dest: Path, + tmp_dir: Path, + ) -> None: + url = self._get_package_url(entry.package) + + with tempfile.TemporaryFile() as deb_fp: + download_file( + url, description=dest.name, sha256=entry.package.sha256, dest=deb_fp + ) + + deb_fp.seek(0) + # Note: need 'type: ignore' because fileobj's type is more restrictive than + # required (BinaryIO vs IO[bytes]). + deb = DebFile(fileobj=deb_fp) # type: ignore + + source = deb.data.get_file(entry.filename) + + with atomic_write(dest=dest, tmp_dir=tmp_dir) as fp: + shutil.copyfileobj(source, fp) + + +class Downloads: + @dataclass + class ImageFile: + image: ImageReference + location: Path + + def __init__(self) -> None: + xdg_data_home = os.environ.get('XDG_DATA_HOME') or os.path.expanduser( + '~/.local/share' + ) + self.root = Path(xdg_data_home) / 'apertis-images' + + @property + def tmp_dir(self) -> Path: + return self.root / 'tmp' + + @property + def _images_dir(self) -> Path: + return self.root / 'images' + + @property + def _bootloaders_dir(self) -> Path: + return self.root / 'bootloaders' + + def get_image(self, image: ImageReference) -> Downloads.ImageFile: + return Downloads.ImageFile( + image=image, + location=self._images_dir / (image.name + '.img'), + ) + + def list_images( + self, + *, + filter: ImageReference.Partial = ImageReference.Partial.empty(), + ) -> Iterable[Downloads.ImageFile]: + for path in self._images_dir.glob('*.img'): + image = ImageReference.from_filename(path.name) + + if ( + (filter.release is not None and filter.release != image.release) + or (filter.version is not None and filter.version != image.version) + or (filter.variant is not None and filter.variant != image.variant) + or (filter.arch is not None and filter.arch != image.arch) + or ( + filter.deployment is not None + and filter.deployment != image.deployment + ) + ): + continue + + yield Downloads.ImageFile( + image=image, + location=path, + ) + + def get_bootloader(self, bootloader: Bootloader) -> Path: + version = bootloader.debian_package_entry.package.version + + if isinstance(bootloader, UBoot): + return ( + self._bootloaders_dir + / f'u-boot-qemu-{bootloader.qemu_arch}-{version}.bin' + ) + elif isinstance(bootloader, Ovmf): + return self._bootloaders_dir / f'ovmf-code-{version}.fd' + + +def prepare_qemu_environment() -> None: + # Workaround for USB mouse cursor jumping: + # https://wiki.archlinux.org/title/QEMU#Mouse_cursor_is_jittery_or_erratic + os.environ['SDL_VIDEO_X11_DGAMOUSE'] = '0' + + +@dataclass +class FilesystemShare: + source: str + dest: str + rw: bool + + @staticmethod + def parse(s: str) -> FilesystemShare: + parts = s.split(':') + if len(parts) == 2: + rw = False + elif len(parts) == 3: + if parts[2] == 'rw': + rw = True + elif parts[2] == 'ro': + rw = False + else: + raise ValueError('Suffix of filesystem share may only be :rw or :ro') + else: + raise ValueError('Invalid # of components in filesystem share spec') + + return FilesystemShare( + source=parts[0], + dest=parts[1], + rw=rw, + ) + + +class QemuBuilder: + def __init__(self, qemu: str) -> None: + self._args = [qemu] + + def add( + self, arg: str, value: Union[str, dict[str, Optional[str]], None] = None + ) -> None: + self._args.append(f'-{arg}') + if value is not None: + if isinstance(value, dict): + self._args.append( + ','.join( + f'{k}={self._escape(v)}' if v is not None else k + for k, v in value.items() + ) + ) + else: + self._args.append(value) + + def _escape(self, arg: str) -> str: + return arg.replace(',', r'\,') + + @property + def args(self) -> list[str]: + return self._args[:] + + +class QemuCpu: + AUTO = 'auto' + HOST = 'host' + # Defaults used when the host system does not match the arch being run. + NON_NATIVE_BY_ARCH = { + # From: https://qemu-project.gitlab.io/qemu/system/qemu-cpu-models.html#preferred-cpu-models-for-intel-x86-hosts + Arch.AMD64: 'max', + Arch.ARMHF: 'cortex-a15', + Arch.ARM64: 'cortex-a72', + } + + VALUES = {HOST, *NON_NATIVE_BY_ARCH.values()} + + +def build_qemu_command( + *, + arch: Arch, + disk: Path, + default_bootloader: Path, + cpu: str, + memory: int, + fs_shares: list[FilesystemShare], + serial_console: bool, +) -> list[str]: + QEMU_ARCH = { + Arch.AMD64: 'x86_64', + Arch.ARMHF: 'arm', + Arch.ARM64: 'aarch64', + } + + builder = QemuBuilder(f'qemu-system-{QEMU_ARCH[arch]}') + builder.add('bios', str(default_bootloader)) + + can_use_kvm = arch in (Arch.AMD64, Arch.ARM64) and arch == Arch.host() + if can_use_kvm: + builder.add('enable-kvm') + + if cpu == QemuCpu.AUTO: + cpu = QemuCpu.HOST if can_use_kvm else QemuCpu.NON_NATIVE_BY_ARCH[arch] + builder.add('cpu', cpu) + + if arch != Arch.AMD64: + builder.add('machine', 'virt') + + if serial_console: + builder.add('serial', 'mon:stdio') + + builder.add('m', str(memory)) + builder.add('drive', {'format': 'raw', 'file': str(disk)}) + + for dev in ('usb-ehci', 'usb-kbd', 'usb-mouse', 'VGA'): + builder.add('device', dev) + + for fs in fs_shares: + builder.add( + 'virtfs', + { + 'local': None, + 'path': fs.source, + 'mount_tag': fs.dest, + 'security_model': 'passthrough', + 'readonly': 'off' if fs.rw else 'on', + }, + ) + + return builder.args + + +@runtime_checkable +class FilterSharedArguments(Protocol): + release: Optional[str] + version: Optional[str] + variant: Optional[str] + arch: Optional[Arch] + deployment: Optional[Deployment] + + +def filter_args_to_partial(args: FilterSharedArguments) -> ImageReference.Partial: + return ImageReference.Partial( + release=args.release, + version=args.version, + variant=args.variant, + arch=args.arch, + deployment=args.deployment, + ) + + +@runtime_checkable +class PurgeDownloadsArguments(FilterSharedArguments, Protocol): + force: bool + + +def do_purge_downloads(args: PurgeDownloadsArguments) -> None: + to_delete = list(Downloads().list_images(filter=filter_args_to_partial(args))) + + if not to_delete: + logger.info('No matching images found.') + return + + if not args.force: + print('Going to delete:') + + for image_file in to_delete: + print(' -', image_file.image.name) + + while True: + try: + answer = input('Are you sure? (y/n) ').lower() + if answer == 'y': + break + elif answer == 'n': + return + else: + print('Invalid answer.') + except (KeyboardInterrupt, EOFError): + print() + return + + for image_file in to_delete: + logger.info('Deleting %s...', image_file.image.name) + image_file.location.unlink() + + logger.info('Deleted %d image(s).', len(to_delete)) + + +@runtime_checkable +class ListDownloadsArguments(FilterSharedArguments, Protocol): + pass + + +def do_list_downloads(args: ListDownloadsArguments) -> None: + matches: list[dict[str, str]] = [] + column_widths: dict[str, int] = {} + + for image_file in Downloads().list_images(filter=filter_args_to_partial(args)): + image = image_file.image + + match = { + 'name': image.name, + 'release': image.release, + 'version': image.version, + 'variant': image.variant, + 'arch': image.arch.value, + 'deployment': image.deployment.value, + } + + matches.append(match) + + if column_widths: + for k, v in match.items(): + column_widths[k] = max(column_widths[k], len(v)) + else: + for k, v in match.items(): + column_widths[k] = max(len(k), len(v)) + + if not matches: + print('No matches.') + return + + matches.sort(key=lambda m: m['name']) + + print( + '|', ' | '.join(c.capitalize().ljust(w) for c, w in column_widths.items()), '|' + ) + + for match in matches: + print('|', ' | '.join(v.ljust(column_widths[k]) for k, v in match.items()), '|') + + +@runtime_checkable +class RunSharedArguments(Protocol): + cpu: str + memory: int + fs_shares: list[FilesystemShare] + serial_console: bool + + +def run_impl( + image: ImageReference, + repository: ImageRepository, + args: RunSharedArguments, +) -> None: + downloads = Downloads() + + default_bootloader = downloads.get_bootloader(image.arch.bootloader) + if not default_bootloader.exists(): + pool = DebianPackagePool() + pool.download_entry( + entry=image.arch.bootloader.debian_package_entry, + dest=default_bootloader, + tmp_dir=downloads.tmp_dir, + ) + + image_file = downloads.get_image(image) + if not image_file.location.exists(): + repository.download( + image=image_file.image, dest=image_file.location, tmp_dir=downloads.tmp_dir + ) + + prepare_qemu_environment() + command = build_qemu_command( + arch=image.arch, + disk=image_file.location, + cpu=args.cpu, + memory=args.memory, + fs_shares=args.fs_shares, + serial_console=args.serial_console, + default_bootloader=default_bootloader, + ) + logger.debug('Running: %s', ' '.join(map(shlex.quote, command))) + + logger.info('Running QEMU...') + os.execvp(command[0], command) + + +@runtime_checkable +class RunArguments(RunSharedArguments, Protocol): + name_or_url: str + + +def do_run(args: RunArguments) -> None: + repository = ImageRepository() + if args.name_or_url.startswith(('http://', 'https://', 'images.apertis.org')): + image = ImageReference.from_url(args.name_or_url) + else: + image = ImageReference.from_filename(args.name_or_url) + + run_impl(image, repository, args) + + +@runtime_checkable +class RunReleaseArguments(RunSharedArguments, Protocol): + release: str + version: str + variant: str + arch: Arch + deployment: Deployment + + +def do_run_release(args: RunReleaseArguments) -> None: + repository = ImageRepository() + + version: Optional[str] = args.version + query = ImageRepository.Query( + variant=args.variant, + arch=args.arch, + deployment=args.deployment, + ) + + if version == SpecialVersions.RELEASE: + version = repository.get_latest_release(args.release, containing=query) + elif version == SpecialVersions.DAILY: + version = repository.get_latest_daily(args.release, containing=query) + + if version is None: + raise ApertisImagesException(f'No images found matching the given arguments.') + + image = ImageReference( + release=args.release, + version=version, + variant=args.variant, + arch=args.arch, + deployment=args.deployment, + ) + + run_impl(image, repository, args) + + +EnumType = TypeVar('EnumType', bound=enum.Enum) + + +def argparse_enum_choices(ty: Type[EnumType]) -> list[str]: + return [k.name.lower() for k in ty] + + +def main() -> None: + RELEASE_EXAMPLE = 'v2022' + VERSION_EXAMPLE = 'v2022.0rc1, 20220523.0118' + VARIANT_EXAMPLE = 'fixedfunction, hmi' + + DEFAULT_MEMORY = 2048 + + parser = argparse.ArgumentParser() + parser.add_argument('--debug', action='store_true') + + subparsers = parser.add_subparsers(dest='command', required=True) + + filter_shared_parser = argparse.ArgumentParser(add_help=False) + filter_shared_parser.add_argument( + '--release', + '-r', + help=f'only include the given release (e.g. {RELEASE_EXAMPLE})', + ) + filter_shared_parser.add_argument( + '--version', + '-V', + help=f'only include the given version (e.g. {VERSION_EXAMPLE})', + ) + filter_shared_parser.add_argument( + '--variant', + '-t', + help=f'only include the given variant (e.g. {VARIANT_EXAMPLE})', + ) + filter_shared_parser.add_argument( + '--arch', + '-a', + choices=argparse_enum_choices(Arch), + help='only include the given architecture', + ) + filter_shared_parser.add_argument( + '--deployment', + '-d', + choices=argparse_enum_choices(Deployment), + help='only include the given deployment type', + ) + + subparsers.add_parser( + 'list-downloads', + parents=[filter_shared_parser], + help='Lists all the currently downloaded images,' + ' or only those matching the given arguments.', + ) + + purge_downloads_parser = subparsers.add_parser( + 'purge-downloads', + parents=[filter_shared_parser], + help='Deletes all the currently downloaded images,' + ' or only those matching the given arguments.', + ) + purge_downloads_parser.add_argument( + '--force', + action='store_true', + help="don't ask for conformation", + ) + + run_shared_parser = argparse.ArgumentParser(add_help=False) + run_shared_parser.add_argument( + '--cpu', + default=QemuCpu.AUTO, + help='CPU for the VM' + f' (if auto, defaults to one of: {", ".join(QemuCpu.VALUES)})', + ) + run_shared_parser.add_argument( + '--memory', + type=int, + default=DEFAULT_MEMORY, + help='amount of memory for the VM, in bytes', + ) + run_shared_parser.add_argument( + '--share-fs', + type=FilesystemShare.parse, + dest='fs_shares', + nargs='*', + default=[], + help='share the given filesystems with the VM via 9P' + ' (format is SOURCE:DEST[:ro|:rw], defaults to :ro)', + ) + run_shared_parser.add_argument( + '--serial-console', + action='store_true', + default=True, + help='show the serial console in the current terminal', + ) + run_shared_parser.add_argument( + '--no-serial-console', + action='store_false', + dest='serial_console', + help='do not show the serial console in the current terminal', + ) + + run_parser = subparsers.add_parser( + 'run', + parents=[run_shared_parser], + help='Runs the given Apertis image, by name or URL.', + ) + run_parser.add_argument( + 'name_or_url', + help='the name or URL to run (e.g. apertis_v2022-hmi-amd64-uefi_20220523.0118)', + ) + + run_release_parser = subparsers.add_parser( + 'run-release', + parents=[run_shared_parser], + help='Runs the Apertis release matching the given arguments.', + ) + run_release_parser.add_argument( + 'release', help=f'the release to run (e.g. {RELEASE_EXAMPLE})' + ) + run_release_parser.add_argument( + '--version', + '-V', + default=SpecialVersions.DAILY, + help=f'the version to run (e.g. {VERSION_EXAMPLE});' + f" use '{SpecialVersions.DAILY}' for the latest daily," + f" '{SpecialVersions.RELEASE}' for the latest release", + ) + run_release_parser.add_argument( + '--variant', + '-t', + default='hmi', + help=f'the variant to run (e.g. {VARIANT_EXAMPLE})', + ) + run_release_parser.add_argument( + '--arch', + '-a', + default=Arch.host().name.lower(), + choices=argparse_enum_choices(Arch), + help='the architecture to run', + ) + run_release_parser.add_argument( + '--deployment', + '-d', + default='apt', + choices=argparse_enum_choices(Deployment), + help='the deployment to run', + ) + + args = parser.parse_args() + + handler = logging.StreamHandler() + formatter = logging.Formatter('[%(levelname)s] %(message)s') + + handler.setFormatter(formatter) + logger.addHandler(handler) + + logger.setLevel(logging.DEBUG if args.debug else logging.INFO) + + if getattr(args, 'arch', None) is not None: + args.arch = Arch[args.arch.upper()] + if getattr(args, 'deployment', None) is not None: + args.deployment = Deployment[args.deployment.upper()] + + command: str = args.command + if command == 'list-downloads': + assert isinstance(args, ListDownloadsArguments), args + do_list_downloads(args) + elif command == 'purge-downloads': + assert isinstance(args, PurgeDownloadsArguments), args + do_purge_downloads(args) + elif command == 'run': + assert isinstance(args, RunArguments), args + do_run(args) + elif command == 'run-release': + assert isinstance(args, RunReleaseArguments), args + do_run_release(args) + else: + assert False, command + + +if __name__ == '__main__': + main()