Skip to content
Snippets Groups Projects
Commit ebab6716 authored by Emanuele Aina's avatar Emanuele Aina
Browse files

Fix thread_pool() signature now that num_retries is no longer used


The new tenacity-based code limits the overall time interval during
which it retries failed calls and no longer limits the actual number of
retries on its own.

Since in some cases the called function may not be retry-safe, just
expose a boolean to make retrying opt-in and avoid some potential foot
shooting in the future.

Signed-off-by: Emanuele Aina's avatarEmanuele Aina <emanuele.aina@collabora.com>
parent 09e15175
No related branches found
No related tags found
1 merge request!181Fix thread_pool() signature now that num_retries is no longer used
Pipeline #628709 passed
......@@ -184,7 +184,7 @@ class DownstreamFetcher:
iterator=True, per_page=per_page, **kwargs
).total_pages
thread_pool(
num_worker_threads, _fetch_page, range(1, total_pages + 1), num_retries=2
num_worker_threads, _fetch_page, range(1, total_pages + 1), retryable=True
)
projects = [
p for p in projects if fnmatch.fnmatch(p.path_with_namespace, filterglob)
......@@ -214,7 +214,7 @@ class DownstreamFetcher:
}
thread_pool(
num_worker_threads, _fetch_branches_and_tags, projects, num_retries=2
num_worker_threads, _fetch_branches_and_tags, projects, retryable=True
)
def fetch_pipelines(self, projects):
......@@ -236,7 +236,7 @@ class DownstreamFetcher:
"status": pipeline.status,
}
thread_pool(num_worker_threads, _fetch_pipelines, projects, num_retries=2)
thread_pool(num_worker_threads, _fetch_pipelines, projects, retryable=True)
def fetch_descendants(self, projects, cache=None):
num_worker_threads = 10
......@@ -275,7 +275,7 @@ class DownstreamFetcher:
]
project_tags = [(p, t) for p in filtered_projects for t in p.tags.values()]
refs = project_branches + project_tags
thread_pool(num_worker_threads, _fetch_descendants, refs, num_retries=2)
thread_pool(num_worker_threads, _fetch_descendants, refs, retryable=True)
for project in filtered_projects:
for branch in project.branches.values():
......@@ -437,7 +437,7 @@ class DownstreamFetcher:
filtered_projects = projects
thread_pool(
num_worker_threads, _fetch_license_report, filtered_projects, num_retries=2
num_worker_threads, _fetch_license_report, filtered_projects, retryable=True
)
def fetch_lintian_job(self, projects):
......@@ -467,7 +467,7 @@ class DownstreamFetcher:
)
project.lintian_job = True
thread_pool(num_worker_threads, _fetch_lintian_job, projects, num_retries=2)
thread_pool(num_worker_threads, _fetch_lintian_job, projects, retryable=True)
if __name__ == "__main__":
......
......@@ -108,7 +108,7 @@ class ObsFetcher:
"obs", {}
)[projectname] = entry
thread_pool(num_worker_threads, _find_packages, projectnames, num_retries=2)
thread_pool(num_worker_threads, _find_packages, projectnames, retryable=True)
logging.info(
f"retrieved {sum(len(p['obs']) for p in packages.values())} packages"
)
......@@ -127,7 +127,7 @@ class ObsFetcher:
)
entry.files = listing
thread_pool(num_worker_threads, _find_files, entries, num_retries=2)
thread_pool(num_worker_threads, _find_files, entries, retryable=True)
logging.info(f"retrieved {sum(len(entry.files) for entry in entries)} files")
......
......@@ -6,11 +6,11 @@ import traceback
import tenacity
def _retrying_on_exception(exception_type=Exception):
def _retrying_on_exception(timeout, exception_type=Exception):
retrying = tenacity.Retrying(
stop=tenacity.stop_after_delay(600),
stop=tenacity.stop_after_delay(timeout),
retry=tenacity.retry_if_exception_type(exception_type),
wait=tenacity.wait_exponential_jitter(max=60),
wait=tenacity.wait_exponential_jitter(max=timeout / 10),
)
return retrying
......@@ -33,10 +33,10 @@ def item_id(item):
return itemid
def thread_pool(num_workers, func, items, num_retries=0):
def thread_pool(num_workers, func, items, *, retryable=False):
def func_with_retries(item, counter, total):
itemid = item_id(item)
for attempt in _retrying_on_exception():
for attempt in _retrying_on_exception(600 if retryable else 0):
i = attempt.retry_state.attempt_number
logging.debug(f"Fetching item {counter} of {total} ({itemid}), round #{i}")
with attempt:
......@@ -45,7 +45,7 @@ def thread_pool(num_workers, func, items, num_retries=0):
logging.error(
f"Failed processing item {counter} of {total} ({itemid}), round #{i}"
)
if logging.DEBUG >= logging.root.level and i != num_retries:
if logging.DEBUG >= logging.root.level:
ex = attempt.retry_state.outcome.exception()
traceback.print_exception(type(ex), ex, ex.__traceback__)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment