-
Frederic Danis authored
License scan for `pkg/dash` and `tests\dash` should not re-use the scan results from a previous scan of the other project. Separate license scan using different group id, which is done using different users.
Frederic Danis authoredLicense scan for `pkg/dash` and `tests\dash` should not re-use the scan results from a previous scan of the other project. Separate license scan using different group id, which is done using different users.
fossology.py 7.73 KiB
#!/usr/bin/python3
from datetime import datetime, timedelta
import json
import re
import requests
import sys
import time
def print_err(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class ApertisFossology:
def __init__(self, server, username, password):
self.server = f"{server}/repo/api/v1"
self.upload_id = 0
self.session = requests.Session()
now = datetime.now()
data = {"username": username,
"password": password,
"token_name": "ci-cd_{}".format(now.strftime("%Y%m%d-%H%M%S")),
"token_scope": "write",
"token_expire": "{}".format((now + timedelta(days = 2)).strftime("%Y-%m-%d"))}
resp = self.session.post(f"{self.server}/tokens", data = data)
if resp.status_code != 201:
raise Exception('POST /tokens {}'.format(resp.status_code))
token = re.sub("Bearer ", "", resp.json()["Authorization"])
self.session.headers.update({"Authorization": f"Bearer {token}"})
def _wait_for_completion(self):
while (True):
resp = self.session.get(f"{self.server}/jobs?upload={self.upload_id}")
if resp.status_code != 200:
raise Exception('GET /jobs?upload={} {}'.format(self.upload_id, resp.status_code))
status = resp.json()[-1]["status"]
if status == "Completed":
print_err("")
return
elif status == "Failed":
raise Exception('job {} failed for upload {}'.format(resp.json()[-1]["id"], resp.json()[-1]["uploadId"]))
print_err(".", end = '', flush = True)
time.sleep(1)
def get_group_id(self):
resp = self.session.get(f"{self.server}/jobs?upload={self.upload_id}")
if resp.status_code != 200:
raise Exception('GET /jobs?upload={} {}'.format(self.upload_id, resp.status_code))
return resp.json()[-1]["groupId"]
def get_previous_upload_analysis_id(self, name, group_id):
last_deleted_upload_id = 0
resp = self.session.get(f"{self.server}/jobs")
if resp.status_code != 200:
raise Exception('GET /jobs {}'.format(resp.status_code))
for job in reversed(resp.json()):
if job["name"] == "Delete":
last_deleted_upload_id = job["uploadId"]
elif (job["name"] == name and
job["groupId"] == group_id and
job["uploadId"] != self.upload_id and
job["uploadId"] != last_deleted_upload_id):
return job["uploadId"]
return 0
def upload(self, url, branch, name):
print_err("Uploading {} branch {}".format(url, branch))
headers = {"folderId": '1',
"uploadDescription": "{} - {}".format(url, branch),
"public": 'protected',
"ignoreScm": 'true',
"uploadType": 'vcs'}
data = {"vcsType": "git",
"vcsUrl": url,
"vcsBranch": branch,
"vcsName": name}
resp = self.session.post(f"{self.server}/uploads", headers = headers, data = data)
if resp.status_code != 201:
raise Exception('POST /uploads {}'.format(resp.status_code))
self.upload_id = str(resp.json()["message"])
self._wait_for_completion()
print_err("Upload done (id={})".format(self.upload_id))
def analyse(self, reuse_upload_id, reuse_group):
print_err("Starting analysis")
print_err("Reusing analysis from upload {}".format(reuse_upload_id))
headers = {'folderId': '1',
'uploadId': self.upload_id,
'Content-Type': 'application/json'}
data = {"analysis":{
"bucket": True,
"copyright_email_author": True,
"ecc": True,
"keyword": True,
"mime": True,
"monk": True,
"nomos": True,
"ojo": True,
"package": True},
"decider":{
"nomos_monk": True,
"bulk_reused": True,
"new_scanner": True,
"ojo_decider": True},
"reuse":{
"reuse_upload": reuse_upload_id,
"reuse_group": reuse_group,
"reuse_main": True,
"reuse_enhanced": True}}
resp = self.session.post(f"{self.server}/jobs", headers = headers, data = json.dumps(data))
if resp.status_code != 201:
raise Exception('POST /jobs {}'.format(resp.status_code))
self._wait_for_completion()
print_err("Analysis done")
def _report_cleanup(self, str):
# Remove part of the header, from 'Upstream-Name:' to 'with FOSSology\n'
str = re.sub('Upstream-Name:.*with FOSSology\n', '', str, flags=re.S)
# Use FOSSology assertion as License
str = re.sub('License: NoLicenseConcluded\nComment: scanners found:', 'License:', str, flags=re.S)
# Remove FOSSology project's name from files path, and sort files
mysplit = re.split('^(Files:|Copyright:|License:)', str, flags=re.MULTILINE)
in_files = False
for index in range(len(mysplit)):
if mysplit[index] == "Copyright:" or mysplit[index] == "License:":
# No more in 'Files:' paragraph
in_files = False
if in_files:
list_files = mysplit[index].splitlines()
for index_files in range(len(list_files)):
list_files[index_files] = list_files[index_files].strip().split('/', 1)[1]
list_files.sort()
mysplit[index] = ""
for index_files in range(len(list_files)):
mysplit[index] = mysplit[index] + " " + list_files[index_files] + "\n"
if mysplit[index] == "Files:":
# Next index will be part of the 'Files:' paragraph
in_files = True
return "".join(mysplit).strip('\n')
def get_report(self, format):
print_err("Generating report")
headers = {'uploadId': self.upload_id,
'reportFormat': format}
resp = self.session.get(f"{self.server}/report", headers = headers)
if resp.status_code != 201:
raise Exception('GET /report {}'.format(resp.status_code))
report_id = resp.json()["message"].split('/')[-1]
self._wait_for_completion()
print_err("Downloading report (report id={})".format(report_id))
resp = self.session.get(f"{self.server}/report/{report_id}")
if resp.status_code != 200:
raise Exception('GET /report/{} {}'.format(report_id, resp.status_code))
return self._report_cleanup(resp.text)
if __name__ == "__main__":
def usage():
print_err("{} fossology_host username password url branch".format(sys.argv[0]))
if len(sys.argv) != 6:
print_err("Error: Need the FOSSology host URL, username, password, source URL and branch to check.")
usage()
exit(1)
fossology_host = sys.argv[1]
fossology_username = sys.argv[2]
fossology_password = sys.argv[3]
source_url = sys.argv[4]
source_branch = sys.argv[5]
source_url = source_url.rstrip('/')
name = source_url.split('/')[-1]
suffix = '.git'
if name.endswith(suffix):
name = name[:-len(suffix)]
project = ApertisFossology(fossology_host, fossology_username, fossology_password)
project.upload(source_url, source_branch, name)
group_id = project.get_group_id()
reuse_id = project.get_previous_upload_analysis_id(name, group_id)
project.analyse(reuse_id, group_id)
report = project.get_report('dep5')
print(report)