Audio: rework audio tests
Replace constant audio bytearray with generated sine wave sample to get more robust results across test environments. Use zero-crossings as an audio fingerprint and compare it with sine wave frequency.
This commit is contained in:
parent
c4739b7ed0
commit
fd7d1267d7
@ -36,6 +36,8 @@ import qubes.tests
|
|||||||
import qubes.vm.appvm
|
import qubes.vm.appvm
|
||||||
import qubes.vm.templatevm
|
import qubes.vm.templatevm
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
class TC_00_AppVMMixin(object):
|
class TC_00_AppVMMixin(object):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -422,16 +424,13 @@ class TC_00_AppVMMixin(object):
|
|||||||
local_user = grp.getgrnam('qubes').gr_mem[0]
|
local_user = grp.getgrnam('qubes').gr_mem[0]
|
||||||
p = self.loop.run_until_complete(asyncio.create_subprocess_shell(
|
p = self.loop.run_until_complete(asyncio.create_subprocess_shell(
|
||||||
"sudo -E -u {} timeout 30s sh -c '"
|
"sudo -E -u {} timeout 30s sh -c '"
|
||||||
"while ! pactl list sink-inputs | grep -q :{}; do sleep 1; done'".format(
|
"while ! pactl list sink-inputs | grep -q :{}; do sleep 1; done'"
|
||||||
local_user, vm.name)))
|
.format(local_user, vm.name)))
|
||||||
self.loop.run_until_complete(p.wait())
|
self.loop.run_until_complete(p.wait())
|
||||||
# and some more...
|
# and some more...
|
||||||
self.loop.run_until_complete(asyncio.sleep(1))
|
self.loop.run_until_complete(asyncio.sleep(1))
|
||||||
|
|
||||||
|
def is_vm_suitable(self):
|
||||||
@unittest.skipUnless(spawn.find_executable('parecord'),
|
|
||||||
"pulseaudio-utils not installed in dom0")
|
|
||||||
def test_220_audio_playback(self):
|
|
||||||
if 'whonix-gw' in self.template:
|
if 'whonix-gw' in self.template:
|
||||||
self.skipTest('whonix-gw have no audio')
|
self.skipTest('whonix-gw have no audio')
|
||||||
self.loop.run_until_complete(self.testvm1.start())
|
self.loop.run_until_complete(self.testvm1.start())
|
||||||
@ -441,21 +440,30 @@ class TC_00_AppVMMixin(object):
|
|||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
self.skipTest('pulseaudio-utils not installed in VM')
|
self.skipTest('pulseaudio-utils not installed in VM')
|
||||||
|
|
||||||
|
def common_audio_playback(self):
|
||||||
|
self.is_vm_suitable()
|
||||||
self.wait_for_pulseaudio_startup(self.testvm1)
|
self.wait_for_pulseaudio_startup(self.testvm1)
|
||||||
# generate some "audio" data
|
# sine frequency
|
||||||
audio_in = b'\x20' * 44100
|
sfreq = 4400
|
||||||
self.loop.run_until_complete(
|
# generate signal
|
||||||
self.testvm1.run_for_stdio('cat > audio_in.raw', input=audio_in))
|
audio_in = np.sin(2*np.pi*np.arange(44100)*sfreq/44100)
|
||||||
|
self.loop.run_until_complete(self.testvm1.run_for_stdio(
|
||||||
|
'pacat --raw --rate=44100 --channels=1 \
|
||||||
|
--format=float32le > audio_in.raw',
|
||||||
|
input=audio_in.astype(np.float32).tobytes()))
|
||||||
local_user = grp.getgrnam('qubes').gr_mem[0]
|
local_user = grp.getgrnam('qubes').gr_mem[0]
|
||||||
with tempfile.NamedTemporaryFile() as recorded_audio:
|
with tempfile.NamedTemporaryFile() as recorded_audio:
|
||||||
os.chmod(recorded_audio.name, 0o666)
|
os.chmod(recorded_audio.name, 0o666)
|
||||||
# FIXME: -d 0 assumes only one audio device
|
# FIXME: -d 0 assumes only one audio device
|
||||||
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
|
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
|
||||||
'parecord', '-d', '0', '--raw', recorded_audio.name],
|
'parecord', '-d', '0', '--raw',
|
||||||
stdout=subprocess.PIPE)
|
'--format=float32le', '--rate=44100', '--channels=1',
|
||||||
|
recorded_audio.name], stdout=subprocess.PIPE)
|
||||||
try:
|
try:
|
||||||
self.loop.run_until_complete(
|
self.loop.run_until_complete(
|
||||||
self.testvm1.run_for_stdio('paplay --raw audio_in.raw'))
|
self.testvm1.run_for_stdio(
|
||||||
|
'paplay --format=float32le --rate=44100 \
|
||||||
|
--channels=1 --raw audio_in.raw'))
|
||||||
except subprocess.CalledProcessError as err:
|
except subprocess.CalledProcessError as err:
|
||||||
self.fail('{} stderr: {}'.format(str(err), err.stderr))
|
self.fail('{} stderr: {}'.format(str(err), err.stderr))
|
||||||
# wait for possible parecord buffering
|
# wait for possible parecord buffering
|
||||||
@ -464,15 +472,16 @@ class TC_00_AppVMMixin(object):
|
|||||||
# for some reason sudo do not relay SIGTERM sent above
|
# for some reason sudo do not relay SIGTERM sent above
|
||||||
subprocess.check_call(['pkill', 'parecord'])
|
subprocess.check_call(['pkill', 'parecord'])
|
||||||
p.wait()
|
p.wait()
|
||||||
# allow up to 20ms missing, don't use assertIn, to avoid printing
|
rec = np.fromstring(recorded_audio.file.read(), dtype=np.float32)
|
||||||
# the whole data in error message
|
# find zero crossings
|
||||||
recorded_audio = recorded_audio.file.read()
|
crossings = np.nonzero((rec[1:] > 0) & (rec[:-1] < 0))[0]
|
||||||
if audio_in[:-3528] not in recorded_audio:
|
np.seterr('raise')
|
||||||
found_bytes = recorded_audio.count(audio_in[0])
|
# compare against sine wave frequency
|
||||||
all_bytes = len(audio_in)
|
rec_freq = len(rec)/np.mean(np.diff(crossings))
|
||||||
|
if not sfreq*0.8 < rec_freq < sfreq*1.2:
|
||||||
self.fail('played sound not found in dom0, '
|
self.fail('played sound not found in dom0, '
|
||||||
'missing {} bytes out of {}'.format(
|
'frequency {} not in specified range'
|
||||||
all_bytes-found_bytes, all_bytes))
|
.format(rec_freq))
|
||||||
|
|
||||||
def _configure_audio_recording(self, vm):
|
def _configure_audio_recording(self, vm):
|
||||||
'''Connect VM's output-source to sink monitor instead of mic'''
|
'''Connect VM's output-source to sink monitor instead of mic'''
|
||||||
@ -497,18 +506,8 @@ class TC_00_AppVMMixin(object):
|
|||||||
subprocess.check_call(sudo +
|
subprocess.check_call(sudo +
|
||||||
['pacmd', 'move-source-output', last_index, '0'])
|
['pacmd', 'move-source-output', last_index, '0'])
|
||||||
|
|
||||||
@unittest.skipUnless(spawn.find_executable('parecord'),
|
def common_audio_record_muted(self):
|
||||||
"pulseaudio-utils not installed in dom0")
|
self.is_vm_suitable()
|
||||||
def test_221_audio_record_muted(self):
|
|
||||||
if 'whonix-gw' in self.template:
|
|
||||||
self.skipTest('whonix-gw have no audio')
|
|
||||||
self.loop.run_until_complete(self.testvm1.start())
|
|
||||||
try:
|
|
||||||
self.loop.run_until_complete(
|
|
||||||
self.testvm1.run_for_stdio('which parecord'))
|
|
||||||
except subprocess.CalledProcessError:
|
|
||||||
self.skipTest('pulseaudio-utils not installed in VM')
|
|
||||||
|
|
||||||
self.wait_for_pulseaudio_startup(self.testvm1)
|
self.wait_for_pulseaudio_startup(self.testvm1)
|
||||||
# connect VM's recording source output monitor (instead of mic)
|
# connect VM's recording source output monitor (instead of mic)
|
||||||
self._configure_audio_recording(self.testvm1)
|
self._configure_audio_recording(self.testvm1)
|
||||||
@ -535,53 +534,79 @@ class TC_00_AppVMMixin(object):
|
|||||||
if audio_in[:32] in recorded_audio:
|
if audio_in[:32] in recorded_audio:
|
||||||
self.fail('VM recorded something, even though mic disabled')
|
self.fail('VM recorded something, even though mic disabled')
|
||||||
|
|
||||||
@unittest.skipUnless(spawn.find_executable('parecord'),
|
def common_audio_record_unmuted(self):
|
||||||
"pulseaudio-utils not installed in dom0")
|
self.is_vm_suitable()
|
||||||
def test_222_audio_record_unmuted(self):
|
|
||||||
if 'whonix-gw' in self.template:
|
|
||||||
self.skipTest('whonix-gw have no audio')
|
|
||||||
self.loop.run_until_complete(self.testvm1.start())
|
|
||||||
try:
|
|
||||||
self.loop.run_until_complete(
|
|
||||||
self.testvm1.run_for_stdio('which parecord'))
|
|
||||||
except subprocess.CalledProcessError:
|
|
||||||
self.skipTest('pulseaudio-utils not installed in VM')
|
|
||||||
|
|
||||||
self.wait_for_pulseaudio_startup(self.testvm1)
|
self.wait_for_pulseaudio_startup(self.testvm1)
|
||||||
da = qubes.devices.DeviceAssignment(self.app.domains[0], 'mic')
|
deva = qubes.devices.DeviceAssignment(self.app.domains[0], 'mic')
|
||||||
self.loop.run_until_complete(
|
self.loop.run_until_complete(
|
||||||
self.testvm1.devices['mic'].attach(da))
|
self.testvm1.devices['mic'].attach(deva))
|
||||||
# connect VM's recording source output monitor (instead of mic)
|
# connect VM's recording source output monitor (instead of mic)
|
||||||
self._configure_audio_recording(self.testvm1)
|
self._configure_audio_recording(self.testvm1)
|
||||||
|
sfreq = 4400
|
||||||
# generate some "audio" data
|
audio_in = np.sin(2*np.pi*np.arange(44100)*sfreq/44100)
|
||||||
audio_in = b'\x20' * 44100
|
|
||||||
local_user = grp.getgrnam('qubes').gr_mem[0]
|
local_user = grp.getgrnam('qubes').gr_mem[0]
|
||||||
record = self.loop.run_until_complete(
|
record = self.loop.run_until_complete(self.testvm1.run(
|
||||||
self.testvm1.run('parecord --raw audio_rec.raw'))
|
'parecord --raw --format=float32le --rate=44100 \
|
||||||
|
--channels=1 audio_rec.raw'))
|
||||||
# give it time to start recording
|
# give it time to start recording
|
||||||
self.loop.run_until_complete(asyncio.sleep(0.5))
|
|
||||||
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
|
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
|
||||||
'paplay', '--raw'],
|
'paplay', '--raw', '--format=float32le',
|
||||||
|
'--rate=44100', '--channels=1'],
|
||||||
stdin=subprocess.PIPE)
|
stdin=subprocess.PIPE)
|
||||||
p.communicate(audio_in)
|
p.communicate(audio_in.astype(np.float32).tobytes())
|
||||||
# wait for possible parecord buffering
|
|
||||||
self.loop.run_until_complete(asyncio.sleep(1))
|
|
||||||
self.loop.run_until_complete(
|
self.loop.run_until_complete(
|
||||||
self.testvm1.run_for_stdio('pkill parecord || :'))
|
self.testvm1.run_for_stdio('pkill parecord || :'))
|
||||||
_, record_stderr = self.loop.run_until_complete(record.communicate())
|
_, record_stderr = self.loop.run_until_complete(record.communicate())
|
||||||
if record_stderr:
|
if record_stderr:
|
||||||
self.fail('parecord printed something on stderr: {}'.format(
|
self.fail('parecord printed something on stderr: {}'.format(
|
||||||
record_stderr))
|
record_stderr))
|
||||||
|
|
||||||
recorded_audio, _ = self.loop.run_until_complete(
|
recorded_audio, _ = self.loop.run_until_complete(
|
||||||
self.testvm1.run_for_stdio('cat audio_rec.raw'))
|
self.testvm1.run_for_stdio('cat audio_rec.raw'))
|
||||||
# allow up to 20ms to be missing
|
rec = np.fromstring(recorded_audio, dtype=np.float32)
|
||||||
if audio_in[:-3528] not in recorded_audio:
|
crossings = np.nonzero((rec[1:] > 0) & (rec[:-1] < 0))[0]
|
||||||
found_bytes = recorded_audio.count(audio_in[0])
|
np.seterr('raise')
|
||||||
all_bytes = len(audio_in)
|
rec_freq = len(rec)/np.mean(np.diff(crossings))
|
||||||
|
if not sfreq*0.8 < rec_freq < sfreq*1.2:
|
||||||
self.fail('VM not recorded expected data, '
|
self.fail('VM not recorded expected data, '
|
||||||
'missing {} bytes out of {}'.format(
|
'frequency {} not in specified range'
|
||||||
all_bytes-found_bytes, all_bytes))
|
.format(rec_freq))
|
||||||
|
|
||||||
|
@unittest.skipUnless(spawn.find_executable('parecord'),
|
||||||
|
"pulseaudio-utils not installed in dom0")
|
||||||
|
def test_220_audio_play(self):
|
||||||
|
self.common_audio_playback()
|
||||||
|
|
||||||
|
@unittest.skipUnless(spawn.find_executable('parecord'),
|
||||||
|
"pulseaudio-utils not installed in dom0")
|
||||||
|
def test_221_audio_rec_muted(self):
|
||||||
|
self.common_audio_record_muted()
|
||||||
|
|
||||||
|
@unittest.skipUnless(spawn.find_executable('parecord'),
|
||||||
|
"pulseaudio-utils not installed in dom0")
|
||||||
|
def test_222_audio_rec_unmuted(self):
|
||||||
|
self.common_audio_record_unmuted()
|
||||||
|
|
||||||
|
@unittest.skipUnless(spawn.find_executable('parecord'),
|
||||||
|
"pulseaudio-utils not installed in dom0")
|
||||||
|
def test_223_audio_play_hvm(self):
|
||||||
|
self.testvm1.virt_mode = 'hvm'
|
||||||
|
self.testvm1.features['audio-model'] = 'ich6'
|
||||||
|
self.common_audio_playback()
|
||||||
|
|
||||||
|
@unittest.skipUnless(spawn.find_executable('parecord'),
|
||||||
|
"pulseaudio-utils not installed in dom0")
|
||||||
|
def test_224_audio_rec_muted_hvm(self):
|
||||||
|
self.testvm1.virt_mode = 'hvm'
|
||||||
|
self.testvm1.features['audio-model'] = 'ich6'
|
||||||
|
self.common_audio_record_muted()
|
||||||
|
|
||||||
|
@unittest.skipUnless(spawn.find_executable('parecord'),
|
||||||
|
"pulseaudio-utils not installed in dom0")
|
||||||
|
def test_225_audio_rec_unmuted_hvm(self):
|
||||||
|
self.testvm1.virt_mode = 'hvm'
|
||||||
|
self.testvm1.features['audio-model'] = 'ich6'
|
||||||
|
self.common_audio_record_unmuted()
|
||||||
|
|
||||||
def test_250_resize_private_img(self):
|
def test_250_resize_private_img(self):
|
||||||
"""
|
"""
|
||||||
|
Loading…
Reference in New Issue
Block a user