Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
P
pipewire-evaluation
Manage
Activity
Members
Labels
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
docs
pipewire-evaluation
Commits
eb0bd9d5
Commit
eb0bd9d5
authored
1 year ago
by
Dylan Aïssi
Browse files
Options
Downloads
Patches
Plain Diff
Add test-case-3-run.py
Signed-off-by:
Dylan Aïssi
<
dylan.aissi@collabora.com
>
parent
9603544d
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Pipeline
#625253
passed
1 year ago
Stage: build
Changes
1
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
scripts/test-case-3-run.py
+298
-0
298 additions, 0 deletions
scripts/test-case-3-run.py
with
298 additions
and
0 deletions
scripts/test-case-3-run.py
0 → 100755
+
298
−
0
View file @
eb0bd9d5
#!/usr/bin/env python3
from
itertools
import
cycle
import
json
import
select
import
subprocess
import
sys
import
time
def
pw_dump
(
name
=
None
):
cmd_pw_dump
=
[
"
pw-dump
"
]
p
=
subprocess
.
run
(
cmd_pw_dump
,
stdout
=
subprocess
.
PIPE
)
my_pw_dump
=
json
.
loads
(
p
.
stdout
)
if
name
!=
None
:
with
open
(
name
,
"
w
"
)
as
f
:
json
.
dump
(
my_pw_dump
,
f
,
indent
=
2
)
return
my_pw_dump
def
pw_dot
(
name
):
subprocess
.
run
([
"
pw-dot
"
,
"
-o
"
,
"
pw-dot-temp.dot
"
],
stderr
=
subprocess
.
DEVNULL
)
subprocess
.
run
([
"
dot
"
,
"
-Tpng
"
,
"
pw-dot-temp.dot
"
,
"
-o
"
,
name
],
stderr
=
subprocess
.
DEVNULL
)
def
pw_link
(
out_pattern
,
in_pattern
):
cmd_pw_link
=
[
"
pw-link
"
,
str
(
out_pattern
),
str
(
in_pattern
)]
subprocess
.
run
(
cmd_pw_link
)
def
pw_link_delete
(
out_pattern
,
in_pattern
):
cmd_pw_link_delete
=
[
"
pw-link
"
,
"
-d
"
,
str
(
out_pattern
),
str
(
in_pattern
)]
subprocess
.
run
(
cmd_pw_link_delete
)
def
pw_metadata
(
out_pattern
,
in_pattern
):
cmd_pw_metadata
=
[
"
pw-metadata
"
,
str
(
out_pattern
),
"
target.object
"
,
in_pattern
]
subprocess
.
run
(
cmd_pw_metadata
,
stdout
=
subprocess
.
DEVNULL
,
stderr
=
subprocess
.
DEVNULL
)
def
create_loopback_device
(
name
):
print
(
f
"
Creating a virtual device named
{
name
}
"
)
cmd_pw_loopback
=
"
pw-loopback -m
'
[FL FR]
'
--capture-props=
'
media.class=Audio/Sink node.name=
"
+
name
+
"'"
subprocess
.
Popen
(
cmd_pw_loopback
,
shell
=
True
)
time
.
sleep
(
1
)
# Wait 1 sec to be sure the loopback is up
def
create_loopback_input_device
(
name
):
print
(
f
"
Creating a virtual device named
{
name
}
"
)
cmd_pw_loopback
=
"
pw-loopback -m
'
[FL FR]
'
--capture-props=
'
media.class=Audio/Sink node.name=
"
+
name
+
"'
--playback-props=
'
node.name=
"
+
name
+
"'"
subprocess
.
Popen
(
cmd_pw_loopback
,
shell
=
True
)
time
.
sleep
(
1
)
# Wait 1 sec to be sure the loopback is up
def
start_jackplay
(
my_sound
):
print
(
f
"
Starting playing music with jackplay
"
)
cmd_jackplay
=
[
"
pw-jack
"
,
"
sndfile-jackplay
"
,
my_sound
,
"
-l
"
,
"
0
"
]
subprocess
.
Popen
(
cmd_jackplay
,
stderr
=
subprocess
.
DEVNULL
)
def
delete_useless_pw_links
(
my_pw_dump
,
output_ports
,
input_ports
):
my_useless_ports_id
=
[]
for
pw_obj
in
my_pw_dump
:
if
pw_obj
[
"
type
"
]
!=
"
PipeWire:Interface:Link
"
:
continue
if
pw_obj
[
"
info
"
][
"
output-port-id
"
]
not
in
output_ports
:
continue
if
pw_obj
[
"
info
"
][
"
input-port-id
"
]
in
input_ports
:
continue
print
(
f
"
Destroying link between
{
pw_obj
[
'
info
'
][
'
output-port-id
'
]
}
and
{
pw_obj
[
'
info
'
][
'
input-port-id
'
]
}
"
)
pw_link_delete
(
pw_obj
[
"
info
"
][
"
output-port-id
"
],
pw_obj
[
"
info
"
][
"
input-port-id
"
])
def
kill_my_sinks
():
print
(
"
Killing all sndfile-jackplay and pw-loopback
"
)
subprocess
.
run
([
"
killall
"
,
"
sndfile-jackplay
"
],
stderr
=
subprocess
.
DEVNULL
)
subprocess
.
run
([
"
killall
"
,
"
pw-loopback
"
],
stderr
=
subprocess
.
DEVNULL
)
def
start_sink
(
my_sink_name
,
my_wav
,
device_playback
,
my_id_used
=
None
):
create_loopback_device
(
my_sink_name
)
start_jackplay
(
my_wav
)
time
.
sleep
(
5
)
# Wait 5 sec to be sure it started
my_pw_dump
=
pw_dump
(
"
pw-dump-temp-
"
+
my_sink_name
+
"
-1.json
"
)
pw_dot
(
"
pw-dot-temp-
"
+
my_sink_name
+
"
-1.png
"
)
# Get node id of "jackplay"
my_jackplay_id
=
[]
for
pw_obj
in
my_pw_dump
:
if
pw_obj
[
"
type
"
]
!=
"
PipeWire:Interface:Node
"
:
continue
if
pw_obj
[
"
info
"
][
"
props
"
][
"
node.name
"
]
!=
"
jackplay
"
:
continue
if
my_id_used
:
if
pw_obj
[
"
id
"
]
in
my_id_used
:
continue
my_jackplay_id
.
append
(
pw_obj
[
"
id
"
])
# Get associated ports id of "jackplay"
my_jackplay_ports_id
=
[]
for
pw_obj
in
my_pw_dump
:
if
pw_obj
[
"
type
"
]
!=
"
PipeWire:Interface:Port
"
:
continue
if
pw_obj
[
"
info
"
][
"
props
"
][
"
node.id
"
]
!=
my_jackplay_id
[
0
]:
continue
my_jackplay_ports_id
.
append
(
pw_obj
[
"
id
"
])
print
(
f
"
.. DBG: my_jackplay_ports_id
{
my_jackplay_ports_id
}
"
)
# Get node id of my_sink_name
my_sink_id
=
[]
my_sink_node_group
=
[]
for
pw_obj
in
my_pw_dump
:
if
pw_obj
[
"
type
"
]
!=
"
PipeWire:Interface:Node
"
:
continue
if
pw_obj
[
"
info
"
][
"
props
"
][
"
node.name
"
]
!=
my_sink_name
:
continue
my_sink_id
.
append
(
pw_obj
[
"
id
"
])
my_sink_node_group
.
append
(
pw_obj
[
"
info
"
][
"
props
"
][
"
node.group
"
])
print
(
f
"
.. DBG: my_sink_id
{
my_sink_id
}
"
)
print
(
f
"
.. DBG: my_sink_node_group
{
my_sink_node_group
}
"
)
# Get associated ports id of my_sink_name
my_sink_object_path
=
my_sink_node_group
[
0
]
+
"
:playback
"
print
(
f
"
.. DBG: my_sink_object_path
{
my_sink_object_path
}
"
)
my_sink_ports_id
=
[]
for
pw_obj
in
my_pw_dump
:
if
pw_obj
[
"
type
"
]
!=
"
PipeWire:Interface:Port
"
:
continue
if
"
object.path
"
not
in
pw_obj
[
"
info
"
][
"
props
"
]:
continue
if
not
my_sink_object_path
in
pw_obj
[
"
info
"
][
"
props
"
][
"
object.path
"
]:
continue
my_sink_ports_id
.
append
(
pw_obj
[
"
id
"
])
print
(
f
"
.. DBG: my_sink_ports_id
{
my_sink_ports_id
}
"
)
# Move stream to the virtual node
pw_link
(
my_jackplay_ports_id
[
0
],
my_sink_ports_id
[
0
])
pw_link
(
my_jackplay_ports_id
[
1
],
my_sink_ports_id
[
1
])
my_pw_dump
=
pw_dump
(
"
pw-dump-temp-
"
+
my_sink_name
+
"
-3.json
"
)
pw_dot
(
"
pw-dot-temp-
"
+
my_sink_name
+
"
-3.png
"
)
# Get id of virtual node
my_sink_node_group
=
None
for
pw_obj
in
my_pw_dump
:
if
pw_obj
[
"
type
"
]
!=
"
PipeWire:Interface:Node
"
:
continue
if
pw_obj
[
"
id
"
]
!=
my_sink_id
[
0
]:
continue
my_sink_node_group
=
pw_obj
[
"
info
"
][
"
props
"
][
"
node.group
"
]
my_loopback_output
=
None
for
pw_obj
in
my_pw_dump
:
if
pw_obj
[
"
type
"
]
!=
"
PipeWire:Interface:Node
"
:
continue
if
pw_obj
[
"
info
"
][
"
props
"
][
"
node.name
"
]
!=
(
"
output.
"
+
my_sink_node_group
):
continue
my_loopback_output
=
pw_obj
[
'
id
'
]
# Assign virtual node to device_playback
print
(
f
"
Send sound to
{
device_playback
}
"
)
pw_metadata
(
my_loopback_output
,
device_playback
)
my_pw_dump
=
pw_dump
(
"
pw-dump-temp-
"
+
my_sink_name
+
"
-3.json
"
)
pw_dot
(
"
pw-dot-temp-
"
+
my_sink_name
+
"
-3.png
"
)
# Delete previous links
delete_useless_pw_links
(
my_pw_dump
,
my_jackplay_ports_id
,
my_sink_ports_id
)
my_pw_dump
=
pw_dump
(
"
pw-dump-temp-
"
+
my_sink_name
+
"
-4.json
"
)
pw_dot
(
"
pw-dot-temp-
"
+
my_sink_name
+
"
-4.png
"
)
# This is required to identify which jackplay are already processsed
return
my_jackplay_id
def
find_outputs
():
dump
=
pw_dump
()
outputs
=
[]
for
obj
in
dump
:
if
obj
[
"
type
"
]
!=
"
PipeWire:Interface:Node
"
:
continue
if
obj
[
"
info
"
][
"
props
"
].
get
(
"
media.class
"
)
!=
"
Audio/Sink
"
:
continue
outputs
.
append
(
obj
[
"
info
"
][
"
props
"
][
"
node.name
"
])
return
outputs
def
start_capture_inputs
(
my_inputs
,
my_sound_cards
):
##########
## Input 1 will be redirected to LEFT side of the two outputs
my_loopback_input_1
=
"
input_1
"
create_loopback_input_device
(
my_loopback_input_1
)
pw_dot
(
"
pw-dot-temp-input-10.png
"
)
pw_link
(
my_inputs
[
0
],
my_loopback_input_1
)
pw_dot
(
"
pw-dot-temp-input-11.png
"
)
# Merge R in L
pw_link
(
my_loopback_input_1
+
"
:output_FR
"
,
my_sound_cards
[
0
]
+
"
:playback_FL
"
)
# Drop R
pw_link_delete
(
my_loopback_input_1
+
"
:output_FR
"
,
my_sound_cards
[
0
]
+
"
:playback_FR
"
)
pw_dot
(
"
pw-dot-temp-input-12.png
"
)
# Sink to the second output device, FL side only
pw_link
(
my_loopback_input_1
+
"
:output_FL
"
,
my_sound_cards
[
1
]
+
"
:playback_FL
"
)
pw_link
(
my_loopback_input_1
+
"
:output_FR
"
,
my_sound_cards
[
1
]
+
"
:playback_FL
"
)
pw_dot
(
"
pw-dot-temp-input-13.png
"
)
##########
## Input 2 will be redirected to RIGHT side of the two outputs
my_loopback_input_2
=
"
input_2
"
create_loopback_input_device
(
my_loopback_input_2
)
pw_dot
(
"
pw-dot-temp-input-20.png
"
)
pw_link
(
my_inputs
[
1
],
my_loopback_input_2
)
pw_dot
(
"
pw-dot-temp-input-21.png
"
)
# Merge L in R
pw_link
(
my_loopback_input_2
+
"
:output_FL
"
,
my_sound_cards
[
0
]
+
"
:playback_FR
"
)
# Drop L
pw_link_delete
(
my_loopback_input_2
+
"
:output_FL
"
,
my_sound_cards
[
0
]
+
"
:playback_FL
"
)
pw_dot
(
"
pw-dot-temp-input-22.png
"
)
# Sink to the second output device, FR side only
pw_link
(
my_loopback_input_2
+
"
:output_FL
"
,
my_sound_cards
[
1
]
+
"
:playback_FR
"
)
pw_link
(
my_loopback_input_2
+
"
:output_FR
"
,
my_sound_cards
[
1
]
+
"
:playback_FR
"
)
pw_dot
(
"
pw-dot-temp-input-23.png
"
)
##########
# Graph is now installed. Time to stop capture and recapture in loop
print
(
"
\n
Everything is ready for capture!
"
)
print
(
"
Inputs capture will start and stop repeatedly
"
)
print
(
"
Press any key to start the test...
"
)
print
(
"
... then press any key to stop the test
"
)
input
()
for
icycle
in
cycle
([
1
,
2
]):
if
icycle
==
1
:
print
(
"
Stop capturing inputs
"
)
pw_link_delete
(
my_inputs
[
0
],
my_loopback_input_1
)
pw_link_delete
(
my_inputs
[
1
],
my_loopback_input_2
)
pw_dot
(
"
pw-dot-temp-input-stop.png
"
)
else
:
print
(
"
Start capturing inputs
"
)
pw_link
(
my_inputs
[
0
],
my_loopback_input_1
)
pw_link
(
my_inputs
[
1
],
my_loopback_input_2
)
pw_dot
(
"
pw-dot-temp-input-start.png
"
)
a
,
b
,
c
=
select
.
select
([
sys
.
stdin
],
[],
[],
5
)
if
(
a
):
break
if
__name__
==
"
__main__
"
:
print
(
"
Run test-case 3
"
)
# To avoid interference from previous run
kill_my_sinks
()
#my_sound_cards = find_outputs()
my_sound_cards
=
[
"
alsa_output.usb-0d8c_USB_Sound_Device-00.analog-stereo
"
,
"
alsa_output.platform-sound.stereo-fallback
"
]
# FIXME
my_inputs
=
[
"
alsa_input.usb-0d8c_USB_Sound_Device-00.analog-stereo
"
,
"
alsa_input.platform-sound.stereo-fallback
"
]
# FIXME
# Make sound with USB sound card
my_jackplay_id_used_A
=
start_sink
(
"
my-sink-A
"
,
"
test300.wav
"
,
my_sound_cards
[
0
])
# Make sound with onboard card
my_jackplay_id_used_B
=
start_sink
(
"
my-sink-B
"
,
"
test800.wav
"
,
my_sound_cards
[
1
],
my_jackplay_id_used_A
)
# Capture inputs
start_capture_inputs
(
my_inputs
,
my_sound_cards
)
# End of test case 3
print
(
"
Press any key to stop the playback...
"
)
input
()
kill_my_sinks
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment