api_internal.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # -*- encoding: utf-8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2019 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import asyncio
  21. import qubes.api.internal
  22. import qubes.tests
  23. import qubes.vm.adminvm
  24. from unittest import mock
  25. def mock_coro(f):
  26. @asyncio.coroutine
  27. def coro_f(*args, **kwargs):
  28. return f(*args, **kwargs)
  29. return coro_f
  30. class TC_00_API_Misc(qubes.tests.QubesTestCase):
  31. def setUp(self):
  32. super(TC_00_API_Misc, self).setUp()
  33. self.tpl = mock.NonCallableMagicMock(name='template')
  34. del self.tpl.template
  35. self.src = mock.NonCallableMagicMock(name='appvm',
  36. template=self.tpl)
  37. self.app = mock.NonCallableMock()
  38. self.dest = mock.NonCallableMock()
  39. self.dest.name = 'dom0'
  40. self.app.configure_mock(domains={
  41. 'dom0': self.dest,
  42. 'test-vm': self.src,
  43. })
  44. def configure_qdb(self, entries):
  45. self.src.configure_mock(**{
  46. 'untrusted_qdb.read.side_effect': (
  47. lambda path: entries.get(path, None)),
  48. 'untrusted_qdb.list.side_effect': (
  49. lambda path: sorted(entries.keys())),
  50. })
  51. def create_mockvm(self, features=None):
  52. if features is None:
  53. features = {}
  54. vm = mock.Mock()
  55. vm.features.check_with_template.side_effect = features.get
  56. vm.run_service.return_value.wait = mock_coro(
  57. vm.run_service.return_value.wait)
  58. vm.run_service = mock_coro(vm.run_service)
  59. vm.suspend = mock_coro(vm.suspend)
  60. vm.resume = mock_coro(vm.resume)
  61. return vm
  62. def call_mgmt_func(self, method, arg=b'', payload=b''):
  63. mgmt_obj = qubes.api.internal.QubesInternalAPI(self.app,
  64. b'dom0', method, b'dom0', arg)
  65. loop = asyncio.get_event_loop()
  66. response = loop.run_until_complete(
  67. mgmt_obj.execute(untrusted_payload=payload))
  68. return response
  69. def test_000_suspend_pre(self):
  70. dom0 = mock.NonCallableMock(spec=qubes.vm.adminvm.AdminVM)
  71. running_vm = self.create_mockvm(features={'qrexec': True})
  72. running_vm.is_running.return_value = True
  73. not_running_vm = self.create_mockvm(features={'qrexec': True})
  74. not_running_vm.is_running.return_value = False
  75. no_qrexec_vm = self.create_mockvm()
  76. no_qrexec_vm.is_running.return_value = True
  77. domains_dict = {
  78. 'dom0': dom0,
  79. 'running': running_vm,
  80. 'not-running': not_running_vm,
  81. 'no-qrexec': no_qrexec_vm,
  82. }
  83. self.addCleanup(domains_dict.clear)
  84. self.app.domains = mock.MagicMock(**{
  85. '__iter__': lambda _: iter(domains_dict.values()),
  86. '__getitem__': domains_dict.get,
  87. })
  88. ret = self.call_mgmt_func(b'internal.SuspendPre')
  89. self.assertIsNone(ret)
  90. self.assertFalse(dom0.called)
  91. self.assertNotIn(('run_service', ('qubes.SuspendPreAll',), mock.ANY),
  92. not_running_vm.mock_calls)
  93. self.assertNotIn(('suspend', (), {}),
  94. not_running_vm.mock_calls)
  95. self.assertIn(('run_service', ('qubes.SuspendPreAll',), mock.ANY),
  96. running_vm.mock_calls)
  97. self.assertIn(('suspend', (), {}),
  98. running_vm.mock_calls)
  99. self.assertNotIn(('run_service', ('qubes.SuspendPreAll',), mock.ANY),
  100. no_qrexec_vm.mock_calls)
  101. self.assertIn(('suspend', (), {}),
  102. no_qrexec_vm.mock_calls)
  103. def test_001_suspend_post(self):
  104. dom0 = mock.NonCallableMock(spec=qubes.vm.adminvm.AdminVM)
  105. running_vm = self.create_mockvm(features={'qrexec': True})
  106. running_vm.is_running.return_value = True
  107. running_vm.get_power_state.return_value = 'Suspended'
  108. not_running_vm = self.create_mockvm(features={'qrexec': True})
  109. not_running_vm.is_running.return_value = False
  110. not_running_vm.get_power_state.return_value = 'Halted'
  111. no_qrexec_vm = self.create_mockvm()
  112. no_qrexec_vm.is_running.return_value = True
  113. no_qrexec_vm.get_power_state.return_value = 'Suspended'
  114. domains_dict = {
  115. 'dom0': dom0,
  116. 'running': running_vm,
  117. 'not-running': not_running_vm,
  118. 'no-qrexec': no_qrexec_vm,
  119. }
  120. self.addCleanup(domains_dict.clear)
  121. self.app.domains = mock.MagicMock(**{
  122. '__iter__': lambda _: iter(domains_dict.values()),
  123. '__getitem__': domains_dict.get,
  124. })
  125. ret = self.call_mgmt_func(b'internal.SuspendPost')
  126. self.assertIsNone(ret)
  127. self.assertFalse(dom0.called)
  128. self.assertNotIn(('run_service', ('qubes.SuspendPostAll',), mock.ANY),
  129. not_running_vm.mock_calls)
  130. self.assertNotIn(('resume', (), {}),
  131. not_running_vm.mock_calls)
  132. self.assertIn(('run_service', ('qubes.SuspendPostAll',), mock.ANY),
  133. running_vm.mock_calls)
  134. self.assertIn(('resume', (), {}),
  135. running_vm.mock_calls)
  136. self.assertNotIn(('run_service', ('qubes.SuspendPostAll',), mock.ANY),
  137. no_qrexec_vm.mock_calls)
  138. self.assertIn(('resume', (), {}),
  139. no_qrexec_vm.mock_calls)