diff --git a/scripts/test-case-3-run.py b/scripts/test-case-3-run.py new file mode 100755 index 0000000000000000000000000000000000000000..4afbd559da3c41a4efb651e634f09c39f3100275 --- /dev/null +++ b/scripts/test-case-3-run.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 + +from itertools import cycle +import json +import select +import subprocess +import sys +import time + + +def pw_dump(name = None ): + cmd_pw_dump = ["pw-dump"] + p = subprocess.run(cmd_pw_dump, stdout=subprocess.PIPE) + my_pw_dump = json.loads(p.stdout) + + if name != None: + with open(name, "w") as f: + json.dump(my_pw_dump, f, indent=2) + + return my_pw_dump + + +def pw_dot(name): + subprocess.run(["pw-dot","-o","pw-dot-temp.dot"], stderr=subprocess.DEVNULL) + subprocess.run(["dot","-Tpng","pw-dot-temp.dot","-o",name], stderr=subprocess.DEVNULL) + + +def pw_link(out_pattern, in_pattern): + cmd_pw_link = ["pw-link",str(out_pattern),str(in_pattern)] + subprocess.run(cmd_pw_link) + + +def pw_link_delete(out_pattern, in_pattern): + cmd_pw_link_delete = ["pw-link","-d",str(out_pattern),str(in_pattern)] + subprocess.run(cmd_pw_link_delete) + + +def pw_metadata(out_pattern, in_pattern): + cmd_pw_metadata = ["pw-metadata", str(out_pattern), + "target.object", in_pattern] + subprocess.run(cmd_pw_metadata, + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + +def create_loopback_device(name): + print(f"Creating a virtual device named {name}") + + cmd_pw_loopback = "pw-loopback -m '[FL FR]' --capture-props='media.class=Audio/Sink node.name="+name+"'" + subprocess.Popen(cmd_pw_loopback, shell=True) + time.sleep(1) # Wait 1 sec to be sure the loopback is up + + +def create_loopback_input_device(name): + print(f"Creating a virtual device named {name}") + + cmd_pw_loopback = "pw-loopback -m '[FL FR]' --capture-props='media.class=Audio/Sink node.name="+name+"' --playback-props='node.name="+name+"'" + subprocess.Popen(cmd_pw_loopback, shell=True) + time.sleep(1) # Wait 1 sec to be sure the loopback is up + + +def start_jackplay(my_sound): + print(f"Starting playing music with jackplay") + + cmd_jackplay = [ + "pw-jack", + "sndfile-jackplay", + my_sound, + "-l", + "0" + ] + subprocess.Popen(cmd_jackplay, stderr=subprocess.DEVNULL) + + +def delete_useless_pw_links(my_pw_dump, output_ports, input_ports): + my_useless_ports_id = [] + for pw_obj in my_pw_dump: + if pw_obj["type"] != "PipeWire:Interface:Link": + continue + if pw_obj["info"]["output-port-id"] not in output_ports: + continue + if pw_obj["info"]["input-port-id"] in input_ports: + continue + print(f"Destroying link between {pw_obj['info']['output-port-id']} and {pw_obj['info']['input-port-id']}") + pw_link_delete(pw_obj["info"]["output-port-id"], + pw_obj["info"]["input-port-id"]) + + +def kill_my_sinks(): + print("Killing all sndfile-jackplay and pw-loopback") + subprocess.run(["killall","sndfile-jackplay"], stderr=subprocess.DEVNULL) + subprocess.run(["killall","pw-loopback"], stderr=subprocess.DEVNULL) + + +def start_sink(my_sink_name, my_wav, device_playback, my_id_used=None): + create_loopback_device(my_sink_name) + start_jackplay(my_wav) + time.sleep(5) # Wait 5 sec to be sure it started + + my_pw_dump = pw_dump("pw-dump-temp-"+my_sink_name+"-1.json") + pw_dot("pw-dot-temp-"+my_sink_name+"-1.png") + + # Get node id of "jackplay" + my_jackplay_id = [] + for pw_obj in my_pw_dump: + if pw_obj["type"] != "PipeWire:Interface:Node": + continue + if pw_obj["info"]["props"]["node.name"] != "jackplay": + continue + if my_id_used: + if pw_obj["id"] in my_id_used: + continue + my_jackplay_id.append(pw_obj["id"]) + + # Get associated ports id of "jackplay" + my_jackplay_ports_id = [] + for pw_obj in my_pw_dump: + if pw_obj["type"] != "PipeWire:Interface:Port": + continue + if pw_obj["info"]["props"]["node.id"] != my_jackplay_id[0]: + continue + my_jackplay_ports_id.append(pw_obj["id"]) + print(f".. DBG: my_jackplay_ports_id {my_jackplay_ports_id}") + + # Get node id of my_sink_name + my_sink_id = [] + my_sink_node_group = [] + for pw_obj in my_pw_dump: + if pw_obj["type"] != "PipeWire:Interface:Node": + continue + if pw_obj["info"]["props"]["node.name"] != my_sink_name: + continue + my_sink_id.append(pw_obj["id"]) + my_sink_node_group.append(pw_obj["info"]["props"]["node.group"]) + print(f".. DBG: my_sink_id {my_sink_id}") + print(f".. DBG: my_sink_node_group {my_sink_node_group}") + + # Get associated ports id of my_sink_name + my_sink_object_path = my_sink_node_group[0]+":playback" + print(f".. DBG: my_sink_object_path {my_sink_object_path}") + my_sink_ports_id = [] + for pw_obj in my_pw_dump: + if pw_obj["type"] != "PipeWire:Interface:Port": + continue + if "object.path" not in pw_obj["info"]["props"]: + continue + if not my_sink_object_path in pw_obj["info"]["props"]["object.path"]: + continue + my_sink_ports_id.append(pw_obj["id"]) + print(f".. DBG: my_sink_ports_id {my_sink_ports_id}") + + # Move stream to the virtual node + pw_link(my_jackplay_ports_id[0], my_sink_ports_id[0]) + pw_link(my_jackplay_ports_id[1], my_sink_ports_id[1]) + + my_pw_dump = pw_dump("pw-dump-temp-"+my_sink_name+"-3.json") + pw_dot("pw-dot-temp-"+my_sink_name+"-3.png") + + # Get id of virtual node + my_sink_node_group = None + for pw_obj in my_pw_dump: + if pw_obj["type"] != "PipeWire:Interface:Node": + continue + if pw_obj["id"] != my_sink_id[0]: + continue + my_sink_node_group = pw_obj["info"]["props"]["node.group"] + + my_loopback_output = None + for pw_obj in my_pw_dump: + if pw_obj["type"] != "PipeWire:Interface:Node": + continue + if pw_obj["info"]["props"]["node.name"] != ("output."+my_sink_node_group): + continue + my_loopback_output = pw_obj['id'] + + # Assign virtual node to device_playback + print(f"Send sound to {device_playback}") + pw_metadata(my_loopback_output, device_playback) + + my_pw_dump = pw_dump("pw-dump-temp-"+my_sink_name+"-3.json") + pw_dot("pw-dot-temp-"+my_sink_name+"-3.png") + + # Delete previous links + delete_useless_pw_links(my_pw_dump, my_jackplay_ports_id, my_sink_ports_id) + + my_pw_dump = pw_dump("pw-dump-temp-"+my_sink_name+"-4.json") + pw_dot("pw-dot-temp-"+my_sink_name+"-4.png") + + # This is required to identify which jackplay are already processsed + return my_jackplay_id + + +def find_outputs(): + dump = pw_dump() + outputs = [] + for obj in dump: + if obj["type"] != "PipeWire:Interface:Node": + continue + if obj["info"]["props"].get("media.class") != "Audio/Sink": + continue + outputs.append(obj["info"]["props"]["node.name"]) + return outputs + + +def start_capture_inputs(my_inputs, my_sound_cards): + + ########## + ## Input 1 will be redirected to LEFT side of the two outputs + my_loopback_input_1 = "input_1" + + create_loopback_input_device(my_loopback_input_1) + pw_dot("pw-dot-temp-input-10.png") + + pw_link(my_inputs[0], my_loopback_input_1) + pw_dot("pw-dot-temp-input-11.png") + + # Merge R in L + pw_link(my_loopback_input_1+":output_FR", my_sound_cards[0]+":playback_FL") + + # Drop R + pw_link_delete(my_loopback_input_1+":output_FR", my_sound_cards[0]+":playback_FR") + pw_dot("pw-dot-temp-input-12.png") + + # Sink to the second output device, FL side only + pw_link(my_loopback_input_1+":output_FL", my_sound_cards[1]+":playback_FL") + pw_link(my_loopback_input_1+":output_FR", my_sound_cards[1]+":playback_FL") + pw_dot("pw-dot-temp-input-13.png") + + ########## + ## Input 2 will be redirected to RIGHT side of the two outputs + my_loopback_input_2 = "input_2" + + create_loopback_input_device(my_loopback_input_2) + pw_dot("pw-dot-temp-input-20.png") + + pw_link(my_inputs[1], my_loopback_input_2) + pw_dot("pw-dot-temp-input-21.png") + + # Merge L in R + pw_link(my_loopback_input_2+":output_FL", my_sound_cards[0]+":playback_FR") + + # Drop L + pw_link_delete(my_loopback_input_2+":output_FL", my_sound_cards[0]+":playback_FL") + pw_dot("pw-dot-temp-input-22.png") + + # Sink to the second output device, FR side only + pw_link(my_loopback_input_2+":output_FL", my_sound_cards[1]+":playback_FR") + pw_link(my_loopback_input_2+":output_FR", my_sound_cards[1]+":playback_FR") + pw_dot("pw-dot-temp-input-23.png") + + ########## + # Graph is now installed. Time to stop capture and recapture in loop + print("\nEverything is ready for capture!") + print("Inputs capture will start and stop repeatedly") + print("Press any key to start the test...") + print("... then press any key to stop the test") + input() + for icycle in cycle([1,2]): + if icycle == 1: + print("Stop capturing inputs") + pw_link_delete(my_inputs[0], my_loopback_input_1) + pw_link_delete(my_inputs[1], my_loopback_input_2) + pw_dot("pw-dot-temp-input-stop.png") + else: + print("Start capturing inputs") + pw_link(my_inputs[0], my_loopback_input_1) + pw_link(my_inputs[1], my_loopback_input_2) + pw_dot("pw-dot-temp-input-start.png") + a, b, c = select.select([sys.stdin], [], [], 5) + if (a): + break + + +if __name__ == "__main__": + print("Run test-case 3") + + # To avoid interference from previous run + kill_my_sinks() + + #my_sound_cards = find_outputs() + my_sound_cards = ["alsa_output.usb-0d8c_USB_Sound_Device-00.analog-stereo", + "alsa_output.platform-sound.stereo-fallback"] # FIXME + + my_inputs = ["alsa_input.usb-0d8c_USB_Sound_Device-00.analog-stereo", + "alsa_input.platform-sound.stereo-fallback"] # FIXME + + # Make sound with USB sound card + my_jackplay_id_used_A = start_sink("my-sink-A", "test300.wav", my_sound_cards[0]) + + # Make sound with onboard card + my_jackplay_id_used_B = start_sink("my-sink-B", "test800.wav", my_sound_cards[1], my_jackplay_id_used_A) + + # Capture inputs + start_capture_inputs(my_inputs, my_sound_cards) + + # End of test case 3 + print("Press any key to stop the playback...") + input() + kill_my_sinks()