From f28971ebf8fabcdfcccd0c01c0e9ea5886387cf3 Mon Sep 17 00:00:00 2001
From: Julian Bouzas <julian.bouzas@collabora.com>
Date: Tue, 2 Jul 2019 14:06:56 -0400
Subject: [PATCH] softdsp-endpoint: add multiple streams support

---
 modules/meson.build                           |   3 +-
 modules/module-pw-audio-softdsp-endpoint.c    | 509 +++----------
 .../module-pw-audio-softdsp-endpoint/dsp.c    | 680 ++++++++++++++++++
 .../module-pw-audio-softdsp-endpoint/dsp.h    |  33 +
 4 files changed, 813 insertions(+), 412 deletions(-)
 create mode 100644 modules/module-pw-audio-softdsp-endpoint/dsp.c
 create mode 100644 modules/module-pw-audio-softdsp-endpoint/dsp.h

diff --git a/modules/meson.build b/modules/meson.build
index 9b7b2d5b..662142ed 100644
--- a/modules/meson.build
+++ b/modules/meson.build
@@ -54,7 +54,8 @@ shared_library(
   'wireplumber-module-pw-audio-softdsp-endpoint',
   [
     'module-pw-audio-softdsp-endpoint.c',
-  ],
+    'module-pw-audio-softdsp-endpoint/dsp.c',
+ ],
   c_args : [common_c_args, '-DG_LOG_DOMAIN="m-pw-audio-softdsp-endpoint"'],
   install : true,
   install_dir : wireplumber_module_dir,
diff --git a/modules/module-pw-audio-softdsp-endpoint.c b/modules/module-pw-audio-softdsp-endpoint.c
index 49a5c6c0..b87f0c2f 100644
--- a/modules/module-pw-audio-softdsp-endpoint.c
+++ b/modules/module-pw-audio-softdsp-endpoint.c
@@ -19,15 +19,26 @@
 #include <spa/pod/builder.h>
 #include <spa/param/props.h>
 
+#include "module-pw-audio-softdsp-endpoint/dsp.h"
+
 #define MIN_QUANTUM_SIZE  64
 #define MAX_QUANTUM_SIZE  1024
 
+static const char * streams[] = {
+  "multimedia",
+  "navigation",
+  "communication",
+  "emergency",
+};
+#define N_STREAMS (sizeof (streams) / sizeof (const char *))
+
 struct _WpPwAudioSoftdspEndpoint
 {
   WpEndpoint parent;
 
-  /* The global-id this endpoint refers to */
+  /* Properties */
   guint global_id;
+  guint stream_count;
 
   /* The task to signal the endpoint is initialized */
   GTask *init_task;
@@ -35,29 +46,16 @@ struct _WpPwAudioSoftdspEndpoint
   /* The remote pipewire */
   WpRemotePipewire *remote_pipewire;
 
-  /* Handler */
-  gulong proxy_dsp_done_handler_id;
-
-  /* temporary method to select which endpoint
-   * is going to be the default input/output */
-  gboolean selected;
-
   /* Direction */
   enum pw_direction direction;
 
   /* Proxies */
   WpProxyNode *proxy_node;
   WpProxyPort *proxy_port;
-  WpProxyNode *proxy_dsp;
-  GPtrArray *proxies_dsp_port;
-
-  /* Volume */
-  gfloat master_volume;
-  gboolean master_mute;
 
-  /* DSP */
-  struct spa_hook dsp_listener;
-  struct pw_proxy *link_proxy;
+  /* Audio Dsp */
+  WpPwAudioDsp *converter;
+  WpPwAudioDsp *streams[N_STREAMS];
 };
 
 enum {
@@ -65,12 +63,6 @@ enum {
   PROP_GLOBAL_ID,
 };
 
-enum {
-  CONTROL_VOLUME = 0,
-  CONTROL_MUTE,
-  CONTROL_SELECTED,
-};
-
 static GAsyncInitableIface *wp_endpoint_parent_interface = NULL;
 static void wp_endpoint_async_initable_init (gpointer iface,
     gpointer iface_data);
@@ -82,174 +74,27 @@ G_DEFINE_TYPE_WITH_CODE (WpPwAudioSoftdspEndpoint, endpoint, WP_TYPE_ENDPOINT,
     G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE,
                            wp_endpoint_async_initable_init))
 
-static void
-proxies_dsp_port_foreach_func(gpointer data, gpointer user_data)
-{
-  GVariantBuilder *b = user_data;
-  g_variant_builder_add (b, "t", data);
-}
-
 static gboolean
 endpoint_prepare_link (WpEndpoint * ep, guint32 stream_id,
     WpEndpointLink * link, GVariant ** properties, GError ** error)
 {
   WpPwAudioSoftdspEndpoint *self = WP_PW_AUDIO_SOFTDSP_ENDPOINT (ep);
-  const struct pw_node_info *dsp_info = NULL;
-  GVariantBuilder b, *b_ports;
-  GVariant *v_ports;
-
-  /* Get the dsp info */
-  dsp_info = wp_proxy_node_get_info(self->proxy_dsp);
-  g_return_val_if_fail (dsp_info, FALSE);
-
-  /* Create a variant array with all the ports */
-  b_ports = g_variant_builder_new (G_VARIANT_TYPE ("at"));
-  g_ptr_array_foreach(self->proxies_dsp_port, proxies_dsp_port_foreach_func,
-      b_ports);
-  v_ports = g_variant_builder_end (b_ports);
-
-  /* Set the properties */
-  g_variant_builder_init (&b, G_VARIANT_TYPE_VARDICT);
-  g_variant_builder_add (&b, "{sv}", "node-id",
-      g_variant_new_uint32 (dsp_info->id));
-  g_variant_builder_add (&b, "{sv}", "ports", v_ports);
-  *properties = g_variant_builder_end (&b);
-
-  return TRUE;
-}
+  WpPwAudioDsp *stream = NULL;
 
