#!/usr/bin/env python3 # SPDX-License-Identifier: MPL-2.0 # # Copyright © 2019 Collabora Ltd # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import argparse import json import os import re import shlex import subprocess import sys import tempfile import urllib.parse import urllib.request from functools import cache from itertools import chain import yaml from debian.changelog import Changelog from debian.debian_support import Version from sh import __version__ as sh_version from sh.contrib import git RETURN_CMD = {} if sh_version.startswith("1.") else {"_return_cmd": True} def debian_branch(suite): return "debian/" + suite def upstream_branch(suite): """Return an upstream source branch name for a Debian suite >>> upstream_branch('buster') 'upstream/buster' >>> upstream_branch('buster-security') 'upstream/buster' >>> upstream_branch('buster-backports') 'upstream/buster' >>> upstream_branch('buster-proposed-updates') 'upstream/buster' >>> upstream_branch('unstable') 'upstream/unstable' """ return "upstream/" + suite.split("-")[0] def parse_ref(ref: str) -> str: return git("rev-parse", "-q", "--verify", ref + "^{commit}", _ok_code=[0, 1]).strip( "\n" ) def is_ancestor(this: str, other: str): return ( git( "merge-base", "--is-ancestor", this, other, **RETURN_CMD, _ok_code=[0, 1], ).exit_code == 0 ) def force_branch(name: str, commit: str) -> str: old_commit = parse_ref(name) if old_commit: print(f"Moving branch {name} to {commit:.7}, was: {old_commit:.7}") git("update-ref", f"refs/heads/{name}", commit) return commit def run(cmd, **kwargs): quoted = " ".join(shlex.quote(i) for i in cmd) print("running", quoted) return subprocess.run(cmd, **kwargs) def ensure_suite_branches(suite, allow_missing=False): upstream_packaging = debian_branch(suite) upstream = upstream_branch(suite) # if there is an upstream packaging branch, set a local tracking branch to it # if there’s no branch but we can skip it, skip it if parse_ref(f"origin/{upstream_packaging}") or not allow_missing: run( [ "git", "branch", "--track", "-f", upstream_packaging, f"origin/{upstream_packaging}", ], check=True, ) # ensure the local "upstream" branch is in sync with the upstream one # but only reset it if it’s out of sync if parse_ref(upstream) != parse_ref(f"origin/{upstream}"): run(["git", "branch", "--track", "-f", upstream, f"origin/{upstream}"]) def configure_git_user(name, email): git("config", "user.email", email) git("config", "user.name", name) def prepare_git_repo(upstream_suite): ensure_suite_branches(upstream_suite) ensure_suite_branches(f"{upstream_suite}-security", allow_missing=True) ensure_suite_branches(f"{upstream_suite}-updates", allow_missing=True) ensure_suite_branches(f"{upstream_suite}-backports", allow_missing=True) ensure_suite_branches(f"{upstream_suite}-proposed-updates", allow_missing=True) configure_git_user("Apertis CI", "devel@lists.apertis.org") @cache def get_remote_version(suite, package): """Request the package version for the Debian suite from Madison Madison returns a YAML response in the following format: --- dash: 0.5.7-4: jessie: - source 0.5.7-4+b1: jessie: - amd64 - armel - armhf - i386 >>> get_remote_version('jessie', 'dash') fetch https://qa.debian.org/madison.php?package=dash&yaml=on&s=jessie Version('0.5.7-4') >>> get_remote_version('jessie', 'gtk+3.0') fetch https://qa.debian.org/madison.php?package=gtk%2B3.0&yaml=on&s=jessie Version('3.14.5-1+deb8u1') """ quoted_package = urllib.parse.quote(package) url = ( f"https://qa.debian.org/madison.php?package={quoted_package}&yaml=on&s={suite}" ) print("fetch", url) with urllib.request.urlopen(url) as response: data = yaml.safe_load(response.read().decode("utf-8")) if "error" in data: raise Exception( "failed to retrieve remote upstream version:", data.get("error") ) if package not in data: raise KeyError(suite) # create version -> arch list mapping: {'0.5.7-4': ['source'], ...} versions = { Version(v): list(chain.from_iterable(s.values())) for v, s in data[package].items() } sourceful_versions = [v for v, a in versions.items() if "source" in a] if not sourceful_versions: raise KeyError(suite) return max(sourceful_versions) def get_package_name(): with open("debian/changelog") as f: ch = Changelog(f, max_blocks=1) return ch.package def get_git_branch_version(branch: str): ch = Changelog(git.show(f"{branch}:debian/changelog"), max_blocks=1) return ch.version def create_merge(target: str, *branches): this, *others = branches msg = "Merge %s to %s" % (" ".join(others), target) print(msg) return git( "commit-tree", *chain(*[("-p", b) for b in branches]), f"{this}^{{tree}}", _in=msg, ).strip("\n") def get_newest_branch_version(release: str, branches=None): versions = { branch: get_git_branch_version(branch) for branch in branches or existing_branches(release) } newest_branch, newest_version = sorted(versions.items(), key=lambda kv: kv[1]).pop() return newest_branch, newest_version def existing_branches(release: str): branches = set( ( f"debian/{release}-backports", f"debian/{release}-proposed-updates", f"debian/{release}-security", f"debian/{release}-updates", f"debian/{release}", ) ) for branch in list(branches): if not parse_ref(branch): branches.remove(branch) return branches def prepare_target_branch(release: str, target: str): branches = existing_branches(release) target_branch = f"debian/{target}" newest_branch, newest_version = get_newest_branch_version(release, branches) branches.remove(newest_branch) resolve_target = parse_ref(target_branch) resolve_newest = parse_ref(newest_branch) # We still want a merge if importing a new stable release if security/proposed-updates have diverged if resolve_target == resolve_newest and f"debian/{release}" != target_branch: print("Will import to the newest branch, no merge necessary") else: for branch in branches: if not is_ancestor(branch, newest_branch): print(f"Branch {branch} is not an ancestor of {newest_branch}") if not all([is_ancestor(branch, newest_branch) for branch in branches]): print("Merge needed") force_branch( target_branch, create_merge(target_branch, newest_branch, *branches) ) else: force_branch(target_branch, newest_branch) def should_update(upstream_suite, package_name, local_version, missing_is_fatal=True): try: remote_version = Version(get_remote_version(upstream_suite, package_name)) print("remote version:", remote_version) except KeyboardInterrupt: raise except KeyError: if missing_is_fatal: print( f"fatal: no version found in Debian for release {upstream_suite}", file=sys.stderr, ) sys.exit(1) else: return None if remote_version > local_version: return remote_version return None def get_remote_dsc_path(package_name, version, mirror=False): if not mirror: url = "https://snapshot.debian.org/mr/package/{}/{}/srcfiles?fileinfo=1".format( package_name, version ) else: url = "https://snapshot-mlm-01.debian.org/mr/package/{}/{}/srcfiles?fileinfo=1".format( package_name, version ) print("fetch", url) with urllib.request.urlopen(url) as response: data = json.loads(response.read().decode("utf-8")) for filehash, fileinfo in data["fileinfo"].items(): for i in fileinfo: if i["name"].endswith(".dsc") and ( i["archive_name"] == "debian" or i["archive_name"] == "debian-security" ): return i["archive_name"] + i["path"] + "/" + i["name"] raise KeyError((package_name, version)) def get_remote_sources(remote_dsc, tmpdir): # FIXME: drop --allow-unauthenticated run( ["dget", "--download-only", "--allow-unauthenticated", remote_dsc], cwd=tmpdir, check=True, ) return os.path.join(tmpdir, os.path.basename(remote_dsc)) def import_sources(local_dsc, upstream_suite): git("checkout", debian_branch(upstream_suite)) run( [ "gbp", "import-dsc", local_dsc, "--author-is-committer", "--author-date-is-committer-date", "--upstream-branch=" + upstream_branch(upstream_suite), "--debian-branch=" + debian_branch(upstream_suite), "--debian-tag=debian/%(version)s", "--no-sign-tags", "--no-pristine-tar", ], env={ "GBP_CONF_FILES": "/dev/null" }, # prevent the debian/gbp.conf in packages from interfering check=True, ) # gbp puts all the new changelog entries in the commit message, generating # big walls of text when, for instance, importing the version from bullseye # on top of the buster one # GitLab then puts the whole log message in the CI_COMMIT_MESSAGE env var, # which is passed on the docker command line, resulting in a error: # standard_init_linux.go:219: exec user process caused: argument list too long # https://gitlab.com/gitlab-org/gitlab-runner/-/issues/26624#note_529234097 # to avoid that, trim the message to only keep the first line shortmessage = git("log", "--format=%s", "-n1") git("commit", "--amend", f"--message={shortmessage}") # run `gbp tag --retag` to update the version tag to point to the amended commit rather than the original one run( [ "gbp", "tag", "--retag", "--debian-branch=" + debian_branch(upstream_suite), "--debian-tag=debian/%(version)s", "--no-sign-tags", ], env={ "GBP_CONF_FILES": "/dev/null" }, # prevent the debian/gbp.conf in packages from interfering check=True, ) def main(): parser = argparse.ArgumentParser( description="Pull updates from the upstream repositories" ) parser.add_argument( "--package", dest="package", type=str, help="the package name (e.g. glib2.0)" ) # TODO: figure this out from the repo parser.add_argument( "--upstream", dest="upstream", type=str, required=True, help="the upstream suite (e.g. buster)", ) parser.add_argument( "--mirror", dest="mirror", type=str, required=True, help="the upstream mirror (e.g. http://deb.debian.org/debian)", ) parser.add_argument( "--backports", action="store_true", help="check backports repository" ) parser.add_argument( "--proposed-updates", action="store_true", help="check proposed-updates repository", ) args = parser.parse_args() package_name = args.package # buster-security → buster, # buster-updates → buster, # buster → buster upstream_suite = args.upstream.split("-")[0] mirror = args.mirror package_name = args.package or get_package_name() print("source package", package_name) prepare_git_repo(upstream_suite) list_suites = [ f"{upstream_suite}-security", f"{upstream_suite}-updates", upstream_suite, ] if args.backports: list_suites.append(f"{upstream_suite}-backports") if args.proposed_updates: list_suites.append(f"{upstream_suite}-proposed-updates") for suite in list_suites: local_version_branch, local_version = get_newest_branch_version(upstream_suite) print("local version:", local_version) remote_version = should_update( suite, package_name, local_version, upstream_suite == suite ) if remote_version: print("update to", remote_version) try: remote_dsc_path = get_remote_dsc_path(package_name, remote_version) except urllib.error.HTTPError: print( "⚠️ Fail to get dsc from main snapshot.debian.org, trying from a mirror" ) remote_dsc_path = get_remote_dsc_path( package_name, remote_version, mirror=True ) dsc = re.sub(r"/debian/*", "/", mirror) + "/" + remote_dsc_path print("download", dsc) with tempfile.TemporaryDirectory(prefix="pull-updates") as tmpdir: local_dsc = get_remote_sources(dsc, tmpdir) prepare_target_branch(upstream_suite, suite) import_sources(local_dsc, suite) if ( local_version.upstream_version != remote_version.upstream_version ) and (remote_version.debian_revision is not None): run(["pristine-lfs", "import-dsc", local_dsc]) # Keep the debian/{suite} (e.g. debian/bullseye) branches up-to-date after a point release. # i.e. Updates from {upstream_suite}-security, {upstream_suite}-backports and {upstream_suite}-proposed-updates fall into # the main repo {upstream_suite}. We need to sync the corresponding branch with the one used previously. if suite == upstream_suite: local_suite_version = Version( get_git_branch_version(debian_branch(upstream_suite)) ) remote_suite_version = should_update( upstream_suite, package_name, local_suite_version ) if remote_suite_version and remote_suite_version == local_version: upstream_suite_branch = debian_branch(upstream_suite) print( upstream_suite_branch, "needs a fast-forward from", local_version_branch, "for", remote_suite_version, ) git("checkout", upstream_suite_branch) o = git( "merge", "--ff-only", local_version_branch, **RETURN_CMD, _out="/dev/stdout", _err="/dev/stderr", ) if o.exit_code == 0: print( f"⏩ Successfully fast-forwarded {local_version_branch} to {upstream_suite_branch}" ) else: print("🛑 Fast-forward failed") if __name__ == "__main__": main()