diff --git a/debian/patches/apertis/fuzzer-add-varlink-fuzzer.patch b/debian/patches/apertis/fuzzer-add-varlink-fuzzer.patch new file mode 100644 index 0000000000000000000000000000000000000000..f5e3ab8b0c4461e721acc08388da45d2e8fdc804 Binary files /dev/null and b/debian/patches/apertis/fuzzer-add-varlink-fuzzer.patch differ diff --git a/debian/patches/apertis/journalctl-add-new-relinquish-and-smart-relinquish-option.patch b/debian/patches/apertis/journalctl-add-new-relinquish-and-smart-relinquish-option.patch new file mode 100644 index 0000000000000000000000000000000000000000..14e45074e2e061b4a5fac439e9d081c804ceb9b1 --- /dev/null +++ b/debian/patches/apertis/journalctl-add-new-relinquish-and-smart-relinquish-option.patch @@ -0,0 +1,236 @@ +From: Lennart Poettering <lennart@poettering.net> +Date: Fri, 5 Apr 2019 18:21:02 +0200 +Subject: journalctl: add new --relinquish and --smart-relinquish options + +The latter is identical to the former, but becomes a NOP if +/var/log/journal is on the same mount as /, and thus during shutdown +unmounting /var is not necessary and hence we can keep logging until the +very end. + +Signed-off-by: Martyn Welch <martyn.welch@collabora.com> +[Martyn Welch: Backported to systemd v241] +--- + src/journal/journalctl.c | 161 ++++++++++++++++++++++++++++++----------------- + 1 file changed, 103 insertions(+), 58 deletions(-) + +diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c +index 910aced..5bde51d 100644 +--- a/src/journal/journalctl.c ++++ b/src/journal/journalctl.c +@@ -51,6 +51,7 @@ + #include "log.h" + #include "logs-show.h" + #include "mkdir.h" ++#include "mountpoint-util.h" + #include "pager.h" + #include "parse-util.h" + #include "path-util.h" +@@ -165,6 +166,7 @@ static enum { + ACTION_UPDATE_CATALOG, + ACTION_LIST_BOOTS, + ACTION_FLUSH, ++ ACTION_RELINQUISH_VAR, + ACTION_SYNC, + ACTION_ROTATE, + ACTION_VACUUM, +@@ -361,6 +363,8 @@ static int help(void) { + " --vacuum-time=TIME Remove journal files older than specified time\n" + " --verify Verify journal file consistency\n" + " --sync Synchronize unwritten journal messages to disk\n" ++ " --relinquish-var Stop logging to disk, log to temporary file system\n" ++ " --smart-relinquish-var Similar, but NOP if log directory is on root mount\n" + " --flush Flush all journal data from /run into /var\n" + " --rotate Request immediate rotation of the journal files\n" + " --header Show journal header information\n" +@@ -407,6 +411,8 @@ static int parse_argv(int argc, char *argv[]) { + ARG_UTC, + ARG_SYNC, + ARG_FLUSH, ++ ARG_RELINQUISH_VAR, ++ ARG_SMART_RELINQUISH_VAR, + ARG_ROTATE, + ARG_VACUUM_SIZE, + ARG_VACUUM_FILES, +@@ -416,64 +422,66 @@ static int parse_argv(int argc, char *argv[]) { + }; + + static const struct option options[] = { +- { "help", no_argument, NULL, 'h' }, +- { "version" , no_argument, NULL, ARG_VERSION }, +- { "no-pager", no_argument, NULL, ARG_NO_PAGER }, +- { "pager-end", no_argument, NULL, 'e' }, +- { "follow", no_argument, NULL, 'f' }, +- { "force", no_argument, NULL, ARG_FORCE }, +- { "output", required_argument, NULL, 'o' }, +- { "all", no_argument, NULL, 'a' }, +- { "full", no_argument, NULL, 'l' }, +- { "no-full", no_argument, NULL, ARG_NO_FULL }, +- { "lines", optional_argument, NULL, 'n' }, +- { "no-tail", no_argument, NULL, ARG_NO_TAIL }, +- { "new-id128", no_argument, NULL, ARG_NEW_ID128 }, /* deprecated */ +- { "quiet", no_argument, NULL, 'q' }, +- { "merge", no_argument, NULL, 'm' }, +- { "this-boot", no_argument, NULL, ARG_THIS_BOOT }, /* deprecated */ +- { "boot", optional_argument, NULL, 'b' }, +- { "list-boots", no_argument, NULL, ARG_LIST_BOOTS }, +- { "dmesg", no_argument, NULL, 'k' }, +- { "system", no_argument, NULL, ARG_SYSTEM }, +- { "user", no_argument, NULL, ARG_USER }, +- { "directory", required_argument, NULL, 'D' }, +- { "file", required_argument, NULL, ARG_FILE }, +- { "root", required_argument, NULL, ARG_ROOT }, +- { "header", no_argument, NULL, ARG_HEADER }, +- { "identifier", required_argument, NULL, 't' }, +- { "priority", required_argument, NULL, 'p' }, +- { "grep", required_argument, NULL, 'g' }, +- { "case-sensitive", optional_argument, NULL, ARG_CASE_SENSITIVE }, +- { "setup-keys", no_argument, NULL, ARG_SETUP_KEYS }, +- { "interval", required_argument, NULL, ARG_INTERVAL }, +- { "verify", no_argument, NULL, ARG_VERIFY }, +- { "verify-key", required_argument, NULL, ARG_VERIFY_KEY }, +- { "disk-usage", no_argument, NULL, ARG_DISK_USAGE }, +- { "cursor", required_argument, NULL, 'c' }, +- { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR }, +- { "show-cursor", no_argument, NULL, ARG_SHOW_CURSOR }, +- { "since", required_argument, NULL, 'S' }, +- { "until", required_argument, NULL, 'U' }, +- { "unit", required_argument, NULL, 'u' }, +- { "user-unit", required_argument, NULL, ARG_USER_UNIT }, +- { "field", required_argument, NULL, 'F' }, +- { "fields", no_argument, NULL, 'N' }, +- { "catalog", no_argument, NULL, 'x' }, +- { "list-catalog", no_argument, NULL, ARG_LIST_CATALOG }, +- { "dump-catalog", no_argument, NULL, ARG_DUMP_CATALOG }, +- { "update-catalog", no_argument, NULL, ARG_UPDATE_CATALOG }, +- { "reverse", no_argument, NULL, 'r' }, +- { "machine", required_argument, NULL, 'M' }, +- { "utc", no_argument, NULL, ARG_UTC }, +- { "flush", no_argument, NULL, ARG_FLUSH }, +- { "sync", no_argument, NULL, ARG_SYNC }, +- { "rotate", no_argument, NULL, ARG_ROTATE }, +- { "vacuum-size", required_argument, NULL, ARG_VACUUM_SIZE }, +- { "vacuum-files", required_argument, NULL, ARG_VACUUM_FILES }, +- { "vacuum-time", required_argument, NULL, ARG_VACUUM_TIME }, +- { "no-hostname", no_argument, NULL, ARG_NO_HOSTNAME }, +- { "output-fields", required_argument, NULL, ARG_OUTPUT_FIELDS }, ++ { "help", no_argument, NULL, 'h' }, ++ { "version" , no_argument, NULL, ARG_VERSION }, ++ { "no-pager", no_argument, NULL, ARG_NO_PAGER }, ++ { "pager-end", no_argument, NULL, 'e' }, ++ { "follow", no_argument, NULL, 'f' }, ++ { "force", no_argument, NULL, ARG_FORCE }, ++ { "output", required_argument, NULL, 'o' }, ++ { "all", no_argument, NULL, 'a' }, ++ { "full", no_argument, NULL, 'l' }, ++ { "no-full", no_argument, NULL, ARG_NO_FULL }, ++ { "lines", optional_argument, NULL, 'n' }, ++ { "no-tail", no_argument, NULL, ARG_NO_TAIL }, ++ { "new-id128", no_argument, NULL, ARG_NEW_ID128 }, /* deprecated */ ++ { "quiet", no_argument, NULL, 'q' }, ++ { "merge", no_argument, NULL, 'm' }, ++ { "this-boot", no_argument, NULL, ARG_THIS_BOOT }, /* deprecated */ ++ { "boot", optional_argument, NULL, 'b' }, ++ { "list-boots", no_argument, NULL, ARG_LIST_BOOTS }, ++ { "dmesg", no_argument, NULL, 'k' }, ++ { "system", no_argument, NULL, ARG_SYSTEM }, ++ { "user", no_argument, NULL, ARG_USER }, ++ { "directory", required_argument, NULL, 'D' }, ++ { "file", required_argument, NULL, ARG_FILE }, ++ { "root", required_argument, NULL, ARG_ROOT }, ++ { "header", no_argument, NULL, ARG_HEADER }, ++ { "identifier", required_argument, NULL, 't' }, ++ { "priority", required_argument, NULL, 'p' }, ++ { "grep", required_argument, NULL, 'g' }, ++ { "case-sensitive", optional_argument, NULL, ARG_CASE_SENSITIVE }, ++ { "setup-keys", no_argument, NULL, ARG_SETUP_KEYS }, ++ { "interval", required_argument, NULL, ARG_INTERVAL }, ++ { "verify", no_argument, NULL, ARG_VERIFY }, ++ { "verify-key", required_argument, NULL, ARG_VERIFY_KEY }, ++ { "disk-usage", no_argument, NULL, ARG_DISK_USAGE }, ++ { "cursor", required_argument, NULL, 'c' }, ++ { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR }, ++ { "show-cursor", no_argument, NULL, ARG_SHOW_CURSOR }, ++ { "since", required_argument, NULL, 'S' }, ++ { "until", required_argument, NULL, 'U' }, ++ { "unit", required_argument, NULL, 'u' }, ++ { "user-unit", required_argument, NULL, ARG_USER_UNIT }, ++ { "field", required_argument, NULL, 'F' }, ++ { "fields", no_argument, NULL, 'N' }, ++ { "catalog", no_argument, NULL, 'x' }, ++ { "list-catalog", no_argument, NULL, ARG_LIST_CATALOG }, ++ { "dump-catalog", no_argument, NULL, ARG_DUMP_CATALOG }, ++ { "update-catalog", no_argument, NULL, ARG_UPDATE_CATALOG }, ++ { "reverse", no_argument, NULL, 'r' }, ++ { "machine", required_argument, NULL, 'M' }, ++ { "utc", no_argument, NULL, ARG_UTC }, ++ { "flush", no_argument, NULL, ARG_FLUSH }, ++ { "relinquish-var", no_argument, NULL, ARG_RELINQUISH_VAR }, ++ { "smart-relinquish-var", no_argument, NULL, ARG_SMART_RELINQUISH_VAR }, ++ { "sync", no_argument, NULL, ARG_SYNC }, ++ { "rotate", no_argument, NULL, ARG_ROTATE }, ++ { "vacuum-size", required_argument, NULL, ARG_VACUUM_SIZE }, ++ { "vacuum-files", required_argument, NULL, ARG_VACUUM_FILES }, ++ { "vacuum-time", required_argument, NULL, ARG_VACUUM_TIME }, ++ { "no-hostname", no_argument, NULL, ARG_NO_HOSTNAME }, ++ { "output-fields", required_argument, NULL, ARG_OUTPUT_FIELDS }, + {} + }; + +@@ -897,6 +905,35 @@ static int parse_argv(int argc, char *argv[]) { + arg_action = ACTION_FLUSH; + break; + ++ case ARG_SMART_RELINQUISH_VAR: { ++ int root_mnt_id, log_mnt_id; ++ ++ /* Try to be smart about relinquishing access to /var/log/journal/ during shutdown: ++ * if it's on the same mount as the root file system there's no point in ++ * relinquishing access and we can leave journald write to it until the very last ++ * moment. */ ++ ++ r = path_get_mnt_id("/", &root_mnt_id); ++ if (r < 0) ++ log_debug_errno(r, "Failed to get root mount ID, ignoring: %m"); ++ else { ++ r = path_get_mnt_id("/var/log/journal/", &log_mnt_id); ++ if (r < 0) ++ log_debug_errno(r, "Failed to get journal directory mount ID, ignoring: %m"); ++ else if (root_mnt_id == log_mnt_id) { ++ log_debug("/var/log/journal/ is on root file system, not relinquishing access to /var."); ++ return 0; ++ } else ++ log_debug("/var/log/journal/ is not on the root file system, relinquishing access to it."); ++ } ++ ++ _fallthrough_; ++ } ++ ++ case ARG_RELINQUISH_VAR: ++ arg_action = ACTION_RELINQUISH_VAR; ++ break; ++ + case ARG_ROTATE: + arg_action = arg_action == ACTION_VACUUM ? ACTION_ROTATE_AND_VACUUM : ACTION_ROTATE; + break; +@@ -1907,6 +1944,10 @@ static int flush_to_var(void) { + return simple_varlink_call("--flush", "io.systemd.Journal.FlushToVar"); + } + ++static int relinquish_var(void) { ++ return simple_varlink_call("--relinquish-var", "io.systemd.Journal.RelinquishVar"); ++} ++ + static int rotate(void) { + return simple_varlink_call("--rotate", "io.systemd.Journal.Rotate"); + } +@@ -2020,6 +2061,10 @@ int main(int argc, char *argv[]) { + r = flush_to_var(); + goto finish; + ++ case ACTION_RELINQUISH_VAR: ++ r = relinquish_var(); ++ goto finish; ++ + case ACTION_SYNC: + r = sync_journal(); + goto finish; diff --git a/debian/patches/apertis/journalctl-port-flush-sync-rotate-to-use-varlink-method-c.patch b/debian/patches/apertis/journalctl-port-flush-sync-rotate-to-use-varlink-method-c.patch new file mode 100644 index 0000000000000000000000000000000000000000..fd22f1a65fe2057b6f04ba9987072262b2fe0278 --- /dev/null +++ b/debian/patches/apertis/journalctl-port-flush-sync-rotate-to-use-varlink-method-c.patch @@ -0,0 +1,273 @@ +From: Lennart Poettering <lennart@poettering.net> +Date: Thu, 4 Apr 2019 19:41:33 +0200 +Subject: journalctl: port --flush/--sync/--rotate to use varlink method calls + +Signed-off-by: Martyn Welch <martyn.welch@collabora.com> +[Martyn Welch: Backported to systemd v241] +--- + src/journal/journalctl.c | 149 ++++-------------------------------------- + src/journal/journald-server.c | 58 +++++++++++++++- + 2 files changed, 70 insertions(+), 137 deletions(-) + +diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c +index 14a02ed..910aced 100644 +--- a/src/journal/journalctl.c ++++ b/src/journal/journalctl.c +@@ -65,6 +65,7 @@ + #include "tmpfile-util.h" + #include "unit-name.h" + #include "user-util.h" ++#include "varlink.h" + + #define DEFAULT_FSS_INTERVAL_USEC (15*USEC_PER_MINUTE) + +@@ -1883,157 +1884,35 @@ static int verify(sd_journal *j) { + return r; + } + +-static int flush_to_var(void) { +- _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; +- _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; +- _cleanup_close_ int watch_fd = -1; ++static int simple_varlink_call(const char *option, const char *method) { ++ _cleanup_(varlink_flush_close_unrefp) Varlink *link = NULL; ++ const char *error; + int r; + +- if (arg_machine) { +- log_error("--flush is not supported in conjunction with --machine=."); +- return -EOPNOTSUPP; +- } +- +- /* Quick exit */ +- if (access("/run/systemd/journal/flushed", F_OK) >= 0) +- return 0; ++ if (arg_machine) ++ return log_error_errno(SYNTHETIC_ERRNO(EOPNOTSUPP), "%s is not supported in conjunction with --machine=.", option); + +- /* OK, let's actually do the full logic, send SIGUSR1 to the +- * daemon and set up inotify to wait for the flushed file to appear */ +- r = bus_connect_system_systemd(&bus); +- if (r < 0) +- return log_error_errno(r, "Failed to get D-Bus connection: %m"); +- +- r = sd_bus_call_method( +- bus, +- "org.freedesktop.systemd1", +- "/org/freedesktop/systemd1", +- "org.freedesktop.systemd1.Manager", +- "KillUnit", +- &error, +- NULL, +- "ssi", "systemd-journald.service", "main", SIGUSR1); ++ r = varlink_connect_address(&link, "/run/systemd/journal/io.systemd.journal"); + if (r < 0) +- return log_error_errno(r, "Failed to kill journal service: %s", bus_error_message(&error, r)); +- +- mkdir_p("/run/systemd/journal", 0755); ++ return log_error_errno(r, "Failed to connect to journal: %m"); + +- watch_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC); +- if (watch_fd < 0) +- return log_error_errno(errno, "Failed to create inotify watch: %m"); +- +- r = inotify_add_watch(watch_fd, "/run/systemd/journal", IN_CREATE|IN_DONT_FOLLOW|IN_ONLYDIR); ++ r = varlink_call(link, method, NULL, NULL, &error, NULL); + if (r < 0) +- return log_error_errno(errno, "Failed to watch journal directory: %m"); +- +- for (;;) { +- if (access("/run/systemd/journal/flushed", F_OK) >= 0) +- break; +- +- if (errno != ENOENT) +- return log_error_errno(errno, "Failed to check for existence of /run/systemd/journal/flushed: %m"); +- +- r = fd_wait_for_event(watch_fd, POLLIN, USEC_INFINITY); +- if (r < 0) +- return log_error_errno(r, "Failed to wait for event: %m"); +- +- r = flush_fd(watch_fd); +- if (r < 0) +- return log_error_errno(r, "Failed to flush inotify events: %m"); +- } ++ return log_error_errno(r, "Failed to execute operation: %s", error); + + return 0; + } + +-static int send_signal_and_wait(int sig, const char *watch_path) { +- _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; +- _cleanup_close_ int watch_fd = -1; +- usec_t start; +- int r; +- +- if (arg_machine) { +- log_error("--sync and --rotate are not supported in conjunction with --machine=."); +- return -EOPNOTSUPP; +- } +- +- start = now(CLOCK_MONOTONIC); +- +- /* This call sends the specified signal to journald, and waits +- * for acknowledgment by watching the mtime of the specified +- * flag file. This is used to trigger syncing or rotation and +- * then wait for the operation to complete. */ +- +- for (;;) { +- usec_t tstamp; +- +- /* See if a sync happened by now. */ +- r = read_timestamp_file(watch_path, &tstamp); +- if (r < 0 && r != -ENOENT) +- return log_error_errno(r, "Failed to read %s: %m", watch_path); +- if (r >= 0 && tstamp >= start) +- return 0; +- +- /* Let's ask for a sync, but only once. */ +- if (!bus) { +- _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; +- +- r = bus_connect_system_systemd(&bus); +- if (r < 0) +- return log_error_errno(r, "Failed to get D-Bus connection: %m"); +- +- r = sd_bus_call_method( +- bus, +- "org.freedesktop.systemd1", +- "/org/freedesktop/systemd1", +- "org.freedesktop.systemd1.Manager", +- "KillUnit", +- &error, +- NULL, +- "ssi", "systemd-journald.service", "main", sig); +- if (r < 0) +- return log_error_errno(r, "Failed to kill journal service: %s", bus_error_message(&error, r)); +- +- continue; +- } +- +- /* Let's install the inotify watch, if we didn't do that yet. */ +- if (watch_fd < 0) { +- +- mkdir_p("/run/systemd/journal", 0755); +- +- watch_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC); +- if (watch_fd < 0) +- return log_error_errno(errno, "Failed to create inotify watch: %m"); +- +- r = inotify_add_watch(watch_fd, "/run/systemd/journal", IN_MOVED_TO|IN_DONT_FOLLOW|IN_ONLYDIR); +- if (r < 0) +- return log_error_errno(errno, "Failed to watch journal directory: %m"); +- +- /* Recheck the flag file immediately, so that we don't miss any event since the last check. */ +- continue; +- } +- +- /* OK, all preparatory steps done, let's wait until +- * inotify reports an event. */ +- +- r = fd_wait_for_event(watch_fd, POLLIN, USEC_INFINITY); +- if (r < 0) +- return log_error_errno(r, "Failed to wait for event: %m"); +- +- r = flush_fd(watch_fd); +- if (r < 0) +- return log_error_errno(r, "Failed to flush inotify events: %m"); +- } +- +- return 0; ++static int flush_to_var(void) { ++ return simple_varlink_call("--flush", "io.systemd.Journal.FlushToVar"); + } + + static int rotate(void) { +- return send_signal_and_wait(SIGUSR2, "/run/systemd/journal/rotated"); ++ return simple_varlink_call("--rotate", "io.systemd.Journal.Rotate"); + } + + static int sync_journal(void) { +- return send_signal_and_wait(SIGRTMIN+1, "/run/systemd/journal/synced"); ++ return simple_varlink_call("--sync", "io.systemd.Journal.Synchronize"); + } + + static int wait_for_change(sd_journal *j, int poll_fd) { +diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c +index dc4b259..2aad8e9 100644 +--- a/src/journal/journald-server.c ++++ b/src/journal/journald-server.c +@@ -1835,8 +1835,37 @@ static int server_connect_notify(Server *s) { + return 0; + } + ++static int synchronize_second_half(sd_event_source *event_source, void *userdata) { ++ Varlink *link = userdata; ++ Server *s; ++ int r; ++ ++ assert(link); ++ assert_se(s = varlink_get_userdata(link)); ++ ++ /* This is the "second half" of the Synchronize() varlink method. This function is called as deferred ++ * event source at a low priority to ensure the synchronization completes after all queued log ++ * messages are processed. */ ++ server_full_sync(s); ++ ++ /* Let's get rid of the event source now, by marking it as non-floating again. It then has no ref ++ * anymore and is immediately destroyed after we return from this function, i.e. from this event ++ * source handler at the end. */ ++ r = sd_event_source_set_floating(event_source, false); ++ if (r < 0) ++ return log_error_errno(r, "Failed to mark event source as non-floating: %m"); ++ ++ return varlink_reply(link, NULL); ++} ++ ++static void synchronize_destroy(void *userdata) { ++ varlink_unref(userdata); ++} ++ + static int vl_method_synchronize(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { ++ _cleanup_(sd_event_source_unrefp) sd_event_source *event_source = NULL; + Server *s = userdata; ++ int r; + + assert(link); + assert(s); +@@ -1845,9 +1874,34 @@ static int vl_method_synchronize(Varlink *link, JsonVariant *parameters, Varlink + return varlink_error_invalid_parameter(link, parameters); + + log_info("Received client request to rotate journal."); +- server_full_sync(s); + +- return varlink_reply(link, NULL); ++ /* We don't do the main work now, but instead enqueue a deferred event loop job which will do ++ * it. That job is scheduled at low priority, so that we return from this method call only after all ++ * queued but not processed log messages are written to disk, so that this method call returning can ++ * be used as nice synchronization point. */ ++ r = sd_event_add_defer(s->event, &event_source, synchronize_second_half, link); ++ if (r < 0) ++ return log_error_errno(r, "Failed to allocate defer event source: %m"); ++ ++ r = sd_event_source_set_destroy_callback(event_source, synchronize_destroy); ++ if (r < 0) ++ return log_error_errno(r, "Failed to set event source destroy callback: %m"); ++ ++ varlink_ref(link); /* The varlink object is now left to the destroy callack to unref */ ++ ++ r = sd_event_source_set_priority(event_source, SD_EVENT_PRIORITY_NORMAL+15); ++ if (r < 0) ++ return log_error_errno(r, "Failed to set defer event source priority: %m"); ++ ++ /* Give up ownership of this event source. It will now be destroyed along with event loop itself, ++ * unless it destroys itself earlier. */ ++ r = sd_event_source_set_floating(event_source, true); ++ if (r < 0) ++ return log_error_errno(r, "Failed to mark event source as floating: %m"); ++ ++ (void) sd_event_source_set_description(event_source, "deferred-sync"); ++ ++ return 0; + } + + static int vl_method_rotate(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { diff --git a/debian/patches/apertis/journald-add-API-to-move-logging-from-var-to-run-again.patch b/debian/patches/apertis/journald-add-API-to-move-logging-from-var-to-run-again.patch new file mode 100644 index 0000000000000000000000000000000000000000..a3ec8a88149506c0685cd0683b3b3077e1b93474 --- /dev/null +++ b/debian/patches/apertis/journald-add-API-to-move-logging-from-var-to-run-again.patch @@ -0,0 +1,197 @@ +From: Lennart Poettering <lennart@poettering.net> +Date: Fri, 5 Apr 2019 18:20:25 +0200 +Subject: journald: add API to move logging from /var to /run again + +We now have this nice little Varlink API, let's beef it up a bit. + +Signed-off-by: Martyn Welch <martyn.welch@collabora.com> +[Martyn Welch: Backported to systemd v241] +--- + src/journal/journald-server.c | 76 ++++++++++++++++++++++++++++++++----------- + 1 file changed, 57 insertions(+), 19 deletions(-) + +diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c +index 2aad8e9..77eecee 100644 +--- a/src/journal/journald-server.c ++++ b/src/journal/journald-server.c +@@ -286,13 +286,14 @@ static bool flushed_flag_is_set(void) { + return access("/run/systemd/journal/flushed", F_OK) >= 0; + } + +-static int system_journal_open(Server *s, bool flush_requested) { ++static int system_journal_open(Server *s, bool flush_requested, bool relinquish_requested) { + const char *fn; + int r = 0; + + if (!s->system_journal && + IN_SET(s->storage, STORAGE_PERSISTENT, STORAGE_AUTO) && +- (flush_requested || flushed_flag_is_set())) { ++ (flush_requested || flushed_flag_is_set()) && ++ !relinquish_requested) { + + /* If in auto mode: first try to create the machine + * path, but not the prefix. +@@ -334,7 +335,7 @@ static int system_journal_open(Server *s, bool flush_requested) { + + fn = strjoina(s->runtime_storage.path, "/system.journal"); + +- if (s->system_journal) { ++ if (s->system_journal && !relinquish_requested) { + + /* Try to open the runtime journal, but only + * if it already exists, so that we can flush +@@ -389,7 +390,7 @@ static JournalFile* find_journal(Server *s, uid_t uid) { + * else that's left the journals as NULL). + * + * Fixes https://github.com/systemd/systemd/issues/3968 */ +- (void) system_journal_open(s, false); ++ (void) system_journal_open(s, false, false); + + /* We split up user logs only on /var, not on /run. If the + * runtime file is open, we write to it exclusively, in order +@@ -1117,7 +1118,7 @@ int server_flush_to_var(Server *s, bool require_flag_file) { + char ts[FORMAT_TIMESPAN_MAX]; + usec_t start; + unsigned n = 0; +- int r; ++ int r, k; + + assert(s); + +@@ -1130,7 +1131,7 @@ int server_flush_to_var(Server *s, bool require_flag_file) { + if (require_flag_file && !flushed_flag_is_set()) + return 0; + +- (void) system_journal_open(s, true); ++ (void) system_journal_open(s, true, false); + + if (!s->system_journal) + return 0; +@@ -1209,9 +1210,36 @@ finish: + n), + NULL); + ++ k = touch("/run/systemd/journal/flushed"); ++ if (k < 0) ++ log_warning_errno(k, "Failed to touch /run/systemd/journal/flushed, ignoring: %m"); ++ + return r; + } + ++static int server_relinquish_var(Server *s) { ++ assert(s); ++ ++ if (s->storage == STORAGE_NONE) ++ return 0; ++ ++ if (s->runtime_journal && !s->system_journal) ++ return 0; ++ ++ log_debug("Relinquishing /var..."); ++ ++ (void) system_journal_open(s, false, true); ++ ++ s->system_journal = journal_file_close(s->system_journal); ++ ordered_hashmap_clear_with_destructor(s->user_journals, journal_file_close); ++ set_clear_with_destructor(s->deferred_closes, journal_file_close); ++ ++ if (unlink("/run/systemd/journal/flushed") < 0 && errno != ENOENT) ++ log_warning_errno(errno, "Failed to unlink /run/systemd/journal/flushed, ignoring: %m"); ++ ++ return 0; ++} ++ + int server_process_datagram(sd_event_source *es, int fd, uint32_t revents, void *userdata) { + Server *s = userdata; + struct ucred *ucred = NULL; +@@ -1331,18 +1359,12 @@ int server_process_datagram(sd_event_source *es, int fd, uint32_t revents, void + } + + static void server_full_flush(Server *s) { +- int r; +- + assert(s); + + (void) server_flush_to_var(s, false); + server_sync(s); + server_vacuum(s, false); + +- r = touch("/run/systemd/journal/flushed"); +- if (r < 0) +- log_warning_errno(r, "Failed to touch /run/systemd/journal/flushed, ignoring: %m"); +- + server_space_usage_message(s, NULL); + } + +@@ -1351,7 +1373,7 @@ static int dispatch_sigusr1(sd_event_source *es, const struct signalfd_siginfo * + + assert(s); + +- log_info("Received request to flush runtime journal from PID " PID_FMT, si->ssi_pid); ++ log_info("Received SIGUSR1 signal from PID " PID_FMT ", as request to flush runtime journal.", si->ssi_pid); + server_full_flush(s); + + return 0; +@@ -1381,7 +1403,7 @@ static int dispatch_sigusr2(sd_event_source *es, const struct signalfd_siginfo * + + assert(s); + +- log_info("Received request to rotate journal from PID " PID_FMT, si->ssi_pid); ++ log_info("Received SIGUSR2 signal from PID " PID_FMT ", as request to rotate journal.", si->ssi_pid); + server_full_rotate(s); + + return 0; +@@ -1418,7 +1440,7 @@ static int dispatch_sigrtmin1(sd_event_source *es, const struct signalfd_siginfo + + assert(s); + +- log_debug("Received request to sync from PID " PID_FMT, si->ssi_pid); ++ log_debug("Received SIGRTMIN1 signal from PID " PID_FMT ", as request to sync.", si->ssi_pid ); + server_full_sync(s); + + return 0; +@@ -1934,6 +1956,21 @@ static int vl_method_flush_to_var(Varlink *link, JsonVariant *parameters, Varlin + return varlink_reply(link, NULL); + } + ++static int vl_method_relinquish_var(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { ++ Server *s = userdata; ++ ++ assert(link); ++ assert(s); ++ ++ if (json_variant_elements(parameters) > 0) ++ return varlink_error_invalid_parameter(link, parameters); ++ ++ log_info("Received client request to relinquish /var access."); ++ server_relinquish_var(s); ++ ++ return varlink_reply(link, NULL); ++} ++ + static int server_open_varlink(Server *s) { + int r; + +@@ -1947,9 +1984,10 @@ static int server_open_varlink(Server *s) { + + r = varlink_server_bind_method_many( + s->varlink_server, +- "io.systemd.Journal.Synchronize", vl_method_synchronize, +- "io.systemd.Journal.Rotate", vl_method_rotate, +- "io.systemd.Journal.FlushToVar", vl_method_flush_to_var); ++ "io.systemd.Journal.Synchronize", vl_method_synchronize, ++ "io.systemd.Journal.Rotate", vl_method_rotate, ++ "io.systemd.Journal.FlushToVar", vl_method_flush_to_var, ++ "io.systemd.Journal.RelinquishVar", vl_method_relinquish_var); + if (r < 0) + return r; + +@@ -2164,7 +2202,7 @@ int server_init(Server *s) { + + (void) client_context_acquire_default(s); + +- return system_journal_open(s, false); ++ return system_journal_open(s, false, false); + } + + void server_maybe_append_tags(Server *s) { diff --git a/debian/patches/apertis/journald-also-offer-flush-rotate-sync-as-varlink-method-c.patch b/debian/patches/apertis/journald-also-offer-flush-rotate-sync-as-varlink-method-c.patch new file mode 100644 index 0000000000000000000000000000000000000000..faf3db351102d85af1deb7ecd97fb137a7badbb4 --- /dev/null +++ b/debian/patches/apertis/journald-also-offer-flush-rotate-sync-as-varlink-method-c.patch @@ -0,0 +1,236 @@ +From: Lennart Poettering <lennart@poettering.net> +Date: Thu, 4 Apr 2019 19:38:18 +0200 +Subject: journald: also offer flush/rotate/sync as varlink method calls + +This makes the operations previously available via asynchronous signals +also available as regular varlink method calls, i.e. with sane +completion. + +Signed-off-by: Martyn Welch <martyn.welch@collabora.com> +[Martyn Welch: Backported to systemd v241] +--- + src/journal/journald-server.c | 125 ++++++++++++++++++++++++++++++++++++++---- + src/journal/journald-server.h | 3 + + 2 files changed, 117 insertions(+), 11 deletions(-) + +diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c +index 7fe0f82..dc4b259 100644 +--- a/src/journal/journald-server.c ++++ b/src/journal/journald-server.c +@@ -1330,14 +1330,11 @@ int server_process_datagram(sd_event_source *es, int fd, uint32_t revents, void + return 0; + } + +-static int dispatch_sigusr1(sd_event_source *es, const struct signalfd_siginfo *si, void *userdata) { +- Server *s = userdata; ++static void server_full_flush(Server *s) { + int r; + + assert(s); + +- log_info("Received request to flush runtime journal from PID " PID_FMT, si->ssi_pid); +- + (void) server_flush_to_var(s, false); + server_sync(s); + server_vacuum(s, false); +@@ -1347,16 +1344,24 @@ static int dispatch_sigusr1(sd_event_source *es, const struct signalfd_siginfo * + log_warning_errno(r, "Failed to touch /run/systemd/journal/flushed, ignoring: %m"); + + server_space_usage_message(s, NULL); +- return 0; + } + +-static int dispatch_sigusr2(sd_event_source *es, const struct signalfd_siginfo *si, void *userdata) { ++static int dispatch_sigusr1(sd_event_source *es, const struct signalfd_siginfo *si, void *userdata) { + Server *s = userdata; ++ ++ assert(s); ++ ++ log_info("Received request to flush runtime journal from PID " PID_FMT, si->ssi_pid); ++ server_full_flush(s); ++ ++ return 0; ++} ++ ++static void server_full_rotate(Server *s) { + int r; + + assert(s); + +- log_info("Received request to rotate journal from PID " PID_FMT, si->ssi_pid); + server_rotate(s); + server_vacuum(s, true); + +@@ -1369,6 +1374,15 @@ static int dispatch_sigusr2(sd_event_source *es, const struct signalfd_siginfo * + r = write_timestamp_file_atomic("/run/systemd/journal/rotated", now(CLOCK_MONOTONIC)); + if (r < 0) + log_warning_errno(r, "Failed to write /run/systemd/journal/rotated, ignoring: %m"); ++} ++ ++static int dispatch_sigusr2(sd_event_source *es, const struct signalfd_siginfo *si, void *userdata) { ++ Server *s = userdata; ++ ++ assert(s); ++ ++ log_info("Received request to rotate journal from PID " PID_FMT, si->ssi_pid); ++ server_full_rotate(s); + + return 0; + } +@@ -1384,14 +1398,11 @@ static int dispatch_sigterm(sd_event_source *es, const struct signalfd_siginfo * + return 0; + } + +-static int dispatch_sigrtmin1(sd_event_source *es, const struct signalfd_siginfo *si, void *userdata) { +- Server *s = userdata; ++static void server_full_sync(Server *s) { + int r; + + assert(s); + +- log_debug("Received request to sync from PID " PID_FMT, si->ssi_pid); +- + server_sync(s); + + /* Let clients know when the most recent sync happened. */ +@@ -1399,6 +1410,17 @@ static int dispatch_sigrtmin1(sd_event_source *es, const struct signalfd_siginfo + if (r < 0) + log_warning_errno(r, "Failed to write /run/systemd/journal/synced, ignoring: %m"); + ++ return; ++} ++ ++static int dispatch_sigrtmin1(sd_event_source *es, const struct signalfd_siginfo *si, void *userdata) { ++ Server *s = userdata; ++ ++ assert(s); ++ ++ log_debug("Received request to sync from PID " PID_FMT, si->ssi_pid); ++ server_full_sync(s); ++ + return 0; + } + +@@ -1813,6 +1835,81 @@ static int server_connect_notify(Server *s) { + return 0; + } + ++static int vl_method_synchronize(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { ++ Server *s = userdata; ++ ++ assert(link); ++ assert(s); ++ ++ if (json_variant_elements(parameters) > 0) ++ return varlink_error_invalid_parameter(link, parameters); ++ ++ log_info("Received client request to rotate journal."); ++ server_full_sync(s); ++ ++ return varlink_reply(link, NULL); ++} ++ ++static int vl_method_rotate(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { ++ Server *s = userdata; ++ ++ assert(link); ++ assert(s); ++ ++ if (json_variant_elements(parameters) > 0) ++ return varlink_error_invalid_parameter(link, parameters); ++ ++ log_info("Received client request to rotate journal."); ++ server_full_rotate(s); ++ ++ return varlink_reply(link, NULL); ++} ++ ++static int vl_method_flush_to_var(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { ++ Server *s = userdata; ++ ++ assert(link); ++ assert(s); ++ ++ if (json_variant_elements(parameters) > 0) ++ return varlink_error_invalid_parameter(link, parameters); ++ ++ log_info("Received client request to flush runtime journal."); ++ server_full_flush(s); ++ ++ return varlink_reply(link, NULL); ++} ++ ++static int server_open_varlink(Server *s) { ++ int r; ++ ++ assert(s); ++ ++ r = varlink_server_new(&s->varlink_server, VARLINK_SERVER_ROOT_ONLY); ++ if (r < 0) ++ return r; ++ ++ varlink_server_set_userdata(s->varlink_server, s); ++ ++ r = varlink_server_bind_method_many( ++ s->varlink_server, ++ "io.systemd.Journal.Synchronize", vl_method_synchronize, ++ "io.systemd.Journal.Rotate", vl_method_rotate, ++ "io.systemd.Journal.FlushToVar", vl_method_flush_to_var); ++ if (r < 0) ++ return r; ++ ++ r = varlink_server_listen_address(s->varlink_server, "/run/systemd/journal/io.systemd.journal", 0600); ++ if (r < 0) ++ return r; ++ ++ r = varlink_server_attach_event(s->varlink_server, s->event, SD_EVENT_PRIORITY_NORMAL); ++ if (r < 0) ++ return r; ++ ++ return 0; ++} ++ + int server_init(Server *s) { + _cleanup_fdset_free_ FDSet *fds = NULL; + int n, r, fd; +@@ -1973,6 +2070,10 @@ int server_init(Server *s) { + return r; + } + ++ r = server_open_varlink(s); ++ if (r < 0) ++ return r; ++ + r = server_open_kernel_seqnum(s); + if (r < 0) + return r; +@@ -2046,6 +2147,8 @@ void server_done(Server *s) { + + ordered_hashmap_free_with_destructor(s->user_journals, journal_file_close); + ++ varlink_server_unref(s->varlink_server); ++ + sd_event_source_unref(s->syslog_event_source); + sd_event_source_unref(s->native_event_source); + sd_event_source_unref(s->stdout_event_source); +diff --git a/src/journal/journald-server.h b/src/journal/journald-server.h +index 3f6b42d..b44e658 100644 +--- a/src/journal/journald-server.h ++++ b/src/journal/journald-server.h +@@ -16,6 +16,7 @@ typedef struct Server Server; + #include "journald-stream.h" + #include "list.h" + #include "prioq.h" ++#include "varlink.h" + + typedef enum Storage { + STORAGE_AUTO, +@@ -165,6 +166,8 @@ struct Server { + + ClientContext *my_context; /* the context of journald itself */ + ClientContext *pid1_context; /* the context of PID 1 */ ++ ++ VarlinkServer *varlink_server; + }; + + #define SERVER_MACHINE_ID(s) ((s)->machine_id_field + STRLEN("_MACHINE_ID=")) diff --git a/debian/patches/apertis/shared-add-minimal-varlink-implementation.patch b/debian/patches/apertis/shared-add-minimal-varlink-implementation.patch new file mode 100644 index 0000000000000000000000000000000000000000..437278cc263c8a53ac86c7715fc99300d64d3d13 --- /dev/null +++ b/debian/patches/apertis/shared-add-minimal-varlink-implementation.patch @@ -0,0 +1,2650 @@ +From: Lennart Poettering <lennart@poettering.net> +Date: Thu, 11 Apr 2019 18:46:54 +0200 +Subject: shared: add minimal varlink implementation + +This adds a minimal Varlink (https://varlink.org/) implementation to our +tree. Given that we already have a JSON logic it's an easy thing to add. + +Why bother? + +We currently have major problems with IPC before dbus-daemon is up, and +in all components that dbus-daemon itself makes use of (such as various +NSS modules to resolve users as well as the journal which dbus-daemon +logs to). Because of that we so far ended up creating various (usually +crappy) work-arounds either coming up with secondary IPC systems or +sharing data statelessly in /run or similar. Let's clean this up, and +instead use a clean, well-defined, broker-less IPC for cases like that. + +This is a minimal implementation of Varlink, i.e. the most basic logic +only. Stuff that's missing is left out on purpose: there's no +introspection/validation and there's no name service. It might make +sense to add that later, but for now let's only do the minimum buy-in we +can get away with. In particular as I'd assume that at least initially +we only use this IPC for our internal communication avoiding +introspection and the name service should be fine. + +Specifically, I'd expect that we add IPC interfaces to the following +concepts with this scheme: + +1. nss-resolve (so that hostname lookups with resolved work before + resolved is up) +2. journald (so that IPC calls to journald don't have to go through + dbus-daemon thus creating a cyclic dependency between journald and + dbus-daemon) +3. nss-systemd (so that dynamic user lookups via PID 1 work sanely even + inside of dbus-daemon, because otherwise we'd want to use dbus to run + dbus which causes deadlocks) +4. networkd (to make sure one can talk to it in the initrd already, + long before dbus is around) + +And there might be other cases similar to this. + +Signed-off-by: Martyn Welch <martyn.welch@collabora.com> +[Martyn Welch: Backported to systemd v241] +--- + src/shared/meson.build | 2 + + src/shared/varlink.c | 2412 ++++++++++++++++++++++++++++++++++++++++++++++++ + src/shared/varlink.h | 162 ++++ + 3 files changed, 2576 insertions(+) + create mode 100644 src/shared/varlink.c + create mode 100644 src/shared/varlink.h + +diff --git a/src/shared/meson.build b/src/shared/meson.build +index f6d1092..3a43168 100644 +--- a/src/shared/meson.build ++++ b/src/shared/meson.build +@@ -153,6 +153,8 @@ shared_sources = files(''' + uid-range.c + uid-range.h + utmp-wtmp.h ++ varlink.c ++ varlink.h + verbs.c + verbs.h + vlan-util.c +diff --git a/src/shared/varlink.c b/src/shared/varlink.c +new file mode 100644 +index 0000000..2a89f88 +--- /dev/null ++++ b/src/shared/varlink.c +@@ -0,0 +1,2412 @@ ++/* SPDX-License-Identifier: LGPL-2.1+ */ ++ ++#include <sys/poll.h> ++ ++#include "alloc-util.h" ++#include "fd-util.h" ++#include "hashmap.h" ++#include "list.h" ++#include "process-util.h" ++#include "set.h" ++#include "socket-util.h" ++#include "string-table.h" ++#include "string-util.h" ++#include "strv.h" ++#include "time-util.h" ++#include "umask-util.h" ++#include "user-util.h" ++#include "util.h" ++#include "varlink.h" ++ ++/* Transient errors we might get on accept() that we should ignore. As per error handling comment in ++ * the accept(2) man page. */ ++static inline bool ERRNO_IS_ACCEPT_AGAIN(int r) { ++ return ERRNO_IS_DISCONNECT(r) || ++ IN_SET(abs(r), ++ EAGAIN, ++ EINTR, ++ EOPNOTSUPP); ++} ++ ++#define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U ++#define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U ++ ++#define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC) ++#define VARLINK_BUFFER_MAX (16U*1024U*1024U) ++#define VARLINK_READ_SIZE (64U*1024U) ++ ++typedef enum VarlinkState { ++ /* Client side states */ ++ VARLINK_IDLE_CLIENT, ++ VARLINK_AWAITING_REPLY, ++ VARLINK_CALLING, ++ VARLINK_CALLED, ++ VARLINK_PROCESSING_REPLY, ++ ++ /* Server side states */ ++ VARLINK_IDLE_SERVER, ++ VARLINK_PROCESSING_METHOD, ++ VARLINK_PROCESSING_METHOD_MORE, ++ VARLINK_PROCESSING_METHOD_ONEWAY, ++ VARLINK_PROCESSED_METHOD, ++ VARLINK_PROCESSED_METHOD_MORE, ++ VARLINK_PENDING_METHOD, ++ VARLINK_PENDING_METHOD_MORE, ++ ++ /* Common states (only during shutdown) */ ++ VARLINK_PENDING_DISCONNECT, ++ VARLINK_PENDING_TIMEOUT, ++ VARLINK_PROCESSING_DISCONNECT, ++ VARLINK_PROCESSING_TIMEOUT, ++ VARLINK_PROCESSING_FAILURE, ++ VARLINK_DISCONNECTED, ++ ++ _VARLINK_STATE_MAX, ++ _VARLINK_STATE_INVALID = -1 ++} VarlinkState; ++ ++/* Tests whether we are not yet disconnected. Note that this is true during all states where the connection ++ * is still good for something, and false only when it's dead for good. This means: when we are ++ * asynchronously connecting to a peer and the connect() is still pending, then this will return 'true', as ++ * the connection is still good, and we are likely to be able to properly operate on it soon. */ ++#define VARLINK_STATE_IS_ALIVE(state) \ ++ IN_SET(state, \ ++ VARLINK_IDLE_CLIENT, \ ++ VARLINK_AWAITING_REPLY, \ ++ VARLINK_CALLING, \ ++ VARLINK_CALLED, \ ++ VARLINK_PROCESSING_REPLY, \ ++ VARLINK_IDLE_SERVER, \ ++ VARLINK_PROCESSING_METHOD, \ ++ VARLINK_PROCESSING_METHOD_MORE, \ ++ VARLINK_PROCESSING_METHOD_ONEWAY, \ ++ VARLINK_PROCESSED_METHOD, \ ++ VARLINK_PROCESSED_METHOD_MORE, \ ++ VARLINK_PENDING_METHOD, \ ++ VARLINK_PENDING_METHOD_MORE) ++ ++struct Varlink { ++ unsigned n_ref; ++ ++ VarlinkServer *server; ++ ++ VarlinkState state; ++ bool connecting; /* This boolean indicates whether the socket fd we are operating on is currently ++ * processing an asynchronous connect(). In that state we watch the socket for ++ * EPOLLOUT, but we refrain from calling read() or write() on the socket as that ++ * will trigger ENOTCONN. Note that this boolean is kept separate from the ++ * VarlinkState above on purpose: while the connect() is still not complete we ++ * already want to allow queuing of messages and similar. Thus it's nice to keep ++ * these two state concepts separate: the VarlinkState encodes what our own view of ++ * the connection is, i.e. whether we think it's a server, a client, and has ++ * something queued already, while 'connecting' tells us a detail about the ++ * transport used below, that should have no effect on how we otherwise accept and ++ * process operations from the user. ++ * ++ * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the ++ * connection is good to use, even if it might not be fully connected ++ * yet. connecting=true then informs you that actually we are still connecting, and ++ * the connection is actually not established yet and thus any requests you enqueue ++ * now will still work fine but will be queued only, not sent yet, but that ++ * shouldn't stop you from using the connection, since eventually whatever you queue ++ * *will* be sent. ++ * ++ * Or to say this even differently: 'state' is a high-level ("application layer" ++ * high, if you so will) state, while 'conecting' is a low-level ("transport layer" ++ * low, if you so will) state, and while they are not entirely unrelated and ++ * sometimes propagate effects to each other they are only asynchronously connected ++ * at most. */ ++ unsigned n_pending; ++ ++ int fd; ++ ++ char *input_buffer; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */ ++ size_t input_buffer_allocated; ++ size_t input_buffer_index; ++ size_t input_buffer_size; ++ size_t input_buffer_unscanned; ++ ++ char *output_buffer; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */ ++ size_t output_buffer_allocated; ++ size_t output_buffer_index; ++ size_t output_buffer_size; ++ ++ VarlinkReply reply_callback; ++ ++ JsonVariant *current; ++ JsonVariant *reply; ++ ++ struct ucred ucred; ++ bool ucred_acquired:1; ++ ++ bool write_disconnected:1; ++ bool read_disconnected:1; ++ bool prefer_read_write:1; ++ bool got_pollhup:1; ++ ++ usec_t timestamp; ++ usec_t timeout; ++ ++ void *userdata; ++ char *description; ++ ++ sd_event *event; ++ sd_event_source *io_event_source; ++ sd_event_source *time_event_source; ++ sd_event_source *quit_event_source; ++ sd_event_source *defer_event_source; ++}; ++ ++typedef struct VarlinkServerSocket VarlinkServerSocket; ++ ++struct VarlinkServerSocket { ++ VarlinkServer *server; ++ ++ int fd; ++ char *address; ++ ++ sd_event_source *event_source; ++ ++ LIST_FIELDS(VarlinkServerSocket, sockets); ++}; ++ ++struct VarlinkServer { ++ unsigned n_ref; ++ VarlinkServerFlags flags; ++ ++ LIST_HEAD(VarlinkServerSocket, sockets); ++ ++ Hashmap *methods; ++ VarlinkConnect connect_callback; ++ ++ sd_event *event; ++ int64_t event_priority; ++ ++ unsigned n_connections; ++ Hashmap *by_uid; ++ ++ void *userdata; ++ char *description; ++ ++ unsigned connections_max; ++ unsigned connections_per_uid_max; ++}; ++ ++static const char* const varlink_state_table[_VARLINK_STATE_MAX] = { ++ [VARLINK_IDLE_CLIENT] = "idle-client", ++ [VARLINK_AWAITING_REPLY] = "awaiting-reply", ++ [VARLINK_CALLING] = "calling", ++ [VARLINK_CALLED] = "called", ++ [VARLINK_PROCESSING_REPLY] = "processing-reply", ++ [VARLINK_IDLE_SERVER] = "idle-server", ++ [VARLINK_PROCESSING_METHOD] = "processing-method", ++ [VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more", ++ [VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway", ++ [VARLINK_PROCESSED_METHOD] = "processed-method", ++ [VARLINK_PROCESSED_METHOD_MORE] = "processed-method-more", ++ [VARLINK_PENDING_METHOD] = "pending-method", ++ [VARLINK_PENDING_METHOD_MORE] = "pending-method-more", ++ [VARLINK_PENDING_DISCONNECT] = "pending-disconnect", ++ [VARLINK_PENDING_TIMEOUT] = "pending-timeout", ++ [VARLINK_PROCESSING_DISCONNECT] = "processing-disconnect", ++ [VARLINK_PROCESSING_TIMEOUT] = "processing-timeout", ++ [VARLINK_PROCESSING_FAILURE] = "processing-failure", ++ [VARLINK_DISCONNECTED] = "disconnected", ++}; ++ ++DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state, VarlinkState); ++ ++#define varlink_log_errno(v, error, fmt, ...) \ ++ log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__) ++ ++#define varlink_log(v, fmt, ...) \ ++ log_debug("%s: " fmt, varlink_description(v), ##__VA_ARGS__) ++ ++#define varlink_server_log_errno(s, error, fmt, ...) \ ++ log_debug_errno(error, "%s: " fmt, varlink_server_description(s), ##__VA_ARGS__) ++ ++#define varlink_server_log(s, fmt, ...) \ ++ log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__) ++ ++static inline const char *varlink_description(Varlink *v) { ++ return strna(v ? v->description : NULL); ++} ++ ++static inline const char *varlink_server_description(VarlinkServer *s) { ++ return strna(s ? s->description : NULL); ++} ++ ++static void varlink_set_state(Varlink *v, VarlinkState state) { ++ assert(v); ++ ++ varlink_log(v, "varlink: changing state %s → %s", ++ varlink_state_to_string(v->state), ++ varlink_state_to_string(state)); ++ ++ v->state = state; ++} ++ ++static int varlink_new(Varlink **ret) { ++ Varlink *v; ++ ++ assert(ret); ++ ++ v = new(Varlink, 1); ++ if (!v) ++ return -ENOMEM; ++ ++ *v = (Varlink) { ++ .n_ref = 1, ++ .fd = -1, ++ ++ .state = _VARLINK_STATE_INVALID, ++ ++ .ucred.uid = UID_INVALID, ++ .ucred.gid = GID_INVALID, ++ ++ .timestamp = USEC_INFINITY, ++ .timeout = VARLINK_DEFAULT_TIMEOUT_USEC ++ }; ++ ++ *ret = v; ++ return 0; ++} ++ ++int varlink_connect_address(Varlink **ret, const char *address) { ++ _cleanup_(varlink_unrefp) Varlink *v = NULL; ++ union sockaddr_union sockaddr; ++ int r; ++ ++ assert_return(ret, -EINVAL); ++ assert_return(address, -EINVAL); ++ ++ r = sockaddr_un_set_path(&sockaddr.un, address); ++ if (r < 0) ++ return r; ++ ++ r = varlink_new(&v); ++ if (r < 0) ++ return r; ++ ++ v->fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); ++ if (v->fd < 0) ++ return -errno; ++ ++ if (connect(v->fd, &sockaddr.sa, SOCKADDR_UN_LEN(sockaddr.un)) < 0) { ++ if (!IN_SET(errno, EAGAIN, EINPROGRESS)) ++ return -errno; ++ ++ v->connecting = true; /* We are asynchronously connecting, i.e. the connect() is being ++ * processed in the background. As long as that's the case the socket ++ * is in a special state: it's there, we can poll it for EPOLLOUT, but ++ * if we attempt to write() to it before we see EPOLLOUT we'll get ++ * ENOTCONN (and not EAGAIN, like we would for a normal connected ++ * socket that isn't writable at the moment). Since ENOTCONN on write() ++ * hence can mean two different things (i.e. connection not complete ++ * yet vs. already disconnected again), we store as a boolean whether ++ * we are still in connect(). */ ++ } ++ ++ varlink_set_state(v, VARLINK_IDLE_CLIENT); ++ ++ *ret = TAKE_PTR(v); ++ return r; ++} ++ ++int varlink_connect_fd(Varlink **ret, int fd) { ++ Varlink *v; ++ int r; ++ ++ assert_return(ret, -EINVAL); ++ assert_return(fd >= 0, -EBADF); ++ ++ r = fd_nonblock(fd, true); ++ if (r < 0) ++ return r; ++ ++ r = varlink_new(&v); ++ if (r < 0) ++ return r; ++ ++ v->fd = fd; ++ varlink_set_state(v, VARLINK_IDLE_CLIENT); ++ ++ /* Note that if this function is called we assume the passed socket (if it is one) is already ++ * properly connected, i.e. any asynchronous connect() done on it already completed. Because of that ++ * we'll not set the 'connecting' boolean here, i.e. we don't need to avoid write()ing to the socket ++ * until the connection is fully set up. Behaviour here is hence a bit different from ++ * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and ++ * avoid doing write() on it before we saw EPOLLOUT for the first time. */ ++ ++ *ret = v; ++ return 0; ++} ++ ++static void varlink_detach_event_sources(Varlink *v) { ++ assert(v); ++ ++ if (v->io_event_source) { ++ (void) sd_event_source_set_enabled(v->io_event_source, SD_EVENT_OFF); ++ v->io_event_source = sd_event_source_unref(v->io_event_source); ++ } ++ ++ if (v->time_event_source) { ++ (void) sd_event_source_set_enabled(v->time_event_source, SD_EVENT_OFF); ++ v->time_event_source = sd_event_source_unref(v->time_event_source); ++ } ++ ++ if (v->quit_event_source) { ++ (void) sd_event_source_set_enabled(v->quit_event_source, SD_EVENT_OFF); ++ v->quit_event_source = sd_event_source_unref(v->quit_event_source); ++ } ++ ++ if (v->defer_event_source) { ++ (void) sd_event_source_set_enabled(v->defer_event_source, SD_EVENT_OFF); ++ v->defer_event_source = sd_event_source_unref(v->defer_event_source); ++ } ++} ++ ++static void varlink_clear(Varlink *v) { ++ assert(v); ++ ++ varlink_detach_event_sources(v); ++ ++ v->fd = safe_close(v->fd); ++ ++ v->input_buffer = mfree(v->input_buffer); ++ v->output_buffer = mfree(v->output_buffer); ++ ++ v->current = json_variant_unref(v->current); ++ v->reply = json_variant_unref(v->reply); ++ ++ v->event = sd_event_unref(v->event); ++} ++ ++static Varlink* varlink_destroy(Varlink *v) { ++ if (!v) ++ return NULL; ++ ++ /* If this is called the server object must already been unreffed here. Why that? because when we ++ * linked up the varlink connection with the server object we took one ref in each direction */ ++ assert(!v->server); ++ ++ varlink_clear(v); ++ ++ free(v->description); ++ return mfree(v); ++} ++ ++DEFINE_TRIVIAL_REF_UNREF_FUNC(Varlink, varlink, varlink_destroy); ++ ++static int varlink_test_disconnect(Varlink *v) { ++ assert(v); ++ ++ /* Tests whether we the the connection has been terminated. We are careful to not stop processing it ++ * prematurely, since we want to handle half-open connections as well as possible and want to flush ++ * out and read data before we close down if we can. */ ++ ++ /* Already disconnected? */ ++ if (!VARLINK_STATE_IS_ALIVE(v->state)) ++ return 0; ++ ++ /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */ ++ if (v->connecting) ++ return 0; ++ ++ /* Still something to write and we can write? Stay around */ ++ if (v->output_buffer_size > 0 && !v->write_disconnected) ++ return 0; ++ ++ /* Both sides gone already? Then there's no need to stick around */ ++ if (v->read_disconnected && v->write_disconnected) ++ goto disconnect; ++ ++ /* If we are waiting for incoming data but the read side is shut down, disconnect. */ ++ if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected) ++ goto disconnect; ++ ++ /* Similar, if are a client that hasn't written anything yet but the write side is dead, also ++ * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side ++ * being down if we never wrote anything. */ ++ if (IN_SET(v->state, VARLINK_IDLE_CLIENT) && (v->write_disconnected || v->got_pollhup)) ++ goto disconnect; ++ ++ return 0; ++ ++disconnect: ++ varlink_set_state(v, VARLINK_PENDING_DISCONNECT); ++ return 1; ++} ++ ++static int varlink_write(Varlink *v) { ++ ssize_t n; ++ ++ assert(v); ++ ++ if (!VARLINK_STATE_IS_ALIVE(v->state)) ++ return 0; ++ if (v->connecting) /* Writing while we are still wait for a non-blocking connect() to complete will ++ * result in ENOTCONN, hence exit early here */ ++ return 0; ++ if (v->output_buffer_size == 0) ++ return 0; ++ if (v->write_disconnected) ++ return 0; ++ ++ assert(v->fd >= 0); ++ ++ /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible ++ * with non-socket IO, hence fall back automatically */ ++ if (!v->prefer_read_write) { ++ n = send(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL); ++ if (n < 0 && errno == ENOTSOCK) ++ v->prefer_read_write = true; ++ } ++ if (v->prefer_read_write) ++ n = write(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size); ++ if (n < 0) { ++ if (errno == EAGAIN) ++ return 0; ++ ++ if (ERRNO_IS_DISCONNECT(errno)) { ++ /* If we get informed about a disconnect on write, then let's remember that, but not ++ * act on it just yet. Let's wait for read() to report the issue first. */ ++ v->write_disconnected = true; ++ return 1; ++ } ++ ++ return -errno; ++ } ++ ++ v->output_buffer_size -= n; ++ ++ if (v->output_buffer_size == 0) ++ v->output_buffer_index = 0; ++ else ++ v->output_buffer_index += n; ++ ++ v->timestamp = now(CLOCK_MONOTONIC); ++ return 1; ++} ++ ++static int varlink_read(Varlink *v) { ++ size_t rs; ++ ssize_t n; ++ ++ assert(v); ++ ++ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER)) ++ return 0; ++ if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */ ++ return 0; ++ if (v->current) ++ return 0; ++ if (v->input_buffer_unscanned > 0) ++ return 0; ++ if (v->read_disconnected) ++ return 0; ++ ++ if (v->input_buffer_size >= VARLINK_BUFFER_MAX) ++ return -ENOBUFS; ++ ++ assert(v->fd >= 0); ++ ++ if (v->input_buffer_allocated <= v->input_buffer_index + v->input_buffer_size) { ++ size_t add; ++ ++ add = MIN(VARLINK_BUFFER_MAX - v->input_buffer_size, VARLINK_READ_SIZE); ++ ++ if (v->input_buffer_index == 0) { ++ ++ if (!GREEDY_REALLOC(v->input_buffer, v->input_buffer_allocated, v->input_buffer_size + add)) ++ return -ENOMEM; ++ ++ } else { ++ char *b; ++ ++ b = new(char, v->input_buffer_size + add); ++ if (!b) ++ return -ENOMEM; ++ ++ memcpy(b, v->input_buffer + v->input_buffer_index, v->input_buffer_size); ++ ++ free_and_replace(v->input_buffer, b); ++ ++ v->input_buffer_allocated = v->input_buffer_size + add; ++ v->input_buffer_index = 0; ++ } ++ } ++ ++ rs = v->input_buffer_allocated - (v->input_buffer_index + v->input_buffer_size); ++ ++ if (!v->prefer_read_write) { ++ n = recv(v->fd, v->input_buffer + v->input_buffer_index + v->input_buffer_size, rs, MSG_DONTWAIT); ++ if (n < 0 && errno == ENOTSOCK) ++ v->prefer_read_write = true; ++ } ++ if (v->prefer_read_write) ++ n = read(v->fd, v->input_buffer + v->input_buffer_index + v->input_buffer_size, rs); ++ if (n < 0) { ++ if (errno == EAGAIN) ++ return 0; ++ ++ if (ERRNO_IS_DISCONNECT(errno)) { ++ v->read_disconnected = true; ++ return 1; ++ } ++ ++ return -errno; ++ } ++ if (n == 0) { /* EOF */ ++ v->read_disconnected = true; ++ return 1; ++ } ++ ++ v->input_buffer_size += n; ++ v->input_buffer_unscanned += n; ++ ++ return 1; ++} ++ ++static int varlink_parse_message(Varlink *v) { ++ const char *e, *begin; ++ size_t sz; ++ int r; ++ ++ assert(v); ++ ++ if (v->current) ++ return 0; ++ if (v->input_buffer_unscanned <= 0) ++ return 0; ++ ++ assert(v->input_buffer_unscanned <= v->input_buffer_size); ++ assert(v->input_buffer_index + v->input_buffer_size <= v->input_buffer_allocated); ++ ++ begin = v->input_buffer + v->input_buffer_index; ++ ++ e = memchr(begin + v->input_buffer_size - v->input_buffer_unscanned, 0, v->input_buffer_unscanned); ++ if (!e) { ++ v->input_buffer_unscanned = 0; ++ return 0; ++ } ++ ++ sz = e - begin + 1; ++ ++ varlink_log(v, "New incoming message: %s", begin); ++ ++ r = json_parse(begin, &v->current, NULL, NULL); ++ if (r < 0) ++ return r; ++ ++ v->input_buffer_size -= sz; ++ ++ if (v->input_buffer_size == 0) ++ v->input_buffer_index = 0; ++ else ++ v->input_buffer_index += sz; ++ ++ v->input_buffer_unscanned = v->input_buffer_size; ++ return 1; ++} ++ ++static int varlink_test_timeout(Varlink *v) { ++ assert(v); ++ ++ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING)) ++ return 0; ++ if (v->timeout == USEC_INFINITY) ++ return 0; ++ ++ if (now(CLOCK_MONOTONIC) < usec_add(v->timestamp, v->timeout)) ++ return 0; ++ ++ varlink_set_state(v, VARLINK_PENDING_TIMEOUT); ++ ++ return 1; ++} ++ ++static int varlink_dispatch_local_error(Varlink *v, const char *error) { ++ int r; ++ ++ assert(v); ++ assert(error); ++ ++ if (!v->reply_callback) ++ return 0; ++ ++ r = v->reply_callback(v, NULL, error, VARLINK_REPLY_ERROR|VARLINK_REPLY_LOCAL, v->userdata); ++ if (r < 0) ++ log_debug_errno(r, "Reply callback returned error, ignoring: %m"); ++ ++ return 1; ++} ++ ++static int varlink_dispatch_timeout(Varlink *v) { ++ assert(v); ++ ++ if (v->state != VARLINK_PENDING_TIMEOUT) ++ return 0; ++ ++ varlink_set_state(v, VARLINK_PROCESSING_TIMEOUT); ++ varlink_dispatch_local_error(v, VARLINK_ERROR_TIMEOUT); ++ varlink_close(v); ++ ++ return 1; ++} ++ ++static int varlink_dispatch_disconnect(Varlink *v) { ++ assert(v); ++ ++ if (v->state != VARLINK_PENDING_DISCONNECT) ++ return 0; ++ ++ varlink_set_state(v, VARLINK_PROCESSING_DISCONNECT); ++ varlink_dispatch_local_error(v, VARLINK_ERROR_DISCONNECTED); ++ varlink_close(v); ++ ++ return 1; ++} ++ ++static int varlink_sanitize_parameters(JsonVariant **v) { ++ assert(v); ++ ++ /* Varlink always wants a parameters list, hence make one if the caller doesn't want any */ ++ if (!*v) ++ return json_variant_new_object(v, NULL, 0); ++ else if (!json_variant_is_object(*v)) ++ return -EINVAL; ++ ++ return 0; ++} ++ ++static int varlink_dispatch_reply(Varlink *v) { ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ VarlinkReplyFlags flags = 0; ++ const char *error = NULL; ++ JsonVariant *k, *e; ++ int r; ++ ++ assert(v); ++ ++ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING)) ++ return 0; ++ if (!v->current) ++ return 0; ++ ++ assert(v->n_pending > 0); ++ ++ if (!json_variant_is_object(v->current)) ++ goto invalid; ++ ++ JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) { ++ const char *n; ++ ++ assert_se(n = json_variant_string(k)); ++ ++ if (streq(n, "error")) { ++ if (error) ++ goto invalid; ++ if (!json_variant_is_string(e)) ++ goto invalid; ++ ++ error = json_variant_string(e); ++ flags |= VARLINK_REPLY_ERROR; ++ ++ } else if (streq(n, "parameters")) { ++ if (parameters) ++ goto invalid; ++ if (!json_variant_is_object(e)) ++ goto invalid; ++ ++ parameters = json_variant_ref(e); ++ ++ } else if (streq(n, "continues")) { ++ if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES)) ++ goto invalid; ++ ++ if (!json_variant_is_boolean(e)) ++ goto invalid; ++ ++ if (json_variant_boolean(e)) ++ flags |= VARLINK_REPLY_CONTINUES; ++ } else ++ goto invalid; ++ } ++ ++ if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES)) ++ goto invalid; ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ goto invalid; ++ ++ if (v->state == VARLINK_AWAITING_REPLY) { ++ varlink_set_state(v, VARLINK_PROCESSING_REPLY); ++ ++ if (v->reply_callback) { ++ r = v->reply_callback(v, parameters, error, flags, v->userdata); ++ if (r < 0) ++ log_debug_errno(r, "Reply callback returned error, ignoring: %m"); ++ } ++ ++ v->current = json_variant_unref(v->current); ++ ++ if (v->state == VARLINK_PROCESSING_REPLY) { ++ assert(v->n_pending > 0); ++ v->n_pending--; ++ ++ varlink_set_state(v, v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY); ++ } ++ } else { ++ assert(v->state == VARLINK_CALLING); ++ ++ if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES)) ++ goto invalid; ++ ++ varlink_set_state(v, VARLINK_CALLED); ++ } ++ ++ return 1; ++ ++invalid: ++ varlink_set_state(v, VARLINK_PROCESSING_FAILURE); ++ varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL); ++ varlink_close(v); ++ ++ return 1; ++} ++ ++static int varlink_dispatch_method(Varlink *v) { ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ VarlinkMethodFlags flags = 0; ++ const char *method = NULL, *error; ++ JsonVariant *k, *e; ++ VarlinkMethod callback; ++ int r; ++ ++ assert(v); ++ ++ if (v->state != VARLINK_IDLE_SERVER) ++ return 0; ++ if (!v->current) ++ return 0; ++ ++ if (!json_variant_is_object(v->current)) ++ goto invalid; ++ ++ JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) { ++ const char *n; ++ ++ assert_se(n = json_variant_string(k)); ++ ++ if (streq(n, "method")) { ++ if (method) ++ goto invalid; ++ if (!json_variant_is_string(e)) ++ goto invalid; ++ ++ method = json_variant_string(e); ++ ++ } else if (streq(n, "parameters")) { ++ if (parameters) ++ goto invalid; ++ if (!json_variant_is_object(e)) ++ goto invalid; ++ ++ parameters = json_variant_ref(e); ++ ++ } else if (streq(n, "oneway")) { ++ ++ if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0) ++ goto invalid; ++ ++ if (!json_variant_is_boolean(e)) ++ goto invalid; ++ ++ if (json_variant_boolean(e)) ++ flags |= VARLINK_METHOD_ONEWAY; ++ ++ } else if (streq(n, "more")) { ++ ++ if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0) ++ goto invalid; ++ ++ if (!json_variant_is_boolean(e)) ++ goto invalid; ++ ++ if (json_variant_boolean(e)) ++ flags |= VARLINK_METHOD_MORE; ++ ++ } else ++ goto invalid; ++ } ++ ++ if (!method) ++ goto invalid; ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ goto fail; ++ ++ varlink_set_state(v, (flags & VARLINK_METHOD_MORE) ? VARLINK_PROCESSING_METHOD_MORE : ++ (flags & VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY : ++ VARLINK_PROCESSING_METHOD); ++ ++ assert(v->server); ++ ++ if (STR_IN_SET(method, "org.varlink.service.GetInfo", "org.varlink.service.GetInterface")) { ++ /* For now, we don't implement a single of varlink's own methods */ ++ callback = NULL; ++ error = VARLINK_ERROR_METHOD_NOT_IMPLEMENTED; ++ } else if (startswith(method, "org.varlink.service.")) { ++ callback = NULL; ++ error = VARLINK_ERROR_METHOD_NOT_FOUND; ++ } else { ++ callback = hashmap_get(v->server->methods, method); ++ error = VARLINK_ERROR_METHOD_NOT_FOUND; ++ } ++ ++ if (callback) { ++ r = callback(v, parameters, flags, v->userdata); ++ if (r < 0) { ++ log_debug_errno(r, "Callback for %s returned error: %m", method); ++ ++ /* We got an error back from the callback. Propagate it to the client if the method call remains unanswered. */ ++ if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) { ++ r = varlink_errorb(v, VARLINK_ERROR_SYSTEM, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("errno", JSON_BUILD_INTEGER(-r)))); ++ if (r < 0) ++ return r; ++ } ++ } ++ } else if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) { ++ assert(error); ++ ++ r = varlink_errorb(v, error, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)))); ++ if (r < 0) ++ return r; ++ } ++ ++ switch (v->state) { ++ ++ case VARLINK_PROCESSED_METHOD: /* Method call is fully processed */ ++ case VARLINK_PROCESSING_METHOD_ONEWAY: /* ditto */ ++ v->current = json_variant_unref(v->current); ++ varlink_set_state(v, VARLINK_IDLE_SERVER); ++ break; ++ ++ case VARLINK_PROCESSING_METHOD: /* Method call wasn't replied to, will be replied to later */ ++ varlink_set_state(v, VARLINK_PENDING_METHOD); ++ break; ++ ++ case VARLINK_PROCESSED_METHOD_MORE: /* One reply for a "more" message was sent, more to come */ ++ case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */ ++ varlink_set_state(v, VARLINK_PENDING_METHOD_MORE); ++ break; ++ ++ default: ++ assert_not_reached("Unexpected state"); ++ ++ } ++ ++ return r; ++ ++invalid: ++ r = -EINVAL; ++ ++fail: ++ varlink_set_state(v, VARLINK_PROCESSING_FAILURE); ++ varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL); ++ varlink_close(v); ++ ++ return r; ++} ++ ++int varlink_process(Varlink *v) { ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ ++ varlink_ref(v); ++ ++ r = varlink_write(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_dispatch_reply(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_dispatch_method(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_parse_message(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_read(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_test_disconnect(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_dispatch_disconnect(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_test_timeout(v); ++ if (r != 0) ++ goto finish; ++ ++ r = varlink_dispatch_timeout(v); ++ if (r != 0) ++ goto finish; ++ ++finish: ++ if (r >= 0 && v->defer_event_source) { ++ int q; ++ ++ /* If we did some processing, make sure we are called again soon */ ++ q = sd_event_source_set_enabled(v->defer_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF); ++ if (q < 0) ++ r = q; ++ } ++ ++ if (r < 0) { ++ if (VARLINK_STATE_IS_ALIVE(v->state)) ++ /* Initiate disconnection */ ++ varlink_set_state(v, VARLINK_PENDING_DISCONNECT); ++ else ++ /* We failed while disconnecting, in that case close right away */ ++ varlink_close(v); ++ } ++ ++ varlink_unref(v); ++ return r; ++} ++ ++static void handle_revents(Varlink *v, int revents) { ++ assert(v); ++ ++ if (v->connecting) { ++ /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect() ++ * to complete on, we know we are ready. We don't read the connection error here though, ++ * we'll get the error on the next read() or write(). */ ++ if ((revents & (POLLOUT|POLLHUP)) == 0) ++ return; ++ ++ varlink_log(v, "Anynchronous connection completed."); ++ v->connecting = false; ++ } else { ++ /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing ++ * what we can. However, we do care about POLLHUP to detect connection termination even if we ++ * momentarily don't want to read nor write anything. */ ++ ++ if (!FLAGS_SET(revents, POLLHUP)) ++ return; ++ ++ varlink_log(v, "Got POLLHUP from socket."); ++ v->got_pollhup = true; ++ } ++} ++ ++int varlink_wait(Varlink *v, usec_t timeout) { ++ struct timespec ts; ++ struct pollfd pfd; ++ int r, fd, events; ++ usec_t t; ++ ++ assert_return(v, -EINVAL); ++ assert_return(!v->server, -ENOTTY); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ ++ r = varlink_get_timeout(v, &t); ++ if (r < 0) ++ return r; ++ if (t != USEC_INFINITY) { ++ usec_t n; ++ ++ n = now(CLOCK_MONOTONIC); ++ if (t < n) ++ t = 0; ++ else ++ t = usec_sub_unsigned(t, n); ++ } ++ ++ if (timeout != USEC_INFINITY && ++ (t == USEC_INFINITY || timeout < t)) ++ t = timeout; ++ ++ fd = varlink_get_fd(v); ++ if (fd < 0) ++ return fd; ++ ++ events = varlink_get_events(v); ++ if (events < 0) ++ return events; ++ ++ pfd = (struct pollfd) { ++ .fd = fd, ++ .events = events, ++ }; ++ ++ r = ppoll(&pfd, 1, ++ t == USEC_INFINITY ? NULL : timespec_store(&ts, t), ++ NULL); ++ if (r < 0) ++ return -errno; ++ ++ handle_revents(v, pfd.revents); ++ ++ return r > 0 ? 1 : 0; ++} ++ ++int varlink_get_fd(Varlink *v) { ++ ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ if (v->fd < 0) ++ return -EBADF; ++ ++ return v->fd; ++} ++ ++int varlink_get_events(Varlink *v) { ++ int ret = 0; ++ ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ ++ if (v->connecting) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which ++ * tells us that the connection is now complete. Before that we should neither ++ * write() or read() from the fd. */ ++ return EPOLLOUT; ++ ++ if (!v->read_disconnected && ++ IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && ++ !v->current && ++ v->input_buffer_unscanned <= 0) ++ ret |= EPOLLIN; ++ ++ if (!v->write_disconnected && ++ v->output_buffer_size > 0) ++ ret |= EPOLLOUT; ++ ++ return ret; ++} ++ ++int varlink_get_timeout(Varlink *v, usec_t *ret) { ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ ++ if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING) && ++ v->timeout != USEC_INFINITY) { ++ if (ret) ++ *ret = usec_add(v->timestamp, v->timeout); ++ return 1; ++ } else { ++ if (ret) ++ *ret = USEC_INFINITY; ++ return 0; ++ } ++} ++ ++int varlink_flush(Varlink *v) { ++ int ret = 0, r; ++ ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ ++ for (;;) { ++ struct pollfd pfd; ++ ++ if (v->output_buffer_size == 0) ++ break; ++ if (v->write_disconnected) ++ return -ECONNRESET; ++ ++ r = varlink_write(v); ++ if (r < 0) ++ return r; ++ if (r > 0) { ++ ret = 1; ++ continue; ++ } ++ ++ pfd = (struct pollfd) { ++ .fd = v->fd, ++ .events = POLLOUT, ++ }; ++ ++ if (poll(&pfd, 1, -1) < 0) ++ return -errno; ++ ++ handle_revents(v, pfd.revents); ++ } ++ ++ return ret; ++} ++ ++static void varlink_detach_server(Varlink *v) { ++ assert(v); ++ ++ if (!v->server) ++ return; ++ ++ if (v->server->by_uid && ++ v->ucred_acquired && ++ uid_is_valid(v->ucred.uid)) { ++ unsigned c; ++ ++ c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(v->ucred.uid))); ++ assert(c > 0); ++ ++ if (c == 1) ++ (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(v->ucred.uid)); ++ else ++ (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(v->ucred.uid), UINT_TO_PTR(c - 1)); ++ } ++ ++ assert(v->server->n_connections > 0); ++ v->server->n_connections--; ++ ++ /* If this is a connection associated to a server, then let's disconnect the server and the ++ * connection from each other. This drops the dangling reference that connect_callback() set up. */ ++ v->server = varlink_server_unref(v->server); ++ varlink_unref(v); ++} ++ ++int varlink_close(Varlink *v) { ++ ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return 0; ++ ++ varlink_set_state(v, VARLINK_DISCONNECTED); ++ ++ /* Let's take a reference first, since varlink_detach_server() might drop the final (dangling) ref ++ * which would destroy us before we can call varlink_clear() */ ++ varlink_ref(v); ++ varlink_detach_server(v); ++ varlink_clear(v); ++ varlink_unref(v); ++ ++ return 1; ++} ++ ++Varlink* varlink_flush_close_unref(Varlink *v) { ++ ++ if (!v) ++ return NULL; ++ ++ (void) varlink_flush(v); ++ (void) varlink_close(v); ++ ++ return varlink_unref(v); ++} ++ ++static int varlink_enqueue_json(Varlink *v, JsonVariant *m) { ++ _cleanup_free_ char *text = NULL; ++ int r; ++ ++ assert(v); ++ assert(m); ++ ++ r = json_variant_format(m, 0, &text); ++ if (r < 0) ++ return r; ++ ++ if (v->output_buffer_size + r + 1 > VARLINK_BUFFER_MAX) ++ return -ENOBUFS; ++ ++ varlink_log(v, "Sending message: %s", text); ++ ++ if (v->output_buffer_size == 0) { ++ ++ free_and_replace(v->output_buffer, text); ++ ++ v->output_buffer_size = v->output_buffer_allocated = r + 1; ++ v->output_buffer_index = 0; ++ ++ } else if (v->output_buffer_index == 0) { ++ ++ if (!GREEDY_REALLOC(v->output_buffer, v->output_buffer_allocated, v->output_buffer_size + r + 1)) ++ return -ENOMEM; ++ ++ memcpy(v->output_buffer + v->output_buffer_size, text, r + 1); ++ v->output_buffer_size += r + 1; ++ ++ } else { ++ char *n; ++ ++ n = new(char, v->output_buffer_size + r + 1); ++ if (!n) ++ return -ENOMEM; ++ ++ memcpy(mempcpy(n, v->output_buffer + v->output_buffer_index, v->output_buffer_size), text, r + 1); ++ ++ free_and_replace(v->output_buffer, n); ++ v->output_buffer_size += r + 1; ++ v->output_buffer_index = 0; ++ } ++ ++ return 0; ++} ++ ++int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) { ++ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(method, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY)) ++ return -EBUSY; ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ return r; ++ ++ r = json_build(&m, JSON_BUILD_OBJECT( ++ JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)), ++ JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)), ++ JSON_BUILD_PAIR("oneway", JSON_BUILD_BOOLEAN(true)))); ++ if (r < 0) ++ return r; ++ ++ r = varlink_enqueue_json(v, m); ++ if (r < 0) ++ return r; ++ ++ /* No state change here, this is one-way only after all */ ++ v->timestamp = now(CLOCK_MONOTONIC); ++ return 0; ++} ++ ++int varlink_sendb(Varlink *v, const char *method, ...) { ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ va_list ap; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ va_start(ap, method); ++ r = json_buildv(¶meters, ap); ++ va_end(ap); ++ ++ if (r < 0) ++ return r; ++ ++ return varlink_send(v, method, parameters); ++} ++ ++int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) { ++ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(method, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY)) ++ return -EBUSY; ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ return r; ++ ++ r = json_build(&m, JSON_BUILD_OBJECT( ++ JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)), ++ JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)))); ++ if (r < 0) ++ return r; ++ ++ r = varlink_enqueue_json(v, m); ++ if (r < 0) ++ return r; ++ ++ varlink_set_state(v, VARLINK_AWAITING_REPLY); ++ v->n_pending++; ++ v->timestamp = now(CLOCK_MONOTONIC); ++ ++ return 0; ++} ++ ++int varlink_invokeb(Varlink *v, const char *method, ...) { ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ va_list ap; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ va_start(ap, method); ++ r = json_buildv(¶meters, ap); ++ va_end(ap); ++ ++ if (r < 0) ++ return r; ++ ++ return varlink_invoke(v, method, parameters); ++} ++ ++int varlink_call( ++ Varlink *v, ++ const char *method, ++ JsonVariant *parameters, ++ JsonVariant **ret_parameters, ++ const char **ret_error_id, ++ VarlinkReplyFlags *ret_flags) { ++ ++ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(method, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ if (!IN_SET(v->state, VARLINK_IDLE_CLIENT)) ++ return -EBUSY; ++ ++ assert(v->n_pending == 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */ ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ return r; ++ ++ r = json_build(&m, JSON_BUILD_OBJECT( ++ JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)), ++ JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)))); ++ if (r < 0) ++ return r; ++ ++ r = varlink_enqueue_json(v, m); ++ if (r < 0) ++ return r; ++ ++ varlink_set_state(v, VARLINK_CALLING); ++ v->n_pending++; ++ v->timestamp = now(CLOCK_MONOTONIC); ++ ++ while (v->state == VARLINK_CALLING) { ++ ++ r = varlink_process(v); ++ if (r < 0) ++ return r; ++ if (r > 0) ++ continue; ++ ++ r = varlink_wait(v, USEC_INFINITY); ++ if (r < 0) ++ return r; ++ } ++ ++ switch (v->state) { ++ ++ case VARLINK_CALLED: ++ assert(v->current); ++ ++ json_variant_unref(v->reply); ++ v->reply = TAKE_PTR(v->current); ++ ++ varlink_set_state(v, VARLINK_IDLE_CLIENT); ++ assert(v->n_pending == 1); ++ v->n_pending--; ++ ++ if (ret_parameters) ++ *ret_parameters = json_variant_by_key(v->reply, "parameters"); ++ if (ret_error_id) ++ *ret_error_id = json_variant_string(json_variant_by_key(v->reply, "error")); ++ if (ret_flags) ++ *ret_flags = 0; ++ ++ return 1; ++ ++ case VARLINK_PENDING_DISCONNECT: ++ case VARLINK_DISCONNECTED: ++ return -ECONNRESET; ++ ++ case VARLINK_PENDING_TIMEOUT: ++ return -ETIME; ++ ++ default: ++ assert_not_reached("Unexpected state after method call."); ++ } ++} ++ ++int varlink_callb( ++ Varlink *v, ++ const char *method, ++ JsonVariant **ret_parameters, ++ const char **ret_error_id, ++ VarlinkReplyFlags *ret_flags, ...) { ++ ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ va_list ap; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ va_start(ap, ret_flags); ++ r = json_buildv(¶meters, ap); ++ va_end(ap); ++ ++ if (r < 0) ++ return r; ++ ++ return varlink_call(v, method, parameters, ret_parameters, ret_error_id, ret_flags); ++} ++ ++int varlink_reply(Varlink *v, JsonVariant *parameters) { ++ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ if (!IN_SET(v->state, ++ VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE, ++ VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) ++ return -EBUSY; ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ return r; ++ ++ r = json_build(&m, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)))); ++ if (r < 0) ++ return r; ++ ++ r = varlink_enqueue_json(v, m); ++ if (r < 0) ++ return r; ++ ++ if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) { ++ /* We just replied to a method call that was let hanging for a while (i.e. we were outside of ++ * the varlink_dispatch_method() stack frame), which means with this reply we are ready to ++ * process further messages. */ ++ v->current = json_variant_unref(v->current); ++ varlink_set_state(v, VARLINK_IDLE_SERVER); ++ } else ++ /* We replied to a method call from within the varlink_dispatch_method() stack frame), which ++ * means we should it handle the rest of the state engine. */ ++ varlink_set_state(v, VARLINK_PROCESSED_METHOD); ++ ++ return 1; ++} ++ ++int varlink_replyb(Varlink *v, ...) { ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ va_list ap; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ va_start(ap, v); ++ r = json_buildv(¶meters, ap); ++ va_end(ap); ++ ++ if (r < 0) ++ return r; ++ ++ return varlink_reply(v, parameters); ++} ++ ++int varlink_error(Varlink *v, const char *error_id, JsonVariant *parameters) { ++ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(error_id, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ if (!IN_SET(v->state, ++ VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE, ++ VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) ++ return -EBUSY; ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ return r; ++ ++ r = json_build(&m, JSON_BUILD_OBJECT( ++ JSON_BUILD_PAIR("error", JSON_BUILD_STRING(error_id)), ++ JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)))); ++ if (r < 0) ++ return r; ++ ++ r = varlink_enqueue_json(v, m); ++ if (r < 0) ++ return r; ++ ++ if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) { ++ v->current = json_variant_unref(v->current); ++ varlink_set_state(v, VARLINK_IDLE_SERVER); ++ } else ++ varlink_set_state(v, VARLINK_PROCESSED_METHOD); ++ ++ return 1; ++} ++ ++int varlink_errorb(Varlink *v, const char *error_id, ...) { ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ va_list ap; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(error_id, -EINVAL); ++ ++ va_start(ap, error_id); ++ r = json_buildv(¶meters, ap); ++ va_end(ap); ++ ++ if (r < 0) ++ return r; ++ ++ return varlink_error(v, error_id, parameters); ++} ++ ++int varlink_error_invalid_parameter(Varlink *v, JsonVariant *parameters) { ++ ++ assert_return(v, -EINVAL); ++ assert_return(parameters, -EINVAL); ++ ++ /* We expect to be called in one of two ways: the 'parameters' argument is a string variant in which ++ * case it is the parameter key name that is invalid. Or the 'parameters' argument is an object ++ * variant in which case we'll pull out the first key. The latter mode is useful in functions that ++ * don't expect any arguments. */ ++ ++ if (json_variant_is_string(parameters)) ++ return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER, parameters); ++ ++ if (json_variant_is_object(parameters) && ++ json_variant_elements(parameters) > 0) ++ return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER, ++ json_variant_by_index(parameters, 0)); ++ ++ return -EINVAL; ++} ++ ++int varlink_notify(Varlink *v, JsonVariant *parameters) { ++ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ if (v->state == VARLINK_DISCONNECTED) ++ return -ENOTCONN; ++ if (!IN_SET(v->state, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD_MORE)) ++ return -EBUSY; ++ ++ r = varlink_sanitize_parameters(¶meters); ++ if (r < 0) ++ return r; ++ ++ r = json_build(&m, JSON_BUILD_OBJECT( ++ JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)), ++ JSON_BUILD_PAIR("continues", JSON_BUILD_BOOLEAN(true)))); ++ if (r < 0) ++ return r; ++ ++ r = varlink_enqueue_json(v, m); ++ if (r < 0) ++ return r; ++ ++ /* No state change, as more is coming */ ++ return 1; ++} ++ ++int varlink_notifyb(Varlink *v, ...) { ++ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; ++ va_list ap; ++ int r; ++ ++ assert_return(v, -EINVAL); ++ ++ va_start(ap, v); ++ r = json_buildv(¶meters, ap); ++ va_end(ap); ++ ++ if (r < 0) ++ return r; ++ ++ return varlink_notify(v, parameters); ++} ++ ++int varlink_bind_reply(Varlink *v, VarlinkReply callback) { ++ assert_return(v, -EINVAL); ++ ++ if (callback && v->reply_callback && callback != v->reply_callback) ++ return -EBUSY; ++ ++ v->reply_callback = callback; ++ ++ return 0; ++} ++ ++void* varlink_set_userdata(Varlink *v, void *userdata) { ++ void *old; ++ ++ assert_return(v, NULL); ++ ++ old = v->userdata; ++ v->userdata = userdata; ++ ++ return old; ++} ++ ++void* varlink_get_userdata(Varlink *v) { ++ assert_return(v, NULL); ++ ++ return v->userdata; ++} ++ ++static int varlink_acquire_ucred(Varlink *v) { ++ int r; ++ ++ assert(v); ++ ++ if (v->ucred_acquired) ++ return 0; ++ ++ r = getpeercred(v->fd, &v->ucred); ++ if (r < 0) ++ return r; ++ ++ v->ucred_acquired = true; ++ return 0; ++} ++ ++int varlink_get_peer_uid(Varlink *v, uid_t *ret) { ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(ret, -EINVAL); ++ ++ r = varlink_acquire_ucred(v); ++ if (r < 0) ++ return r; ++ ++ if (!uid_is_valid(v->ucred.uid)) ++ return -ENODATA; ++ ++ *ret = v->ucred.uid; ++ return 0; ++} ++ ++int varlink_get_peer_pid(Varlink *v, pid_t *ret) { ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(ret, -EINVAL); ++ ++ r = varlink_acquire_ucred(v); ++ if (r < 0) ++ return r; ++ ++ if (!pid_is_valid(v->ucred.pid)) ++ return -ENODATA; ++ ++ *ret = v->ucred.pid; ++ return 0; ++} ++ ++int varlink_set_relative_timeout(Varlink *v, usec_t timeout) { ++ assert_return(v, -EINVAL); ++ assert_return(timeout > 0, -EINVAL); ++ ++ v->timeout = timeout; ++ return 0; ++} ++ ++VarlinkServer *varlink_get_server(Varlink *v) { ++ assert_return(v, NULL); ++ ++ return v->server; ++} ++ ++int varlink_set_description(Varlink *v, const char *description) { ++ assert_return(v, -EINVAL); ++ ++ return free_and_strdup(&v->description, description); ++} ++ ++static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) { ++ Varlink *v = userdata; ++ ++ assert(s); ++ assert(v); ++ ++ handle_revents(v, revents); ++ (void) varlink_process(v); ++ ++ return 1; ++} ++ ++static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) { ++ Varlink *v = userdata; ++ ++ assert(s); ++ assert(v); ++ ++ (void) varlink_process(v); ++ return 1; ++} ++ ++static int defer_callback(sd_event_source *s, void *userdata) { ++ Varlink *v = userdata; ++ ++ assert(s); ++ assert(v); ++ ++ (void) varlink_process(v); ++ return 1; ++} ++ ++static int prepare_callback(sd_event_source *s, void *userdata) { ++ Varlink *v = userdata; ++ int r, e; ++ usec_t until; ++ ++ assert(s); ++ assert(v); ++ ++ e = varlink_get_events(v); ++ if (e < 0) ++ return e; ++ ++ r = sd_event_source_set_io_events(v->io_event_source, e); ++ if (r < 0) ++ return r; ++ ++ r = varlink_get_timeout(v, &until); ++ if (r < 0) ++ return r; ++ if (r > 0) { ++ r = sd_event_source_set_time(v->time_event_source, until); ++ if (r < 0) ++ return r; ++ } ++ ++ r = sd_event_source_set_enabled(v->time_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF); ++ if (r < 0) ++ return r; ++ ++ return 1; ++} ++ ++static int quit_callback(sd_event_source *event, void *userdata) { ++ Varlink *v = userdata; ++ ++ assert(event); ++ assert(v); ++ ++ varlink_flush(v); ++ varlink_close(v); ++ ++ return 1; ++} ++ ++int varlink_attach_event(Varlink *v, sd_event *e, int64_t priority) { ++ int r; ++ ++ assert_return(v, -EINVAL); ++ assert_return(!v->event, -EBUSY); ++ ++ if (e) ++ v->event = sd_event_ref(e); ++ else { ++ r = sd_event_default(&v->event); ++ if (r < 0) ++ return r; ++ } ++ ++ r = sd_event_add_time(v->event, &v->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, v); ++ if (r < 0) ++ goto fail; ++ ++ r = sd_event_source_set_priority(v->time_event_source, priority); ++ if (r < 0) ++ goto fail; ++ ++ (void) sd_event_source_set_description(v->time_event_source, "varlink-time"); ++ ++ r = sd_event_add_exit(v->event, &v->quit_event_source, quit_callback, v); ++ if (r < 0) ++ goto fail; ++ ++ r = sd_event_source_set_priority(v->quit_event_source, priority); ++ if (r < 0) ++ goto fail; ++ ++ (void) sd_event_source_set_description(v->quit_event_source, "varlink-quit"); ++ ++ r = sd_event_add_io(v->event, &v->io_event_source, v->fd, 0, io_callback, v); ++ if (r < 0) ++ goto fail; ++ ++ r = sd_event_source_set_prepare(v->io_event_source, prepare_callback); ++ if (r < 0) ++ goto fail; ++ ++ r = sd_event_source_set_priority(v->io_event_source, priority); ++ if (r < 0) ++ goto fail; ++ ++ (void) sd_event_source_set_description(v->io_event_source, "varlink-io"); ++ ++ r = sd_event_add_defer(v->event, &v->defer_event_source, defer_callback, v); ++ if (r < 0) ++ goto fail; ++ ++ r = sd_event_source_set_priority(v->defer_event_source, priority); ++ if (r < 0) ++ goto fail; ++ ++ (void) sd_event_source_set_description(v->defer_event_source, "varlink-defer"); ++ ++ return 0; ++ ++fail: ++ varlink_detach_event(v); ++ return r; ++} ++ ++ ++void varlink_detach_event(Varlink *v) { ++ if (!v) ++ return; ++ ++ varlink_detach_event_sources(v); ++ ++ v->event = sd_event_unref(v->event); ++} ++ ++sd_event *varlink_get_event(Varlink *v) { ++ assert_return(v, NULL); ++ ++ return v->event; ++} ++ ++int varlink_server_new(VarlinkServer **ret, VarlinkServerFlags flags) { ++ VarlinkServer *s; ++ ++ assert_return(ret, -EINVAL); ++ assert_return((flags & ~_VARLINK_SERVER_FLAGS_ALL) == 0, -EINVAL); ++ ++ s = new(VarlinkServer, 1); ++ if (!s) ++ return -ENOMEM; ++ ++ *s = (VarlinkServer) { ++ .n_ref = 1, ++ .flags = flags, ++ .connections_max = varlink_server_connections_max(NULL), ++ .connections_per_uid_max = varlink_server_connections_per_uid_max(NULL), ++ }; ++ ++ *ret = s; ++ return 0; ++} ++ ++static VarlinkServer* varlink_server_destroy(VarlinkServer *s) { ++ char *m; ++ ++ if (!s) ++ return NULL; ++ ++ varlink_server_shutdown(s); ++ ++ while ((m = hashmap_steal_first_key(s->methods))) ++ free(m); ++ ++ hashmap_free(s->methods); ++ hashmap_free(s->by_uid); ++ ++ sd_event_unref(s->event); ++ ++ free(s->description); ++ ++ return mfree(s); ++} ++ ++DEFINE_TRIVIAL_REF_UNREF_FUNC(VarlinkServer, varlink_server, varlink_server_destroy); ++ ++static int validate_connection(VarlinkServer *server, const struct ucred *ucred) { ++ int allowed = -1; ++ ++ assert(server); ++ assert(ucred); ++ ++ if (FLAGS_SET(server->flags, VARLINK_SERVER_ROOT_ONLY)) ++ allowed = ucred->uid == 0; ++ ++ if (FLAGS_SET(server->flags, VARLINK_SERVER_MYSELF_ONLY)) ++ allowed = allowed > 0 || ucred->uid == getuid(); ++ ++ if (allowed == 0) { /* Allow access when it is explicitly allowed or when neither ++ * VARLINK_SERVER_ROOT_ONLY nor VARLINK_SERVER_MYSELF_ONLY are specified. */ ++ varlink_server_log(server, "Unprivileged client attempted connection, refusing."); ++ return 0; ++ } ++ ++ if (server->n_connections >= server->connections_max) { ++ varlink_server_log(server, "Connection limit of %u reached, refusing.", server->connections_max); ++ return 0; ++ } ++ ++ if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) { ++ unsigned c; ++ ++ if (!uid_is_valid(ucred->uid)) { ++ varlink_server_log(server, "Client with invalid UID attempted connection, refusing."); ++ return 0; ++ } ++ ++ c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid))); ++ if (c >= server->connections_per_uid_max) { ++ varlink_server_log(server, "Per-UID connection limit of %u reached, refusing.", ++ server->connections_per_uid_max); ++ return 0; ++ } ++ } ++ ++ return 1; ++} ++ ++static int count_connection(VarlinkServer *server, struct ucred *ucred) { ++ unsigned c; ++ int r; ++ ++ assert(server); ++ assert(ucred); ++ ++ server->n_connections++; ++ ++ if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) { ++ r = hashmap_ensure_allocated(&server->by_uid, NULL); ++ if (r < 0) ++ return log_debug_errno(r, "Failed to allocate UID hash table: %m"); ++ ++ c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid))); ++ ++ varlink_server_log(server, "Connections of user " UID_FMT ": %u (of %u max)", ++ ucred->uid, c, server->connections_per_uid_max); ++ ++ r = hashmap_replace(server->by_uid, UID_TO_PTR(ucred->uid), UINT_TO_PTR(c + 1)); ++ if (r < 0) ++ return log_debug_errno(r, "Failed to increment counter in UID hash table: %m"); ++ } ++ ++ return 0; ++} ++ ++int varlink_server_add_connection(VarlinkServer *server, int fd, Varlink **ret) { ++ _cleanup_(varlink_unrefp) Varlink *v = NULL; ++ bool ucred_acquired; ++ struct ucred ucred; ++ int r; ++ ++ assert_return(server, -EINVAL); ++ assert_return(fd >= 0, -EBADF); ++ ++ if ((server->flags & (VARLINK_SERVER_ROOT_ONLY|VARLINK_SERVER_ACCOUNT_UID)) != 0) { ++ r = getpeercred(fd, &ucred); ++ if (r < 0) ++ return varlink_server_log_errno(server, r, "Failed to acquire peer credentials of incoming socket, refusing: %m"); ++ ++ ucred_acquired = true; ++ ++ r = validate_connection(server, &ucred); ++ if (r < 0) ++ return r; ++ if (r == 0) ++ return -EPERM; ++ } else ++ ucred_acquired = false; ++ ++ r = varlink_new(&v); ++ if (r < 0) ++ return varlink_server_log_errno(server, r, "Failed to allocate connection object: %m"); ++ ++ r = count_connection(server, &ucred); ++ if (r < 0) ++ return r; ++ ++ v->fd = fd; ++ v->userdata = server->userdata; ++ if (ucred_acquired) { ++ v->ucred = ucred; ++ v->ucred_acquired = true; ++ } ++ ++ (void) asprintf(&v->description, "%s-%i", server->description ?: "varlink", v->fd); ++ ++ /* Link up the server and the connection, and take reference in both directions. Note that the ++ * reference on the connection is left dangling. It will be dropped when the connection is closed, ++ * which happens in varlink_close(), including in the event loop quit callback. */ ++ v->server = varlink_server_ref(server); ++ varlink_ref(v); ++ ++ varlink_set_state(v, VARLINK_IDLE_SERVER); ++ ++ r = varlink_attach_event(v, server->event, server->event_priority); ++ if (r < 0) { ++ varlink_log_errno(v, r, "Failed to attach new connection: %m"); ++ v->fd = -1; /* take the fd out of the connection again */ ++ varlink_close(v); ++ return r; ++ } ++ ++ if (ret) ++ *ret = v; ++ ++ return 0; ++} ++ ++static int connect_callback(sd_event_source *source, int fd, uint32_t revents, void *userdata) { ++ VarlinkServerSocket *ss = userdata; ++ _cleanup_close_ int cfd = -1; ++ Varlink *v = NULL; ++ int r; ++ ++ assert(source); ++ assert(ss); ++ ++ varlink_server_log(ss->server, "New incoming connection."); ++ ++ cfd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); ++ if (cfd < 0) { ++ if (ERRNO_IS_ACCEPT_AGAIN(errno)) ++ return 0; ++ ++ return varlink_server_log_errno(ss->server, errno, "Failed to accept incoming socket: %m"); ++ } ++ ++ r = varlink_server_add_connection(ss->server, cfd, &v); ++ if (r < 0) ++ return 0; ++ ++ TAKE_FD(cfd); ++ ++ if (ss->server->connect_callback) { ++ r = ss->server->connect_callback(ss->server, v, ss->server->userdata); ++ if (r < 0) { ++ varlink_log_errno(v, r, "Connection callback returned error, disconnecting client: %m"); ++ varlink_close(v); ++ return 0; ++ } ++ } ++ ++ return 0; ++} ++ ++int varlink_server_listen_fd(VarlinkServer *s, int fd) { ++ _cleanup_free_ VarlinkServerSocket *ss = NULL; ++ int r; ++ ++ assert_return(s, -EINVAL); ++ assert_return(fd >= 0, -EBADF); ++ ++ r = fd_nonblock(fd, true); ++ if (r < 0) ++ return r; ++ ++ ss = new(VarlinkServerSocket, 1); ++ if (!ss) ++ return -ENOMEM; ++ ++ *ss = (VarlinkServerSocket) { ++ .server = s, ++ .fd = fd, ++ }; ++ ++ if (s->event) { ++ _cleanup_(sd_event_source_unrefp) sd_event_source *es = NULL; ++ ++ r = sd_event_add_io(s->event, &es, fd, EPOLLIN, connect_callback, ss); ++ if (r < 0) ++ return r; ++ ++ r = sd_event_source_set_priority(ss->event_source, s->event_priority); ++ if (r < 0) ++ return r; ++ } ++ ++ LIST_PREPEND(sockets, s->sockets, TAKE_PTR(ss)); ++ return 0; ++} ++ ++int varlink_server_listen_address(VarlinkServer *s, const char *address, mode_t m) { ++ union sockaddr_union sockaddr; ++ _cleanup_close_ int fd = -1; ++ int r; ++ ++ assert_return(s, -EINVAL); ++ assert_return(address, -EINVAL); ++ assert_return((m & ~0777) == 0, -EINVAL); ++ ++ r = sockaddr_un_set_path(&sockaddr.un, address); ++ if (r < 0) ++ return r; ++ ++ fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); ++ if (fd < 0) ++ return -errno; ++ ++ (void) sockaddr_un_unlink(&sockaddr.un); ++ ++ RUN_WITH_UMASK(~m & 0777) ++ if (bind(fd, &sockaddr.sa, SOCKADDR_UN_LEN(sockaddr.un)) < 0) ++ return -errno; ++ ++ if (listen(fd, SOMAXCONN) < 0) ++ return -errno; ++ ++ r = varlink_server_listen_fd(s, fd); ++ if (r < 0) ++ return r; ++ ++ TAKE_FD(fd); ++ return 0; ++} ++ ++void* varlink_server_set_userdata(VarlinkServer *s, void *userdata) { ++ void *ret; ++ ++ assert_return(s, NULL); ++ ++ ret = s->userdata; ++ s->userdata = userdata; ++ ++ return ret; ++} ++ ++void* varlink_server_get_userdata(VarlinkServer *s) { ++ assert_return(s, NULL); ++ ++ return s->userdata; ++} ++ ++static VarlinkServerSocket* varlink_server_socket_destroy(VarlinkServerSocket *ss) { ++ if (!ss) ++ return NULL; ++ ++ if (ss->server) ++ LIST_REMOVE(sockets, ss->server->sockets, ss); ++ ++ if (ss->event_source) { ++ (void) sd_event_source_set_enabled(ss->event_source, SD_EVENT_OFF); ++ sd_event_source_unref(ss->event_source); ++ } ++ ++ free(ss->address); ++ safe_close(ss->fd); ++ ++ return mfree(ss); ++} ++ ++int varlink_server_shutdown(VarlinkServer *s) { ++ assert_return(s, -EINVAL); ++ ++ while (s->sockets) ++ varlink_server_socket_destroy(s->sockets); ++ ++ return 0; ++} ++ ++int varlink_server_attach_event(VarlinkServer *s, sd_event *e, int64_t priority) { ++ VarlinkServerSocket *ss; ++ int r; ++ ++ assert_return(s, -EINVAL); ++ assert_return(!s->event, -EBUSY); ++ ++ if (e) ++ s->event = sd_event_ref(e); ++ else { ++ r = sd_event_default(&s->event); ++ if (r < 0) ++ return r; ++ } ++ ++ LIST_FOREACH(sockets, ss, s->sockets) { ++ assert(!ss->event_source); ++ ++ r = sd_event_add_io(s->event, &ss->event_source, ss->fd, EPOLLIN, connect_callback, ss); ++ if (r < 0) ++ goto fail; ++ ++ r = sd_event_source_set_priority(ss->event_source, priority); ++ if (r < 0) ++ goto fail; ++ } ++ ++ s->event_priority = priority; ++ return 0; ++ ++fail: ++ varlink_server_detach_event(s); ++ return r; ++} ++ ++int varlink_server_detach_event(VarlinkServer *s) { ++ VarlinkServerSocket *ss; ++ ++ assert_return(s, -EINVAL); ++ ++ LIST_FOREACH(sockets, ss, s->sockets) { ++ ++ if (!ss->event_source) ++ continue; ++ ++ (void) sd_event_source_set_enabled(ss->event_source, SD_EVENT_OFF); ++ ss->event_source = sd_event_source_unref(ss->event_source); ++ } ++ ++ sd_event_unref(s->event); ++ return 0; ++} ++ ++sd_event *varlink_server_get_event(VarlinkServer *s) { ++ assert_return(s, NULL); ++ ++ return s->event; ++} ++ ++int varlink_server_bind_method(VarlinkServer *s, const char *method, VarlinkMethod callback) { ++ char *m; ++ int r; ++ ++ assert_return(s, -EINVAL); ++ assert_return(method, -EINVAL); ++ assert_return(callback, -EINVAL); ++ ++ if (startswith(method, "org.varlink.service.")) ++ return -EEXIST; ++ ++ r = hashmap_ensure_allocated(&s->methods, &string_hash_ops); ++ if (r < 0) ++ return r; ++ ++ m = strdup(method); ++ if (!m) ++ return -ENOMEM; ++ ++ r = hashmap_put(s->methods, m, callback); ++ if (r < 0) { ++ free(m); ++ return r; ++ } ++ ++ return 0; ++} ++ ++int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) { ++ va_list ap; ++ int r; ++ ++ assert_return(s, -EINVAL); ++ ++ va_start(ap, s); ++ for (;;) { ++ VarlinkMethod callback; ++ const char *method; ++ ++ method = va_arg(ap, const char *); ++ if (!method) ++ break; ++ ++ callback = va_arg(ap, VarlinkMethod); ++ ++ r = varlink_server_bind_method(s, method, callback); ++ if (r < 0) ++ return r; ++ } ++ ++ return 0; ++} ++ ++int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) { ++ assert_return(s, -EINVAL); ++ ++ if (callback && s->connect_callback && callback != s->connect_callback) ++ return -EBUSY; ++ ++ s->connect_callback = callback; ++ return 0; ++} ++ ++unsigned varlink_server_connections_max(VarlinkServer *s) { ++ struct rlimit rl; ++ ++ /* If a server is specified, return the setting for that server, otherwise the default value */ ++ if (s) ++ return s->connections_max; ++ ++ assert_se(getrlimit(RLIMIT_NOFILE, &rl) >= 0); ++ ++ /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */ ++ if (VARLINK_DEFAULT_CONNECTIONS_MAX > rl.rlim_cur / 4 * 3) ++ return rl.rlim_cur / 4 * 3; ++ ++ return VARLINK_DEFAULT_CONNECTIONS_MAX; ++} ++ ++unsigned varlink_server_connections_per_uid_max(VarlinkServer *s) { ++ unsigned m; ++ ++ if (s) ++ return s->connections_per_uid_max; ++ ++ /* Make sure to never use up more than ¾th of available connections for a single user */ ++ m = varlink_server_connections_max(NULL); ++ if (VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX > m) ++ return m / 4 * 3; ++ ++ return VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX; ++} ++ ++int varlink_server_set_connections_per_uid_max(VarlinkServer *s, unsigned m) { ++ assert_return(s, -EINVAL); ++ assert_return(m > 0, -EINVAL); ++ ++ s->connections_per_uid_max = m; ++ return 0; ++} ++ ++int varlink_server_set_connections_max(VarlinkServer *s, unsigned m) { ++ assert_return(s, -EINVAL); ++ assert_return(m > 0, -EINVAL); ++ ++ s->connections_max = m; ++ return 0; ++} ++ ++int varlink_server_set_description(VarlinkServer *s, const char *description) { ++ assert_return(s, -EINVAL); ++ ++ return free_and_strdup(&s->description, description); ++} +diff --git a/src/shared/varlink.h b/src/shared/varlink.h +new file mode 100644 +index 0000000..d96fa93 +--- /dev/null ++++ b/src/shared/varlink.h +@@ -0,0 +1,162 @@ ++/* SPDX-License-Identifier: LGPL-2.1+ */ ++#pragma once ++ ++#include "sd-event.h" ++ ++#include "json.h" ++#include "time-util.h" ++ ++/* A minimal Varlink implementation. We only implement the minimal, obvious bits here though. No validation, ++ * no introspection, no name service, just the stuff actually needed. ++ * ++ * You might wonder why we aren't using libvarlink here? Varlink is a very simple protocol, which allows us ++ * to write our own implementation relatively easily. However, the main reasons are these: ++ * ++ * • We want to use our own JSON subsystem, with all the benefits that brings (i.e. accurate unsigned+signed ++ * 64bit integers, full fuzzing, logging during parsing and so on). If we'd want to use that with ++ * libvarlink we'd have to serialize and deserialize all the time from its own representation which is ++ * inefficient and nasty. ++ * ++ * • We want integration into sd-event, but also synchronous event-loop-less operation ++ * ++ * • We need proper per-UID accounting and access control, since we want to allow communication between ++ * unprivileged clients and privileged servers. ++ * ++ * • And of course, we don't want the name service and introspection stuff for now (though that might ++ * change). ++ */ ++ ++typedef struct Varlink Varlink; ++typedef struct VarlinkServer VarlinkServer; ++ ++typedef enum VarlinkReplyFlags { ++ VARLINK_REPLY_ERROR = 1 << 0, ++ VARLINK_REPLY_CONTINUES = 1 << 1, ++ VARLINK_REPLY_LOCAL = 1 << 2, ++} VarlinkReplyFlags; ++ ++typedef enum VarlinkMethodFlags { ++ VARLINK_METHOD_ONEWAY = 1 << 0, ++ VARLINK_METHOD_MORE = 2 << 1, ++} VarlinkMethodFlags; ++ ++typedef enum VarlinkServerFlags { ++ VARLINK_SERVER_ROOT_ONLY = 1 << 0, /* Only accessible by root */ ++ VARLINK_SERVER_MYSELF_ONLY = 1 << 1, /* Only accessible by our own UID */ ++ VARLINK_SERVER_ACCOUNT_UID = 1 << 2, /* Do per user accounting */ ++ ++ _VARLINK_SERVER_FLAGS_ALL = (1 << 3) - 1, ++} VarlinkServerFlags; ++ ++typedef int (*VarlinkMethod)(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata); ++typedef int (*VarlinkReply)(Varlink *link, JsonVariant *parameters, const char *error_id, VarlinkReplyFlags flags, void *userdata); ++typedef int (*VarlinkConnect)(VarlinkServer *server, Varlink *link, void *userdata); ++ ++int varlink_connect_address(Varlink **ret, const char *address); ++int varlink_connect_fd(Varlink **ret, int fd); ++ ++Varlink* varlink_ref(Varlink *link); ++Varlink* varlink_unref(Varlink *v); ++ ++int varlink_get_fd(Varlink *v); ++int varlink_get_events(Varlink *v); ++int varlink_get_timeout(Varlink *v, usec_t *ret); ++ ++int varlink_attach_event(Varlink *v, sd_event *e, int64_t priority); ++void varlink_detach_event(Varlink *v); ++sd_event *varlink_get_event(Varlink *v); ++ ++int varlink_process(Varlink *v); ++int varlink_wait(Varlink *v, usec_t timeout); ++ ++int varlink_flush(Varlink *v); ++int varlink_close(Varlink *v); ++ ++Varlink* varlink_flush_close_unref(Varlink *v); ++ ++/* Enqueue method call, not expecting a reply */ ++int varlink_send(Varlink *v, const char *method, JsonVariant *parameters); ++int varlink_sendb(Varlink *v, const char *method, ...); ++ ++/* Send method call and wait for reply */ ++int varlink_call(Varlink *v, const char *method, JsonVariant *parameters, JsonVariant **ret_parameters, const char **ret_error_id, VarlinkReplyFlags *ret_flags); ++int varlink_callb(Varlink *v, const char *method, JsonVariant **ret_parameters, const char **ret_error_id, VarlinkReplyFlags *ret_flags, ...); ++ ++/* Enqueue method call, expect a reply, which is eventually delivered to the reply callback */ ++int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters); ++int varlink_invokeb(Varlink *v, const char *method, ...); ++ ++/* Enqueue a final reply */ ++int varlink_reply(Varlink *v, JsonVariant *parameters); ++int varlink_replyb(Varlink *v, ...); ++ ++/* Enqueue a (final) error */ ++int varlink_error(Varlink *v, const char *error_id, JsonVariant *parameters); ++int varlink_errorb(Varlink *v, const char *error_id, ...); ++int varlink_error_invalid_parameter(Varlink *v, JsonVariant *parameters); ++ ++/* Enqueue a "more" reply */ ++int varlink_notify(Varlink *v, JsonVariant *parameters); ++int varlink_notifyb(Varlink *v, ...); ++ ++/* Bind a disconnect, reply or timeout callback */ ++int varlink_bind_reply(Varlink *v, VarlinkReply reply); ++ ++void* varlink_set_userdata(Varlink *v, void *userdata); ++void* varlink_get_userdata(Varlink *v); ++ ++int varlink_get_peer_uid(Varlink *v, uid_t *ret); ++int varlink_get_peer_pid(Varlink *v, pid_t *ret); ++ ++int varlink_set_relative_timeout(Varlink *v, usec_t usec); ++ ++VarlinkServer* varlink_get_server(Varlink *v); ++ ++int varlink_set_description(Varlink *v, const char *d); ++ ++/* Create a varlink server */ ++int varlink_server_new(VarlinkServer **ret, VarlinkServerFlags flags); ++VarlinkServer *varlink_server_ref(VarlinkServer *s); ++VarlinkServer *varlink_server_unref(VarlinkServer *s); ++ ++/* Add addresses or fds to listen on */ ++int varlink_server_listen_address(VarlinkServer *s, const char *address, mode_t mode); ++int varlink_server_listen_fd(VarlinkServer *s, int fd); ++int varlink_server_add_connection(VarlinkServer *s, int fd, Varlink **ret); ++ ++/* Bind callbacks */ ++int varlink_server_bind_method(VarlinkServer *s, const char *method, VarlinkMethod callback); ++int varlink_server_bind_method_many_internal(VarlinkServer *s, ...); ++#define varlink_server_bind_method_many(s, ...) varlink_server_bind_method_many_internal(s, __VA_ARGS__, NULL) ++int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect connect); ++ ++void* varlink_server_set_userdata(VarlinkServer *s, void *userdata); ++void* varlink_server_get_userdata(VarlinkServer *s); ++ ++int varlink_server_attach_event(VarlinkServer *v, sd_event *e, int64_t priority); ++int varlink_server_detach_event(VarlinkServer *v); ++sd_event *varlink_server_get_event(VarlinkServer *v); ++ ++int varlink_server_shutdown(VarlinkServer *server); ++ ++unsigned varlink_server_connections_max(VarlinkServer *s); ++unsigned varlink_server_connections_per_uid_max(VarlinkServer *s); ++ ++int varlink_server_set_connections_per_uid_max(VarlinkServer *s, unsigned m); ++int varlink_server_set_connections_max(VarlinkServer *s, unsigned m); ++ ++int varlink_server_set_description(VarlinkServer *s, const char *description); ++ ++DEFINE_TRIVIAL_CLEANUP_FUNC(Varlink *, varlink_unref); ++DEFINE_TRIVIAL_CLEANUP_FUNC(Varlink *, varlink_flush_close_unref); ++DEFINE_TRIVIAL_CLEANUP_FUNC(VarlinkServer *, varlink_server_unref); ++ ++#define VARLINK_ERROR_DISCONNECTED "io.systemd.Disconnected" ++#define VARLINK_ERROR_TIMEOUT "io.systemd.TimedOut" ++#define VARLINK_ERROR_PROTOCOL "io.systemd.Protocol" ++#define VARLINK_ERROR_SYSTEM "io.systemd.System" ++ ++#define VARLINK_ERROR_INTERFACE_NOT_FOUND "org.varlink.service.InterfaceNotFound" ++#define VARLINK_ERROR_METHOD_NOT_FOUND "org.varlink.service.MethodNotFound" ++#define VARLINK_ERROR_METHOD_NOT_IMPLEMENTED "org.varlink.service.MethodNotImplemented" ++#define VARLINK_ERROR_INVALID_PARAMETER "org.varlink.service.InvalidParameter" diff --git a/debian/patches/apertis/tests-add-varlink-test.patch b/debian/patches/apertis/tests-add-varlink-test.patch new file mode 100644 index 0000000000000000000000000000000000000000..021084f06bea7419488645f050d65a070f10ae4d --- /dev/null +++ b/debian/patches/apertis/tests-add-varlink-test.patch @@ -0,0 +1,272 @@ +From: Lennart Poettering <lennart@poettering.net> +Date: Thu, 11 Apr 2019 18:47:10 +0200 +Subject: tests: add varlink test + +Signed-off-by: Martyn Welch <martyn.welch@collabora.com> +[Martyn Welch: Backported to systemd v241] +--- + src/test/meson.build | 4 + + src/test/test-varlink.c | 239 ++++++++++++++++++++++++++++++++++++++++++++++++ + 2 files changed, 243 insertions(+) + create mode 100644 src/test/test-varlink.c + +diff --git a/src/test/meson.build b/src/test/meson.build +index 40154cf..9a50dc4 100644 +--- a/src/test/meson.build ++++ b/src/test/meson.build +@@ -567,6 +567,10 @@ tests += [ + libmount, + libblkid]], + ++ [['src/test/test-varlink.c'], ++ [], ++ [threads]], ++ + [['src/test/test-cgroup-util.c'], + [], + []], +diff --git a/src/test/test-varlink.c b/src/test/test-varlink.c +new file mode 100644 +index 0000000..fbfc72c +--- /dev/null ++++ b/src/test/test-varlink.c +@@ -0,0 +1,239 @@ ++/* SPDX-License-Identifier: LGPL-2.1+ */ ++ ++#include <fcntl.h> ++#include <poll.h> ++#include <pthread.h> ++ ++#include "sd-event.h" ++ ++#include "fd-util.h" ++#include "json.h" ++#include "rm-rf.h" ++#include "strv.h" ++#include "tmpfile-util.h" ++#include "user-util.h" ++#include "varlink.h" ++ ++/* Let's pick some high value, that is higher than the largest listen() backlog, but leaves enough room below ++ the typical RLIMIT_NOFILE value of 1024 so that we can process both sides of each socket in our ++ process. Or in other words: "OVERLOAD_CONNECTIONS * 2 + x < 1024" should hold, for some small x that ++ should cover any auxiliary fds, the listener server fds, stdin/stdout/stderr and whatever else. */ ++#define OVERLOAD_CONNECTIONS 333 ++ ++static int n_done = 0; ++static int block_write_fd = -1; ++ ++static int method_something(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { ++ _cleanup_(json_variant_unrefp) JsonVariant *ret = NULL; ++ JsonVariant *a, *b; ++ intmax_t x, y; ++ int r; ++ ++ a = json_variant_by_key(parameters, "a"); ++ if (!a) ++ return varlink_error(link, "io.test.BadParameters", NULL); ++ ++ x = json_variant_integer(a); ++ ++ b = json_variant_by_key(parameters, "b"); ++ if (!b) ++ return varlink_error(link, "io.test.BadParameters", NULL); ++ ++ y = json_variant_integer(b); ++ ++ r = json_build(&ret, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("sum", JSON_BUILD_INTEGER(x + y)))); ++ if (r < 0) ++ return r; ++ ++ return varlink_reply(link, ret); ++} ++ ++static int method_done(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { ++ ++ if (++n_done == 2) ++ sd_event_exit(varlink_get_event(link), EXIT_FAILURE); ++ ++ return 0; ++} ++ ++static int reply(Varlink *link, JsonVariant *parameters, const char *error_id, VarlinkReplyFlags flags, void *userdata) { ++ JsonVariant *sum; ++ ++ sum = json_variant_by_key(parameters, "sum"); ++ ++ assert_se(json_variant_integer(sum) == 7+22); ++ ++ if (++n_done == 2) ++ sd_event_exit(varlink_get_event(link), EXIT_FAILURE); ++ ++ return 0; ++} ++ ++static int on_connect(VarlinkServer *s, Varlink *link, void *userdata) { ++ uid_t uid = UID_INVALID; ++ ++ assert(s); ++ assert(link); ++ ++ assert_se(varlink_get_peer_uid(link, &uid) >= 0); ++ assert_se(getuid() == uid); ++ ++ return 0; ++} ++ ++static int overload_reply(Varlink *link, JsonVariant *parameters, const char *error_id, VarlinkReplyFlags flags, void *userdata) { ++ ++ /* This method call reply should always be called with a disconnection, since the method call should ++ * be talking to an overloaded server */ ++ ++ log_debug("Over reply triggered with error: %s", strna(error_id)); ++ assert_se(streq(error_id, VARLINK_ERROR_DISCONNECTED)); ++ sd_event_exit(varlink_get_event(link), 0); ++ ++ return 0; ++} ++ ++static void flood_test(const char *address) { ++ _cleanup_(varlink_flush_close_unrefp) Varlink *c = NULL; ++ _cleanup_(sd_event_unrefp) sd_event *e = NULL; ++ _cleanup_free_ Varlink **connections = NULL; ++ size_t k; ++ char x = 'x'; ++ ++ log_debug("Flooding server..."); ++ ++ /* Block the main event loop while we flood */ ++ assert_se(write(block_write_fd, &x, sizeof(x)) == sizeof(x)); ++ ++ assert_se(sd_event_default(&e) >= 0); ++ ++ /* Flood the server with connections */ ++ assert_se(connections = new0(Varlink*, OVERLOAD_CONNECTIONS)); ++ for (k = 0; k < OVERLOAD_CONNECTIONS; k++) { ++ _cleanup_free_ char *t = NULL; ++ log_debug("connection %zu", k); ++ assert_se(varlink_connect_address(connections + k, address) >= 0); ++ ++ assert_se(asprintf(&t, "flood-%zu", k) >= 0); ++ assert_se(varlink_set_description(connections[k], t) >= 0); ++ assert_se(varlink_attach_event(connections[k], e, k) >= 0); ++ assert_se(varlink_sendb(connections[k], "io.test.Rubbish", JSON_BUILD_OBJECT(JSON_BUILD_PAIR("id", JSON_BUILD_INTEGER(k)))) >= 0); ++ } ++ ++ /* Then, create one more, which should fail */ ++ log_debug("Creating overload connection..."); ++ assert_se(varlink_connect_address(&c, address) >= 0); ++ assert_se(varlink_set_description(c, "overload-client") >= 0); ++ assert_se(varlink_attach_event(c, e, k) >= 0); ++ assert_se(varlink_bind_reply(c, overload_reply) >= 0); ++ assert_se(varlink_invokeb(c, "io.test.Overload", JSON_BUILD_OBJECT(JSON_BUILD_PAIR("foo", JSON_BUILD_STRING("bar")))) >= 0); ++ ++ /* Unblock it */ ++ log_debug("Unblocking server..."); ++ block_write_fd = safe_close(block_write_fd); ++ ++ /* This loop will terminate as soon as the overload reply callback is called */ ++ assert_se(sd_event_loop(e) >= 0); ++ ++ /* And close all connections again */ ++ for (k = 0; k < OVERLOAD_CONNECTIONS; k++) ++ connections[k] = varlink_unref(connections[k]); ++} ++ ++static void *thread(void *arg) { ++ _cleanup_(varlink_flush_close_unrefp) Varlink *c = NULL; ++ _cleanup_(json_variant_unrefp) JsonVariant *i = NULL; ++ JsonVariant *o = NULL; ++ const char *e; ++ ++ assert_se(json_build(&i, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("a", JSON_BUILD_INTEGER(88)), ++ JSON_BUILD_PAIR("b", JSON_BUILD_INTEGER(99)))) >= 0); ++ ++ assert_se(varlink_connect_address(&c, arg) >= 0); ++ assert_se(varlink_set_description(c, "thread-client") >= 0); ++ ++ assert_se(varlink_call(c, "io.test.DoSomething", i, &o, &e, NULL) >= 0); ++ assert_se(json_variant_integer(json_variant_by_key(o, "sum")) == 88 + 99); ++ assert_se(!e); ++ ++ assert_se(varlink_callb(c, "io.test.IDontExist", &o, &e, NULL, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("x", JSON_BUILD_REAL(5.5)))) >= 0); ++ assert_se(streq_ptr(json_variant_string(json_variant_by_key(o, "method")), "io.test.IDontExist")); ++ assert_se(streq(e, VARLINK_ERROR_METHOD_NOT_FOUND)); ++ ++ flood_test(arg); ++ ++ assert_se(varlink_send(c, "io.test.Done", NULL) >= 0); ++ ++ return NULL; ++} ++ ++static int block_fd_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) { ++ char c; ++ ++ assert_se(fd_nonblock(fd, false) >= 0); ++ ++ assert_se(read(fd, &c, sizeof(c)) == sizeof(c)); ++ /* When a character is written to this pipe we'll block until the pipe is closed. */ ++ ++ assert_se(read(fd, &c, sizeof(c)) == 0); ++ ++ assert_se(fd_nonblock(fd, true) >= 0); ++ ++ assert_se(sd_event_source_set_enabled(s, SD_EVENT_OFF) >= 0); ++ ++ return 0; ++} ++ ++int main(int argc, char *argv[]) { ++ _cleanup_(sd_event_source_unrefp) sd_event_source *block_event = NULL; ++ _cleanup_(varlink_server_unrefp) VarlinkServer *s = NULL; ++ _cleanup_(varlink_flush_close_unrefp) Varlink *c = NULL; ++ _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL; ++ _cleanup_(json_variant_unrefp) JsonVariant *v = NULL; ++ _cleanup_(sd_event_unrefp) sd_event *e = NULL; ++ _cleanup_(close_pairp) int block_fds[2] = { -1, -1 }; ++ pthread_t t; ++ const char *sp; ++ ++ log_set_max_level(LOG_DEBUG); ++ log_open(); ++ ++ assert_se(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir) >= 0); ++ sp = strjoina(tmpdir, "/socket"); ++ ++ assert_se(sd_event_default(&e) >= 0); ++ ++ assert_se(pipe2(block_fds, O_NONBLOCK|O_CLOEXEC) >= 0); ++ assert_se(sd_event_add_io(e, &block_event, block_fds[0], EPOLLIN, block_fd_handler, NULL) >= 0); ++ assert_se(sd_event_source_set_priority(block_event, SD_EVENT_PRIORITY_IMPORTANT) >= 0); ++ block_write_fd = TAKE_FD(block_fds[1]); ++ ++ assert_se(varlink_server_new(&s, VARLINK_SERVER_ACCOUNT_UID) >= 0); ++ assert_se(varlink_server_set_description(s, "our-server") >= 0); ++ ++ assert_se(varlink_server_bind_method(s, "io.test.DoSomething", method_something) >= 0); ++ assert_se(varlink_server_bind_method(s, "io.test.Done", method_done) >= 0); ++ assert_se(varlink_server_bind_connect(s, on_connect) >= 0); ++ assert_se(varlink_server_listen_address(s, sp, 0600) >= 0); ++ assert_se(varlink_server_attach_event(s, e, 0) >= 0); ++ assert_se(varlink_server_set_connections_max(s, OVERLOAD_CONNECTIONS) >= 0); ++ ++ assert_se(varlink_connect_address(&c, sp) >= 0); ++ assert_se(varlink_set_description(c, "main-client") >= 0); ++ assert_se(varlink_bind_reply(c, reply) >= 0); ++ ++ assert_se(json_build(&v, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("a", JSON_BUILD_INTEGER(7)), ++ JSON_BUILD_PAIR("b", JSON_BUILD_INTEGER(22)))) >= 0); ++ ++ assert_se(varlink_invoke(c, "io.test.DoSomething", v) >= 0); ++ ++ assert_se(varlink_attach_event(c, e, 0) >= 0); ++ ++ assert_se(pthread_create(&t, NULL, thread, (void*) sp) == 0); ++ ++ assert_se(sd_event_loop(e) >= 0); ++ ++ assert_se(pthread_join(t, NULL) == 0); ++ ++ return 0; ++} diff --git a/debian/patches/apertis/units-automatically-revert-to-run-logging-on-shutdown-if-.patch b/debian/patches/apertis/units-automatically-revert-to-run-logging-on-shutdown-if-.patch new file mode 100644 index 0000000000000000000000000000000000000000..c8fa3d00f551fc7e7424200bd169fb902cdd3400 --- /dev/null +++ b/debian/patches/apertis/units-automatically-revert-to-run-logging-on-shutdown-if-.patch @@ -0,0 +1,24 @@ +From: Lennart Poettering <lennart@poettering.net> +Date: Fri, 5 Apr 2019 18:22:31 +0200 +Subject: units: automatically revert to /run logging on shutdown if necessary + +Fixes: #867 + +Signed-off-by: Martyn Welch <martyn.welch@collabora.com> +[Martyn Welch: Backported to systemd v241] +--- + units/systemd-journal-flush.service.in | 1 + + 1 file changed, 1 insertion(+) + +diff --git a/units/systemd-journal-flush.service.in b/units/systemd-journal-flush.service.in +index bacfe51..29b006c 100644 +--- a/units/systemd-journal-flush.service.in ++++ b/units/systemd-journal-flush.service.in +@@ -18,6 +18,7 @@ RequiresMountsFor=/var/log/journal + + [Service] + ExecStart=@rootbindir@/journalctl --flush ++ExecStop=@rootbindir@/journalctl --smart-relinquish-var + Type=oneshot + RemainAfterExit=yes + TimeoutSec=90s diff --git a/debian/patches/series b/debian/patches/series index 95e5f7f2b8c4d8f1ab259d62d194b94a18ebcfba..77b26816e56ed704075cc435f3cd070d30bf03e1 100644 --- a/debian/patches/series +++ b/debian/patches/series @@ -72,3 +72,11 @@ apertis/0001-Remove-bashisms-from-the-depmod-wrapper.patch apertis/0002-Remove-bashisms-from-the-UEFI-entries-generator.patch apertis/0003-Reworked-kernel-install-script.patch disable-failing-tests.patch +apertis/shared-add-minimal-varlink-implementation.patch +apertis/tests-add-varlink-test.patch +apertis/fuzzer-add-varlink-fuzzer.patch +apertis/journald-also-offer-flush-rotate-sync-as-varlink-method-c.patch +apertis/journalctl-port-flush-sync-rotate-to-use-varlink-method-c.patch +apertis/journald-add-API-to-move-logging-from-var-to-run-again.patch +apertis/journalctl-add-new-relinquish-and-smart-relinquish-option.patch +apertis/units-automatically-revert-to-run-logging-on-shutdown-if-.patch