From 5f2979173d6f946d4ff91fd57e599f1a403ca3b5 Mon Sep 17 00:00:00 2001 From: Andrej Shadura <andrew.shadura@collabora.co.uk> Date: Fri, 21 Apr 2023 10:25:26 +0200 Subject: [PATCH 1/4] Add docstrings to a few functions lacking them Signed-off-by: Andrej Shadura <andrew.shadura@collabora.co.uk> --- obs_proxy/server.py | 3 +++ obs_proxy/utils.py | 6 ++++++ obs_proxy/worker.py | 1 + 3 files changed, 10 insertions(+) diff --git a/obs_proxy/server.py b/obs_proxy/server.py index 409ad8e..6858b37 100755 --- a/obs_proxy/server.py +++ b/obs_proxy/server.py @@ -80,6 +80,7 @@ jobs = bidict() def require_auth(f): + """Require authentication when accessed over the external server port""" @functools.wraps(f) async def wrapper(*args, **kwargs): ctx = request if has_request_context() else websocket @@ -111,6 +112,7 @@ def require_auth(f): def handout_port(worker_id): + """Assign an available port to a worker or return None if no ports are available.""" if worker_id in port_cache: port = port_cache[worker_id] if ports[port] is None: @@ -120,6 +122,7 @@ def handout_port(worker_id): def find_worker(worker_id): + """If not set, find the worker ID associated with the port that the request came from.""" if worker_id: return worker_id _, port = request.scope['server'] diff --git a/obs_proxy/utils.py b/obs_proxy/utils.py index c91dd0a..4204558 100644 --- a/obs_proxy/utils.py +++ b/obs_proxy/utils.py @@ -154,6 +154,11 @@ async def upload_data( headers=None, **kv, ): + """ + Upload data to a specified URI and return the response + + Upload is performed either as a single request or (if over a certain size) as a chunked upload. + """ if headers is None: headers = {} for header in ('content-type', 'content-length'): @@ -232,6 +237,7 @@ async def upload_file( headers: Mapping[str, str] | None = None, **kv, ) -> httpx.Response: + """Upload a file asynchronously and return a response.""" async def upload_bytes(): chunk = await file.read(UPLOAD_CHUNK_SIZE) diff --git a/obs_proxy/worker.py b/obs_proxy/worker.py index f520ebb..6b19c59 100644 --- a/obs_proxy/worker.py +++ b/obs_proxy/worker.py @@ -35,6 +35,7 @@ class Worker: return hash(self.workerid) def asxml(self, pure=False, meta=True): + """Return an XML representation of a worker object.""" meta = self.meta.copy() if meta else {} meta.update({ '@port': self.port, -- GitLab From 44715a7416d161bd61239d8047ad83d048f95438 Mon Sep 17 00:00:00 2001 From: Andrej Shadura <andrew.shadura@collabora.co.uk> Date: Wed, 26 Apr 2023 14:35:35 +0200 Subject: [PATCH 2/4] Add [mypy] extra dependency set for type checking Signed-off-by: Andrej Shadura <andrew.shadura@collabora.co.uk> --- setup.cfg | 3 +++ 1 file changed, 3 insertions(+) diff --git a/setup.cfg b/setup.cfg index 2c61991..aa7e935 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,6 +47,9 @@ lint = flake8-builtins flake8-isort flake8-pytest-style +mypy = + mypy + types-xmltodict ~= 0.12 [options.data_files] share/doc/obs-proxy/examples = proxy.conf -- GitLab From db08e31ebe09513ad119a99d62001b91b512b84d Mon Sep 17 00:00:00 2001 From: Andrej Shadura <andrew.shadura@collabora.co.uk> Date: Wed, 30 Mar 2022 09:44:29 +0200 Subject: [PATCH 3/4] Set up structured logging framework Pipe all logs through structlog, including stdlib logs. When run on a tty, use colours, otherwise logfmt. Convert some log prints to structured logs. Bump Quart dependency to 0.18. Signed-off-by: Andrej Shadura <andrew.shadura@collabora.co.uk> --- obs_proxy/client.py | 13 ++--- obs_proxy/repserver.py | 5 +- obs_proxy/server.py | 112 +++++++++++++++++++++++++-------------- obs_proxy/utils.py | 88 ++++++++++++++++++++++++++++-- obs_proxy/wsclient.py | 4 +- setup.cfg | 4 +- tests/upload/conftest.py | 8 ++- 7 files changed, 174 insertions(+), 60 deletions(-) diff --git a/obs_proxy/client.py b/obs_proxy/client.py index 5ec9c54..8a34276 100755 --- a/obs_proxy/client.py +++ b/obs_proxy/client.py @@ -12,25 +12,22 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio -import logging import os import signal import sys +import structlog + from .config import config from .repserver import repserver -from .utils import run_multiserver +from .utils import configure_logging, run_multiserver from .wsclient import queue, wsclient -logger = logging.getLogger(__name__) +logger = structlog.get_logger() def run(): - logging.basicConfig( - level=logging.DEBUG if config.debug else logging.INFO, - style='{', - format='[{asctime}] {levelname[0]}: {message}', - ) + configure_logging() if config.server.insecure: logger.warning("Not verifying certificates when connecting to the OBS proxy server") diff --git a/obs_proxy/repserver.py b/obs_proxy/repserver.py index 67eed74..99fe51f 100644 --- a/obs_proxy/repserver.py +++ b/obs_proxy/repserver.py @@ -12,10 +12,10 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. -import logging from typing import Optional import httpx +import structlog from jsonrpc.jsonrpc import JSONRPC20Request from quart import Quart, make_response, request @@ -24,7 +24,7 @@ from .utils import filterdict, upload_data from .worker import Worker from .wsclient import first_connect, queue, workers # noqa: F401 -logger = logging.getLogger(__name__) +logger = structlog.get_logger() repserver = Quart(__name__) repserver.config['MAX_CONTENT_LENGTH'] = None @@ -74,6 +74,7 @@ def build_auth(): @repserver.route('/<upstream>/<path:worker_id>/source/<project>') @repserver.route('/<upstream>/<path:worker_id>/source/<project>/<package>') async def proxy_request(upstream=None, worker_id=None, **kv): + logger.info("proxy_request", upstream=upstream, worker_id=worker_id, kv=kv) path_parts = request.path.split('/') if upstream: uri = proxy_server_uri('/'.join(path_parts[3:]), upstream, worker_id) diff --git a/obs_proxy/server.py b/obs_proxy/server.py index 6858b37..16c0462 100755 --- a/obs_proxy/server.py +++ b/obs_proxy/server.py @@ -15,7 +15,6 @@ import asyncio import functools import hashlib import json -import logging import os import signal import ssl @@ -25,6 +24,7 @@ from typing import Optional import aiofiles import httpx +import structlog import xmltodict from bidict import bidict from jsonrpc import Dispatcher @@ -45,18 +45,20 @@ from .chunked_uploads import chunked_uploads from .config import HTTP_TIMEOUT, config from .rpcqueue import RPCQueue from .utils import ( + configure_logging, filterdict, + format_ws_request, + format_ws_response, job_alias, job_trace, open_cache, - print_ws_message, run_multiserver, upload_data, upload_file, ) from .worker import ProxiedWorker, Worker -logger = logging.getLogger(__name__) +logger = structlog.get_logger() app = Quart(__name__) app.config['MAX_CONTENT_LENGTH'] = None @@ -85,6 +87,9 @@ def require_auth(f): async def wrapper(*args, **kwargs): ctx = request if has_request_context() else websocket _, port = ctx.scope['server'] + structlog.contextvars.bind_contextvars( + http_port=port, + ) if port == config.server.port: auth_header = ctx.headers.get("Authorization") if not auth_header or not ( @@ -123,12 +128,15 @@ def handout_port(worker_id): def find_worker(worker_id): """If not set, find the worker ID associated with the port that the request came from.""" - if worker_id: - return worker_id - _, port = request.scope['server'] - if not worker_id and ports.get(port): - worker_id = ports[int(port)].workerid - logger.debug(f"detected {worker_id = }") + if not worker_id: + _, port = request.scope['server'] + if not worker_id and ports.get(port): + worker_id = ports[int(port)].workerid + known_worker = worker_id in ws_connections + structlog.contextvars.bind_contextvars( + worker_id=worker_id, + known_worker=known_worker, + ) return worker_id @@ -142,32 +150,52 @@ def per_worker(f): def forget_jobid_mapping(worker_id, job_id): - if (worker_id, job_id) in jobs: - logger.info(f" forgetting mapping for {worker_id}/{job_id}") - _ = jobs.pop((worker_id, job_id)) - else: - logger.warning(f" NOT forgetting {worker_id}/{job_id}!") + known = (worker_id, job_id) in jobs + if known: + del jobs[(worker_id, job_id)] + logger.info( + "forgetting mapping", + worker_id=worker_id, + job_id=job_id, + known=known, + ) def map_jobid_to_orig(worker_id, job_id): if (worker_id, job_id) in jobs: - logger.info(f" found job mapping for {worker_id}/{job_id}") _, orig_job_id = jobs.get((worker_id, job_id)) - logger.info(f" mapping to the orig job id {orig_job_id}") + logger.info( + "mapping job", + worker_id=worker_id, + job_id=job_id, + orig_job_id=orig_job_id, + ) return orig_job_id else: - logger.warning(f" NOT mapping {worker_id}/{job_id}!") + logger.warning( + "not mapping unknown job", + worker_id=worker_id, + job_id=job_id, + ) return job_id def map_jobid_to_internal(worker_id, orig_job_id): if (worker_id, orig_job_id) in jobs.inverse: - logger.info(f" found job mapping for {worker_id}/orig_id:{orig_job_id}") _, job_id = jobs.inverse.get((worker_id, orig_job_id)) - logger.info(f" mapping to the internal job id {job_id}") + logger.info( + "mapping job", + worker_id=worker_id, + orig_job_id=orig_job_id, + job_id=job_id, + ) return job_id else: - logger.warning(f" NOT mapping {worker_id}/orig_id:{orig_job_id}!") + logger.warning( + "not mapping unknown job", + worker_id=worker_id, + orig_job_id=orig_job_id, + ) return orig_job_id @@ -176,8 +204,8 @@ async def worker_ping(worker, state=None, jobid=None): if w.workerid not in workers: w.externalport = handout_port(w.workerid) or 0 if not w.externalport: - print(f"{workers = }") - logger.debug(f" alloc {w.externalport}") + logger.debug("no external port?", workers=workers) + logger.debug("allocated external port", port=w.externalport) workers[w.workerid] = w ports[w.externalport] = w if w.externalport: @@ -201,20 +229,19 @@ async def post_worker_state(worker_id: str, state: str): 'port': w.externalport, 'workerid': w.workerid, } - logger.info(f" {w.hostarch}:{w.workerid} {state} on {w.port}") + logger.info("updating worker state", **params) async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: try: resp = await client.post( f"{config.backend.repserver_uri}/worker", params=params, content=xml, ) - logger.info("==> [worker_ping]") - except httpx.RequestError: - logger.info("<!= [worker_pong (error)]") + except httpx.RequestError as e: + logger.error("update failed", error=e) raise except httpx.HTTPStatusError as e: - logger.info(f"<!= [worker_pong {resp.status_code} {resp.reason_phrase}]") + logger.error("update failed", code=resp.status_code, reason=resp.reason_phrase) raise Exception(f"HTTP error: {e}") - logger.info(f"<== [worker_pong {resp.status_code} {resp.reason_phrase}]") + logger.info("updated", code=resp.status_code, reason=resp.reason_phrase) return ( 'ok' if resp.status_code == 200 else f"{resp.status_code} {resp.reason_phrase}" ) @@ -225,20 +252,27 @@ async def job_started(orig_job_id, job_id, worker_id): return f"job mapping {worker_id}/{orig_job_id} => {job_id} already known" jobs[(worker_id, job_id)] = (worker_id, orig_job_id) job_alias(orig_job_id, job_id) - logger.info(f" mapping {worker_id}/{orig_job_id} => {job_id} done") + logger.info( + "mapping job done", + worker_id=worker_id, + orig_job_id=orig_job_id, + job_id=job_id, + ) return 'ok' @app.route('/ping') async def ping(): + logger.info("ping") return "All OK", 200 @app.route('/worker/<path:worker_id>/config') @require_auth -async def dump_config(worker_id = None): +async def dump_config(worker_id=None): return filterdict(config.as_dict(), ['backend']) + @app.route('/logfile') @app.route('/worker/<path:worker_id>/logfile', methods=['GET']) @require_auth @@ -288,7 +322,7 @@ async def worker_events_ws(worker_id: str): new_worker_id = kwargs.pop('worker_id', None) if new_worker_id and new_worker_id != worker_id: ws_connections[new_worker_id] = queue - logger.info(f"(o) {new_worker_id} online sharing connection with {worker_id}") + logger.info("new worker", worker_id=new_worker_id, shared_with=worker_id) return await worker_ping(**kwargs) dispatcher = Dispatcher() @@ -304,21 +338,21 @@ async def worker_events_ws(worker_id: str): @copy_current_websocket_context async def recv(queue): while True: - print("... awaiting") + logger.info("... awaiting") data = await websocket.receive() try: message = json.loads(data) except (TypeError, ValueError): - logger.error(f"### invalid JSON of length {len(data)}") + logger.error(f"### invalid JSON of length {len(data)}", json=data) continue if 'id' in message and ('result' in message or 'error' in message): - print_ws_message('->>', message) + logger.info(">>>", kind="call.result", **format_ws_response(message)) response = JSONRPC20Response(_id=message['id'], **message) await queue.reply(response) elif 'method' in message: - print_ws_message('>>>', message) + logger.info(">>>", kind="call", **format_ws_request(message)) response = await AsyncJSONRPCResponseManager.handle(data, dispatcher) - print_ws_message('<x-', response) + logger.info("<x-", kind="call.result.unsent", **format_ws_response(response.data)) logger.info(f"(o) {worker_id} online") ws_connections[worker_id] = queue @@ -588,11 +622,7 @@ async def proxy_src_request(worker_id: str, **kv): def run(): - logging.basicConfig( - level=logging.DEBUG if config.debug else logging.INFO, - style='{', - format='[{asctime}] {levelname[0]}: {message}', - ) + configure_logging() try: port_cache = open_cache("port_cache") diff --git a/obs_proxy/utils.py b/obs_proxy/utils.py index 4204558..cf09f28 100644 --- a/obs_proxy/utils.py +++ b/obs_proxy/utils.py @@ -16,18 +16,19 @@ from __future__ import annotations import datetime import logging import shelve +import sys from logging import debug, error, info from pathlib import Path from typing import Mapping import httpx +import structlog from aiofiles.tempfile import NamedTemporaryFile from aiofiles.threadpool import AsyncFileIO from hypercorn.asyncio import serve from hypercorn.config import Config as HyperConfig from jsonrpc.jsonrpc2 import JSONRPC20Request, JSONRPC20Response from quart import request, stream_with_context -from quart.logging import create_serving_logger from .chunked_uploads.client import ( ChunkedUploadError, @@ -39,6 +40,85 @@ from .config import config, find_cache MAXIMUM_PAYLOAD = 2 * 1024 * 1024 * 1024 +# TODO: merge with print_ws_request +def format_ws_request(message): + return message.get('params', {}) | { + 'id': message['id'] or '()', + 'method': message['method'], + } + + +# TODO: merge with print_ws_response +def format_ws_response(message): + return message | { + 'id': message['id'] or '()', + } + + +def configure_logging(): + timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%dT%H:%M:%SZ", utc=True) + shared_processors = [ + structlog.stdlib.add_log_level, + timestamper, + ] + + structlog.configure( + processors=shared_processors + [ + structlog.processors.CallsiteParameterAdder( + [structlog.processors.CallsiteParameter.FUNC_NAME], + additional_ignores=[ + ], + ), + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) + + def add_extra(_, __, event_dict): + # currently nop + return event_dict + + formatter = structlog.stdlib.ProcessorFormatter( + # These run ONLY on `logging` entries that do NOT originate within + # structlog. + foreign_pre_chain=shared_processors, + # These run on ALL entries after the pre_chain is done. + processors=[ + structlog.stdlib.add_logger_name, + structlog.stdlib.PositionalArgumentsFormatter(), + add_extra, + structlog.processors.StackInfoRenderer(), + # Remove _record & _from_structlog. + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + structlog.dev.ConsoleRenderer() if sys.stdout.isatty() else structlog.processors.LogfmtRenderer( + key_order=( + 'timestamp', + 'event', + 'kind', + 'level', + ), + drop_missing=True, + ), + ], + ) + + handler = logging.StreamHandler() + # Use OUR `ProcessorFormatter` to format all `logging` entries. + handler.setFormatter(formatter) + root_logger = logging.getLogger() + root_logger.addHandler(handler) + + root_logger.setLevel(logging.DEBUG if config.debug else logging.INFO) + + # Too verbose for us + ws_logger = logging.getLogger("websockets.client") + ws_logger.setLevel(logging.INFO) + + http_logger = logging.getLogger("http.server") + http_logger.addHandler(handler) + + def shorten(text: str, width: int = 512, placeholder: str = "...") -> str: r""" Collapse and truncate the given text to fit in the given width. @@ -115,9 +195,9 @@ def run_multiserver( ): """Run a Quart app on multiple ports using Hypercorn""" config = HyperConfig() - config.access_log_format = "%(h)s %(r)s %(s)s %(b)s %(D)s" - config.accesslog = create_serving_logger() - config.accesslog.propagate = False + config.access_log_format = "%(h)s %(R)s %(s)s %(b)s %(D)s" + config.accesslog = logging.getLogger("http.server") + config.bind = [f'{host}:{port}' for port in https_ports] or [ f'{host}:{port}' for port in ports ] diff --git a/obs_proxy/wsclient.py b/obs_proxy/wsclient.py index beac381..146c392 100644 --- a/obs_proxy/wsclient.py +++ b/obs_proxy/wsclient.py @@ -15,11 +15,11 @@ import asyncio import hashlib import json -import logging import random import ssl import httpx +import structlog import websockets import xmltodict from hypercorn.utils import raise_shutdown @@ -32,7 +32,7 @@ from .worker import Worker random.seed() -logger = logging.getLogger(__name__) +logger = structlog.get_logger() queue = asyncio.Queue() diff --git a/setup.cfg b/setup.cfg index aa7e935..2e4e7d5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,11 +26,13 @@ install_requires = bidict ~= 0.18 httpx ~= 0.22 json-rpc ~= 1.12 - Quart >= 0.11, < 0.18 + Quart ~= 0.18 websockets ~= 10.0 xmltodict ~= 0.13 aiofiles ~= 0.8 jinja2 ~= 3.0.0 + structlog ~= 22.3 + rich tests_require = obs-proxy[test] [options.extras_require] diff --git a/tests/upload/conftest.py b/tests/upload/conftest.py index 2a35401..71fde29 100644 --- a/tests/upload/conftest.py +++ b/tests/upload/conftest.py @@ -31,6 +31,9 @@ def config_dict(): }, 'workers': { }, + 'debug': { + 'enabled': True, + }, 'auth': { 'token': 'token', }, @@ -80,12 +83,13 @@ async def capture_stdout_until(p: subprocess.Popen, text: str) -> str: @pytest.fixture async def spawn_client(mock_config: MockConfig): + print(f"{mock_config = }") p = subprocess.Popen( [ "python3", "-m", "obs_proxy.client", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) - await capture_stdout_until(p, "Running on http") + await capture_stdout_until(p, "Waiting for the worker") yield p, mock_config p.terminate() p.wait(timeout=5) @@ -102,7 +106,7 @@ async def spawn_server(mock_config): "python3", "-m", "obs_proxy.server", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) - await capture_stdout_until(p, "Running on http") + await capture_stdout_until(p, "Will listen on worker ports") yield p, mock_config p.terminate() p.wait(timeout=5) -- GitLab From 665b5399cc304021b014efa1cd38809ea5b551f3 Mon Sep 17 00:00:00 2001 From: Andrej Shadura <andrew.shadura@collabora.co.uk> Date: Wed, 26 Apr 2023 16:10:33 +0200 Subject: [PATCH 4/4] Make config parser and its tests flake8-clean Drop unused imports, drop extra empty lines, replace a wildcard import with a series of specific imports. Fixes: abd77e4db6ac ("Add configuration de/serialisation code") Signed-off-by: Andrej Shadura <andrew.shadura@collabora.co.uk> --- obs_proxy/config.py | 5 +---- tests/test_config.py | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/obs_proxy/config.py b/obs_proxy/config.py index e6f2444..9435a84 100644 --- a/obs_proxy/config.py +++ b/obs_proxy/config.py @@ -17,7 +17,7 @@ from configparser import ConfigParser from dataclasses import dataclass, is_dataclass from logging import error from pathlib import Path -from typing import Iterable, Mapping +from typing import Mapping HTTP_TIMEOUT = 20 * 60 @@ -184,12 +184,10 @@ class Config: cachedir=cachedir, ) - def __post_init__(self): if self.tracedir: self.tracedir.mkdir(parents=True, exist_ok=True) - def as_dict(self): def serialize(o): if is_dataclass(o): @@ -204,7 +202,6 @@ class Config: return serialize(self) - def update_from_dict(self, d): def deserialize(o, c, d): if is_dataclass(o): diff --git a/tests/test_config.py b/tests/test_config.py index c8eaaf4..0972f2d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,8 +1,12 @@ import io -import pytest -from obs_proxy.config import * - +from obs_proxy.config import ( + AuthConfig, + BackendConfig, + ClientConfig, + Config, + ServerConfig, +) basic_config = """ [server] @@ -111,6 +115,7 @@ username = luser password = drosswap """ + def test_mini_config(): f = io.StringIO(mini_config) c = Config.parse_file(f) @@ -124,12 +129,12 @@ def test_mini_config(): insecure=False, buffer_uploads=False, keyfile=None, - certfile=None + certfile=None, ), client=ClientConfig( host='0.0.0.0', port=5000, - buffer_uploads=True + buffer_uploads=True, ), auth=AuthConfig( username='luser', @@ -138,12 +143,12 @@ def test_mini_config(): ), backend=BackendConfig( srcserver_uri=None, - repserver_uri=None + repserver_uri=None, ), worker_ports=None, debug=False, tracedir=None, - cachedir=None + cachedir=None, ) -- GitLab