-static void
-on_dsp_running(WpPwAudioSoftdspEndpoint *self)
-{
-  struct pw_properties *props;
-  const struct pw_node_info *node_info = NULL;
-  const struct pw_node_info *dsp_info = NULL;
-
-  /* Return if the node has already been linked */
-  g_return_if_fail (!self->link_proxy);
-
-  /* Get the node info */
-  node_info = wp_proxy_node_get_info(self->proxy_node);
-  g_return_if_fail (node_info);
-
-  /* Get the dsp info */
-  dsp_info = wp_proxy_node_get_info(self->proxy_dsp);
-  g_return_if_fail (dsp_info);
-
-  /* Create new properties */
-  props = pw_properties_new(NULL, NULL);
-
-  /* Set the new properties */
-  pw_properties_set(props, PW_LINK_PROP_PASSIVE, "true");
-  if (self->direction == PW_DIRECTION_OUTPUT) {
-    pw_properties_setf(props, PW_LINK_OUTPUT_NODE_ID, "%d", dsp_info->id);
-    pw_properties_setf(props, PW_LINK_OUTPUT_PORT_ID, "%d", -1);
-    pw_properties_setf(props, PW_LINK_INPUT_NODE_ID, "%d", node_info->id);
-    pw_properties_setf(props, PW_LINK_INPUT_PORT_ID, "%d", -1);
-  } else {
-    pw_properties_setf(props, PW_LINK_OUTPUT_NODE_ID, "%d", node_info->id);
-    pw_properties_setf(props, PW_LINK_OUTPUT_PORT_ID, "%d", -1);
-    pw_properties_setf(props, PW_LINK_INPUT_NODE_ID, "%d", dsp_info->id);
-    pw_properties_setf(props, PW_LINK_INPUT_PORT_ID, "%d", -1);
-  }
-
-  g_debug ("%p linking DSP to node", self);
+  /* Make sure the stream Id is valid */
+  g_return_val_if_fail(stream_id < N_STREAMS, FALSE);
 
-  /* Create the link */
-  self->link_proxy = wp_remote_pipewire_create_object(self->remote_pipewire,
-      "link-factory", PW_TYPE_INTERFACE_Link, &props->dict);
+  /* Make sure the stream is valid */
+  stream = self->streams[stream_id];
+  g_return_val_if_fail(stream, FALSE);
 
-  /* Clean up */
-  pw_properties_free(props);
+  /* Prepare the link */
+  return wp_pw_audio_dsp_prepare_link (stream, properties, error);
 }
 
 static void
-on_dsp_idle (WpPwAudioSoftdspEndpoint *self)
+finish_endpoint_creation(WpPwAudioSoftdspEndpoint *self)
 {
-  if (self->link_proxy != NULL) {
-    g_debug ("%p unlinking DSP from node", self);
-    pw_proxy_destroy (self->link_proxy);
-    self->link_proxy = NULL;
-  }
-}
-
-static void
-dsp_node_event_info (void *data, const struct pw_node_info *info)
-{
-  WpPwAudioSoftdspEndpoint *self = data;
-
-  /* Handle the different states */
-  switch (info->state) {
-  case PW_NODE_STATE_IDLE:
-    on_dsp_idle (self);
-    break;
-  case PW_NODE_STATE_RUNNING:
-    on_dsp_running (self);
-    break;
-  case PW_NODE_STATE_SUSPENDED:
-    break;
-  default:
-    break;
-  }
-}
-
-static void
-dsp_node_event_param (void *object, int seq, uint32_t id,
-    uint32_t index, uint32_t next, const struct spa_pod *param)
-{
-  WpPwAudioSoftdspEndpoint *self = WP_PW_AUDIO_SOFTDSP_ENDPOINT (object);
-
-  switch (id) {
-    case SPA_PARAM_Props:
-    {
-      struct spa_pod_prop *prop;
-      struct spa_pod_object *obj = (struct spa_pod_object *) param;
-      float volume = self->master_volume;
-      bool mute = self->master_mute;
-
-      SPA_POD_OBJECT_FOREACH(obj, prop) {
-        switch (prop->key) {
-        case SPA_PROP_volume:
-          spa_pod_get_float(&prop->value, &volume);
-          break;
-        case SPA_PROP_mute:
-          spa_pod_get_bool(&prop->value, &mute);
-          break;
-        default:
-          break;
-        }
-      }
-
-      g_debug ("WpEndpoint:%p param event, vol:(%lf -> %f) mute:(%d -> %d)",
-          self, self->master_volume, volume, self->master_mute, mute);
-
-      if (self->master_volume != volume) {
-        self->master_volume = volume;
-        wp_endpoint_notify_control_value (WP_ENDPOINT (self), CONTROL_VOLUME);
-      }
-      if (self->master_mute != mute) {
-        self->master_mute = mute;
-        wp_endpoint_notify_control_value (WP_ENDPOINT (self), CONTROL_MUTE);
-      }
-
-      break;
-    }
-    default:
-      break;
-  }
-}
-
-static const struct pw_node_proxy_events dsp_node_events = {
-  PW_VERSION_NODE_PROXY_EVENTS,
-  .info = dsp_node_event_info,
-  .param = dsp_node_event_param,
-};
-
-static void
-on_proxy_dsp_done(WpProxy *proxy, gpointer data)
-{
-  WpPwAudioSoftdspEndpoint *self = data;
-
   /* Don't do anything if the endpoint has already been initialized */
   if (!self->init_task)
     return;
@@ -260,81 +105,54 @@ on_proxy_dsp_done(WpProxy *proxy, gpointer data)
 }
 
 static void
-on_proxy_dsp_created(GObject *initable, GAsyncResult *res, gpointer data)
+on_audio_dsp_stream_created(GObject *initable, GAsyncResult *res, gpointer data)
 {
   WpPwAudioSoftdspEndpoint *self = data;
-  struct pw_node_proxy *dsp_proxy = NULL;
-  const struct spa_audio_info_raw *port_format;
-  struct spa_audio_info_raw format;
-  uint8_t buf[1024];
-  struct spa_pod_builder pod_builder = { 0, };
-  struct spa_pod *param;
-
-  /* Get the proxy dsp */
-  self->proxy_dsp = wp_proxy_node_new_finish(initable, res, NULL);
-  g_return_if_fail (self->proxy_dsp);
-
-  /* Add a custom dsp listener */
-  dsp_proxy = wp_proxy_get_pw_proxy(WP_PROXY(self->proxy_dsp));
-  g_return_if_fail (dsp_proxy);
-  pw_node_proxy_add_listener(dsp_proxy, &self->dsp_listener,
-      &dsp_node_events, self);
-
-  /* Emit the props param */
-  pw_node_proxy_enum_params (dsp_proxy, 0, SPA_PARAM_Props, 0, -1, NULL);
-
-  /* Get the port format */
-  port_format = wp_proxy_port_get_format(self->proxy_port);
-  g_return_if_fail (port_format);
-  format = *port_format;
-
-  /* Build the param profile */
-  spa_pod_builder_init(&pod_builder, buf, sizeof(buf));
-  param = spa_format_audio_raw_build(&pod_builder, SPA_PARAM_Format, &format);
-  param = spa_pod_builder_add_object(&pod_builder,
-      SPA_TYPE_OBJECT_ParamProfile, SPA_PARAM_Profile,
-      SPA_PARAM_PROFILE_direction,  SPA_POD_Id(pw_direction_reverse(self->direction)),
-      SPA_PARAM_PROFILE_format,     SPA_POD_Pod(param));
-
-  /* Set the param profile to emit the dsp ports */
-  pw_node_proxy_set_param(dsp_proxy, SPA_PARAM_Profile, 0, param);
+  WpPwAudioDsp *stream = NULL;
+  guint stream_id = 0;
+
+  /* Get the stream */
+  stream = wp_pw_audio_dsp_new_finish(initable, res, NULL);
+  g_return_if_fail (stream);
+
+  /* Get the stream id */
+  g_object_get (stream, "id", &stream_id, NULL);
+  g_return_if_fail (stream_id >= 0);
+  g_return_if_fail (stream_id < N_STREAMS);
+
+  /* Set the streams */
+  self->streams[stream_id] = stream;
+
+  /* Finish the endpoint creation when all the streams are created */
+  self->stream_count++;
+  if (self->stream_count == N_STREAMS)
+    finish_endpoint_creation(self);
 }
 
 static void
