Browse Source

tests: reenable some qrexec tests, convert them to py3k/asyncio

Marek Marczykowski-Górecki 5 years ago
parent
commit
ac8b8a3ad4
1 changed files with 83 additions and 55 deletions
  1. 83 55
      qubes/tests/integ/vm_qrexec_gui.py

+ 83 - 55
qubes/tests/integ/vm_qrexec_gui.py

@@ -223,7 +223,6 @@ class TC_00_AppVMMixin(object):
         self.assertFalse(stderr,
             'Some data was printed to stderr')
 
-    @unittest.skip('#2851, because there is no GUI in vm')
     def test_051_qrexec_simple_eof_reverse(self):
         """Test for EOF transmission VM->dom0"""
 
@@ -241,7 +240,7 @@ class TC_00_AppVMMixin(object):
             p.stdin.write(TEST_DATA)
             yield from p.stdin.drain()
             p.stdin.close()
-            self.assertEqual(stdout.strip(), 'test',
+            self.assertEqual(stdout.strip(), b'test',
                 'Received data differs from what was expected')
             # this may hang in some buggy cases
             self.assertFalse((yield from p.stderr.read()),
@@ -254,15 +253,18 @@ class TC_00_AppVMMixin(object):
                     "probably EOF wasn't transferred from the VM process")
 
         self.loop.run_until_complete(self.testvm1.start())
+        self.loop.run_until_complete(self.wait_for_session(self.testvm1))
         self.loop.run_until_complete(run(self))
 
-    @unittest.skip('#2851, because there is no GUI in vm')
     def test_052_qrexec_vm_service_eof(self):
         """Test for EOF transmission VM(src)->VM(dst)"""
 
         self.loop.run_until_complete(asyncio.wait([
             self.testvm1.start(),
             self.testvm2.start()]))
+        self.loop.run_until_complete(asyncio.wait([
+            self.wait_for_session(self.testvm1),
+            self.wait_for_session(self.testvm2)]))
         self.loop.run_until_complete(self.testvm2.run_for_stdio(
             'cat > /etc/qubes-rpc/test.EOF',
             user='root',
@@ -279,7 +281,7 @@ class TC_00_AppVMMixin(object):
             except asyncio.TimeoutError:
                 self.fail("Timeout, probably EOF wasn't transferred")
 
-        self.assertEqual(stdout, b'test',
+        self.assertEqual(stdout, b'test\n',
             'Received data differs from what was expected')
 
     @unittest.expectedFailure
@@ -400,23 +402,14 @@ class TC_00_AppVMMixin(object):
             except asyncio.TimeoutError:
                 self.fail('Timeout, probably deadlock')
 
-    @unittest.skip('localcmd= argument went away')
     def test_071_qrexec_dom0_simultaneous_write(self):
         """Test for simultaneous write in dom0(src)->VM(dst) connection
 
             Similar to test_070_qrexec_vm_simultaneous_write, but with dom0
             as a source.
         """
-        def run(self):
-            result.value = self.testvm2.run_service(
-                "test.write", localcmd="/bin/sh -c '"
-                # first write a lot of data to fill all the buffers
-                "dd if=/dev/zero bs=993 count=10000 iflag=fullblock & "
-                # then after some time start reading
-                "sleep 1; "
-                "dd of=/dev/null bs=993 count=10000 iflag=fullblock; "
-                "wait"
-                "'")
+
+        self.loop.run_until_complete(self.testvm2.start())
 
         self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.write', '''\
             # first write a lot of data
@@ -424,58 +417,93 @@ class TC_00_AppVMMixin(object):
             # and only then read something
             dd of=/dev/null bs=993 count=10000 iflag=fullblock
             ''')
-        self.create_local_file('/etc/qubes-rpc/policy/test.write',
-            '{} {} allow'.format(self.testvm1.name, self.testvm2.name))
-
-        t = multiprocessing.Process(target=run, args=(self,))
-        t.start()
-        t.join(timeout=10)
-        if t.is_alive():
-            t.terminate()
+
+        # can't use subprocess.PIPE, because asyncio will claim those FDs
+        pipe1_r, pipe1_w = os.pipe()
+        pipe2_r, pipe2_w = os.pipe()
+        try:
+            local_proc = self.loop.run_until_complete(
+                asyncio.create_subprocess_shell(
+                    # first write a lot of data to fill all the buffers
+                    "dd if=/dev/zero bs=993 count=10000 iflag=fullblock & "
+                    # then after some time start reading
+                    "sleep 1; "
+                    "dd of=/dev/null bs=993 count=10000 iflag=fullblock; "
+                    "wait", stdin=pipe1_r, stdout=pipe2_w))
+
+            service_proc = self.loop.run_until_complete(self.testvm2.run_service(
+                "test.write", stdin=pipe2_r, stdout=pipe1_w))
+        finally:
+            os.close(pipe1_r)
+            os.close(pipe1_w)
+            os.close(pipe2_r)
+            os.close(pipe2_w)
+
+        try:
+            self.loop.run_until_complete(
+                asyncio.wait_for(service_proc.wait(), timeout=10))
+        except asyncio.TimeoutError:
             self.fail("Timeout, probably deadlock")
-        self.assertEqual(result.value, 0, "Service call failed")
+        else:
+            self.assertEqual(service_proc.returncode, 0,
+                "Service call failed")
+        finally:
+            try:
+                service_proc.terminate()
+            except ProcessLookupError:
+                pass
 
-    @unittest.skip('localcmd= argument went away')
     def test_072_qrexec_to_dom0_simultaneous_write(self):
         """Test for simultaneous write in dom0(src)<-VM(dst) connection
 
             Similar to test_071_qrexec_dom0_simultaneous_write, but with dom0
             as a "hanging" side.
         """
-        result = multiprocessing.Value('i', -1)
-
-        def run(self):
-            result.value = self.testvm2.run_service(
-                "test.write", localcmd="/bin/sh -c '"
-                # first write a lot of data to fill all the buffers
-                "dd if=/dev/zero bs=993 count=10000 iflag=fullblock "
-                # then, only when all written, read something
-                "dd of=/dev/null bs=993 count=10000 iflag=fullblock; "
-                "'")
 
         self.loop.run_until_complete(self.testvm2.start())
-        p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
-                             passio_popen=True)
-        # first write a lot of data
-        p.stdin.write(b"dd if=/dev/zero bs=993 count=10000 iflag=fullblock &\n")
-        # and only then read something
-        p.stdin.write(b"dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
-        p.stdin.write(b"sleep 1; \n")
-        p.stdin.write(b"wait\n")
-        p.stdin.close()
-        p.wait()
-        policy = open("/etc/qubes-rpc/policy/test.write", "w")
-        policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
-        policy.close()
-        self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.write")
 
-        t = multiprocessing.Process(target=run, args=(self,))
-        t.start()
-        t.join(timeout=10)
-        if t.is_alive():
-            t.terminate()
+        self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.write', '''\
+            # first write a lot of data
+            dd if=/dev/zero bs=993 count=10000 iflag=fullblock &
+            # and only then read something
+            dd of=/dev/null bs=993 count=10000 iflag=fullblock
+            sleep 1;
+            wait
+            ''')
+
+        # can't use subprocess.PIPE, because asyncio will claim those FDs
+        pipe1_r, pipe1_w = os.pipe()
+        pipe2_r, pipe2_w = os.pipe()
+        try:
+            local_proc = self.loop.run_until_complete(
+                asyncio.create_subprocess_shell(
+                    # first write a lot of data to fill all the buffers
+                    "dd if=/dev/zero bs=993 count=10000 iflag=fullblock & "
+                    # then, only when all written, read something
+                    "dd of=/dev/null bs=993 count=10000 iflag=fullblock; ",
+                    stdin=pipe1_r, stdout=pipe2_w))
+
+            service_proc = self.loop.run_until_complete(self.testvm2.run_service(
+                "test.write", stdin=pipe2_r, stdout=pipe1_w))
+        finally:
+            os.close(pipe1_r)
+            os.close(pipe1_w)
+            os.close(pipe2_r)
+            os.close(pipe2_w)
+
+        try:
+            self.loop.run_until_complete(
+                asyncio.wait_for(service_proc.wait(), timeout=10))
+        except asyncio.TimeoutError:
             self.fail("Timeout, probably deadlock")
-        self.assertEqual(result.value, 0, "Service call failed")
+        else:
+            self.assertEqual(service_proc.returncode, 0,
+                "Service call failed")
+        finally:
+            try:
+                service_proc.terminate()
+            except ProcessLookupError:
+                pass
 
     def test_080_qrexec_service_argument_allow_default(self):
         """Qrexec service call with argument"""