Browse Source

tests: fix asyncio usage cont...

- Add missing loop.run_until_complete() calls.
- Convert subprocess.Popen to asyncio.create_subprocess_exec where
needed (when called process needs to communicate with qubesd).
- Cleanup processes (call .wait()).
Marek Marczykowski-Górecki 6 years ago
parent
commit
ed3001da99
2 changed files with 112 additions and 86 deletions
  1. 32 30
      qubes/tests/integ/dispvm.py
  2. 80 56
      qubes/tests/integ/dom0_update.py

+ 32 - 30
qubes/tests/integ/dispvm.py

@@ -27,6 +27,8 @@ import unittest
 
 from distutils import spawn
 
+import asyncio
+
 import qubes.tests
 
 class TC_04_DispVM(qubes.tests.SystemTestCase):
@@ -38,23 +40,22 @@ class TC_04_DispVM(qubes.tests.SystemTestCase):
             name=self.make_vm_name('dvm'),
             label='red',
         )
-        self.disp_base.create_on_disk()
+        self.loop.run_until_complete(self.disp_base.create_on_disk())
         self.app.default_dispvm = self.disp_base
         self.testvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
             name=self.make_vm_name('vm'),
             label='red',
         )
-        self.testvm.create_on_disk()
+        self.loop.run_until_complete(self.testvm.create_on_disk())
         self.app.save()
 
     @unittest.expectedFailure
     def test_002_cleanup(self):
-        self.testvm.start()
+        self.loop.run_until_complete(self.testvm.start())
 
-        p = self.testvm.run("qvm-run --dispvm bash", passio_popen=True)
-        (stdout, _) = p.communicate(input=b"echo test; qubesdb-read /name; "
-                                          b"echo ERROR\n")
-        self.assertEqual(p.returncode, 0)
+        (stdout, _) = self.loop.run_until_complete(
+            self.testvm.run_for_stdio("qvm-run --dispvm bash",
+                input=b"echo test; qubesdb-read /name; echo ERROR\n"))
         lines = stdout.decode('ascii').splitlines()
         self.assertEqual(lines[0], "test")
         dispvm_name = lines[1]
