From ed3001da995ebaa9f64a66c5f465c7d58ac79cf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Thu, 28 Sep 2017 02:55:24 +0200 Subject: [PATCH] tests: fix asyncio usage cont... - Add missing loop.run_until_complete() calls. - Convert subprocess.Popen to asyncio.create_subprocess_exec where needed (when called process needs to communicate with qubesd). - Cleanup processes (call .wait()). --- qubes/tests/integ/dispvm.py | 62 +++++++------- qubes/tests/integ/dom0_update.py | 136 ++++++++++++++++++------------- 2 files changed, 112 insertions(+), 86 deletions(-) diff --git a/qubes/tests/integ/dispvm.py b/qubes/tests/integ/dispvm.py index b370159a..5a1a318e 100644 --- a/qubes/tests/integ/dispvm.py +++ b/qubes/tests/integ/dispvm.py @@ -27,6 +27,8 @@ import unittest from distutils import spawn +import asyncio + import qubes.tests class TC_04_DispVM(qubes.tests.SystemTestCase): @@ -38,23 +40,22 @@ class TC_04_DispVM(qubes.tests.SystemTestCase): name=self.make_vm_name('dvm'), label='red', ) - self.disp_base.create_on_disk() + self.loop.run_until_complete(self.disp_base.create_on_disk()) self.app.default_dispvm = self.disp_base self.testvm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('vm'), label='red', ) - self.testvm.create_on_disk() + self.loop.run_until_complete(self.testvm.create_on_disk()) self.app.save() @unittest.expectedFailure def test_002_cleanup(self): - self.testvm.start() + self.loop.run_until_complete(self.testvm.start()) - p = self.testvm.run("qvm-run --dispvm bash", passio_popen=True) - (stdout, _) = p.communicate(input=b"echo test; qubesdb-read /name; " - b"echo ERROR\n") - self.assertEqual(p.returncode, 0) + (stdout, _) = self.loop.run_until_complete( + self.testvm.run_for_stdio("qvm-run --dispvm bash", + input=b"echo test; qubesdb-read /name; echo ERROR\n")) lines = stdout.decode('ascii').splitlines() self.assertEqual(lines[0], "test") dispvm_name = lines[1] @@ -69,23 +70,21 @@ class TC_04_DispVM(qubes.tests.SystemTestCase): :return: """ - self.testvm.start() + self.loop.run_until_complete(self.testvm.start()) - p = self.testvm.run("qvm-run --dispvm bash; true", passio_popen=True) + p = self.loop.run_until_complete( + self.testvm.run("qvm-run --dispvm bash; true", + stdin=subprocess.PIPE, stdout=subprocess.PIPE)) p.stdin.write(b"qubesdb-read /name\n") p.stdin.write(b"echo ERROR\n") p.stdin.write(b"sudo poweroff\n") # do not close p.stdin on purpose - wait to automatic disconnect when # domain is destroyed timeout = 30 - while timeout > 0: - if p.poll(): - break - time.sleep(1) - timeout -= 1 - # includes check for None - timeout - self.assertEqual(p.returncode, 0) - lines = p.stdout.read().splitlines() + lines_task = asyncio.ensure_future(p.stdout.read()) + self.loop.run_until_complete(asyncio.wait_for(p.wait(), timeout)) + self.loop.run_until_complete(lines_task) + lines = lines_task.result().splitlines() self.assertTrue(lines, 'No output received from DispVM') dispvm_name = lines[0] self.assertNotEquals(dispvm_name, b"ERROR") @@ -110,9 +109,9 @@ class TC_20_DispVMMixin(object): qubes.vm.dispvm.DispVM.from_appvm(self.disp_base)) try: self.loop.run_until_complete(dispvm.start()) - p = self.loop.run_until_complete( - dispvm.run_service('qubes.VMShell', passio_popen=True)) - (stdout, _) = p.communicate(input=b"echo test") + (stdout, _) = self.loop.run_until_complete( + dispvm.run_service_for_stdio('qubes.VMShell', + input=b"echo test")) self.assertEqual(stdout, b"test\n") finally: self.loop.run_until_complete(dispvm.cleanup()) @@ -125,10 +124,12 @@ class TC_20_DispVMMixin(object): try: self.loop.run_until_complete(dispvm.start()) p = self.loop.run_until_complete( - dispvm.run_service('qubes.VMShell', passio_popen=True)) + dispvm.run_service('qubes.VMShell', + stdin=subprocess.PIPE, + stdout=subprocess.PIPE)) # wait for DispVM startup: p.stdin.write(b"echo test\n") - p.stdin.flush() + self.loop.run_until_complete(p.stdin.drain()) l = self.loop.run_until_complete(p.stdout.readline()) self.assertEqual(l, b"test\n") @@ -227,13 +228,14 @@ class TC_20_DispVMMixin(object): wait_count = 0 winid = None while True: - search = subprocess.Popen(['xdotool', 'search', - '--onlyvisible', '--class', 'disp*'], - stdout=subprocess.PIPE, - stderr=open(os.path.devnull, 'w')) - retcode = search.wait() - if retcode == 0: - winid = search.stdout.read().strip() + search = self.loop.run_until_complete( + asyncio.create_subprocess_exec( + 'xdotool', 'search', '--onlyvisible', '--class', 'disp*', + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL)) + stdout, _ = self.loop.run_until_complete(search.communicate()) + if p.returncode == 0: + winid = stdout.strip() # get window title (window_title, _) = subprocess.Popen( ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \ @@ -247,7 +249,7 @@ class TC_20_DispVMMixin(object): wait_count += 1 if wait_count > 100: self.fail("Timeout while waiting for editor window") - time.sleep(0.3) + self.loop.run_until_complete(asyncio.sleep(0.3)) time.sleep(0.5) self._handle_editor(winid) diff --git a/qubes/tests/integ/dom0_update.py b/qubes/tests/integ/dom0_update.py index 177f4331..7898d527 100644 --- a/qubes/tests/integ/dom0_update.py +++ b/qubes/tests/integ/dom0_update.py @@ -25,6 +25,8 @@ import subprocess import tempfile import unittest +import asyncio + import qubes import qubes.tests @@ -118,8 +120,13 @@ enabled = 1 os.path.join(self.tmpdir, 'pubkey.asc')]) self.loop.run_until_complete(self.updatevm.start()) self.repo_running = False + self.repo_proc = None def tearDown(self): + if self.repo_proc: + self.repo_proc.terminate() + self.loop.run_until_complete(self.repo_proc.wait()) + del self.repo_proc super(TC_00_Dom0UpgradeMixin, self).tearDown() subprocess.call(['sudo', 'rpm', '-e', self.pkg_name], stderr=open( @@ -190,7 +197,7 @@ Test package def start_repo(self): if self.repo_running: return - self.loop.run_until_complete(self.updatevm.run( + self.repo_proc = self.loop.run_until_complete(self.updatevm.run( 'cd /tmp/repo && python -m SimpleHTTPServer 8080')) self.repo_running = True @@ -209,14 +216,17 @@ Test package open(self.update_flag_path, 'a').close() logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt') - try: - subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] + - self.dom0_update_common_opts, - stdout=open(logpath, 'w'), - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError: - self.fail("qubes-dom0-update failed: " + open( - logpath).read()) + with open(logpath, 'w') as f_log: + proc = self.loop.run_until_complete(asyncio.create_subprocess_exec( + 'qubes-dom0-update', '-y', *self.dom0_update_common_opts, + stdout=f_log, + stderr=subprocess.STDOUT)) + self.loop.run_until_complete(proc.wait()) + if proc.returncode: + del proc + with open(logpath) as f_log: + self.fail("qubes-dom0-update failed: " + f_log.read()) + del proc retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format( self.pkg_name)], stdout=open(os.devnull, 'w')) @@ -238,14 +248,17 @@ Test package open(self.update_flag_path, 'a').close() logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt') - try: - subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] + - self.dom0_update_common_opts, - stdout=open(logpath, 'w'), - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError: - self.fail("qubes-dom0-update failed: " + open( - logpath).read()) + with open(logpath, 'w') as f_log: + proc = self.loop.run_until_complete(asyncio.create_subprocess_exec( + 'qubes-dom0-update', '-y', *self.dom0_update_common_opts, + stdout=f_log, + stderr=subprocess.STDOUT)) + self.loop.run_until_complete(proc.wait()) + if proc.returncode: + del proc + with open(logpath) as f_log: + self.fail("qubes-dom0-update failed: " + f_log.read()) + del proc with open(logpath) as f: dom0_update_output = f.read() @@ -269,15 +282,18 @@ Test package if os.path.exists('/var/lib/qubes/updates/repodata'): shutil.rmtree('/var/lib/qubes/updates/repodata') logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt') - try: - subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y', - '--clean'] + - self.dom0_update_common_opts, - stdout=open(logpath, 'w'), - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError: - self.fail("qubes-dom0-update failed: " + open( - logpath).read()) + with open(logpath, 'w') as f_log: + proc = self.loop.run_until_complete(asyncio.create_subprocess_exec( + 'qubes-dom0-update', '-y', '--clean', + *self.dom0_update_common_opts, + stdout=f_log, + stderr=subprocess.STDOUT)) + self.loop.run_until_complete(proc.wait()) + if proc.returncode: + del proc + with open(logpath) as f_log: + self.fail("qubes-dom0-update failed: " + f_log.read()) + del proc with open(logpath) as f: dom0_update_output = f.read() @@ -294,15 +310,18 @@ Test package self.send_pkg(filename) logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt') - try: - subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] + - self.dom0_update_common_opts + [ - self.pkg_name], - stdout=open(logpath, 'w'), - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError: - self.fail("qubes-dom0-update failed: " + open( - logpath).read()) + with open(logpath, 'w') as f_log: + proc = self.loop.run_until_complete(asyncio.create_subprocess_exec( + 'qubes-dom0-update', '-y', *self.dom0_update_common_opts, + self.pkg_name, + stdout=f_log, + stderr=subprocess.STDOUT)) + self.loop.run_until_complete(proc.wait()) + if proc.returncode: + del proc + with open(logpath) as f_log: + self.fail("qubes-dom0-update failed: " + f_log.read()) + del proc retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format( self.pkg_name)], stdout=open('/dev/null', 'w')) @@ -316,16 +335,19 @@ Test package self.send_pkg(filename) logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt') - try: - subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] + - self.dom0_update_common_opts + [ - self.pkg_name], - stdout=open(logpath, 'w'), - stderr=subprocess.STDOUT) - self.fail("qubes-dom0-update unexpectedly succeeded: " + open( - logpath).read()) - except subprocess.CalledProcessError: - pass + with open(logpath, 'w') as f_log: + proc = self.loop.run_until_complete(asyncio.create_subprocess_exec( + 'qubes-dom0-update', '-y', *self.dom0_update_common_opts, + self.pkg_name, + stdout=f_log, + stderr=subprocess.STDOUT)) + self.loop.run_until_complete(proc.wait()) + if not proc.returncode: + del proc + with open(logpath) as f_log: + self.fail("qubes-dom0-update unexpectedly succeeded: " + + f_log.read()) + del proc retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format( self.pkg_name)], stdout=open('/dev/null', 'w')) @@ -341,17 +363,19 @@ Test package self.send_pkg(filename) logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt') - try: - subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] + - self.dom0_update_common_opts + - [self.pkg_name], - stdout=open(logpath, 'w'), - stderr=subprocess.STDOUT - ) - self.fail("qubes-dom0-update unexpectedly succeeded: " + open( - logpath).read()) - except subprocess.CalledProcessError: - pass + with open(logpath, 'w') as f_log: + proc = self.loop.run_until_complete(asyncio.create_subprocess_exec( + 'qubes-dom0-update', '-y', *self.dom0_update_common_opts, + self.pkg_name, + stdout=f_log, + stderr=subprocess.STDOUT)) + self.loop.run_until_complete(proc.wait()) + if not proc.returncode: + del proc + with open(logpath) as f_log: + self.fail("qubes-dom0-update unexpectedly succeeded: " + + f_log.read()) + del proc retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format( self.pkg_name)], stdout=open('/dev/null', 'w'))