Skip to content
Snippets Groups Projects
apertis-pkg-pull-updates 15.29 KiB
#!/usr/bin/env python3
# SPDX-License-Identifier: MPL-2.0
#
# Copyright © 2019 Collabora Ltd
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import argparse
import json
import os
import re
import shlex
import subprocess
import sys
import tempfile
import urllib.parse
import urllib.request
from functools import cache
from itertools import chain

import yaml
from debian.changelog import Changelog
from debian.debian_support import Version
from sh import __version__ as sh_version
from sh.contrib import git

RETURN_CMD = {} if sh_version.startswith("1.") else {"_return_cmd": True}


def debian_branch(suite):
    return "debian/" + suite


def upstream_branch(suite):
    """Return an upstream source branch name for a Debian suite

    >>> upstream_branch('buster')
    'upstream/buster'
    >>> upstream_branch('buster-security')
    'upstream/buster'
    >>> upstream_branch('buster-backports')
    'upstream/buster'
    >>> upstream_branch('buster-proposed-updates')
    'upstream/buster'
    >>> upstream_branch('unstable')
    'upstream/unstable'
    """
    return "upstream/" + suite.split("-")[0]


def parse_ref(ref: str) -> str:
    return git("rev-parse", "-q", "--verify", ref + "^{commit}", _ok_code=[0, 1]).strip(
        "\n"
    )


def is_ancestor(this: str, other: str):
    return (
        git(
            "merge-base",
            "--is-ancestor",
            this,
            other,
            **RETURN_CMD,
            _ok_code=[0, 1],
        ).exit_code
        == 0
    )


def force_branch(name: str, commit: str) -> str:
    old_commit = parse_ref(name)
    if old_commit:
        print(f"Moving branch {name} to {commit:.7}, was: {old_commit:.7}")
    git("update-ref", f"refs/heads/{name}", commit)
    return commit


def run(cmd, **kwargs):
    quoted = " ".join(shlex.quote(i) for i in cmd)
    print("running", quoted)
    return subprocess.run(cmd, **kwargs)


def ensure_suite_branches(suite, allow_missing=False):
    upstream_packaging = debian_branch(suite)
    upstream = upstream_branch(suite)
    # if there is an upstream packaging branch, set a local tracking branch to it
    # if there’s no branch but we can skip it, skip it
    if parse_ref(f"origin/{upstream_packaging}") or not allow_missing:
        run(
            [
                "git",
                "branch",
                "--track",
                "-f",
                upstream_packaging,
                f"origin/{upstream_packaging}",
            ],
            check=True,
        )
    # ensure the local "upstream" branch is in sync with the upstream one
    # but only reset it if it’s out of sync
    if parse_ref(upstream) != parse_ref(f"origin/{upstream}"):
        run(["git", "branch", "--track", "-f", upstream, f"origin/{upstream}"])


def configure_git_user(name, email):
    git("config", "user.email", email)
    git("config", "user.name", name)


def prepare_git_repo(upstream_suite):
    ensure_suite_branches(upstream_suite)
    ensure_suite_branches(f"{upstream_suite}-security", allow_missing=True)
    ensure_suite_branches(f"{upstream_suite}-updates", allow_missing=True)
    ensure_suite_branches(f"{upstream_suite}-backports", allow_missing=True)
    ensure_suite_branches(f"{upstream_suite}-proposed-updates", allow_missing=True)
    configure_git_user("Apertis CI", "devel@lists.apertis.org")


@cache
def get_remote_version(suite, package):
    """Request the package version for the Debian suite from Madison

    Madison returns a YAML response in the following format:

        ---
        dash:
          0.5.7-4:
            jessie:
              - source
          0.5.7-4+b1:
            jessie:
              - amd64
              - armel
              - armhf
              - i386

    >>> get_remote_version('jessie', 'dash')
    fetch https://qa.debian.org/madison.php?package=dash&yaml=on&s=jessie
    Version('0.5.7-4')
    >>> get_remote_version('jessie', 'gtk+3.0')
    fetch https://qa.debian.org/madison.php?package=gtk%2B3.0&yaml=on&s=jessie
    Version('3.14.5-1+deb8u1')
    """
    quoted_package = urllib.parse.quote(package)
    url = (
        f"https://qa.debian.org/madison.php?package={quoted_package}&yaml=on&s={suite}"
    )
    print("fetch", url)
    with urllib.request.urlopen(url) as response:
        data = yaml.safe_load(response.read().decode("utf-8"))
    if "error" in data:
        raise Exception(
            "failed to retrieve remote upstream version:", data.get("error")
        )
    if package not in data:
        raise KeyError(suite)

    # create version -> arch list mapping: {'0.5.7-4': ['source'], ...}
    versions = {
        Version(v): list(chain.from_iterable(s.values()))
        for v, s in data[package].items()
    }
    sourceful_versions = [v for v, a in versions.items() if "source" in a]
    if not sourceful_versions:
        raise KeyError(suite)
    return max(sourceful_versions)


