adminvm.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  5. # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import subprocess
  21. import unittest
  22. import unittest.mock
  23. import functools
  24. import asyncio
  25. import qubes
  26. import qubes.exc
  27. import qubes.vm
  28. import qubes.vm.adminvm
  29. import qubes.tests
  30. class TC_00_AdminVM(qubes.tests.QubesTestCase):
  31. def setUp(self):
  32. super().setUp()
  33. try:
  34. self.app = qubes.tests.vm.TestApp()
  35. with unittest.mock.patch.object(
  36. qubes.vm.adminvm.AdminVM, 'start_qdb_watch') as mock_qdb:
  37. self.vm = qubes.vm.adminvm.AdminVM(self.app,
  38. xml=None)
  39. mock_qdb.assert_called_once_with()
  40. self.addCleanup(self.cleanup_adminvm)
  41. except: # pylint: disable=bare-except
  42. if self.id().endswith('.test_000_init'):
  43. raise
  44. self.skipTest('setup failed')
  45. def tearDown(self) -> None:
  46. self.app.domains.clear()
  47. def add_vm(self, name, cls=qubes.vm.qubesvm.QubesVM, **kwargs):
  48. vm = cls(self.app, None,
  49. qid=kwargs.pop('qid', 1), name=qubes.tests.VMPREFIX + name,
  50. **kwargs)
  51. self.app.domains[vm.qid] = vm
  52. self.app.domains[vm.uuid] = vm
  53. self.app.domains[vm.name] = vm
  54. self.app.domains[vm] = vm
  55. self.addCleanup(vm.close)
  56. return vm
  57. @asyncio.coroutine
  58. def coroutine_mock(self, mock, *args, **kwargs):
  59. return mock(*args, **kwargs)
  60. def cleanup_adminvm(self):
  61. self.vm.close()
  62. del self.vm
  63. def test_000_init(self):
  64. pass
  65. def test_001_property_icon(self):
  66. self.assertEqual(self.vm.icon, 'adminvm-black')
  67. def test_100_xid(self):
  68. self.assertEqual(self.vm.xid, 0)
  69. def test_101_libvirt_domain(self):
  70. with unittest.mock.patch.object(self.app, 'vmm') as mock_vmm:
  71. self.assertIsNotNone(self.vm.libvirt_domain)
  72. self.assertEqual(mock_vmm.mock_calls, [
  73. ('libvirt_conn.lookupByID', (0,), {}),
  74. ])
  75. def test_300_is_running(self):
  76. self.assertTrue(self.vm.is_running())
  77. def test_301_get_power_state(self):
  78. self.assertEqual(self.vm.get_power_state(), 'Running')
  79. def test_302_get_mem(self):
  80. self.assertGreater(self.vm.get_mem(), 0)
  81. @unittest.skip('mock object does not support this')
  82. def test_303_get_mem_static_max(self):
  83. self.assertGreater(self.vm.get_mem_static_max(), 0)
  84. def test_310_start(self):
  85. with self.assertRaises(qubes.exc.QubesException):
  86. self.vm.start()
  87. @unittest.skip('this functionality is undecided')
  88. def test_311_suspend(self):
  89. with self.assertRaises(qubes.exc.QubesException):
  90. self.vm.suspend()
  91. @unittest.mock.patch('asyncio.create_subprocess_exec')
  92. def test_700_run_service(self, mock_subprocess):
  93. func_mock = unittest.mock.Mock()
  94. mock_subprocess.side_effect = functools.partial(
  95. self.coroutine_mock, func_mock)
  96. self.add_vm('vm')
  97. with self.subTest('running'):
  98. self.loop.run_until_complete(self.vm.run_service('test.service'))
  99. func_mock.assert_called_once_with(
  100. '/usr/lib/qubes/qubes-rpc-multiplexer',
  101. 'test.service', 'dom0', 'name', 'dom0')
  102. func_mock.reset_mock()
  103. with self.subTest('other_user'):
  104. self.loop.run_until_complete(
  105. self.vm.run_service('test.service', user='other'))
  106. func_mock.assert_called_once_with(
  107. 'runuser', '-u', 'other', '--',
  108. '/usr/lib/qubes/qubes-rpc-multiplexer',
  109. 'test.service', 'dom0', 'name', 'dom0')
  110. func_mock.reset_mock()
  111. with self.subTest('other_source'):
  112. self.loop.run_until_complete(
  113. self.vm.run_service('test.service', source='test-inst-vm'))
  114. func_mock.assert_called_once_with(
  115. '/usr/lib/qubes/qubes-rpc-multiplexer',
  116. 'test.service', 'test-inst-vm', 'name', 'dom0')
  117. @unittest.mock.patch('qubes.vm.adminvm.AdminVM.run_service')
  118. def test_710_run_service_for_stdio(self, mock_run_service):
  119. func_mock = unittest.mock.Mock()
  120. mock_run_service.side_effect = functools.partial(
  121. self.coroutine_mock, func_mock)
  122. communicate_mock = unittest.mock.Mock()
  123. func_mock.return_value.communicate.side_effect = functools.partial(
  124. self.coroutine_mock, communicate_mock)
  125. communicate_mock.return_value = (b'stdout', b'stderr')
  126. func_mock.return_value.returncode = 0
  127. with self.subTest('default'):
  128. value = self.loop.run_until_complete(
  129. self.vm.run_service_for_stdio('test.service'))
  130. func_mock.assert_called_once_with(
  131. 'test.service',
  132. stdout=subprocess.PIPE,
  133. stderr=subprocess.PIPE,
  134. stdin=subprocess.PIPE)
  135. communicate_mock.assert_called_once_with(input=None)
  136. self.assertEqual(value, (b'stdout', b'stderr'))
  137. func_mock.reset_mock()
  138. communicate_mock.reset_mock()
  139. with self.subTest('with_input'):
  140. value = self.loop.run_until_complete(
  141. self.vm.run_service_for_stdio('test.service', input=b'abc'))
  142. func_mock.assert_called_once_with(
  143. 'test.service',
  144. stdout=subprocess.PIPE,
  145. stderr=subprocess.PIPE,
  146. stdin=subprocess.PIPE)
  147. communicate_mock.assert_called_once_with(input=b'abc')
  148. self.assertEqual(value, (b'stdout', b'stderr'))
  149. func_mock.reset_mock()
  150. communicate_mock.reset_mock()
  151. with self.subTest('error'):
  152. func_mock.return_value.returncode = 1
  153. with self.assertRaises(subprocess.CalledProcessError) as exc:
  154. self.loop.run_until_complete(
  155. self.vm.run_service_for_stdio('test.service'))
  156. func_mock.assert_called_once_with(
  157. 'test.service',
  158. stdout=subprocess.PIPE,
  159. stderr=subprocess.PIPE,
  160. stdin=subprocess.PIPE)
  161. communicate_mock.assert_called_once_with(input=None)
  162. self.assertEqual(exc.exception.returncode, 1)
  163. self.assertEqual(exc.exception.output, b'stdout')
  164. self.assertEqual(exc.exception.stderr, b'stderr')