@@ -69,23 +70,21 @@ class TC_04_DispVM(qubes.tests.SystemTestCase):
         :return:
         """
 
-        self.testvm.start()
+        self.loop.run_until_complete(self.testvm.start())
 
-        p = self.testvm.run("qvm-run --dispvm bash; true", passio_popen=True)
+        p = self.loop.run_until_complete(
+            self.testvm.run("qvm-run --dispvm bash; true",
+                stdin=subprocess.PIPE, stdout=subprocess.PIPE))
         p.stdin.write(b"qubesdb-read /name\n")
         p.stdin.write(b"echo ERROR\n")
         p.stdin.write(b"sudo poweroff\n")
         # do not close p.stdin on purpose - wait to automatic disconnect when
         #  domain is destroyed
         timeout = 30
-        while timeout > 0:
-            if p.poll():
-                break
-            time.sleep(1)
-            timeout -= 1
-        # includes check for None - timeout
-        self.assertEqual(p.returncode, 0)
-        lines = p.stdout.read().splitlines()
+        lines_task = asyncio.ensure_future(p.stdout.read())
+        self.loop.run_until_complete(asyncio.wait_for(p.wait(), timeout))
+        self.loop.run_until_complete(lines_task)
+        lines = lines_task.result().splitlines()
         self.assertTrue(lines, 'No output received from DispVM')
         dispvm_name = lines[0]
         self.assertNotEquals(dispvm_name, b"ERROR")
@@ -110,9 +109,9 @@ class TC_20_DispVMMixin(object):
             qubes.vm.dispvm.DispVM.from_appvm(self.disp_base))
         try:
             self.loop.run_until_complete(dispvm.start())
-            p = self.loop.run_until_complete(
-                dispvm.run_service('qubes.VMShell', passio_popen=True))
-            (stdout, _) = p.communicate(input=b"echo test")
+            (stdout, _) = self.loop.run_until_complete(
+                dispvm.run_service_for_stdio('qubes.VMShell',
+                    input=b"echo test"))
             self.assertEqual(stdout, b"test\n")
         finally:
             self.loop.run_until_complete(dispvm.cleanup())
@@ -125,10 +124,12 @@ class TC_20_DispVMMixin(object):
         try:
             self.loop.run_until_complete(dispvm.start())
             p = self.loop.run_until_complete(
-                dispvm.run_service('qubes.VMShell', passio_popen=True))
+                dispvm.run_service('qubes.VMShell',
+                    stdin=subprocess.PIPE,
+                    stdout=subprocess.PIPE))
             # wait for DispVM startup:
             p.stdin.write(b"echo test\n")
-            p.stdin.flush()
+            self.loop.run_until_complete(p.stdin.drain())
             l = self.loop.run_until_complete(p.stdout.readline())
             self.assertEqual(l, b"test\n")
 
@@ -227,13 +228,14 @@ class TC_20_DispVMMixin(object):
         wait_count = 0
         winid = None
         while True:
-            search = subprocess.Popen(['xdotool', 'search',
-                                       '--onlyvisible', '--class', 'disp*'],
-                                      stdout=subprocess.PIPE,
-                                      stderr=open(os.path.devnull, 'w'))
-            retcode = search.wait()
-            if retcode == 0:
-                winid = search.stdout.read().strip()
+            search = self.loop.run_until_complete(
+                asyncio.create_subprocess_exec(
+                    'xdotool', 'search', '--onlyvisible', '--class', 'disp*',
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.DEVNULL))
+            stdout, _ = self.loop.run_until_complete(search.communicate())
+            if p.returncode == 0:
+                winid = stdout.strip()
                 # get window title
                 (window_title, _) = subprocess.Popen(
                     ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
@@ -247,7 +249,7 @@ class TC_20_DispVMMixin(object):
             wait_count += 1
             if wait_count > 100:
                 self.fail("Timeout while waiting for editor window")
-            time.sleep(0.3)
+            self.loop.run_until_complete(asyncio.sleep(0.3))
 
         time.sleep(0.5)
         self._handle_editor(winid)

+ 80 - 56
qubes/tests/integ/dom0_update.py

@@ -25,6 +25,8 @@ import subprocess
 import tempfile
 import unittest
 
+import asyncio
+
 import qubes
 import qubes.tests
 
@@ -118,8 +120,13 @@ enabled = 1
                                os.path.join(self.tmpdir, 'pubkey.asc')])
         self.loop.run_until_complete(self.updatevm.start())
         self.repo_running = False
+        self.repo_proc = None
 
     def tearDown(self):
+        if self.repo_proc:
+            self.repo_proc.terminate()
+            self.loop.run_until_complete(self.repo_proc.wait())
+            del self.repo_proc
         super(TC_00_Dom0UpgradeMixin, self).tearDown()
 
         subprocess.call(['sudo', 'rpm', '-e', self.pkg_name], stderr=open(
@@ -190,7 +197,7 @@ Test package
     def start_repo(self):
         if self.repo_running:
             return
-        self.loop.run_until_complete(self.updatevm.run(
+        self.repo_proc = self.loop.run_until_complete(self.updatevm.run(
             'cd /tmp/repo && python -m SimpleHTTPServer 8080'))
         self.repo_running = True
 
@@ -209,14 +216,17 @@ Test package
         open(self.update_flag_path, 'a').close()
 
         logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
-        try:
-            subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] +
-                                  self.dom0_update_common_opts,
-                                  stdout=open(logpath, 'w'),
-                                  stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError:
-            self.fail("qubes-dom0-update failed: " + open(
-                logpath).read())
+        with open(logpath, 'w') as f_log:
+            proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
+                'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
+                stdout=f_log,
+                stderr=subprocess.STDOUT))
+        self.loop.run_until_complete(proc.wait())
+        if proc.returncode:
+            del proc
+            with open(logpath) as f_log:
+                self.fail("qubes-dom0-update failed: " + f_log.read())
+        del proc
 
         retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
             self.pkg_name)], stdout=open(os.devnull, 'w'))
@@ -238,14 +248,17 @@ Test package
         open(self.update_flag_path, 'a').close()
 
         logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
-        try:
-            subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] +
-                                  self.dom0_update_common_opts,
-                                  stdout=open(logpath, 'w'),
-                                  stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError:
-            self.fail("qubes-dom0-update failed: " + open(
-                logpath).read())
+        with open(logpath, 'w') as f_log:
+            proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
+                'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
+                stdout=f_log,
+                stderr=subprocess.STDOUT))
+        self.loop.run_until_complete(proc.wait())
+        if proc.returncode:
+            del proc
+            with open(logpath) as f_log:
+                self.fail("qubes-dom0-update failed: " + f_log.read())
+        del proc
 
         with open(logpath) as f:
             dom0_update_output = f.read()
@@ -269,15 +282,18 @@ Test package
         if os.path.exists('/var/lib/qubes/updates/repodata'):
             shutil.rmtree('/var/lib/qubes/updates/repodata')
         logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
-        try:
-            subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y',
-                                   '--clean'] +
-                                  self.dom0_update_common_opts,
-                                  stdout=open(logpath, 'w'),
-                                  stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError:
-            self.fail("qubes-dom0-update failed: " + open(
-                logpath).read())
+        with open(logpath, 'w') as f_log:
+            proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
+                'qubes-dom0-update', '-y', '--clean',
+                *self.dom0_update_common_opts,
+                stdout=f_log,
+                stderr=subprocess.STDOUT))
+        self.loop.run_until_complete(proc.wait())
+        if proc.returncode:
+            del proc
+            with open(logpath) as f_log:
+                self.fail("qubes-dom0-update failed: " + f_log.read())
+        del proc
 
         with open(logpath) as f:
             dom0_update_output = f.read()
@@ -294,15 +310,18 @@ Test package
         self.send_pkg(filename)
 
         logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
-        try:
-            subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] +
-                                  self.dom0_update_common_opts + [
-                self.pkg_name],
-                                  stdout=open(logpath, 'w'),
-                                  stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError:
-            self.fail("qubes-dom0-update failed: " + open(
-                logpath).read())
+        with open(logpath, 'w') as f_log:
+            proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
+                'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
+                self.pkg_name,
+                stdout=f_log,
+                stderr=subprocess.STDOUT))
+        self.loop.run_until_complete(proc.wait())
+        if proc.returncode:
+            del proc
+            with open(logpath) as f_log:
+                self.fail("qubes-dom0-update failed: " + f_log.read())
+        del proc
 
         retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
             self.pkg_name)], stdout=open('/dev/null', 'w'))
@@ -316,16 +335,19 @@ Test package
         self.send_pkg(filename)
 
         logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
-        try:
-            subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] +
-                                  self.dom0_update_common_opts + [
-                self.pkg_name],
-                                  stdout=open(logpath, 'w'),
-                                  stderr=subprocess.STDOUT)
-            self.fail("qubes-dom0-update unexpectedly succeeded: " + open(
-                logpath).read())
-        except subprocess.CalledProcessError:
-            pass
+        with open(logpath, 'w') as f_log:
+            proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
+                'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
+                self.pkg_name,
+                stdout=f_log,
+                stderr=subprocess.STDOUT))
+        self.loop.run_until_complete(proc.wait())
+        if not proc.returncode:
+            del proc
+            with open(logpath) as f_log:
+                self.fail("qubes-dom0-update unexpectedly succeeded: " +
+                          f_log.read())
+        del proc
 
         retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
             self.pkg_name)], stdout=open('/dev/null', 'w'))
@@ -341,17 +363,19 @@ Test package
         self.send_pkg(filename)
 
         logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
-        try:
-            subprocess.check_call(['sudo', '-E', 'qubes-dom0-update', '-y'] +
-                                  self.dom0_update_common_opts +
-                                  [self.pkg_name],
-                                  stdout=open(logpath, 'w'),
-                                  stderr=subprocess.STDOUT
-                                  )
-            self.fail("qubes-dom0-update unexpectedly succeeded: " + open(
-                logpath).read())
-        except subprocess.CalledProcessError:
-            pass
+        with open(logpath, 'w') as f_log:
+            proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
+                'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
+                self.pkg_name,
+                stdout=f_log,
+                stderr=subprocess.STDOUT))
+        self.loop.run_until_complete(proc.wait())
+        if not proc.returncode:
+            del proc
+            with open(logpath) as f_log:
+                self.fail("qubes-dom0-update unexpectedly succeeded: " +
+                          f_log.read())
+        del proc
 
         retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
             self.pkg_name)], stdout=open('/dev/null', 'w'))