Skip to content
Snippets Groups Projects
Commit d47d105a authored by Frederic Danis's avatar Frederic Danis
Browse files

Add FOSSology to ci-license-scan


To improve license scan process we add license scan using a `FOSSology`
server.

The `fossology.py` adds an `ApertisFossolgy` class which is able to:
- request upload of source code from Git server to FOSSology server,
- retrieve references of previous scan for this source code,
- request scan of the source code,
- download analysis report.
- remove FOSSology header, project's name in file path and sort files

This class is used in `ci-licence-scan` to generate the
`debian/apertis/copyright.fossology` file.

Signed-off-by: default avatarFrédéric Danis <frederic.danis@collabora.com>
parent 6f3d5ed2
No related branches found
No related tags found
2 merge requests!154Add FOSSology to ci-license-scan,!93WIP: documentation-builder: Rebase on Apertis instead of Debian Buster
Pipeline #181026 passed
......@@ -16,6 +16,7 @@ import sys
import yaml
import textwrap
import unicodedata
import fossology
# this is necessary to eliminate references in the generated YAML
# Perl tools use YAML::Tiny which doesn’t support references.
......@@ -332,6 +333,11 @@ def main():
parser.add_argument('--blacklist-license', dest='blacklist', metavar='LICENSE', action='append', default=[], help='license to blacklist')
parser.add_argument('--blacklist-licenses', dest='blacklist', metavar='LICENSES', type=str, help='space-separated licenses to blacklist')
parser.add_argument('--extra-whitelist', dest='whitelists', metavar='WHITELIST', action='append', default=['debian/apertis/copyright.whitelist'], help='extra file whitelist')
parser.add_argument('--fossology-host', dest='fossology_host', help='FOSSology host URL to use')
parser.add_argument('--fossology-username', dest='fossology_username', help='FOSSology username')
parser.add_argument('--fossology-password', dest='fossology_password', help='FOSSology password')
parser.add_argument('--source-url', dest='source_url', help='git source URL to scan')
parser.add_argument('--source-branch', dest='source_branch', help='git source branch to scan')
args = parser.parse_args()
print("%s fail on change" % ("Will" if args.fail_on_change else "Will not"))
disallowlist = args.blacklist
......@@ -348,6 +354,19 @@ def main():
with open('debian/apertis/copyright.new', 'wt') as f:
print('Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/\n', file=f)
scan_copyrights(_out=f)
if args.fossology_host and args.fossology_username and args.fossology_password and args.source_url and args.source_branch:
with open('debian/apertis/copyright.fossology', 'wt') as f:
args.source_url = args.source_url.rstrip('/')
name = args.source_url.split('/')[-1]
suffix = '.git'
if name.endswith(suffix):
name = name[:-len(suffix)]
foss_project = fossology.ApertisFossology(args.fossology_host, args.fossology_username, args.fossology_password)
foss_project.upload(args.source_url, args.source_branch, name)
reuse_id, reuse_group = foss_project.get_previous_upload_analysis_ids(name)
foss_project.analyse(reuse_id, reuse_group)
report = foss_project.get_report('dep5')
print(report, file=f)
# open for parsing as binary since copyrights may (incorrectly) contain binary data
bad_licenses = set()
unknown_licensed = False
......
#!/usr/bin/python3
from datetime import datetime, timedelta
import json
import re
import requests
import sys
import time
def print_err(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class ApertisFossology:
def __init__(self, server, username, password):
self.server = f"{server}/repo/api/v1"
self.upload_id = 0
self.session = requests.Session()
now = datetime.now()
data = {"username": username,
"password": password,
"token_name": "ci-cd_{}".format(now.strftime("%Y%m%d-%H%M%S")),
"token_scope": "write",
"token_expire": "{}".format((now + timedelta(days = 2)).strftime("%Y-%m-%d"))}
resp = self.session.post(f"{self.server}/tokens", data = data)
if resp.status_code != 201:
raise Exception('POST /tokens {}'.format(resp.status_code))
token = re.sub("Bearer ", "", resp.json()["Authorization"])
self.session.headers.update({"Authorization": f"Bearer {token}"})
def _wait_for_completion(self):
while (True):
resp = self.session.get(f"{self.server}/jobs?upload={self.upload_id}")
if resp.status_code != 200:
raise Exception('GET /jobs?upload={} {}'.format(self.upload_id, resp.status_code))
status = resp.json()[-1]["status"]
if status == "Completed":
print_err("")
return
elif status == "Failed":
raise Exception('job {} failed for upload {}'.format(resp.json()[-1]["id"], resp.json()[-1]["uploadId"]))
print_err(".", end = '', flush = True)
time.sleep(1)
def get_previous_upload_analysis_ids(self, name):
last_deleted_upload_id = 0
resp = self.session.get(f"{self.server}/jobs")
if resp.status_code != 200:
raise Exception('GET /jobs {}'.format(resp.status_code))
for job in reversed(resp.json()):
if job["name"] == "Delete":
last_deleted_upload_id = job["uploadId"]
elif (job["name"] == name and
job["uploadId"] != self.upload_id and
job["uploadId"] != last_deleted_upload_id):
return job["uploadId"], job["groupId"]
return 0, 0
def upload(self, url, branch, name):
print_err("Uploading {} branch {}".format(url, branch))
headers = {"folderId": '1',
"uploadDescription": "{} - {}".format(url, branch),
"public": 'protected',
"ignoreScm": 'true',
"uploadType": 'vcs'}
data = {"vcsType": "git",
"vcsUrl": url,
"vcsBranch": branch,
"vcsName": name}
resp = self.session.post(f"{self.server}/uploads", headers = headers, data = data)
if resp.status_code != 201:
raise Exception('POST /uploads {}'.format(resp.status_code))
self.upload_id = str(resp.json()["message"])
self._wait_for_completion()
print_err("Upload done (id={})".format(self.upload_id))
def analyse(self, reuse_upload_id, reuse_group):
print_err("Starting analysis")
print_err("Reusing analysis from upload {}".format(reuse_upload_id))
headers = {'folderId': '1',
'uploadId': self.upload_id,
'Content-Type': 'application/json'}
data = {"analysis":{
"bucket": True,
"copyright_email_author": True,
"ecc": True,
"keyword": True,
"mime": True,
"monk": True,
"nomos": True,
"ojo": True,
"package": True},
"decider":{
"nomos_monk": True,
"bulk_reused": True,
"new_scanner": True,
"ojo_decider": True},
"reuse":{
"reuse_upload": reuse_upload_id,
"reuse_group": reuse_group,
"reuse_main": True,
"reuse_enhanced": True}}
resp = self.session.post(f"{self.server}/jobs", headers = headers, data = json.dumps(data))
if resp.status_code != 201:
raise Exception('POST /jobs {}'.format(resp.status_code))
self._wait_for_completion()
print_err("Analysis done")
def _report_cleanup(self, str):
# Remove part of the header, from 'Upstream-Name:' to 'with FOSSology\n'
str = re.sub('Upstream-Name:.*with FOSSology\n', '', str, flags=re.S)
# Use FOSSology assertion as License
str = re.sub('License: NoLicenseConcluded\nComment: scanners found:', 'License:', str, flags=re.S)
# Remove FOSSology project's name from files path, and sort files
mysplit = re.split('^(Files:|Copyright:|License:)', str, flags=re.MULTILINE)
in_files = False
for index in range(len(mysplit)):
if mysplit[index] == "Copyright:" or mysplit[index] == "License:":
# No more in 'Files:' paragraph
in_files = False
if in_files:
list_files = mysplit[index].splitlines()
for index_files in range(len(list_files)):
list_files[index_files] = list_files[index_files].strip().split('/', 1)[1]
list_files.sort()
mysplit[index] = ""
for index_files in range(len(list_files)):
mysplit[index] = mysplit[index] + " " + list_files[index_files] + "\n"
if mysplit[index] == "Files:":
# Next index will be part of the 'Files:' paragraph
in_files = True
return "".join(mysplit).strip('\n')
def get_report(self, format):
print_err("Generating report")
headers = {'uploadId': self.upload_id,
'reportFormat': format}
resp = self.session.get(f"{self.server}/report", headers = headers)
if resp.status_code != 201:
raise Exception('GET /report {}'.format(resp.status_code))
report_id = resp.json()["message"].split('/')[-1]
self._wait_for_completion()
print_err("Downloading report (report id={})".format(report_id))
resp = self.session.get(f"{self.server}/report/{report_id}")
if resp.status_code != 200:
raise Exception('GET /report/{} {}'.format(report_id, resp.status_code))
return self._report_cleanup(resp.text)
if __name__ == "__main__":
def usage():
print_err("{} fossology_host username password url branch".format(sys.argv[0]))
if len(sys.argv) != 6:
print_err("Error: Need the FOSSology host URL, username, password, source URL and branch to check.")
usage()
exit(1)
fossology_host = sys.argv[1]
fossology_username = sys.argv[2]
fossology_password = sys.argv[3]
source_url = sys.argv[4]
source_branch = sys.argv[5]
source_url = source_url.rstrip('/')
name = source_url.split('/')[-1]
suffix = '.git'
if name.endswith(suffix):
name = name[:-len(suffix)]
project = ApertisFossology(fossology_host, fossology_username, fossology_password)
project.upload(source_url, source_branch, name)
reuse_id, reuse_group = project.get_previous_upload_analysis_ids(name)
project.analyse(reuse_id, reuse_group)
report = project.get_report('dep5')
print(report)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment