tests: qrexec service argument

QubesOS/qubes-issues#1876
This commit is contained in:
Marek Marczykowski-Górecki 2016-03-28 01:19:23 +02:00
parent b396629d44
commit 5566f31a42
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -529,6 +529,115 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
self.fail("Timeout, probably deadlock")
self.assertEqual(result.value, 0, "Service call failed")
def test_080_qrexec_service_argument_allow_default(self):
"""Qrexec service call with argument"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "argument\n")
def test_081_qrexec_service_argument_allow_specific(self):
"""Qrexec service call with argument - allow only specific value"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("$anyvm $anyvm deny")
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \
policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink,
"/etc/qubes-rpc/policy/test.Argument+argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "argument\n")
def test_082_qrexec_service_argument_deny_specific(self):
"""Qrexec service call with argument - deny specific value"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("$anyvm $anyvm allow")
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \
policy:
policy.write("%s %s deny" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink,
"/etc/qubes-rpc/policy/test.Argument+argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "")
self.assertEqual(p.returncode, 1, "Service request should be denied")
def test_083_qrexec_service_argument_specific_implementation(self):
"""Qrexec service call with argument - argument specific
implementatation"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument+argument",
user="root", passio_popen=True)
p.communicate("/bin/echo specific: $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "specific: argument\n")
def test_084_qrexec_service_argument_extra_env(self):
"""Qrexec service call with argument - extra env variables"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $QREXEC_SERVICE_FULL_NAME "
"$QREXEC_SERVICE_ARGUMENT")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "test.Argument+argument argument\n")
def test_100_qrexec_filecopy(self):
self.testvm1.start()
self.testvm2.start()