-emit_audio_dsp_node (WpPwAudioSoftdspEndpoint *self)
+on_audio_dsp_converter_created(GObject *initable, GAsyncResult *res,
+    gpointer data)
 {
-  struct pw_properties *props;
-  const char *dsp_name = NULL;
-  struct pw_node_proxy *dsp_proxy = NULL;
-  const struct pw_node_info *node_info;
-
-  /* Get the node info */
-  node_info = wp_proxy_node_get_info(self->proxy_node);
-  g_return_if_fail (node_info);
-
-  /* Get the properties */
-  props = pw_properties_new_dict(node_info->props);
-  g_return_if_fail (props);
-
-  /* Get the DSP name */
-  dsp_name = pw_properties_get(props, "device.nick");
-  if (!dsp_name)
-    dsp_name = node_info->name;
-
-  /* Set the properties */
-  pw_properties_set(props, "audio-dsp.name", dsp_name);
-  pw_properties_setf(props, "audio-dsp.direction", "%d", self->direction);
-  pw_properties_setf(props, "audio-dsp.maxbuffer", "%ld",
-      MAX_QUANTUM_SIZE * sizeof(float));
-
-  /* Create the proxy dsp async */
-  dsp_proxy = wp_remote_pipewire_create_object(self->remote_pipewire,
-      "audio-dsp", PW_TYPE_INTERFACE_Node, &props->dict);
-  wp_proxy_node_new(pw_proxy_get_id((struct pw_proxy *)dsp_proxy), dsp_proxy,
-      on_proxy_dsp_created, self);
-
-  /* Clean up */
-  pw_properties_free(props);
+  WpPwAudioSoftdspEndpoint *self = data;
+  g_autoptr (WpCore) core = wp_endpoint_get_core(WP_ENDPOINT(self));
+  const struct pw_node_info *target = NULL;
+  const struct spa_audio_info_raw *format = NULL;
+
+  /* Get the proxy dsp converter */
+  self->converter = wp_pw_audio_dsp_new_finish(initable, res, NULL);
+  g_return_if_fail (self->converter);
+
+  /* Get the target and format */
+  target = wp_pw_audio_dsp_get_info (self->converter);
+  g_return_if_fail (target);
+  g_object_get (self->converter, "format", &format, NULL);
+  g_return_if_fail (format);
+
+  /* Create the audio dsp streams */
+  for (int i = 0; i < N_STREAMS; i++) {
+    wp_pw_audio_dsp_new (WP_ENDPOINT(self), i, streams[i], self->direction,
+        target, format, on_audio_dsp_stream_created, self);
+  }
 }
 
 static void
@@ -342,8 +160,11 @@ on_proxy_node_created(GObject *initable, GAsyncResult *res, gpointer data)
 {
   WpPwAudioSoftdspEndpoint *self = data;
   GVariantDict d;
+  g_autoptr (WpCore) core = wp_endpoint_get_core(WP_ENDPOINT(self));
   g_autofree gchar *name = NULL;
   const struct spa_dict *props;
+  const struct pw_node_info *target = NULL;
+  const struct spa_audio_info_raw *format = NULL;
 
   /* Get the proxy node */
   self->proxy_node = wp_proxy_node_new_finish(initable, res, NULL);
@@ -358,38 +179,21 @@ on_proxy_node_created(GObject *initable, GAsyncResult *res, gpointer data)
       wp_proxy_node_get_info (self->proxy_node)->id);
   g_object_set (self, "name", name, NULL);
 
-  /* Emit the audio DSP node */
-  emit_audio_dsp_node(self);
-
+  /* Register the stream */
   g_variant_dict_init (&d, NULL);
   g_variant_dict_insert (&d, "id", "u", 0);
   g_variant_dict_insert (&d, "name", "s", "default");
   wp_endpoint_register_stream (WP_ENDPOINT (self), g_variant_dict_end (&d));
 
-  self->master_volume = 1.0;
-  g_variant_dict_init (&d, NULL);
-  g_variant_dict_insert (&d, "id", "u", CONTROL_VOLUME);
-  g_variant_dict_insert (&d, "name", "s", "volume");
-  g_variant_dict_insert (&d, "type", "s", "d");
-  g_variant_dict_insert (&d, "range", "(dd)", 0.0, 1.0);
-  g_variant_dict_insert (&d, "default-value", "d", self->master_volume);
-  wp_endpoint_register_control (WP_ENDPOINT (self), g_variant_dict_end (&d));
-
-  self->master_mute = FALSE;
-  g_variant_dict_init (&d, NULL);
-  g_variant_dict_insert (&d, "id", "u", CONTROL_MUTE);
-  g_variant_dict_insert (&d, "name", "s", "mute");
-  g_variant_dict_insert (&d, "type", "s", "b");
-  g_variant_dict_insert (&d, "default-value", "b", self->master_mute);
-  wp_endpoint_register_control (WP_ENDPOINT (self), g_variant_dict_end (&d));
-
-  self->selected = FALSE;
-  g_variant_dict_init (&d, NULL);
-  g_variant_dict_insert (&d, "id", "u", CONTROL_SELECTED);
-  g_variant_dict_insert (&d, "name", "s", "selected");
-  g_variant_dict_insert (&d, "type", "s", "b");
-  g_variant_dict_insert (&d, "default-value", "b", self->selected);
-  wp_endpoint_register_control (WP_ENDPOINT (self), g_variant_dict_end (&d));
+  /* Create the converter proxy */
+  target = wp_proxy_node_get_info (self->proxy_node);
+  g_return_if_fail (target);
+  format = wp_proxy_port_get_format (self->proxy_port);
+  g_return_if_fail (format);
+  /* TODO: For now we create convert as a stream because convert mode does not
+   * generate any ports, not sure why */
+  wp_pw_audio_dsp_new (WP_ENDPOINT(self), WP_STREAM_ID_NONE, "master",
+      self->direction, target, format, on_audio_dsp_converter_created, self);
 }
 
 static void
@@ -410,11 +214,16 @@ on_proxy_port_created(GObject *initable, GAsyncResult *res, gpointer data)
 }
 
 static void