def get_package_name():
    with open("debian/changelog") as f:
        ch = Changelog(f, max_blocks=1)
    return ch.package


def get_git_branch_version(branch: str):
    ch = Changelog(git.show(f"{branch}:debian/changelog"), max_blocks=1)
    return ch.version


def create_merge(target: str, *branches):
    this, *others = branches
    msg = "Merge %s to %s" % (" ".join(others), target)
    print(msg)
    return git(
        "commit-tree",
        *chain(*[("-p", b) for b in branches]),
        f"{this}^{{tree}}",
        _in=msg,
    ).strip("\n")


def get_newest_branch_version(release: str, branches=None):
    versions = {
        branch: get_git_branch_version(branch)
        for branch in branches or existing_branches(release)
    }
    newest_branch, newest_version = sorted(versions.items(), key=lambda kv: kv[1]).pop()
    return newest_branch, newest_version


def existing_branches(release: str):
    branches = set(
        (
            f"debian/{release}-backports",
            f"debian/{release}-proposed-updates",
            f"debian/{release}-security",
            f"debian/{release}-updates",
            f"debian/{release}",
        )
    )
    for branch in list(branches):
        if not parse_ref(branch):
            branches.remove(branch)
    return branches


def prepare_target_branch(release: str, target: str):
    branches = existing_branches(release)
    target_branch = f"debian/{target}"
    newest_branch, newest_version = get_newest_branch_version(release, branches)
    branches.remove(newest_branch)
    resolve_target = parse_ref(target_branch)
    resolve_newest = parse_ref(newest_branch)

    # We still want a merge if importing a new stable release if security/proposed-updates have diverged
    if resolve_target == resolve_newest and f"debian/{release}" != target_branch:
        print("Will import to the newest branch, no merge necessary")
    else:
        for branch in branches:
            if not is_ancestor(branch, newest_branch):
                print(f"Branch {branch} is not an ancestor of {newest_branch}")
        if not all([is_ancestor(branch, newest_branch) for branch in branches]):
            print("Merge needed")
            force_branch(
                target_branch, create_merge(target_branch, newest_branch, *branches)
            )
        else:
            force_branch(target_branch, newest_branch)


def should_update(upstream_suite, package_name, local_version, missing_is_fatal=True):
    try:
        remote_version = Version(get_remote_version(upstream_suite, package_name))
        print("remote version:", remote_version)
    except KeyboardInterrupt:
        raise
    except KeyError:
        if missing_is_fatal:
            print(
                f"fatal: no version found in Debian for release {upstream_suite}",
                file=sys.stderr,
            )
            sys.exit(1)
        else:
            return None
    if remote_version > local_version:
        return remote_version
    return None


def get_remote_dsc_path(package_name, version, mirror=False):
    if not mirror:
        url = "https://snapshot.debian.org/mr/package/{}/{}/srcfiles?fileinfo=1".format(
            package_name, version
        )
    else:
        url = "https://snapshot-mlm-01.debian.org/mr/package/{}/{}/srcfiles?fileinfo=1".format(
            package_name, version
        )
    print("fetch", url)
    with urllib.request.urlopen(url) as response:
        data = json.loads(response.read().decode("utf-8"))
    for filehash, fileinfo in data["fileinfo"].items():
        for i in fileinfo:
            if i["name"].endswith(".dsc") and (
                i["archive_name"] == "debian" or i["archive_name"] == "debian-security"
            ):
                return i["archive_name"] + i["path"] + "/" + i["name"]
    raise KeyError((package_name, version))


def get_remote_sources(remote_dsc, tmpdir):
    # FIXME: drop --allow-unauthenticated
    run(
        ["dget", "--download-only", "--allow-unauthenticated", remote_dsc],
        cwd=tmpdir,
        check=True,
    )
    return os.path.join(tmpdir, os.path.basename(remote_dsc))


