diff --git a/tests/vm_qrexec_gui.py b/tests/vm_qrexec_gui.py index 1a924b21..db514249 100644 --- a/tests/vm_qrexec_gui.py +++ b/tests/vm_qrexec_gui.py @@ -529,6 +529,115 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): self.fail("Timeout, probably deadlock") self.assertEqual(result.value, 0, "Service call failed") + def test_080_qrexec_service_argument_allow_default(self): + """Qrexec service call with argument""" + self.testvm1.start() + self.testvm2.start() + p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", + passio_popen=True) + p.communicate("/bin/echo $1") + + with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: + policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) + self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + + p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " + "test.Argument+argument".format(self.testvm2.name), + passio_popen=True) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "argument\n") + + def test_081_qrexec_service_argument_allow_specific(self): + """Qrexec service call with argument - allow only specific value""" + self.testvm1.start() + self.testvm2.start() + p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", + passio_popen=True) + p.communicate("/bin/echo $1") + + with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: + policy.write("$anyvm $anyvm deny") + self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + + with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \ + policy: + policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) + self.addCleanup(os.unlink, + "/etc/qubes-rpc/policy/test.Argument+argument") + + p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " + "test.Argument+argument".format(self.testvm2.name), + passio_popen=True) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "argument\n") + + def test_082_qrexec_service_argument_deny_specific(self): + """Qrexec service call with argument - deny specific value""" + self.testvm1.start() + self.testvm2.start() + p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", + passio_popen=True) + p.communicate("/bin/echo $1") + + with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: + policy.write("$anyvm $anyvm allow") + self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + + with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \ + policy: + policy.write("%s %s deny" % (self.testvm1.name, self.testvm2.name)) + self.addCleanup(os.unlink, + "/etc/qubes-rpc/policy/test.Argument+argument") + + p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " + "test.Argument+argument".format(self.testvm2.name), + passio_popen=True) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "") + self.assertEqual(p.returncode, 1, "Service request should be denied") + + def test_083_qrexec_service_argument_specific_implementation(self): + """Qrexec service call with argument - argument specific + implementatation""" + self.testvm1.start() + self.testvm2.start() + p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", + passio_popen=True) + p.communicate("/bin/echo $1") + + p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument+argument", + user="root", passio_popen=True) + p.communicate("/bin/echo specific: $1") + + with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: + policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) + self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + + p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " + "test.Argument+argument".format(self.testvm2.name), + passio_popen=True) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "specific: argument\n") + + def test_084_qrexec_service_argument_extra_env(self): + """Qrexec service call with argument - extra env variables""" + self.testvm1.start() + self.testvm2.start() + p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", + passio_popen=True) + p.communicate("/bin/echo $QREXEC_SERVICE_FULL_NAME " + "$QREXEC_SERVICE_ARGUMENT") + + with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: + policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) + self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + + p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " + "test.Argument+argument".format(self.testvm2.name), + passio_popen=True) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "test.Argument+argument argument\n") + def test_100_qrexec_filecopy(self): self.testvm1.start() self.testvm2.start()