__init__.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import traceback
  21. import unittest
  22. import io
  23. import qubesadmin
  24. import qubesadmin.app
  25. class TestVM(object):
  26. def __init__(self, name, **kwargs):
  27. self.name = name
  28. for key, value in kwargs.items():
  29. setattr(self, key, value)
  30. def get_power_state(self):
  31. return getattr(self, 'power_state', 'Running')
  32. def __str__(self):
  33. return self.name
  34. def __lt__(self, other):
  35. if isinstance(other, TestVM):
  36. return self.name < other.name
  37. return NotImplemented
  38. class TestVMCollection(dict):
  39. def __iter__(self):
  40. return iter(self.values())
  41. class TestProcess(object):
  42. def __init__(self, input_callback=None, stdout=None, stderr=None):
  43. self.input_callback = input_callback
  44. self.stdin = io.BytesIO()
  45. # don't let anyone close it, before we get the value
  46. self.stdin_close = self.stdin.close
  47. if self.input_callback:
  48. self.stdin.close = (
  49. lambda: self.input_callback(self.stdin.getvalue()))
  50. else:
  51. self.stdin.close = lambda: None
  52. self.stdout = stdout
  53. self.stderr = stderr
  54. self.returncode = 0
  55. def communicate(self, input=None):
  56. self.stdin.write(input)
  57. self.stdin.close()
  58. self.stdin_close()
  59. return self.stdout, self.stderr
  60. def wait(self):
  61. self.stdin_close()
  62. return 0
  63. class _AssertNotRaisesContext(object):
  64. """A context manager used to implement TestCase.assertNotRaises methods.
  65. Stolen from unittest and hacked. Regexp support stripped.
  66. """ # pylint: disable=too-few-public-methods
  67. def __init__(self, expected, test_case, expected_regexp=None):
  68. if expected_regexp is not None:
  69. raise NotImplementedError('expected_regexp is unsupported')
  70. self.expected = expected
  71. self.exception = None
  72. self.failureException = test_case.failureException
  73. def __enter__(self):
  74. return self
  75. def __exit__(self, exc_type, exc_value, tb):
  76. if exc_type is None:
  77. return True
  78. if issubclass(exc_type, self.expected):
  79. raise self.failureException(
  80. "{!r} raised, traceback:\n{!s}".format(
  81. exc_value, ''.join(traceback.format_tb(tb))))
  82. else:
  83. # pass through
  84. return False
  85. class QubesTest(qubesadmin.app.QubesBase):
  86. expected_calls = None
  87. actual_calls = None
  88. service_calls = None
  89. def __init__(self):
  90. super(QubesTest, self).__init__()
  91. #: expected calls and saved replies for them
  92. self.expected_calls = {}
  93. #: actual calls made
  94. self.actual_calls = []
  95. #: rpc service calls
  96. self.service_calls = []
  97. def qubesd_call(self, dest, method, arg=None, payload=None):
  98. call_key = (dest, method, arg, payload)
  99. self.actual_calls.append(call_key)
  100. if call_key not in self.expected_calls:
  101. raise AssertionError('Unexpected call {!r}'.format(call_key))
  102. return_data = self.expected_calls[call_key]
  103. return self._parse_qubesd_response(return_data)
  104. def run_service(self, dest, service, **kwargs):
  105. self.service_calls.append((dest, service, kwargs))
  106. return TestProcess(lambda input: self.service_calls.append((dest,
  107. service, input)))
  108. class QubesTestCase(unittest.TestCase):
  109. def setUp(self):
  110. super(QubesTestCase, self).setUp()
  111. self.app = QubesTest()
  112. def assertAllCalled(self):
  113. self.assertEqual(
  114. set(self.app.expected_calls.keys()),
  115. set(self.app.actual_calls))
  116. def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
  117. """Fail if an exception of class excClass is raised
  118. by callableObj when invoked with arguments args and keyword
  119. arguments kwargs. If a different type of exception is
  120. raised, it will not be caught, and the test case will be
  121. deemed to have suffered an error, exactly as for an
  122. unexpected exception.
  123. If called with callableObj omitted or None, will return a
  124. context object used like this::
  125. with self.assertRaises(SomeException):
  126. do_something()
  127. The context manager keeps a reference to the exception as
  128. the 'exception' attribute. This allows you to inspect the
  129. exception after the assertion::
  130. with self.assertRaises(SomeException) as cm:
  131. do_something()
  132. the_exception = cm.exception
  133. self.assertEqual(the_exception.error_code, 3)
  134. """
  135. context = _AssertNotRaisesContext(excClass, self)
  136. if callableObj is None:
  137. return context
  138. with context:
  139. callableObj(*args, **kwargs)