def import_sources(local_dsc, upstream_suite):
    git("checkout", debian_branch(upstream_suite))
    run(
        [
            "gbp",
            "import-dsc",
            local_dsc,
            "--author-is-committer",
            "--author-date-is-committer-date",
            "--upstream-branch=" + upstream_branch(upstream_suite),
            "--debian-branch=" + debian_branch(upstream_suite),
            "--debian-tag=debian/%(version)s",
            "--no-sign-tags",
            "--no-pristine-tar",
        ],
        env={
            "GBP_CONF_FILES": "/dev/null"
        },  # prevent the debian/gbp.conf in packages from interfering
        check=True,
    )
    # gbp puts all the new changelog entries in the commit message, generating
    # big walls of text when, for instance, importing the version from bullseye
    # on top of the buster one
    # GitLab then puts the whole log message in the CI_COMMIT_MESSAGE env var,
    # which is passed on the docker command line, resulting in a error:
    #   standard_init_linux.go:219: exec user process caused: argument list too long
    # https://gitlab.com/gitlab-org/gitlab-runner/-/issues/26624#note_529234097
    # to avoid that, trim the message to only keep the first line
    shortmessage = git("log", "--format=%s", "-n1")
    git("commit", "--amend", f"--message={shortmessage}")
    # run `gbp tag --retag` to update the version tag to point to the amended commit rather than the original one
    run(
        [
            "gbp",
            "tag",
            "--retag",
            "--debian-branch=" + debian_branch(upstream_suite),
            "--debian-tag=debian/%(version)s",
            "--no-sign-tags",
        ],
        env={
            "GBP_CONF_FILES": "/dev/null"
        },  # prevent the debian/gbp.conf in packages from interfering
        check=True,
    )


def main():
    parser = argparse.ArgumentParser(
        description="Pull updates from the upstream repositories"
    )
    parser.add_argument(
        "--package", dest="package", type=str, help="the package name (e.g. glib2.0)"
    )  # TODO: figure this out from the repo
    parser.add_argument(
        "--upstream",
        dest="upstream",
        type=str,
        required=True,
        help="the upstream suite (e.g. buster)",
    )
    parser.add_argument(
        "--mirror",
        dest="mirror",
        type=str,
        required=True,
        help="the upstream mirror (e.g. http://deb.debian.org/debian)",
    )
    parser.add_argument(
        "--backports", action="store_true", help="check backports repository"
    )
    parser.add_argument(
        "--proposed-updates",
        action="store_true",
        help="check proposed-updates repository",
    )
    args = parser.parse_args()
    package_name = args.package
    # buster-security → buster,
    # buster-updates  → buster,
    # buster          → buster
    upstream_suite = args.upstream.split("-")[0]
    mirror = args.mirror

    package_name = args.package or get_package_name()
    print("source package", package_name)
    prepare_git_repo(upstream_suite)
    list_suites = [
        f"{upstream_suite}-security",
        f"{upstream_suite}-updates",
        upstream_suite,
    ]
    if args.backports:
        list_suites.append(f"{upstream_suite}-backports")
    if args.proposed_updates:
        list_suites.append(f"{upstream_suite}-proposed-updates")
    for suite in list_suites:
        local_version_branch, local_version = get_newest_branch_version(upstream_suite)
        print("local version:", local_version)
        remote_version = should_update(
            suite, package_name, local_version, upstream_suite == suite
        )
        if remote_version:
            print("update to", remote_version)

            try:
                remote_dsc_path = get_remote_dsc_path(package_name, remote_version)
            except urllib.error.HTTPError:
                print(
                    "⚠️ Fail to get dsc from main snapshot.debian.org, trying from a mirror"
                )
                remote_dsc_path = get_remote_dsc_path(
                    package_name, remote_version, mirror=True
                )

            dsc = re.sub(r"/debian/*", "/", mirror) + "/" + remote_dsc_path
            print("download", dsc)
            with tempfile.TemporaryDirectory(prefix="pull-updates") as tmpdir:
                local_dsc = get_remote_sources(dsc, tmpdir)
                prepare_target_branch(upstream_suite, suite)
                import_sources(local_dsc, suite)
                if (
                    local_version.upstream_version != remote_version.upstream_version
                ) and (remote_version.debian_revision is not None):
                    run(["pristine-lfs", "import-dsc", local_dsc])

        # Keep the debian/{suite} (e.g. debian/bullseye) branches up-to-date after a point release.
        # i.e. Updates from {upstream_suite}-security, {upstream_suite}-backports and {upstream_suite}-proposed-updates fall into
        # the main repo {upstream_suite}. We need to sync the corresponding branch with the one used previously.
        if suite == upstream_suite:
            local_suite_version = Version(
                get_git_branch_version(debian_branch(upstream_suite))
            )
            remote_suite_version = should_update(
                upstream_suite, package_name, local_suite_version
            )
            if remote_suite_version and remote_suite_version == local_version:
                upstream_suite_branch = debian_branch(upstream_suite)
                print(
                    upstream_suite_branch,
                    "needs a fast-forward from",
                    local_version_branch,
                    "for",
                    remote_suite_version,
                )
                git("checkout", upstream_suite_branch)
                o = git(
                    "merge",
                    "--ff-only",
                    local_version_branch,
                    **RETURN_CMD,
                    _out="/dev/stdout",
                    _err="/dev/stderr",
                )
                if o.exit_code == 0:
                    print(
                        f"⏩ Successfully fast-forwarded {local_version_branch} to {upstream_suite_branch}"
                    )
                else:
                    print("🛑 Fast-forward failed")


if __name__ == "__main__":
    main()