Skip to content
Snippets Groups Projects
fossology.py 7.73 KiB
#!/usr/bin/python3

from datetime import datetime, timedelta
import json
import re
import requests
import sys
import time

def print_err(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

class ApertisFossology:
    def __init__(self, server, username, password):
        self.server = f"{server}/repo/api/v1"
        self.upload_id = 0
        self.session = requests.Session()

        now = datetime.now()
        data = {"username": username,
                "password": password,
                "token_name": "ci-cd_{}".format(now.strftime("%Y%m%d-%H%M%S")),
                "token_scope": "write",
                "token_expire": "{}".format((now + timedelta(days = 2)).strftime("%Y-%m-%d"))}
        resp = self.session.post(f"{self.server}/tokens", data = data)
        if resp.status_code != 201:
            raise Exception('POST /tokens {}'.format(resp.status_code))
        token = re.sub("Bearer ", "", resp.json()["Authorization"])
        self.session.headers.update({"Authorization": f"Bearer {token}"})

    def _wait_for_completion(self):
        while (True):
            resp = self.session.get(f"{self.server}/jobs?upload={self.upload_id}")
            if resp.status_code != 200:
                raise Exception('GET /jobs?upload={} {}'.format(self.upload_id, resp.status_code))
            status = resp.json()[-1]["status"]

            if status == "Completed":
                print_err("")
                return
            elif status == "Failed":
                raise Exception('job {} failed for upload {}'.format(resp.json()[-1]["id"], resp.json()[-1]["uploadId"]))

            print_err(".", end = '', flush = True)
            time.sleep(1)

    def get_group_id(self):
        resp = self.session.get(f"{self.server}/jobs?upload={self.upload_id}")
        if resp.status_code != 200:
            raise Exception('GET /jobs?upload={} {}'.format(self.upload_id, resp.status_code))
        return resp.json()[-1]["groupId"]

    def get_previous_upload_analysis_id(self, name, group_id):
        last_deleted_upload_id = 0
        resp = self.session.get(f"{self.server}/jobs")
        if resp.status_code != 200:
            raise Exception('GET /jobs {}'.format(resp.status_code))

        for job in reversed(resp.json()):
            if job["name"] == "Delete":
                last_deleted_upload_id = job["uploadId"]
            elif (job["name"] == name and
                    job["groupId"] == group_id and
                    job["uploadId"] != self.upload_id and
                    job["uploadId"] != last_deleted_upload_id):
                return job["uploadId"]

        return 0

    def upload(self, url, branch, name):
        print_err("Uploading {} branch {}".format(url, branch))
        headers = {"folderId": '1',
                   "uploadDescription": "{} - {}".format(url, branch),
                   "public": 'protected',
                   "ignoreScm": 'true',
                   "uploadType": 'vcs'}
        data = {"vcsType": "git",
                "vcsUrl": url,
                "vcsBranch": branch,
                "vcsName": name}
        resp = self.session.post(f"{self.server}/uploads", headers = headers, data = data)
        if resp.status_code != 201:
            raise Exception('POST /uploads {}'.format(resp.status_code))

        self.upload_id = str(resp.json()["message"])
        self._wait_for_completion()
        print_err("Upload done (id={})".format(self.upload_id))

    def analyse(self, reuse_upload_id, reuse_group):
        print_err("Starting analysis")
        print_err("Reusing analysis from upload {}".format(reuse_upload_id))
        headers = {'folderId': '1',
                   'uploadId': self.upload_id,
                   'Content-Type': 'application/json'}
        data = {"analysis":{
                   "bucket": True,
                   "copyright_email_author": True,
                   "ecc": True,
                   "keyword": True,
                   "mime": True,
                   "monk": True,
                   "nomos": True,
                   "ojo": True,
                   "package": True},
                "decider":{
                   "nomos_monk": True,
                   "bulk_reused": True,
                   "new_scanner": True,
                   "ojo_decider": True},
                "reuse":{
                   "reuse_upload": reuse_upload_id,
                   "reuse_group": reuse_group,
                   "reuse_main": True,
                   "reuse_enhanced": True}}
        resp = self.session.post(f"{self.server}/jobs", headers = headers, data = json.dumps(data))
        if resp.status_code != 201:
            raise Exception('POST /jobs {}'.format(resp.status_code))
        self._wait_for_completion()
        print_err("Analysis done")

    def _report_cleanup(self, str):
        # Remove part of the header, from 'Upstream-Name:' to 'with FOSSology\n'
        str = re.sub('Upstream-Name:.*with FOSSology\n', '', str, flags=re.S)

        # Use FOSSology assertion as License
        str = re.sub('License: NoLicenseConcluded\nComment: scanners found:', 'License:', str, flags=re.S)

        # Remove FOSSology project's name from files path, and sort files
        mysplit = re.split('^(Files:|Copyright:|License:)', str, flags=re.MULTILINE)
        in_files = False
        for index in range(len(mysplit)):
            if mysplit[index] == "Copyright:" or mysplit[index] == "License:":
                # No more in 'Files:' paragraph
                in_files = False

            if in_files:
                list_files = mysplit[index].splitlines()
                for index_files in range(len(list_files)):
                    list_files[index_files] = list_files[index_files].strip().split('/', 1)[1]
                list_files.sort()
                mysplit[index] = ""
                for index_files in range(len(list_files)):
                    mysplit[index] = mysplit[index] + " " + list_files[index_files] + "\n"

            if mysplit[index] == "Files:":
                # Next index will be part of the 'Files:' paragraph
                in_files = True

        return "".join(mysplit).strip('\n')

    def get_report(self, format):
        print_err("Generating report")
        headers = {'uploadId': self.upload_id,
                   'reportFormat': format}
        resp = self.session.get(f"{self.server}/report", headers = headers)
        if resp.status_code != 201:
            raise Exception('GET /report {}'.format(resp.status_code))
        report_id = resp.json()["message"].split('/')[-1]
        self._wait_for_completion()

        print_err("Downloading report (report id={})".format(report_id))
        resp = self.session.get(f"{self.server}/report/{report_id}")
        if resp.status_code != 200:
            raise Exception('GET /report/{} {}'.format(report_id, resp.status_code))
        return self._report_cleanup(resp.text)

if __name__ == "__main__":

    def usage():
        print_err("{} fossology_host username password url branch".format(sys.argv[0]))

    if len(sys.argv) != 6:
        print_err("Error: Need the FOSSology host URL, username, password, source URL and branch to check.")
        usage()
        exit(1)

    fossology_host = sys.argv[1]
    fossology_username = sys.argv[2]
    fossology_password = sys.argv[3]
    source_url = sys.argv[4]
    source_branch = sys.argv[5]

    source_url = source_url.rstrip('/')
    name = source_url.split('/')[-1]
    suffix = '.git'
    if name.endswith(suffix):
        name = name[:-len(suffix)]

    project = ApertisFossology(fossology_host, fossology_username, fossology_password)

    project.upload(source_url, source_branch, name)

    group_id = project.get_group_id()

    reuse_id = project.get_previous_upload_analysis_id(name, group_id)

    project.analyse(reuse_id, group_id)

    report = project.get_report('dep5')
    print(report)