diff --git a/obs_proxy/chunked_uploads/client.py b/obs_proxy/chunked_uploads/client.py index 8d83745141bcf64b55231ae9470ebe2892b114cf..413ba669ace4b427c050143d619122efdd830cf1 100644 --- a/obs_proxy/chunked_uploads/client.py +++ b/obs_proxy/chunked_uploads/client.py @@ -11,13 +11,15 @@ import hashlib from dataclasses import dataclass -from logging import debug import httpx +import structlog from aiofiles.threadpool import AsyncFileIO from werkzeug.datastructures import ContentRange from werkzeug.http import unquote_etag +logger = structlog.get_logger() + UPLOAD_CHUNK_SIZE = 512 * 1024 * 1024 MAX_ATTEMPTS = 5 @@ -78,28 +80,31 @@ async def upload_chunked( exc = None for attempt in range(MAX_ATTEMPTS): try: - debug(f"patching {uri} ({len(chunk)} {content_range})") + logger.debug("patching", uri=uri, size=len(chunk), content_range=content_range, attempt=attempt) r = await client.patch( uri, content=chunk, headers=chunk_headers, **kv, ) + logger.debug("patch result", result=r) if r.is_success: break else: r.raise_for_status() except httpx.HTTPStatusError as e: if not e.response.is_server_error: + logger.debug("not server error, not retrying", resp=e.response) raise - debug(f"retrying ({attempt})") + logger.exception("retrying", attempt=attempt) exc = e except httpx.HTTPError as e: - debug(f"retrying ({attempt})") + logger.exception("retrying", attempt=attempt) exc = e else: raise exc + logger.debug("get etag", uri=uri) # now verify the hash r = await client.head( uri, @@ -108,7 +113,7 @@ async def upload_chunked( calculated_hash = file_hasher.hexdigest() etag, _ = unquote_etag(r.headers.get("etag")) - debug("f{etag = } vs {calculated_hash = }") + logger.debug("check upload", etag=etag, calculated_hash=calculated_hash) if etag != calculated_hash: raise ChunkedUploadVerificationError( diff --git a/obs_proxy/repserver.py b/obs_proxy/repserver.py index 99fe51f6c216fdc2b543a16a9750b98a2da0fcd4..f49337e5130f21bca8adf7d897e6412052e66f5a 100644 --- a/obs_proxy/repserver.py +++ b/obs_proxy/repserver.py @@ -56,6 +56,14 @@ def build_auth(): return authenticate +@repserver.before_request +async def log_port(): + _, port = request.scope['server'] + structlog.contextvars.bind_contextvars( + http_port=port, + ) + + @repserver.route('/getbuildcode') @repserver.route('/getworkercode') @repserver.route('/<upstream>/<path:worker_id>/badpackagebinaryversionlist') @@ -74,7 +82,11 @@ def build_auth(): @repserver.route('/<upstream>/<path:worker_id>/source/<project>') @repserver.route('/<upstream>/<path:worker_id>/source/<project>/<package>') async def proxy_request(upstream=None, worker_id=None, **kv): - logger.info("proxy_request", upstream=upstream, worker_id=worker_id, kv=kv) + structlog.contextvars.bind_contextvars( + upstream=upstream, + worker_id=worker_id, + ) + logger.info("proxy_request", kv=kv) path_parts = request.path.split('/') if upstream: uri = proxy_server_uri('/'.join(path_parts[3:]), upstream, worker_id) @@ -112,6 +124,13 @@ async def proxy_request(upstream=None, worker_id=None, **kv): async def putjob(worker_id=None, **kv): uri = proxy_server_uri('putjob', 'repserver', worker_id) job_id = request.args.get('jobid') + structlog.contextvars.bind_contextvars( + upstream='repserver', + worker_id=worker_id, + job_id=job_id, + ) + if 'job' in kv: + structlog.contextvars.bind_contextvars(job_name=kv['job']) chunked_upload_uri = proxy_server_uri(f'jobs/{job_id}', 'repserver', worker_id) logger.info(f"Proxying {uri}") @@ -143,6 +162,10 @@ async def worker_ping(worker_id=None): return "Invalid port number", 400 host_arch = request.args.get('arch') worker_id = request.args.get('workerid', worker_id or (f"{request.remote_addr}:{peer_port}" if peer_port else None)) + structlog.contextvars.bind_contextvars( + upstream='repserver', + worker_id=worker_id, + ) if data: worker = Worker.fromxml( data, diff --git a/obs_proxy/server.py b/obs_proxy/server.py index b6e9c1feabf01225404d984496640f95cd7f4779..5780373270496aabde9c1cf380b3a414c36aff8c 100755 --- a/obs_proxy/server.py +++ b/obs_proxy/server.py @@ -240,6 +240,9 @@ async def post_worker_state(worker_id: str, state: str): 'port': w.externalport, 'workerid': w.workerid, } + if state == 'building': + logger.info("not updating worker state 'building', OBS does not care", **params) + return logger.info("updating worker state", **params) async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: try: @@ -420,19 +423,13 @@ async def worker_state(worker_id: str = None): return workers[worker_id].state -@app.route('/worker') -@ext_resources.route('/worker/<path:worker_id>/info') -@require_auth -@per_worker -async def worker_info(worker_id: str = None): - if worker_id not in ws_connections: - return 'not found', 404 - params = dict(request.args) +async def update_worker_info(params): + worker_id = params['worker_id'] orig_job_id = params.get('jobid', None) params['jobid'] = map_jobid_to_internal(worker_id, orig_job_id) - params['worker_id'] = worker_id job_trace("server", orig_job_id, worker_id, f"=> info {params['jobid']}") + logger.debug("=> info", params=params) resp = await ws_connections[worker_id].call( JSONRPC20Request( method='worker_info', @@ -441,14 +438,33 @@ async def worker_info(worker_id: str = None): ) job_trace("server", orig_job_id, worker_id, f"<= {resp.data}") if resp.error: - return resp.error, 500 + logger.error("error in worker update", error=resp.error) + return w = Worker.fromdict(resp.result['worker']) + logger.debug("updating worker", info=w) workers[worker_id].update(w) + + +@app.route('/worker') +@ext_resources.route('/worker/<path:worker_id>/info') +@require_auth +@per_worker +async def worker_info(worker_id: str = None): + if worker_id not in ws_connections: + return 'not found', 404 + params = dict(request.args) + + if worker_id not in workers: + logger.warning("worker in ws_connections but not in workers?!") + return 'not found', 404 + + params['worker_id'] = worker_id + + logger.debug("updating worker in background", params=params) + app.add_background_task(update_worker_info, params) + w = workers[worker_id] - if resp.result['code'] == 200: - return f"{w.external().asxml(pure=True, meta=False)}", {'content-type': 'text/xml'} - else: - return resp.result.get('content', ''), resp.result['code'] + return w.external().asxml(pure=True, meta=False), {'content-type': 'text/xml'} @app.route('/worker/<path:worker_id>/status') diff --git a/obs_proxy/utils.py b/obs_proxy/utils.py index 9406c063e44bfd6e116ebf6d4aaeb10479e79cb9..715a769f63e986ae5b521f25b39b98142feea4a6 100644 --- a/obs_proxy/utils.py +++ b/obs_proxy/utils.py @@ -113,13 +113,14 @@ def configure_logging(): root_logger.setLevel(logging.DEBUG if config.debug else logging.INFO) # Too verbose for us - ws_logger = logging.getLogger("websockets.client") - ws_logger.setLevel(logging.INFO) - httpcore_logger = logging.getLogger("httpcore") - httpcore_logger.setLevel(logging.INFO) - - http_logger = logging.getLogger("http.server") - http_logger.addHandler(handler) + quiet_loggers = [ + "websockets.client", + "httpcore", + "hpack.hpack", + "hpack.table", + ] + for logger_name in quiet_loggers: + logging.getLogger(logger_name).setLevel(logging.INFO) def shorten(text: str, width: int = 512, placeholder: str = "...") -> str: diff --git a/obs_proxy/wsclient.py b/obs_proxy/wsclient.py index 146c392c54459ad1a048856de9cf487723d2a4ca..e0944cd327ec22f8dd1d21b68f51f89c336de063 100644 --- a/obs_proxy/wsclient.py +++ b/obs_proxy/wsclient.py @@ -78,17 +78,22 @@ async def worker_log(jobid=None, worker_id=None, **args): worker = workers[worker_id] uri = worker_uri(worker, 'logfile') try: - logger.info(f"==> [log {args.get('start', '-')}:{args.get('end', '-')} {args.get('view', 'log')}]") + logger.info( + f"==> [log {args.get('start', '-')}:{args.get('end', '-')} {args.get('view', 'log')}]", + job_id=jobid, + worker_id=worker_id, + known=jobs.get(worker_id), + ) args['nostream'] = 1 if jobid: args['jobid'] = jobid resp = await client.get(uri, params=args) except httpx.ReadTimeout: - logger.info(" (timed out)") + logger.info(" (timed out)", job_id=jobid, worker_id=worker_id) return {'content': '', 'code': 200} except httpx.HTTPError: raise - logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]") + logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]", job_id=jobid, worker_id=worker_id) return {'content': resp.text, 'code': resp.status_code} else: raise Exception( @@ -106,16 +111,26 @@ async def worker_action(action=None, jobid=None, worker_id=None): global workers if worker_id and worker_id in workers: worker = workers[worker_id] + if worker_id in jobs: + orig_job_id, new_job_id = jobs[worker_id] + if orig_job_id == jobid: + logger.warn("found unmapped jobid!", orig_job_id=orig_job_id, new_job_id=new_job_id) uri = worker_uri(worker, action) try: - logger.info(f"==> [{action}]") + logger.info( + f"==> [{action}]", + job_id=jobid, + worker_id=worker_id, + known=jobs.get(worker_id), + ) job_trace("client", jobid, worker_id, f"=> {action}") resp = await client.get(uri, params={'jobid': jobid} if jobid else None) except httpx.HTTPError as e: + logger.exception(f"=!> [{action}]", job_id=jobid, worker_id=worker_id) job_trace("client", jobid, worker_id, f"<= {e}") raise job_trace("client", jobid, worker_id, f"<= {resp.status_code} {resp.reason_phrase}") - logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]") + logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]", job_id=jobid, worker_id=worker_id) return {'content': resp.text, 'code': resp.status_code} else: job_trace("client", jobid, worker_id, "<= [no such worker]") @@ -134,13 +149,24 @@ async def worker_info(jobid=None, worker_id=None): from .repserver import workers if worker_id and worker_id in workers: worker = workers[worker_id] + if worker_id in jobs: + orig_job_id, new_job_id = jobs[worker_id] + if orig_job_id == jobid: + logger.warn("found unmapped jobid!", orig_job_id=orig_job_id, new_job_id=new_job_id) + action = 'worker' uri = worker_uri(worker, action) try: - logger.info(f"==> [{action}{' job %s' % jobid if jobid else ''}]") + logger.info( + f"==> [{action}]", + job_id=jobid, + worker_id=worker_id, + known=jobs.get(worker_id), + ) job_trace("client", jobid, worker_id, f"=> {action}") resp = await client.get(uri, params={'jobid': jobid} if jobid else None) except httpx.HTTPError as e: + logger.exception(f"=!> [{action}]", job_id=jobid, worker_id=worker_id) job_trace("client", jobid, worker_id, f"<= {e}") raise job_trace("client", jobid, worker_id, f"<= {resp.status_code} {resp.reason_phrase}") @@ -148,14 +174,14 @@ async def worker_info(jobid=None, worker_id=None): # update cached data w = Worker.fromxml(resp.text) worker.update(w) - logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]") + logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]", job_id=jobid, worker_id=worker_id) return { 'worker': worker.asdict(), 'code': resp.status_code, 'reason': resp.reason_phrase, } else: - logger.info(f"<== [{resp.status_code} {resp.reason_phrase} {resp.text}]") + logger.info(f"<== [{resp.status_code} {resp.reason_phrase} {resp.text}]", job_id=jobid, worker_id=worker_id) return { 'worker': worker.asdict(), 'code': resp.status_code, @@ -201,7 +227,7 @@ async def submit_job(job=None, jobid=None, worker_id=None, extra=None): resp = await client.put(uri, params=extra, content=job_xml) if jobid: new_jobid = hashlib.md5(job_xml).hexdigest() - logger.info(f"==> [build {worker_id}/{jobid} => {new_jobid}]") + logger.info(f"==> [build {worker_id}/{jobid} => {new_jobid}]", orig_job_id=jobid, new_job_id=new_jobid, worker_id=worker_id) notification = JSONRPC20Request( method='job_started', @@ -209,10 +235,11 @@ async def submit_job(job=None, jobid=None, worker_id=None, extra=None): ) job_alias(jobid, new_jobid) except httpx.HTTPError as e: + logger.exception("submit_job", orig_job_id=jobid, new_job_id=new_jobid, worker_id=worker_id) job_trace("client", jobid, worker_id, f"<= {e}") raise Exception(f"HTTP error: {e}", uri) job_trace("client", jobid, worker_id, f"<= {resp.status_code} {resp.reason_phrase}") - logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]") + logger.info(f"<== [{resp.status_code} {resp.reason_phrase}]", orig_job_id=jobid, new_job_id=new_jobid, worker_id=worker_id) if resp.status_code == 200 and notification: queue.put_nowait(notification) jobs[worker_id] = (jobid, new_jobid)