-handle_node_port(WpPwAudioSoftdspEndpoint *self, guint id, guint parent_id,
-  const struct spa_dict *props)
+on_port_added(WpRemotePipewire *rp, guint id, guint parent_id, gconstpointer p,
+    gpointer d)
 {
+  WpPwAudioSoftdspEndpoint *self = d;
   struct pw_port_proxy *port_proxy = NULL;
 
+  /* Check if it is a node port and handle it */
+  if (self->global_id != parent_id)
+    return;
+
   /* Alsa nodes should have 1 port only, so make sure proxy_port is not set */
   if (self->proxy_port != 0)
     return;
@@ -426,83 +235,23 @@ handle_node_port(WpPwAudioSoftdspEndpoint *self, guint id, guint parent_id,
   wp_proxy_port_new(id, port_proxy, on_proxy_port_created, self);
 }
 
-static void
-on_proxy_dsp_port_created(GObject *initable, GAsyncResult *res, gpointer data)
-{
-  WpPwAudioSoftdspEndpoint *self = data;
-  WpProxyPort *proxy_dsp_port = NULL;
-
-  /* Get the proxy dsp port */
-  proxy_dsp_port = wp_proxy_port_new_finish(initable, res, NULL);
-  g_return_if_fail (proxy_dsp_port);
-
-  /* Add the proxy dsp port to the array */
-  g_return_if_fail (self->proxies_dsp_port);
-  g_ptr_array_add(self->proxies_dsp_port, proxy_dsp_port);
-
-  /* Register a callback to know when all the dsp ports have been emitted */
-  if (!self->proxy_dsp_done_handler_id) {
-    self->proxy_dsp_done_handler_id = g_signal_connect_object(self->proxy_dsp,
-        "done", (GCallback)on_proxy_dsp_done, self, 0);
-    wp_proxy_sync (WP_PROXY(self->proxy_dsp));
-  }
-}
-
-static void
-handle_dsp_port(WpPwAudioSoftdspEndpoint *self, guint id, guint parent_id,
-  const struct spa_dict *props)
-{
-  struct pw_port_proxy *port_proxy = NULL;
-
-  /* Create the proxy dsp port async */
-  port_proxy = wp_remote_pipewire_proxy_bind (self->remote_pipewire, id,
-      PW_TYPE_INTERFACE_Port);
-  g_return_if_fail(port_proxy);
-  wp_proxy_port_new(id, port_proxy, on_proxy_dsp_port_created, self);
-}
-
-static void
-on_port_added(WpRemotePipewire *rp, guint id, guint parent_id, gconstpointer p,
-    gpointer d)
-{
-  WpPwAudioSoftdspEndpoint *self = d;
-  const struct spa_dict *props = p;
-  const struct pw_node_info *dsp_info = NULL;
-
-  /* Check if it is a node port and handle it */
-  if (self->global_id == parent_id) {
-    handle_node_port(self, id, parent_id, props);
-    return;
-  }
-
-  /* Otherwise, check if it is a dsp port and handle it */
-  if (!self->proxy_dsp)
-    return;
-  dsp_info = wp_proxy_node_get_info (self->proxy_dsp);
-  if (!dsp_info || dsp_info->id != parent_id)
-    return;
-  handle_dsp_port(self, id, parent_id, props);
-}
-
 static void
 endpoint_finalize (GObject * object)
 {
   WpPwAudioSoftdspEndpoint *self = WP_PW_AUDIO_SOFTDSP_ENDPOINT (object);
 
-  /* Destroy the proxies port */
-  if (self->proxies_dsp_port) {
-    g_ptr_array_free(self->proxies_dsp_port, TRUE);
-    self->proxies_dsp_port = NULL;
-  }
-
   /* Destroy the proxy node */
   g_clear_object(&self->proxy_node);
 
   /* Destroy the proxy port */
   g_clear_object(&self->proxy_port);
 
-  /* Destroy the proxy dsp */
-  g_clear_object(&self->proxy_dsp);
+  /* Destroy the proxy dsp converter */
+  g_clear_object(&self->converter);
+
+  /* Destroy all the proxy dsp streams */
+  for (int i = 0; i < N_STREAMS; i++)
+    g_clear_object(&self->streams[i]);
 
   /* Destroy the done task */
   g_clear_object(&self->init_task);
@@ -547,17 +296,9 @@ endpoint_get_control_value (WpEndpoint * ep, guint32 control_id)
 {
   WpPwAudioSoftdspEndpoint *self = WP_PW_AUDIO_SOFTDSP_ENDPOINT (ep);
 
-  switch (control_id) {
-    case CONTROL_VOLUME:
-      return g_variant_new_double (self->master_volume);
-    case CONTROL_MUTE:
-      return g_variant_new_boolean (self->master_mute);
-    case CONTROL_SELECTED:
-      return g_variant_new_boolean (self->selected);
-    default:
-      g_warning ("Unknown control id %u", control_id);
-      return NULL;
-  }
+  /* TODO: We always set the controls in the converter. This needs to change
+   * and select the proper stream once the stream id is passed as a parameter */
+  return wp_pw_audio_dsp_get_control_value (self->converter, control_id);
 }
 
 static gboolean
@@ -565,61 +306,10 @@ endpoint_set_control_value (WpEndpoint * ep, guint32 control_id,
     GVariant * value)
 {
   WpPwAudioSoftdspEndpoint *self = WP_PW_AUDIO_SOFTDSP_ENDPOINT (ep);
-  char buf[1024];
-  struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
-  struct pw_node_proxy *dsp_proxy = NULL;
-  float volume;
-  bool mute;
-
-  /* Get the pipewire dsp proxy */
-  g_return_val_if_fail (self->proxy_dsp, FALSE);
-  dsp_proxy = wp_proxy_get_pw_proxy (WP_PROXY(self->proxy_dsp));
-  g_return_val_if_fail (dsp_proxy, FALSE);
-
-  switch (control_id) {
-    case CONTROL_VOLUME:
-      volume = g_variant_get_double (value);
-
-      g_debug("WpEndpoint:%p set volume control (%u) value, vol:%f", self,
-          control_id, volume);
-
-      pw_node_proxy_set_param (dsp_proxy,
-          SPA_PARAM_Props, 0,
-          spa_pod_builder_add_object (&b,
-              SPA_TYPE_OBJECT_Props, SPA_PARAM_Props,
-              SPA_PROP_volume, SPA_POD_Float(volume),
-              NULL));
-      pw_node_proxy_enum_params (dsp_proxy, 0, SPA_PARAM_Props, 0, -1,
-          NULL);
-      break;
-
-    case CONTROL_MUTE:
-      mute = g_variant_get_boolean (value);
-
-      g_debug("WpEndpoint:%p set mute control (%u) value, mute:%d", self,
-          control_id, mute);
-
-      pw_node_proxy_set_param (dsp_proxy,
-          SPA_PARAM_Props, 0,
-          spa_pod_builder_add_object (&b,
-              SPA_TYPE_OBJECT_Props, SPA_PARAM_Props,
-              SPA_PROP_mute, SPA_POD_Bool(mute),
-              NULL));
-      pw_node_proxy_enum_params (dsp_proxy, 0, SPA_PARAM_Props, 0, -1,
-          NULL);
-      break;
-
-    case CONTROL_SELECTED:
-      self->selected = g_variant_get_boolean (value);
-      wp_endpoint_notify_control_value (ep, CONTROL_SELECTED);
-      break;
-
-    default:
-      g_warning ("Unknown control id %u", control_id);
-      return FALSE;
-  }
 
-  return TRUE;
+  /* TODO: We always set the controls in the converter. This needs to change
+   * and select the proper stream once the stream id is passed as a parameter */
+  return wp_pw_audio_dsp_set_control_value (self->converter, control_id, value);
 }
 
 static void
@@ -633,9 +323,6 @@ wp_endpoint_init_async (GAsyncInitable *initable, int io_priority,
   /* Create the async task */
   self->init_task = g_task_new (initable, cancellable, callback, data);
 
-  /* Init the proxies_dsp_port array */
-  self->proxies_dsp_port = g_ptr_array_new_full(4, (GDestroyNotify)g_object_unref);
-
   /* Set the direction */
   if (g_str_has_suffix (media_class, "Source"))
     self->direction = PW_DIRECTION_INPUT;
diff --git a/modules/module-pw-audio-softdsp-endpoint/dsp.c b/modules/module-pw-audio-softdsp-endpoint/dsp.c
new file mode 100644
index 00000000..f9e51334
--- /dev/null
+++ b/modules/module-pw-audio-softdsp-endpoint/dsp.c
@@ -0,0 +1,680 @@
+/* WirePlumber
+ *
+ * Copyright © 2019 Collabora Ltd.
+ *    @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <pipewire/pipewire.h>
+#include <spa/param/audio/format-utils.h>
+#include <spa/pod/builder.h>
+#include <spa/param/props.h>
+
+#include "dsp.h"
+
+#define MIN_QUANTUM_SIZE  64
+#define MAX_QUANTUM_SIZE  1024
+
+enum {
+  PROP_0,
+  PROP_ENDPOINT,
+  PROP_ID,
+  PROP_NAME,
+  PROP_DIRECTION,
+  PROP_TARGET,
+  PROP_FORMAT,
+};
+
+enum {
+  CONTROL_VOLUME = 0,
+  CONTROL_MUTE,
+  CONTROL_SELECTED,
+  N_CONTROLS,
+};
+
+struct _WpPwAudioDsp
+{
+  GObject parent;
+
+  /* The task to signal the audio dsp is initialized */
+  GTask *init_task;
+
+  /* The remote pipewire */
+  WpRemotePipewire *remote_pipewire;
+
+  /* Handler */
+  gulong proxy_done_handler_id;
+
+  /* Props */
+  GWeakRef endpoint;
+  guint id;
+  gchar *name;
+  enum pw_direction direction;
+  const struct pw_node_info *target;
+  const struct spa_audio_info_raw *format;
+
+  /* Proxies */
+  WpProxyNode *proxy;
+  GPtrArray *port_proxies;
+  struct pw_proxy *link_proxy;
+
+  /* Listener */
+  struct spa_hook listener;
+
+  /* Volume */
+  gfloat volume;
+  gboolean mute;
+  gboolean selected;
+};
+
+static void wp_pw_audio_dsp_async_initable_init (gpointer iface,
+    gpointer iface_data);
+
+G_DEFINE_TYPE_WITH_CODE (WpPwAudioDsp, wp_pw_audio_dsp, G_TYPE_OBJECT,
+    G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE,
+                           wp_pw_audio_dsp_async_initable_init))
+
+static void
+register_controls (WpPwAudioDsp * self)
+{
+  GVariantDict d;
+  g_autoptr (WpEndpoint) ep = g_weak_ref_get (&self->endpoint);
+  g_return_if_fail (ep);
+
+  /* Register the volume control */
+  g_variant_dict_init (&d, NULL);
+  g_variant_dict_insert (&d, "id", "u", CONTROL_VOLUME);
+  g_variant_dict_insert (&d, "stream-id", "u", self->id);
+  g_variant_dict_insert (&d, "name", "s", "volume");
+  g_variant_dict_insert (&d, "type", "s", "d");
+  g_variant_dict_insert (&d, "range", "(dd)", 0.0, 1.0);
+  g_variant_dict_insert (&d, "default-value", "d", self->volume);
+  wp_endpoint_register_control (ep, g_variant_dict_end (&d));
+
+  /* Register the mute control */
+  g_variant_dict_init (&d, NULL);
+  g_variant_dict_insert (&d, "id", "u", CONTROL_MUTE);
+  g_variant_dict_insert (&d, "stream-id", "u", self->id);
+  g_variant_dict_insert (&d, "name", "s", "mute");
+  g_variant_dict_insert (&d, "type", "s", "b");
+  g_variant_dict_insert (&d, "default-value", "b", self->mute);
+  wp_endpoint_register_control (ep, g_variant_dict_end (&d));
+
+  /* Register the selected control only if it is the master converter */
+  if (self->id == WP_STREAM_ID_NONE) {
+    g_variant_dict_init (&d, NULL);
+    g_variant_dict_insert (&d, "id", "u", CONTROL_SELECTED);
+    g_variant_dict_insert (&d, "name", "s", "selected");
+    g_variant_dict_insert (&d, "type", "s", "b");
+    g_variant_dict_insert (&d, "default-value", "b", self->selected);
+    wp_endpoint_register_control (ep, g_variant_dict_end (&d));
+  }
+}
+
+static void
+on_audio_dsp_done(WpProxy *proxy, gpointer data)
+{
+  WpPwAudioDsp *self = data;
+
+  /* Don't do anything if the endpoint has already been initialized */
+  if (!self->init_task)
+    return;
+
+  /* Register the controls */
+  register_controls (self);
+
+  /* Finish the creation of the audio dsp */
+  g_task_return_boolean (self->init_task, TRUE);
+  g_clear_object(&self->init_task);
+}
+
+static void
+on_audio_dsp_port_created(GObject *initable, GAsyncResult *res,
+    gpointer data)
+{
+  WpPwAudioDsp *self = data;
+  WpProxyPort *port_proxy = NULL;
+
+  /* Get the proxy port */
+  port_proxy = wp_proxy_port_new_finish(initable, res, NULL);
+  g_return_if_fail (port_proxy);
+
+  /* Add the proxy port to the array */
+  g_return_if_fail (self->port_proxies);
+  g_ptr_array_add(self->port_proxies, port_proxy);
+
+  /* Register a callback to know when all the dsp ports have been emitted */
+  if (!self->proxy_done_handler_id) {
+    self->proxy_done_handler_id = g_signal_connect_object(self->proxy,
+        "done", (GCallback)on_audio_dsp_done, self, 0);
+    wp_proxy_sync (WP_PROXY(self->proxy));
+  }
+}
+
+static void
+on_audio_dsp_port_added(WpRemotePipewire *rp, guint id, guint parent_id,
+    gconstpointer p, gpointer d)
+{
+  WpPwAudioDsp *self = d;
+  const struct pw_node_info *dsp_info = NULL;
+  struct pw_port_proxy *port_proxy = NULL;
+
+  /* Make sure the port belongs to this audio dsp */
+  if (!self->proxy)
+    return;
+  dsp_info = wp_proxy_node_get_info (self->proxy);
+  if (!dsp_info || dsp_info->id != parent_id)
+    return;
+
+  /* Create the audio dsp port async */
+  port_proxy = wp_remote_pipewire_proxy_bind (self->remote_pipewire, id,
+      PW_TYPE_INTERFACE_Port);
+  g_return_if_fail(port_proxy);
+  wp_proxy_port_new(id, port_proxy, on_audio_dsp_port_created, self);
+}
+
+static void
+on_audio_dsp_running(WpPwAudioDsp *self)
+{
+  struct pw_properties *props;
+  const struct pw_node_info *dsp_info = NULL;
+
+  /* Return if the node has already been linked */
+  if (self->link_proxy)
+    return;
+
+  /* Get the dsp info */
+  dsp_info = wp_proxy_node_get_info(self->proxy);
+  g_return_if_fail (dsp_info);
+
+  /* Create new properties */
+  props = pw_properties_new(NULL, NULL);
+
+  /* Set the new properties */
+  pw_properties_set(props, PW_LINK_PROP_PASSIVE, "true");
+  if (self->direction == PW_DIRECTION_OUTPUT) {
+    pw_properties_setf(props, PW_LINK_OUTPUT_NODE_ID, "%d", dsp_info->id);
+    pw_properties_setf(props, PW_LINK_OUTPUT_PORT_ID, "%d", -1);
+    pw_properties_setf(props, PW_LINK_INPUT_NODE_ID, "%d", self->target->id);
+    pw_properties_setf(props, PW_LINK_INPUT_PORT_ID, "%d", -1);
+  } else {
+    pw_properties_setf(props, PW_LINK_OUTPUT_NODE_ID, "%d", self->target->id);
+    pw_properties_setf(props, PW_LINK_OUTPUT_PORT_ID, "%d", -1);
+    pw_properties_setf(props, PW_LINK_INPUT_NODE_ID, "%d", dsp_info->id);
+    pw_properties_setf(props, PW_LINK_INPUT_PORT_ID, "%d", -1);
+  }
+
+  g_debug ("%p linking DSP to node", self);
+
+  /* Create the link */
+  self->link_proxy = wp_remote_pipewire_create_object(self->remote_pipewire,
+      "link-factory", PW_TYPE_INTERFACE_Link, &props->dict);
+
+  /* Clean up */
+  pw_properties_free(props);
+}
+
+static void
+on_audio_dsp_idle (WpPwAudioDsp *self)
+{
+  if (self->link_proxy != NULL) {
+    pw_proxy_destroy (self->link_proxy);
+    self->link_proxy = NULL;
+  }
+}
+
+static void
+audio_dsp_event_info (void *data, const struct pw_node_info *info)
+{
+  WpPwAudioDsp *self = data;
+
+  /* Handle the different states */
+  switch (info->state) {
+  case PW_NODE_STATE_IDLE:
+    on_audio_dsp_idle (self);
+    break;
+  case PW_NODE_STATE_RUNNING:
+    on_audio_dsp_running (self);
+    break;
+  case PW_NODE_STATE_SUSPENDED:
+    break;
+  default:
+    break;
+  }
+}
+
+static void
+audio_dsp_event_param (void *object, int seq, uint32_t id,
+    uint32_t index, uint32_t next, const struct spa_pod *param)
+{
+  WpPwAudioDsp *self = WP_PW_AUDIO_DSP (object);
+
+  switch (id) {
+    case SPA_PARAM_Props:
+    {
+      struct spa_pod_prop *prop;
+      struct spa_pod_object *obj = (struct spa_pod_object *) param;
+      float volume = self->volume;
+      bool mute = self->mute;
+
+      SPA_POD_OBJECT_FOREACH(obj, prop) {
+        switch (prop->key) {
+        case SPA_PROP_volume:
+          spa_pod_get_float(&prop->value, &volume);
+          break;
+        case SPA_PROP_mute:
+          spa_pod_get_bool(&prop->value, &mute);
+          break;
+        default:
+          break;
+        }
+      }
+
+      g_debug ("WpPwAudioDsp:%p param event, vol:(%lf -> %f) mute:(%d -> %d)",
+          self, self->volume, volume, self->mute, mute);
+
+      if (self->volume != volume) {
+        self->volume = volume;
+        wp_endpoint_notify_control_value (WP_ENDPOINT (self), CONTROL_VOLUME);
+      }
+      if (self->mute != mute) {
+        self->mute = mute;
+        wp_endpoint_notify_control_value (WP_ENDPOINT (self), CONTROL_MUTE);
+      }
+
+      break;
+    }
+    default:
+      break;
+  }
+}
+
+static const struct pw_node_proxy_events audio_dsp_proxy_events = {
+  PW_VERSION_NODE_PROXY_EVENTS,
+  .info = audio_dsp_event_info,
+  .param = audio_dsp_event_param,
+};
+
+static void
+on_audio_dsp_proxy_created(GObject *initable, GAsyncResult *res,
+    gpointer data)
+{
+  WpPwAudioDsp *self = data;
+  struct pw_node_proxy *pw_proxy = NULL;
+  struct spa_audio_info_raw format;
+  uint8_t buf[1024];
+  struct spa_pod_builder pod_builder = { 0, };
+  struct spa_pod *param;
+
+  /* Get the audio dsp proxy */
+  self->proxy = wp_proxy_node_new_finish(initable, res, NULL);
+  g_return_if_fail (self->proxy);
+
+  /* Add a custom dsp listener */
+  pw_proxy = wp_proxy_get_pw_proxy(WP_PROXY(self->proxy));
+  g_return_if_fail (pw_proxy);
+  pw_node_proxy_add_listener(pw_proxy, &self->listener,
+      &audio_dsp_proxy_events, self);
+
+  /* Emit the props param */
+  pw_node_proxy_enum_params (pw_proxy, 0, SPA_PARAM_Props, 0, -1, NULL);
+
+  /* Get the port format */
+  g_return_if_fail (self->format);
+  format = *self->format;
+
+  /* Emit the ports */
+  spa_pod_builder_init(&pod_builder, buf, sizeof(buf));
+  param = spa_format_audio_raw_build(&pod_builder, SPA_PARAM_Format, &format);
+  param = spa_pod_builder_add_object(&pod_builder,
+      SPA_TYPE_OBJECT_ParamProfile, SPA_PARAM_Profile,
+      SPA_PARAM_PROFILE_direction,  SPA_POD_Id(pw_direction_reverse(self->direction)),
+      SPA_PARAM_PROFILE_format,     SPA_POD_Pod(param));
+  pw_node_proxy_set_param(pw_proxy, SPA_PARAM_Profile, 0, param);
+}
+
+static void
+wp_pw_audio_dsp_finalize (GObject * object)
+{
+  WpPwAudioDsp *self = WP_PW_AUDIO_DSP (object);
+
+  /* Props */
+  g_weak_ref_clear (&self->endpoint);
+  g_free (self->name);
+
+  /* Destroy the init task */
+  g_clear_object(&self->init_task);
+
+  /* Destroy the proxy dsp */
+  g_clear_object(&self->proxy);
+
+  /* Destroy the proxies port */
+  if (self->port_proxies) {
+    g_ptr_array_free(self->port_proxies, TRUE);
+    self->port_proxies = NULL;
+  }
+
+  G_OBJECT_CLASS (wp_pw_audio_dsp_parent_class)->finalize (object);
+}
+
+static void
+wp_pw_audio_dsp_set_property (GObject * object, guint property_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  WpPwAudioDsp *self = WP_PW_AUDIO_DSP (object);
+
+  switch (property_id) {
+  case PROP_ENDPOINT:
+    g_weak_ref_set (&self->endpoint, g_value_get_object (value));
+    break;
+  case PROP_ID:
+    self->id = g_value_get_uint(value);
+    break;
+  case PROP_NAME:
+    self->name = g_value_dup_string (value);
+    break;
+  case PROP_DIRECTION:
+    self->direction = g_value_get_uint(value);
+    break;
+  case PROP_TARGET:
+    self->target = g_value_get_pointer(value);
+    break;
+  case PROP_FORMAT:
+    self->format = g_value_get_pointer(value);
+    break;
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+    break;
+  }
+}
+
+static void
+wp_pw_audio_dsp_get_property (GObject * object, guint property_id,
+    GValue * value, GParamSpec * pspec)
+{
+  WpPwAudioDsp *self = WP_PW_AUDIO_DSP (object);
+
+  switch (property_id) {
+    case PROP_ENDPOINT:
+    g_value_take_object (value, g_weak_ref_get (&self->endpoint));
+    break;
+  case PROP_ID:
+    g_value_set_uint (value, self->id);
+    break;
+  case PROP_NAME:
+    g_value_set_string (value, self->name);
+    break;
+  case PROP_DIRECTION:
+    g_value_set_uint (value, self->direction);
+    break;
+  case PROP_TARGET:
+    g_value_set_pointer (value, (gpointer)self->target);
+    break;
+  case PROP_FORMAT:
+    g_value_set_pointer (value, (gpointer)self->format);
+    break;
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+    break;
+  }
+}
+
+static void
+wp_pw_audio_dsp_init_async (GAsyncInitable *initable, int io_priority,
+    GCancellable *cancellable, GAsyncReadyCallback callback, gpointer data)
+{
+  WpPwAudioDsp *self = WP_PW_AUDIO_DSP (initable);
+  struct pw_properties *props;
+  struct pw_node_proxy *proxy;
+
+  /* Set the remote pipewire */
+  g_autoptr (WpEndpoint) ep = g_weak_ref_get (&self->endpoint);
+  g_return_if_fail(ep);
+  g_autoptr (WpCore) wp_core = wp_endpoint_get_core(ep);
+  g_return_if_fail(wp_core);
+  self->remote_pipewire =
+      wp_core_get_global (wp_core, WP_GLOBAL_REMOTE_PIPEWIRE);
+  g_return_if_fail(self->remote_pipewire);
+
+  /* Create the async task */
+  self->init_task = g_task_new (initable, cancellable, callback, data);
+
+  /* Init the list of port proxies */
+  self->port_proxies = g_ptr_array_new_full(4, (GDestroyNotify)g_object_unref);
+
+  /* Set the volume */
+  self->volume = 1.0;
+  self->mute = FALSE;
+  self->selected = FALSE;
+
+  /* Create the properties */
+  props = pw_properties_new_dict(self->target->props);
+  g_return_if_fail (props);
+
+  /* Set the properties */
+  pw_properties_set(props, "audio-dsp.name",
+      self->name ? self->name : "Audio-DSP");
+  pw_properties_setf(props, "audio-dsp.direction", "%d", self->direction);
+  pw_properties_setf(props, "audio-dsp.maxbuffer", "%ld",
+      MAX_QUANTUM_SIZE * sizeof(float));
+
+  /* Register a port_added callback */
+  g_signal_connect_object(self->remote_pipewire, "global-added::port",
+      (GCallback)on_audio_dsp_port_added, self, 0);
+
+  /* Create the proxy async */
+  proxy = wp_remote_pipewire_create_object(self->remote_pipewire,
+      "audio-dsp", PW_TYPE_INTERFACE_Node, &props->dict);
+  wp_proxy_node_new(pw_proxy_get_id((struct pw_proxy *)proxy), proxy,
+      on_audio_dsp_proxy_created, self);
+
+  /* Clean up */
+  pw_properties_free(props);
+}
+
+static gboolean
+wp_pw_audio_dsp_init_finish (GAsyncInitable *initable, GAsyncResult *result,
+    GError **error)
+{
+  g_return_val_if_fail (g_task_is_valid (result, initable), FALSE);
+
+  return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+static void
+wp_pw_audio_dsp_async_initable_init (gpointer iface, gpointer iface_data)
+{
+  GAsyncInitableIface *ai_iface = iface;
+
+  ai_iface->init_async = wp_pw_audio_dsp_init_async;
+  ai_iface->init_finish = wp_pw_audio_dsp_init_finish;
+}
+
+static void
+wp_pw_audio_dsp_init (WpPwAudioDsp * self)
+{
+}
+
+static void
+wp_pw_audio_dsp_class_init (WpPwAudioDspClass * klass)
+{
+  GObjectClass *object_class = (GObjectClass *) klass;
+
+  object_class->finalize = wp_pw_audio_dsp_finalize;
+  object_class->set_property = wp_pw_audio_dsp_set_property;
+  object_class->get_property = wp_pw_audio_dsp_get_property;
+
+  /* Install the properties */
+  g_object_class_install_property (object_class, PROP_ENDPOINT,
+      g_param_spec_object ("endpoint", "endpoint",
+          "The endpoint this audio DSP belongs to", WP_TYPE_ENDPOINT,
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (object_class, PROP_ID,
+      g_param_spec_uint ("id", "id", "The Id of the audio DSP", 0, G_MAXUINT, 0,
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (object_class, PROP_NAME,
+      g_param_spec_string ("name", "name", "The name of the audio DSP", NULL,
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (object_class, PROP_DIRECTION,
+      g_param_spec_uint ("direction", "direction",
+          "The direction of the audio DSP", 0, 1, 0,
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (object_class, PROP_TARGET,
+      g_param_spec_pointer ("target", "target",
+          "The target node info of the audio DSP",
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (object_class, PROP_FORMAT,
+      g_param_spec_pointer ("format", "format",
+          "The format of the audio DSP ports",
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+}
+
+void
+wp_pw_audio_dsp_new (WpEndpoint *endpoint, guint id, const char *name,
+    enum pw_direction direction, const struct pw_node_info *target,
+    const struct spa_audio_info_raw *format, GAsyncReadyCallback callback,
+    gpointer user_data)
+{
+  g_async_initable_new_async (
+      wp_pw_audio_dsp_get_type (), G_PRIORITY_DEFAULT, NULL,
+      callback, user_data,
+      "endpoint", endpoint,
+      "id", id,
+      "name", name,
+      "direction", direction,
+      "target", target,
+      "format", format,
+      NULL);
+}
+
+WpPwAudioDsp *
+wp_pw_audio_dsp_new_finish (GObject *initable, GAsyncResult *res,
+    GError **error)
+{
+  GAsyncInitable *ai = G_ASYNC_INITABLE(initable);
+  return WP_PW_AUDIO_DSP(g_async_initable_new_finish(ai, res, error));
+}
+
+const struct pw_node_info *
+wp_pw_audio_dsp_get_info (WpPwAudioDsp * self)
+{
+  return wp_proxy_node_get_info(self->proxy);
+}
+
+static void
+port_proxies_foreach_func(gpointer data, gpointer user_data)
+{
+  GVariantBuilder *b = user_data;
+  g_variant_builder_add (b, "t", data);
+}
+
+gboolean
+wp_pw_audio_dsp_prepare_link (WpPwAudioDsp * self, GVariant ** properties,
+    GError ** error) {
+  const struct pw_node_info *info = NULL;
+  GVariantBuilder b, *b_ports;
+  GVariant *v_ports;
+
+  /* Get the proxy node info */
+  info = wp_proxy_node_get_info(self->proxy);
+  g_return_val_if_fail (info, FALSE);
+
+  /* Create a variant array with all the ports */
+  b_ports = g_variant_builder_new (G_VARIANT_TYPE ("at"));
+  g_ptr_array_foreach(self->port_proxies, port_proxies_foreach_func,
+      b_ports);
+  v_ports = g_variant_builder_end (b_ports);
+
+  /* Set the properties */
+  g_variant_builder_init (&b, G_VARIANT_TYPE_VARDICT);
+  g_variant_builder_add (&b, "{sv}", "node-id",
+      g_variant_new_uint32 (info->id));
+  g_variant_builder_add (&b, "{sv}", "ports", v_ports);
+  *properties = g_variant_builder_end (&b);
+
+  return TRUE;
+}
+
+GVariant *
+wp_pw_audio_dsp_get_control_value (WpPwAudioDsp * self, guint32 control_id)
+{
+  switch (control_id) {
+    case CONTROL_VOLUME:
+      return g_variant_new_double (self->volume);
+    case CONTROL_MUTE:
+      return g_variant_new_boolean (self->mute);
+    case CONTROL_SELECTED:
+      if (self->id == WP_STREAM_ID_NONE)
+        return g_variant_new_boolean (self->selected);
+      return NULL;
+    default:
+      g_warning ("Unknown control id %u", control_id);
+      return NULL;
+  }
+}
+
+gboolean
+wp_pw_audio_dsp_set_control_value (WpPwAudioDsp * self, guint32 control_id,
+    GVariant * value)
+{
+  char buf[1024];
+  struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
+  struct pw_node_proxy *pw_proxy = NULL;
+  float volume;
+  bool mute;
+  g_autoptr (WpEndpoint) ep = g_weak_ref_get (&self->endpoint);
+  g_return_val_if_fail (ep, FALSE);
+
+  /* Get the pipewire dsp proxy */
+  g_return_val_if_fail (self->proxy, FALSE);
+  pw_proxy = wp_proxy_get_pw_proxy (WP_PROXY(self->proxy));
+  g_return_val_if_fail (pw_proxy, FALSE);
+
+  switch (control_id) {
+    case CONTROL_VOLUME:
+      volume = g_variant_get_double (value);
+
+      g_debug("WpPwAudioDsp:%p set volume control (%u) value, vol:%f", self,
+          control_id, volume);
+
+      pw_node_proxy_set_param (pw_proxy,
+          SPA_PARAM_Props, 0,
+          spa_pod_builder_add_object (&b,
+              SPA_TYPE_OBJECT_Props, SPA_PARAM_Props,
+              SPA_PROP_volume, SPA_POD_Float(volume),
+              NULL));
+      pw_node_proxy_enum_params (pw_proxy, 0, SPA_PARAM_Props, 0, -1,
+          NULL);
+      break;
+
+    case CONTROL_MUTE:
+      mute = g_variant_get_boolean (value);
+
+      g_debug("WpPwAudioDsp:%p set mute control (%u) value, mute:%d", self,
+          control_id, mute);
+
+      pw_node_proxy_set_param (pw_proxy,
+          SPA_PARAM_Props, 0,
+          spa_pod_builder_add_object (&b,
+              SPA_TYPE_OBJECT_Props, SPA_PARAM_Props,
+              SPA_PROP_mute, SPA_POD_Bool(mute),
+              NULL));
+      pw_node_proxy_enum_params (pw_proxy, 0, SPA_PARAM_Props, 0, -1,
+          NULL);
+      break;
+
+    case CONTROL_SELECTED:
+      if (self->id == WP_STREAM_ID_NONE) {
+        self->selected = g_variant_get_boolean (value);
+        wp_endpoint_notify_control_value (ep, CONTROL_SELECTED);
+      }
+      break;
+
+    default:
+      g_warning ("Unknown control id %u", control_id);
+      return FALSE;
+  }
+
+  return TRUE;
+}
diff --git a/modules/module-pw-audio-softdsp-endpoint/dsp.h b/modules/module-pw-audio-softdsp-endpoint/dsp.h
new file mode 100644
index 00000000..5535832c
--- /dev/null
+++ b/modules/module-pw-audio-softdsp-endpoint/dsp.h
@@ -0,0 +1,33 @@
+/* WirePlumber
+ *
+ * Copyright © 2019 Collabora Ltd.
+ *    @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <gio/gio.h>
+#include <wp/wp.h>
+
+#ifndef __WP_PW_AUDIO_DSP_H__
+#define __WP_PW_AUDIO_DSP_H__
+
+G_DECLARE_FINAL_TYPE (WpPwAudioDsp, wp_pw_audio_dsp,
+    WP_PW, AUDIO_DSP, GObject)
+
+void wp_pw_audio_dsp_new (WpEndpoint *endpoint, guint id, const char *name,
+    enum pw_direction direction, const struct pw_node_info *target,
+    const struct spa_audio_info_raw *format, GAsyncReadyCallback callback,
+    gpointer user_data);
+WpPwAudioDsp * wp_pw_audio_dsp_new_finish (GObject *initable, GAsyncResult *res,
+    GError **error);
+
+const struct pw_node_info *wp_pw_audio_dsp_get_info (WpPwAudioDsp * self);
+gboolean wp_pw_audio_dsp_prepare_link (WpPwAudioDsp * self,
+    GVariant ** properties, GError ** error);
+GVariant * wp_pw_audio_dsp_get_control_value (WpPwAudioDsp * self,
+    guint32 control_id);
+gboolean wp_pw_audio_dsp_set_control_value (WpPwAudioDsp * self,
+    guint32 control_id, GVariant * value);
+
+#endif
-- 
GitLab