#!/usr/bin/python3 from datetime import datetime, timedelta import json import re import requests import sys import time def print_err(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) class ApertisFossology: def __init__(self, server, username, password): self.server = f"{server}/repo/api/v1" self.upload_id = 0 self.session = requests.Session() now = datetime.now() data = {"username": username, "password": password, "token_name": "ci-cd_{}".format(now.strftime("%Y%m%d-%H%M%S")), "token_scope": "write", "token_expire": "{}".format((now + timedelta(days = 2)).strftime("%Y-%m-%d"))} resp = self.session.post(f"{self.server}/tokens", data = data) if resp.status_code != 201: raise Exception('POST /tokens {}'.format(resp.status_code)) token = re.sub("Bearer ", "", resp.json()["Authorization"]) self.session.headers.update({"Authorization": f"Bearer {token}"}) def _wait_for_completion(self): while (True): resp = self.session.get(f"{self.server}/jobs?upload={self.upload_id}") if resp.status_code != 200: raise Exception('GET /jobs?upload={} {}'.format(self.upload_id, resp.status_code)) status = resp.json()[-1]["status"] if status == "Completed": print_err("") return elif status == "Failed": raise Exception('job {} failed for upload {}'.format(resp.json()[-1]["id"], resp.json()[-1]["uploadId"])) print_err(".", end = '', flush = True) time.sleep(1) def get_group_id(self): resp = self.session.get(f"{self.server}/jobs?upload={self.upload_id}") if resp.status_code != 200: raise Exception('GET /jobs?upload={} {}'.format(self.upload_id, resp.status_code)) return resp.json()[-1]["groupId"] def get_previous_upload_analysis_id(self, name, group_id): last_deleted_upload_id = 0 resp = self.session.get(f"{self.server}/jobs") if resp.status_code != 200: raise Exception('GET /jobs {}'.format(resp.status_code)) for job in reversed(resp.json()): if job["name"] == "Delete": last_deleted_upload_id = job["uploadId"] elif (job["name"] == name and job["groupId"] == group_id and job["uploadId"] != self.upload_id and job["uploadId"] != last_deleted_upload_id): return job["uploadId"] return 0 def upload(self, url, branch, name): print_err("Uploading {} branch {}".format(url, branch)) headers = {"folderId": '1', "uploadDescription": "{} - {}".format(url, branch), "public": 'protected', "ignoreScm": 'true', "uploadType": 'vcs'} data = {"vcsType": "git", "vcsUrl": url, "vcsBranch": branch, "vcsName": name} resp = self.session.post(f"{self.server}/uploads", headers = headers, data = data) if resp.status_code != 201: raise Exception('POST /uploads {}'.format(resp.status_code)) self.upload_id = str(resp.json()["message"]) self._wait_for_completion() print_err("Upload done (id={})".format(self.upload_id)) def analyse(self, reuse_upload_id, reuse_group): print_err("Starting analysis") print_err("Reusing analysis from upload {}".format(reuse_upload_id)) headers = {'folderId': '1', 'uploadId': self.upload_id, 'Content-Type': 'application/json'} data = {"analysis":{ "bucket": True, "copyright_email_author": True, "ecc": True, "keyword": True, "mime": True, "monk": True, "nomos": True, "ojo": True, "package": True}, "decider":{ "nomos_monk": True, "bulk_reused": True, "new_scanner": True, "ojo_decider": True}, "reuse":{ "reuse_upload": reuse_upload_id, "reuse_group": reuse_group, "reuse_main": True, "reuse_enhanced": True}} resp = self.session.post(f"{self.server}/jobs", headers = headers, data = json.dumps(data)) if resp.status_code != 201: raise Exception('POST /jobs {}'.format(resp.status_code)) self._wait_for_completion() print_err("Analysis done") def _report_cleanup(self, str): # Remove part of the header, from 'Upstream-Name:' to 'with FOSSology\n' str = re.sub('Upstream-Name:.*with FOSSology\n', '', str, flags=re.S) # Use FOSSology assertion as License str = re.sub('License: NoLicenseConcluded\nComment: scanners found:', 'License:', str, flags=re.S) # Remove FOSSology project's name from files path, and sort files mysplit = re.split('^(Files:|Copyright:|License:)', str, flags=re.MULTILINE) in_files = False for index in range(len(mysplit)): if mysplit[index] == "Copyright:" or mysplit[index] == "License:": # No more in 'Files:' paragraph in_files = False if in_files: list_files = mysplit[index].splitlines() for index_files in range(len(list_files)): list_files[index_files] = list_files[index_files].strip().split('/', 1)[1] list_files.sort() mysplit[index] = "" for index_files in range(len(list_files)): mysplit[index] = mysplit[index] + " " + list_files[index_files] + "\n" if mysplit[index] == "Files:": # Next index will be part of the 'Files:' paragraph in_files = True return "".join(mysplit).strip('\n') def get_report(self, format): print_err("Generating report") headers = {'uploadId': self.upload_id, 'reportFormat': format} resp = self.session.get(f"{self.server}/report", headers = headers) if resp.status_code != 201: raise Exception('GET /report {}'.format(resp.status_code)) report_id = resp.json()["message"].split('/')[-1] self._wait_for_completion() print_err("Downloading report (report id={})".format(report_id)) resp = self.session.get(f"{self.server}/report/{report_id}") if resp.status_code != 200: raise Exception('GET /report/{} {}'.format(report_id, resp.status_code)) return self._report_cleanup(resp.text) if __name__ == "__main__": def usage(): print_err("{} fossology_host username password url branch".format(sys.argv[0])) if len(sys.argv) != 6: print_err("Error: Need the FOSSology host URL, username, password, source URL and branch to check.") usage() exit(1) fossology_host = sys.argv[1] fossology_username = sys.argv[2] fossology_password = sys.argv[3] source_url = sys.argv[4] source_branch = sys.argv[5] source_url = source_url.rstrip('/') name = source_url.split('/')[-1] suffix = '.git' if name.endswith(suffix): name = name[:-len(suffix)] project = ApertisFossology(fossology_host, fossology_username, fossology_password) project.upload(source_url, source_branch, name) group_id = project.get_group_id() reuse_id = project.get_previous_upload_analysis_id(name, group_id) project.analyse(reuse_id, group_id) report = project.get_report('dep5') print(report)