qubesvm.py 63 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602
  1. # pylint: disable=protected-access
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  6. # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. import base64
  22. import os
  23. import subprocess
  24. import tempfile
  25. import unittest
  26. import uuid
  27. import datetime
  28. import asyncio
  29. import functools
  30. import lxml.etree
  31. import unittest.mock
  32. import shutil
  33. import qubes
  34. import qubes.exc
  35. import qubes.config
  36. import qubes.vm
  37. import qubes.vm.qubesvm
  38. import qubes.tests
  39. import qubes.tests.vm
  40. class TestApp(object):
  41. labels = {1: qubes.Label(1, '0xcc0000', 'red')}
  42. def __init__(self):
  43. self.domains = {}
  44. self.host = unittest.mock.Mock()
  45. self.host.memory_total = 4096 * 1024
  46. class TestFeatures(dict):
  47. def __init__(self, vm, **kwargs) -> None:
  48. self.vm = vm
  49. super().__init__(**kwargs)
  50. def check_with_template(self, feature, default):
  51. vm = self.vm
  52. while vm is not None:
  53. try:
  54. return vm.features[feature]
  55. except KeyError:
  56. vm = getattr(vm, 'template', None)
  57. return default
  58. class TestProp(object):
  59. # pylint: disable=too-few-public-methods
  60. __name__ = 'testprop'
  61. class TestDeviceCollection(object):
  62. def __init__(self):
  63. self._list = []
  64. def persistent(self):
  65. return self._list
  66. class TestQubesDB(object):
  67. def __init__(self):
  68. self.data = {}
  69. def write(self, path, value):
  70. self.data[path] = value
  71. def rm(self, path):
  72. if path.endswith('/'):
  73. for key in [x for x in self.data if x.startswith(path)]:
  74. del self.data[key]
  75. else:
  76. self.data.pop(path, None)
  77. class TestVM(object):
  78. # pylint: disable=too-few-public-methods
  79. app = TestApp()
  80. def __init__(self, **kwargs):
  81. self.running = False
  82. self.installed_by_rpm = False
  83. for k, v in kwargs.items():
  84. setattr(self, k, v)
  85. self.devices = {'pci': TestDeviceCollection()}
  86. self.features = TestFeatures(self)
  87. def is_running(self):
  88. return self.running
  89. class TC_00_setters(qubes.tests.QubesTestCase):
  90. def setUp(self):
  91. super().setUp()
  92. self.vm = TestVM()
  93. self.prop = TestProp()
  94. def test_000_setter_qid(self):
  95. self.assertEqual(
  96. qubes.vm._setter_qid(self.vm, self.prop, 5), 5)
  97. def test_001_setter_qid_lt_0(self):
  98. with self.assertRaises(ValueError):
  99. qubes.vm._setter_qid(self.vm, self.prop, -1)
  100. def test_002_setter_qid_gt_max(self):
  101. with self.assertRaises(ValueError):
  102. qubes.vm._setter_qid(self.vm,
  103. self.prop, qubes.config.max_qid + 5)
  104. @unittest.skip('test not implemented')
  105. def test_020_setter_kernel(self):
  106. pass
  107. def test_030_setter_label_object(self):
  108. label = TestApp.labels[1]
  109. self.assertIs(label,
  110. qubes.vm.setter_label(self.vm, self.prop, label))
  111. def test_031_setter_label_getitem(self):
  112. label = TestApp.labels[1]
  113. self.assertIs(label,
  114. qubes.vm.setter_label(self.vm, self.prop, 'label-1'))
  115. # there is no check for self.app.get_label()
  116. def test_040_setter_virt_mode(self):
  117. self.assertEqual(
  118. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'hvm'),
  119. 'hvm')
  120. self.assertEqual(
  121. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'HVM'),
  122. 'hvm')
  123. self.assertEqual(
  124. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'PV'),
  125. 'pv')
  126. self.assertEqual(
  127. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'pvh'),
  128. 'pvh')
  129. self.vm.devices['pci']._list.append(object())
  130. with self.assertRaises(ValueError):
  131. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'pvh')
  132. with self.assertRaises(ValueError):
  133. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'True')
  134. class TC_10_default(qubes.tests.QubesTestCase):
  135. def setUp(self):
  136. super().setUp()
  137. self.app = TestApp()
  138. self.vm = TestVM(app=self.app)
  139. self.prop = TestProp()
  140. def test_000_default_with_template_simple(self):
  141. default_getter = qubes.vm.qubesvm._default_with_template('kernel',
  142. 'dfl-kernel')
  143. self.assertEqual(default_getter(self.vm), 'dfl-kernel')
  144. self.vm.template = None
  145. self.assertEqual(default_getter(self.vm), 'dfl-kernel')
  146. self.vm.template = unittest.mock.Mock()
  147. self.vm.template.kernel = 'template-kernel'
  148. self.assertEqual(default_getter(self.vm), 'template-kernel')
  149. def test_001_default_with_template_callable(self):
  150. default_getter = qubes.vm.qubesvm._default_with_template('kernel',
  151. lambda x: x.app.default_kernel)
  152. self.app.default_kernel = 'global-dfl-kernel'
  153. self.assertEqual(default_getter(self.vm), 'global-dfl-kernel')
  154. self.vm.template = None
  155. self.assertEqual(default_getter(self.vm), 'global-dfl-kernel')
  156. self.vm.template = unittest.mock.Mock()
  157. self.vm.template.kernel = 'template-kernel'
  158. self.assertEqual(default_getter(self.vm), 'template-kernel')
  159. def test_010_default_virt_mode(self):
  160. self.assertEqual(qubes.vm.qubesvm._default_virt_mode(self.vm),
  161. 'pvh')
  162. self.vm.template = unittest.mock.Mock()
  163. self.vm.template.virt_mode = 'hvm'
  164. self.assertEqual(qubes.vm.qubesvm._default_virt_mode(self.vm),
  165. 'hvm')
  166. self.vm.template = None
  167. self.assertEqual(qubes.vm.qubesvm._default_virt_mode(self.vm),
  168. 'pvh')
  169. self.vm.devices['pci'].persistent().append('some-dev')
  170. self.assertEqual(qubes.vm.qubesvm._default_virt_mode(self.vm),
  171. 'hvm')
  172. def test_020_default_maxmem(self):
  173. default_maxmem = 2048
  174. self.vm.is_memory_balancing_possible = \
  175. lambda: qubes.vm.qubesvm.QubesVM.is_memory_balancing_possible(
  176. self.vm)
  177. self.vm.virt_mode = 'pvh'
  178. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm),
  179. default_maxmem)
  180. self.vm.virt_mode = 'hvm'
  181. # HVM without qubes tools
  182. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm), 0)
  183. # just 'qrexec' feature
  184. self.vm.features['qrexec'] = True
  185. print(self.vm.features.check_with_template('qrexec', False))
  186. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm),
  187. default_maxmem)
  188. # some supported-service.*, but not meminfo-writer
  189. self.vm.features['supported-service.qubes-firewall'] = True
  190. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm), 0)
  191. # then add meminfo-writer
  192. self.vm.features['supported-service.meminfo-writer'] = True
  193. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm),
  194. default_maxmem)
  195. def test_021_default_maxmem_with_pcidevs(self):
  196. self.vm.is_memory_balancing_possible = \
  197. lambda: qubes.vm.qubesvm.QubesVM.is_memory_balancing_possible(
  198. self.vm)
  199. self.vm.devices['pci'].persistent().append('00_00.0')
  200. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm), 0)
  201. def test_022_default_maxmem_linux(self):
  202. self.vm.is_memory_balancing_possible = \
  203. lambda: qubes.vm.qubesvm.QubesVM.is_memory_balancing_possible(
  204. self.vm)
  205. self.vm.virt_mode = 'pvh'
  206. self.vm.memory = 400
  207. self.vm.features['os'] = 'Linux'
  208. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm), 2048)
  209. self.vm.memory = 100
  210. self.assertEqual(qubes.vm.qubesvm._default_maxmem(self.vm), 1000)
  211. class QubesVMTestsMixin(object):
  212. property_no_default = object()
  213. def setUp(self):
  214. super(QubesVMTestsMixin, self).setUp()
  215. self.app = qubes.tests.vm.TestApp()
  216. self.app.vmm.offline_mode = True
  217. self.app.default_kernel = None
  218. # when full test run is called, extensions are loaded by earlier
  219. # tests, but if just this test class is run, load them manually here,
  220. # to have the same behaviour
  221. qubes.ext.get_extensions()
  222. def tearDown(self):
  223. try:
  224. self.app.domains.close()
  225. except AttributeError:
  226. pass
  227. super(QubesVMTestsMixin, self).tearDown()
  228. def get_vm(self, name='test', cls=qubes.vm.qubesvm.QubesVM, **kwargs):
  229. vm = cls(self.app, None,
  230. qid=kwargs.pop('qid', 1), name=qubes.tests.VMPREFIX + name,
  231. **kwargs)
  232. self.app.domains[vm.qid] = vm
  233. self.app.domains[vm.uuid] = vm
  234. self.app.domains[vm.name] = vm
  235. self.app.domains[vm] = vm
  236. self.addCleanup(vm.close)
  237. return vm
  238. def assertPropertyValue(self, vm, prop_name, set_value, expected_value,
  239. expected_xml_content=None):
  240. # FIXME: any better exception list? or maybe all of that should be a
  241. # single exception?
  242. with self.assertNotRaises((ValueError, TypeError, KeyError)):
  243. setattr(vm, prop_name, set_value)
  244. self.assertEqual(getattr(vm, prop_name), expected_value)
  245. if expected_xml_content is not None:
  246. xml = vm.__xml__()
  247. prop_xml = xml.xpath(
  248. './properties/property[@name=\'{}\']'.format(prop_name))
  249. self.assertEqual(len(prop_xml), 1, "Property not found in XML")
  250. self.assertEqual(prop_xml[0].text, expected_xml_content)
  251. def assertPropertyInvalidValue(self, vm, prop_name, set_value):
  252. orig_value_set = True
  253. orig_value = None
  254. try:
  255. orig_value = getattr(vm, prop_name)
  256. except AttributeError:
  257. orig_value_set = False
  258. # FIXME: any better exception list? or maybe all of that should be a
  259. # single exception?
  260. with self.assertRaises((ValueError, TypeError, KeyError)):
  261. setattr(vm, prop_name, set_value)
  262. if orig_value_set:
  263. self.assertEqual(getattr(vm, prop_name), orig_value)
  264. else:
  265. with self.assertRaises(AttributeError):
  266. getattr(vm, prop_name)
  267. def assertPropertyDefaultValue(self, vm, prop_name,
  268. expected_default=property_no_default):
  269. if expected_default is self.property_no_default:
  270. with self.assertRaises(AttributeError):
  271. getattr(vm, prop_name)
  272. else:
  273. with self.assertNotRaises(AttributeError):
  274. self.assertEqual(getattr(vm, prop_name), expected_default)
  275. xml = vm.__xml__()
  276. prop_xml = xml.xpath(
  277. './properties/property[@name=\'{}\']'.format(prop_name))
  278. self.assertEqual(len(prop_xml), 0, "Property still found in XML")
  279. def _test_generic_bool_property(self, vm, prop_name, default=False):
  280. self.assertPropertyDefaultValue(vm, prop_name, default)
  281. self.assertPropertyValue(vm, prop_name, False, False, 'False')
  282. self.assertPropertyValue(vm, prop_name, True, True, 'True')
  283. delattr(vm, prop_name)
  284. self.assertPropertyDefaultValue(vm, prop_name, default)
  285. self.assertPropertyValue(vm, prop_name, 'True', True, 'True')
  286. self.assertPropertyValue(vm, prop_name, 'False', False, 'False')
  287. self.assertPropertyInvalidValue(vm, prop_name, 'xxx')
  288. self.assertPropertyValue(vm, prop_name, 123, True)
  289. self.assertPropertyInvalidValue(vm, prop_name, '')
  290. class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
  291. def test_000_init(self):
  292. self.get_vm()
  293. def test_001_init_no_qid_or_name(self):
  294. with self.assertRaises(AssertionError):
  295. qubes.vm.qubesvm.QubesVM(self.app, None,
  296. name=qubes.tests.VMPREFIX + 'test')
  297. with self.assertRaises(AssertionError):
  298. qubes.vm.qubesvm.QubesVM(self.app, None,
  299. qid=1)
  300. def test_003_init_fire_domain_init(self):
  301. class TestVM2(qubes.vm.qubesvm.QubesVM):
  302. event_fired = False
  303. @qubes.events.handler('domain-init')
  304. def on_domain_init(self, event): # pylint: disable=unused-argument
  305. self.__class__.event_fired = True
  306. TestVM2(self.app, None, qid=1, name=qubes.tests.VMPREFIX + 'test')
  307. self.assertTrue(TestVM2.event_fired)
  308. def test_004_uuid_autogen(self):
  309. vm = self.get_vm()
  310. self.assertTrue(hasattr(vm, 'uuid'))
  311. def test_100_qid(self):
  312. vm = self.get_vm()
  313. self.assertIsInstance(vm.qid, int)
  314. with self.assertRaises(AttributeError):
  315. vm.qid = 2
  316. def test_110_name(self):
  317. vm = self.get_vm()
  318. self.assertIsInstance(vm.name, str)
  319. def test_120_uuid(self):
  320. my_uuid = uuid.uuid4()
  321. vm = self.get_vm(uuid=my_uuid)
  322. self.assertIsInstance(vm.uuid, uuid.UUID)
  323. self.assertIs(vm.uuid, my_uuid)
  324. with self.assertRaises(AttributeError):
  325. vm.uuid = uuid.uuid4()
  326. @unittest.skip('TODO: how to not fail on making an icon symlink here?')
  327. def test_130_label(self):
  328. vm = self.get_vm()
  329. self.assertPropertyDefaultValue(vm, 'label')
  330. self.assertPropertyValue(vm, 'label', self.app.labels[1],
  331. self.app.labels[1], 'label-1')
  332. del vm.label
  333. self.assertPropertyDefaultValue(vm, 'label')
  334. self.assertPropertyValue(vm, 'label', 'red',
  335. self.app.labels[1], 'label-1')
  336. self.assertPropertyValue(vm, 'label', 'label-1',
  337. self.app.labels[1], 'label-1')
  338. def test_131_label_invalid(self):
  339. vm = self.get_vm()
  340. self.assertPropertyInvalidValue(vm, 'label', 'invalid')
  341. self.assertPropertyInvalidValue(vm, 'label', 123)
  342. def test_160_memory(self):
  343. vm = self.get_vm()
  344. self.assertPropertyDefaultValue(vm, 'memory', 400)
  345. self.assertPropertyValue(vm, 'memory', 500, 500, '500')
  346. del vm.memory
  347. self.assertPropertyDefaultValue(vm, 'memory', 400)
  348. self.assertPropertyValue(vm, 'memory', '500', 500, '500')
  349. def test_161_memory_invalid(self):
  350. vm = self.get_vm()
  351. self.assertPropertyInvalidValue(vm, 'memory', -100)
  352. self.assertPropertyInvalidValue(vm, 'memory', '-100')
  353. self.assertPropertyInvalidValue(vm, 'memory', '')
  354. # TODO: higher than maxmem
  355. # TODO: human readable setter (500M, 4G)?
  356. def test_170_maxmem(self):
  357. vm = self.get_vm()
  358. self.assertPropertyDefaultValue(vm, 'maxmem',
  359. self.app.host.memory_total / 1024 / 2)
  360. self.assertPropertyValue(vm, 'maxmem', 500, 500, '500')
  361. del vm.maxmem
  362. self.assertPropertyDefaultValue(vm, 'maxmem',
  363. self.app.host.memory_total / 1024 / 2)
  364. self.assertPropertyValue(vm, 'maxmem', '500', 500, '500')
  365. def test_171_maxmem_invalid(self):
  366. vm = self.get_vm()
  367. self.assertPropertyInvalidValue(vm, 'maxmem', -100)
  368. self.assertPropertyInvalidValue(vm, 'maxmem', '-100')
  369. self.assertPropertyInvalidValue(vm, 'maxmem', '')
  370. # TODO: lower than memory
  371. # TODO: human readable setter (500M, 4G)?
  372. def test_190_vcpus(self):
  373. vm = self.get_vm()
  374. self.assertPropertyDefaultValue(vm, 'vcpus', 2)
  375. self.assertPropertyValue(vm, 'vcpus', 3, 3, '3')
  376. del vm.vcpus
  377. self.assertPropertyDefaultValue(vm, 'vcpus', 2)
  378. self.assertPropertyValue(vm, 'vcpus', '3', 3, '3')
  379. def test_191_vcpus_invalid(self):
  380. vm = self.get_vm()
  381. self.assertPropertyInvalidValue(vm, 'vcpus', 0)
  382. self.assertPropertyInvalidValue(vm, 'vcpus', -2)
  383. self.assertPropertyInvalidValue(vm, 'vcpus', '-2')
  384. self.assertPropertyInvalidValue(vm, 'vcpus', '')
  385. def test_200_debug(self):
  386. vm = self.get_vm()
  387. self._test_generic_bool_property(vm, 'debug', False)
  388. def test_210_installed_by_rpm(self):
  389. vm = self.get_vm()
  390. self._test_generic_bool_property(vm, 'installed_by_rpm', False)
  391. def test_220_include_in_backups(self):
  392. vm = self.get_vm()
  393. self._test_generic_bool_property(vm, 'include_in_backups', True)
  394. @qubes.tests.skipUnlessDom0
  395. def test_250_kernel(self):
  396. kernels = os.listdir(os.path.join(
  397. qubes.config.qubes_base_dir,
  398. qubes.config.system_path['qubes_kernels_base_dir']))
  399. if not len(kernels):
  400. self.skipTest('Needs at least one kernel installed')
  401. self.app.default_kernel = kernels[0]
  402. vm = self.get_vm()
  403. self.assertPropertyDefaultValue(vm, 'kernel', kernels[0])
  404. self.assertPropertyValue(vm, 'kernel', kernels[-1], kernels[-1],
  405. kernels[-1])
  406. del vm.kernel
  407. self.assertPropertyDefaultValue(vm, 'kernel', kernels[0])
  408. @qubes.tests.skipUnlessDom0
  409. def test_251_kernel_invalid(self):
  410. vm = self.get_vm()
  411. self.assertPropertyInvalidValue(vm, 'kernel', 123)
  412. self.assertPropertyInvalidValue(vm, 'kernel', 'invalid')
  413. def test_252_kernel_empty(self):
  414. vm = self.get_vm()
  415. self.assertPropertyValue(vm, 'kernel', '', '', '')
  416. self.assertPropertyValue(vm, 'kernel', None, '', '')
  417. @unittest.mock.patch.dict(qubes.config.system_path,
  418. {'qubes_kernels_base_dir': '/tmp'})
  419. def test_260_kernelopts(self):
  420. d = tempfile.mkdtemp(prefix='/tmp/')
  421. self.addCleanup(shutil.rmtree, d)
  422. open(d + '/vmlinuz', 'w').close()
  423. open(d + '/initramfs', 'w').close()
  424. vm = self.get_vm()
  425. vm.kernel = os.path.basename(d)
  426. self.assertPropertyDefaultValue(vm, 'kernelopts',
  427. qubes.config.defaults['kernelopts'])
  428. self.assertPropertyValue(vm, 'kernelopts', 'some options',
  429. 'some options', 'some options')
  430. del vm.kernelopts
  431. self.assertPropertyDefaultValue(vm, 'kernelopts',
  432. qubes.config.defaults['kernelopts'])
  433. self.assertPropertyValue(vm, 'kernelopts', '',
  434. '', '')
  435. # TODO?
  436. # self.assertPropertyInvalidValue(vm, 'kernelopts', None),
  437. @unittest.skip('test not implemented')
  438. def test_261_kernelopts_pcidevs(self):
  439. vm = self.get_vm()
  440. # how to do that here? use dummy DeviceManager/DeviceCollection?
  441. # Disable events?
  442. vm.devices['pci'].attach('something')
  443. self.assertPropertyDefaultValue(vm, 'kernelopts',
  444. qubes.config.defaults['kernelopts_pcidevs'])
  445. @unittest.mock.patch.dict(qubes.config.system_path,
  446. {'qubes_kernels_base_dir': '/tmp'})
  447. def test_262_kernelopts(self):
  448. d = tempfile.mkdtemp(prefix='/tmp/')
  449. self.addCleanup(shutil.rmtree, d)
  450. open(d + '/vmlinuz', 'w').close()
  451. open(d + '/initramfs', 'w').close()
  452. with open(d + '/default-kernelopts-nopci.txt', 'w') as f:
  453. f.write('some default options')
  454. vm = self.get_vm()
  455. vm.kernel = os.path.basename(d)
  456. self.assertPropertyDefaultValue(vm, 'kernelopts',
  457. 'some default options')
  458. self.assertPropertyValue(vm, 'kernelopts', 'some options',
  459. 'some options', 'some options')
  460. del vm.kernelopts
  461. self.assertPropertyDefaultValue(vm, 'kernelopts',
  462. 'some default options')
  463. self.assertPropertyValue(vm, 'kernelopts', '',
  464. '', '')
  465. def test_270_qrexec_timeout(self):
  466. vm = self.get_vm()
  467. self.assertPropertyDefaultValue(vm, 'qrexec_timeout', 60)
  468. self.assertPropertyValue(vm, 'qrexec_timeout', 3, 3, '3')
  469. del vm.qrexec_timeout
  470. self.assertPropertyDefaultValue(vm, 'qrexec_timeout', 60)
  471. self.assertPropertyValue(vm, 'qrexec_timeout', '3', 3, '3')
  472. def test_271_qrexec_timeout_invalid(self):
  473. vm = self.get_vm()
  474. self.assertPropertyInvalidValue(vm, 'qrexec_timeout', -2)
  475. self.assertPropertyInvalidValue(vm, 'qrexec_timeout', '-2')
  476. self.assertPropertyInvalidValue(vm, 'qrexec_timeout', '')
  477. def test_272_qrexec_timeout_global_changed(self):
  478. self.app.default_qrexec_timeout = 123
  479. vm = self.get_vm()
  480. self.assertPropertyDefaultValue(vm, 'qrexec_timeout', 123)
  481. self.assertPropertyValue(vm, 'qrexec_timeout', 3, 3, '3')
  482. del vm.qrexec_timeout
  483. self.assertPropertyDefaultValue(vm, 'qrexec_timeout', 123)
  484. self.assertPropertyValue(vm, 'qrexec_timeout', '3', 3, '3')
  485. def test_280_autostart(self):
  486. vm = self.get_vm()
  487. # FIXME any better idea to not involve systemctl call at this stage?
  488. vm.events_enabled = False
  489. self._test_generic_bool_property(vm, 'autostart', False)
  490. @qubes.tests.skipUnlessDom0
  491. def test_281_autostart_systemd(self):
  492. vm = self.get_vm()
  493. self.assertFalse(os.path.exists(
  494. '/etc/systemd/system/multi-user.target.wants/'
  495. 'qubes-vm@{}.service'.format(vm.name)),
  496. "systemd service enabled before setting autostart")
  497. vm.autostart = True
  498. self.assertTrue(os.path.exists(
  499. '/etc/systemd/system/multi-user.target.wants/'
  500. 'qubes-vm@{}.service'.format(vm.name)),
  501. "systemd service not enabled by autostart=True")
  502. vm.autostart = False
  503. self.assertFalse(os.path.exists(
  504. '/etc/systemd/system/multi-user.target.wants/'
  505. 'qubes-vm@{}.service'.format(vm.name)),
  506. "systemd service not disabled by autostart=False")
  507. vm.autostart = True
  508. del vm.autostart
  509. self.assertFalse(os.path.exists(
  510. '/etc/systemd/system/multi-user.target.wants/'
  511. 'qubes-vm@{}.service'.format(vm.name)),
  512. "systemd service not disabled by resetting autostart")
  513. def test_290_management_dispvm(self):
  514. vm = self.get_vm()
  515. vm2 = self.get_vm('test2', qid=2)
  516. self.app.management_dispvm = None
  517. self.assertPropertyDefaultValue(vm, 'management_dispvm', None)
  518. self.app.management_dispvm = vm
  519. try:
  520. self.assertPropertyDefaultValue(vm, 'management_dispvm', vm)
  521. self.assertPropertyValue(vm, 'management_dispvm',
  522. 'test-inst-test2', vm2)
  523. finally:
  524. self.app.management_dispvm = None
  525. def test_291_management_dispvm_template_based(self):
  526. tpl = self.get_vm(name='tpl', cls=qubes.vm.templatevm.TemplateVM)
  527. vm = self.get_vm(cls=qubes.vm.appvm.AppVM, template=tpl, qid=2)
  528. vm2 = self.get_vm('test2', qid=3)
  529. del vm.volumes
  530. self.app.management_dispvm = None
  531. try:
  532. self.assertPropertyDefaultValue(vm, 'management_dispvm', None)
  533. self.app.management_dispvm = vm
  534. self.assertPropertyDefaultValue(vm, 'management_dispvm', vm)
  535. tpl.management_dispvm = vm2
  536. self.assertPropertyDefaultValue(vm, 'management_dispvm', vm2)
  537. self.assertPropertyValue(vm, 'management_dispvm',
  538. 'test-inst-test2', vm2)
  539. finally:
  540. self.app.management_dispvm = None
  541. @unittest.skip('TODO')
  542. def test_320_seamless_gui_mode(self):
  543. vm = self.get_vm()
  544. self._test_generic_bool_property(vm, 'seamless_gui_mode')
  545. # TODO: reject setting to True when guiagent_installed is false
  546. def test_330_mac(self):
  547. vm = self.get_vm()
  548. # TODO: calculate proper default here
  549. default_mac = vm.mac
  550. self.assertIsNotNone(default_mac)
  551. self.assertPropertyDefaultValue(vm, 'mac', default_mac)
  552. self.assertPropertyValue(vm, 'mac', '00:11:22:33:44:55',
  553. '00:11:22:33:44:55', '00:11:22:33:44:55')
  554. del vm.mac
  555. self.assertPropertyDefaultValue(vm, 'mac', default_mac)
  556. def test_331_mac_invalid(self):
  557. vm = self.get_vm()
  558. self.assertPropertyInvalidValue(vm, 'mac', 123)
  559. self.assertPropertyInvalidValue(vm, 'mac', 'invalid')
  560. self.assertPropertyInvalidValue(vm, 'mac', '00:11:22:33:44:55:66')
  561. def test_340_default_user(self):
  562. vm = self.get_vm()
  563. self.assertPropertyDefaultValue(vm, 'default_user', 'user')
  564. self.assertPropertyValue(vm, 'default_user', 'someuser', 'someuser',
  565. 'someuser')
  566. del vm.default_user
  567. self.assertPropertyDefaultValue(vm, 'default_user', 'user')
  568. self.assertPropertyValue(vm, 'default_user', 123, '123', '123')
  569. vm.default_user = 'user'
  570. # TODO: check propagation for template-based VMs
  571. @unittest.skip('TODO')
  572. def test_350_timezone(self):
  573. vm = self.get_vm()
  574. self.assertPropertyDefaultValue(vm, 'timezone', 'localtime')
  575. self.assertPropertyValue(vm, 'timezone', 0, 0, '0')
  576. del vm.timezone
  577. self.assertPropertyDefaultValue(vm, 'timezone', 'localtime')
  578. self.assertPropertyValue(vm, 'timezone', '0', 0, '0')
  579. self.assertPropertyValue(vm, 'timezone', -3600, -3600, '-3600')
  580. self.assertPropertyValue(vm, 'timezone', 7200, 7200, '7200')
  581. @unittest.skip('TODO')
  582. def test_350_timezone_invalid(self):
  583. vm = self.get_vm()
  584. self.assertPropertyInvalidValue(vm, 'timezone', 'xxx')
  585. @unittest.skip('TODO')
  586. def test_360_drive(self):
  587. vm = self.get_vm()
  588. self.assertPropertyDefaultValue(vm, 'drive', None)
  589. # self.execute_tests('drive', [
  590. # ('hd:dom0:/tmp/drive.img', 'hd:dom0:/tmp/drive.img', True),
  591. # ('hd:/tmp/drive.img', 'hd:dom0:/tmp/drive.img', True),
  592. # ('cdrom:dom0:/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  593. # ('cdrom:/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  594. # ('/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  595. # ('hd:drive.img', '', False),
  596. # ('drive.img', '', False),
  597. # ])
  598. def test_400_backup_timestamp(self):
  599. vm = self.get_vm()
  600. timestamp = datetime.datetime(2016, 1, 1, 12, 14, 2)
  601. timestamp_str = timestamp.strftime('%s')
  602. self.assertPropertyDefaultValue(vm, 'backup_timestamp', None)
  603. self.assertPropertyValue(vm, 'backup_timestamp', int(timestamp_str),
  604. int(timestamp_str), timestamp_str)
  605. del vm.backup_timestamp
  606. self.assertPropertyDefaultValue(vm, 'backup_timestamp', None)
  607. self.assertPropertyValue(vm, 'backup_timestamp', timestamp_str,
  608. int(timestamp_str))
  609. def test_401_backup_timestamp_invalid(self):
  610. vm = self.get_vm()
  611. self.assertPropertyInvalidValue(vm, 'backup_timestamp', 'xxx')
  612. self.assertPropertyInvalidValue(vm, 'backup_timestamp', None)
  613. def test_500_property_migrate_virt_mode(self):
  614. xml_template = '''
  615. <domain class="QubesVM" id="domain-1">
  616. <properties>
  617. <property name="qid">1</property>
  618. <property name="name">testvm</property>
  619. <property name="label" ref="label-1" />
  620. <property name="hvm">{hvm_value}</property>
  621. </properties>
  622. </domain>
  623. '''
  624. xml = lxml.etree.XML(xml_template.format(hvm_value='True'))
  625. vm = qubes.vm.qubesvm.QubesVM(self.app, xml)
  626. self.assertEqual(vm.virt_mode, 'hvm')
  627. with self.assertRaises(AttributeError):
  628. vm.hvm
  629. xml = lxml.etree.XML(xml_template.format(hvm_value='False'))
  630. vm = qubes.vm.qubesvm.QubesVM(self.app, xml)
  631. self.assertEqual(vm.virt_mode, 'pv')
  632. with self.assertRaises(AttributeError):
  633. vm.hvm
  634. def test_600_libvirt_xml_pv(self):
  635. expected = '''<domain type="xen">
  636. <name>test-inst-test</name>
  637. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  638. <memory unit="MiB">500</memory>
  639. <currentMemory unit="MiB">400</currentMemory>
  640. <vcpu placement="static">2</vcpu>
  641. <os>
  642. <type arch="x86_64" machine="xenpv">linux</type>
  643. <kernel>/tmp/kernel/vmlinuz</kernel>
  644. <initrd>/tmp/kernel/initramfs</initrd>
  645. <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 nopat</cmdline>
  646. </os>
  647. <features>
  648. </features>
  649. <clock offset='utc' adjustment='reset'>
  650. <timer name="tsc" mode="native"/>
  651. </clock>
  652. <on_poweroff>destroy</on_poweroff>
  653. <on_reboot>destroy</on_reboot>
  654. <on_crash>destroy</on_crash>
  655. <devices>
  656. <disk type="block" device="disk">
  657. <driver name="phy" />
  658. <source dev="/tmp/kernel/modules.img" />
  659. <target dev="xvdd" />
  660. <backenddomain name="dom0" />
  661. </disk>
  662. <console type="pty">
  663. <target type="xen" port="0"/>
  664. </console>
  665. </devices>
  666. </domain>
  667. '''
  668. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  669. vm = self.get_vm(uuid=my_uuid)
  670. vm.netvm = None
  671. vm.virt_mode = 'pv'
  672. with unittest.mock.patch('qubes.config.qubes_base_dir',
  673. '/tmp/qubes-test'):
  674. kernel_dir = '/tmp/qubes-test/vm-kernels/dummy'
  675. os.makedirs(kernel_dir, exist_ok=True)
  676. open(os.path.join(kernel_dir, 'vmlinuz'), 'w').close()
  677. open(os.path.join(kernel_dir, 'initramfs'), 'w').close()
  678. self.addCleanup(shutil.rmtree, '/tmp/qubes-test')
  679. vm.kernel = 'dummy'
  680. # tests for storage are later
  681. vm.volumes['kernel'] = unittest.mock.Mock(**{
  682. 'kernels_dir': '/tmp/kernel',
  683. 'block_device.return_value.domain': 'dom0',
  684. 'block_device.return_value.script': None,
  685. 'block_device.return_value.path': '/tmp/kernel/modules.img',
  686. 'block_device.return_value.devtype': 'disk',
  687. 'block_device.return_value.name': 'kernel',
  688. })
  689. libvirt_xml = vm.create_config_file()
  690. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  691. lxml.etree.XML(expected))
  692. def test_600_libvirt_xml_hvm(self):
  693. expected = '''<domain type="xen">
  694. <name>test-inst-test</name>
  695. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  696. <memory unit="MiB">400</memory>
  697. <currentMemory unit="MiB">400</currentMemory>
  698. <vcpu placement="static">2</vcpu>
  699. <cpu mode='host-passthrough'>
  700. <!-- disable nested HVM -->
  701. <feature name='vmx' policy='disable'/>
  702. <feature name='svm' policy='disable'/>
  703. <!-- disable SMAP inside VM, because of Linux bug -->
  704. <feature name='smap' policy='disable'/>
  705. </cpu>
  706. <os>
  707. <type arch="x86_64" machine="xenfv">hvm</type>
  708. <!--
  709. For the libxl backend libvirt switches between OVMF (UEFI)
  710. and SeaBIOS based on the loader type. This has nothing to
  711. do with the hvmloader binary.
  712. -->
  713. <loader type="rom">hvmloader</loader>
  714. <boot dev="cdrom" />
  715. <boot dev="hd" />
  716. </os>
  717. <features>
  718. <pae/>
  719. <acpi/>
  720. <apic/>
  721. <viridian/>
  722. </features>
  723. <clock offset="variable" adjustment="0" basis="localtime" />
  724. <on_poweroff>destroy</on_poweroff>
  725. <on_reboot>destroy</on_reboot>
  726. <on_crash>destroy</on_crash>
  727. <devices>
  728. <!-- server_ip is the address of stubdomain. It hosts it's own DNS server. -->
  729. <emulator type="stubdom-linux" />
  730. <input type="tablet" bus="usb"/>
  731. <video>
  732. <model type="vga"/>
  733. </video>
  734. <graphics type="qubes"/>
  735. <console type="pty">
  736. <target type="xen" port="0"/>
  737. </console>
  738. </devices>
  739. </domain>
  740. '''
  741. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  742. vm = self.get_vm(uuid=my_uuid)
  743. vm.netvm = None
  744. vm.virt_mode = 'hvm'
  745. libvirt_xml = vm.create_config_file()
  746. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  747. lxml.etree.XML(expected))
  748. def test_600_libvirt_xml_hvm_dom0_kernel(self):
  749. expected = '''<domain type="xen">
  750. <name>test-inst-test</name>
  751. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  752. <memory unit="MiB">500</memory>
  753. <currentMemory unit="MiB">400</currentMemory>
  754. <vcpu placement="static">2</vcpu>
  755. <cpu mode='host-passthrough'>
  756. <!-- disable nested HVM -->
  757. <feature name='vmx' policy='disable'/>
  758. <feature name='svm' policy='disable'/>
  759. <!-- disable SMAP inside VM, because of Linux bug -->
  760. <feature name='smap' policy='disable'/>
  761. </cpu>
  762. <os>
  763. <type arch="x86_64" machine="xenfv">hvm</type>
  764. <!--
  765. For the libxl backend libvirt switches between OVMF (UEFI)
  766. and SeaBIOS based on the loader type. This has nothing to
  767. do with the hvmloader binary.
  768. -->
  769. <loader type="rom">hvmloader</loader>
  770. <boot dev="cdrom" />
  771. <boot dev="hd" />
  772. <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 nopat</cmdline>
  773. </os>
  774. <features>
  775. <pae/>
  776. <acpi/>
  777. <apic/>
  778. <viridian/>
  779. </features>
  780. <clock offset="variable" adjustment="0" basis="localtime" />
  781. <on_poweroff>destroy</on_poweroff>
  782. <on_reboot>destroy</on_reboot>
  783. <on_crash>destroy</on_crash>
  784. <devices>
  785. <!-- server_ip is the address of stubdomain. It hosts it's own DNS server. -->
  786. <emulator type="stubdom-linux" />
  787. <input type="tablet" bus="usb"/>
  788. <video>
  789. <model type="vga"/>
  790. </video>
  791. <graphics type="qubes"/>
  792. <console type="pty">
  793. <target type="xen" port="0"/>
  794. </console>
  795. </devices>
  796. </domain>
  797. '''
  798. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  799. vm = self.get_vm(uuid=my_uuid)
  800. vm.netvm = None
  801. vm.virt_mode = 'hvm'
  802. vm.features['qrexec'] = True
  803. with unittest.mock.patch('qubes.config.qubes_base_dir',
  804. '/tmp/qubes-test'):
  805. kernel_dir = '/tmp/qubes-test/vm-kernels/dummy'
  806. os.makedirs(kernel_dir, exist_ok=True)
  807. open(os.path.join(kernel_dir, 'vmlinuz'), 'w').close()
  808. open(os.path.join(kernel_dir, 'initramfs'), 'w').close()
  809. self.addCleanup(shutil.rmtree, '/tmp/qubes-test')
  810. vm.kernel = 'dummy'
  811. libvirt_xml = vm.create_config_file()
  812. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  813. lxml.etree.XML(expected))
  814. def test_600_libvirt_xml_hvm_dom0_kernel_kernelopts(self):
  815. expected = '''<domain type="xen">
  816. <name>test-inst-test</name>
  817. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  818. <memory unit="MiB">500</memory>
  819. <currentMemory unit="MiB">400</currentMemory>
  820. <vcpu placement="static">2</vcpu>
  821. <cpu mode='host-passthrough'>
  822. <!-- disable nested HVM -->
  823. <feature name='vmx' policy='disable'/>
  824. <feature name='svm' policy='disable'/>
  825. <!-- disable SMAP inside VM, because of Linux bug -->
  826. <feature name='smap' policy='disable'/>
  827. </cpu>
  828. <os>
  829. <type arch="x86_64" machine="xenfv">hvm</type>
  830. <!--
  831. For the libxl backend libvirt switches between OVMF (UEFI)
  832. and SeaBIOS based on the loader type. This has nothing to
  833. do with the hvmloader binary.
  834. -->
  835. <loader type="rom">hvmloader</loader>
  836. <boot dev="cdrom" />
  837. <boot dev="hd" />
  838. <cmdline>kernel specific options nopat</cmdline>
  839. </os>
  840. <features>
  841. <pae/>
  842. <acpi/>
  843. <apic/>
  844. <viridian/>
  845. </features>
  846. <clock offset="variable" adjustment="0" basis="localtime" />
  847. <on_poweroff>destroy</on_poweroff>
  848. <on_reboot>destroy</on_reboot>
  849. <on_crash>destroy</on_crash>
  850. <devices>
  851. <!-- server_ip is the address of stubdomain. It hosts it's own DNS server. -->
  852. <emulator type="stubdom-linux" />
  853. <input type="tablet" bus="usb"/>
  854. <video>
  855. <model type="vga"/>
  856. </video>
  857. <graphics type="qubes"/>
  858. <console type="pty">
  859. <target type="xen" port="0"/>
  860. </console>
  861. </devices>
  862. </domain>
  863. '''
  864. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  865. vm = self.get_vm(uuid=my_uuid)
  866. vm.netvm = None
  867. vm.virt_mode = 'hvm'
  868. vm.features['qrexec'] = True
  869. with unittest.mock.patch('qubes.config.qubes_base_dir',
  870. '/tmp/qubes-test'):
  871. kernel_dir = '/tmp/qubes-test/vm-kernels/dummy'
  872. os.makedirs(kernel_dir, exist_ok=True)
  873. open(os.path.join(kernel_dir, 'vmlinuz'), 'w').close()
  874. open(os.path.join(kernel_dir, 'initramfs'), 'w').close()
  875. with open(os.path.join(kernel_dir,
  876. 'default-kernelopts-common.txt'), 'w') as f:
  877. f.write('kernel specific options \n')
  878. self.addCleanup(shutil.rmtree, '/tmp/qubes-test')
  879. vm.kernel = 'dummy'
  880. libvirt_xml = vm.create_config_file()
  881. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  882. lxml.etree.XML(expected))
  883. def test_600_libvirt_xml_pvh(self):
  884. expected = '''<domain type="xen">
  885. <name>test-inst-test</name>
  886. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  887. <memory unit="MiB">500</memory>
  888. <currentMemory unit="MiB">400</currentMemory>
  889. <vcpu placement="static">2</vcpu>
  890. <cpu mode='host-passthrough'>
  891. <!-- disable nested HVM -->
  892. <feature name='vmx' policy='disable'/>
  893. <feature name='svm' policy='disable'/>
  894. <!-- disable SMAP inside VM, because of Linux bug -->
  895. <feature name='smap' policy='disable'/>
  896. </cpu>
  897. <os>
  898. <type arch="x86_64" machine="xenpvh">xenpvh</type>
  899. <kernel>/tmp/kernel/vmlinuz</kernel>
  900. <initrd>/tmp/kernel/initramfs</initrd>
  901. <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 nopat</cmdline>
  902. </os>
  903. <features>
  904. <pae/>
  905. <acpi/>
  906. <apic/>
  907. <viridian/>
  908. </features>
  909. <clock offset='utc' adjustment='reset'>
  910. <timer name="tsc" mode="native"/>
  911. </clock>
  912. <on_poweroff>destroy</on_poweroff>
  913. <on_reboot>destroy</on_reboot>
  914. <on_crash>destroy</on_crash>
  915. <devices>
  916. <disk type="block" device="disk">
  917. <driver name="phy" />
  918. <source dev="/tmp/kernel/modules.img" />
  919. <target dev="xvdd" />
  920. <backenddomain name="dom0" />
  921. </disk>
  922. <console type="pty">
  923. <target type="xen" port="0"/>
  924. </console>
  925. </devices>
  926. </domain>
  927. '''
  928. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  929. vm = self.get_vm(uuid=my_uuid)
  930. vm.netvm = None
  931. vm.virt_mode = 'pvh'
  932. with unittest.mock.patch('qubes.config.qubes_base_dir',
  933. '/tmp/qubes-test'):
  934. kernel_dir = '/tmp/qubes-test/vm-kernels/dummy'
  935. os.makedirs(kernel_dir, exist_ok=True)
  936. open(os.path.join(kernel_dir, 'vmlinuz'), 'w').close()
  937. open(os.path.join(kernel_dir, 'initramfs'), 'w').close()
  938. self.addCleanup(shutil.rmtree, '/tmp/qubes-test')
  939. vm.kernel = 'dummy'
  940. # tests for storage are later
  941. vm.volumes['kernel'] = unittest.mock.Mock(**{
  942. 'kernels_dir': '/tmp/kernel',
  943. 'block_device.return_value.domain': 'dom0',
  944. 'block_device.return_value.script': None,
  945. 'block_device.return_value.path': '/tmp/kernel/modules.img',
  946. 'block_device.return_value.devtype': 'disk',
  947. 'block_device.return_value.name': 'kernel',
  948. })
  949. libvirt_xml = vm.create_config_file()
  950. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  951. lxml.etree.XML(expected))
  952. def test_600_libvirt_xml_pvh_no_membalance(self):
  953. expected = '''<domain type="xen">
  954. <name>test-inst-test</name>
  955. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  956. <memory unit="MiB">400</memory>
  957. <currentMemory unit="MiB">400</currentMemory>
  958. <vcpu placement="static">2</vcpu>
  959. <cpu mode='host-passthrough'>
  960. <!-- disable nested HVM -->
  961. <feature name='vmx' policy='disable'/>
  962. <feature name='svm' policy='disable'/>
  963. <!-- disable SMAP inside VM, because of Linux bug -->
  964. <feature name='smap' policy='disable'/>
  965. </cpu>
  966. <os>
  967. <type arch="x86_64" machine="xenpvh">xenpvh</type>
  968. <kernel>/tmp/kernel/vmlinuz</kernel>
  969. <initrd>/tmp/kernel/initramfs</initrd>
  970. <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 nopat</cmdline>
  971. </os>
  972. <features>
  973. <pae/>
  974. <acpi/>
  975. <apic/>
  976. <viridian/>
  977. </features>
  978. <clock offset='utc' adjustment='reset'>
  979. <timer name="tsc" mode="native"/>
  980. </clock>
  981. <on_poweroff>destroy</on_poweroff>
  982. <on_reboot>destroy</on_reboot>
  983. <on_crash>destroy</on_crash>
  984. <devices>
  985. <disk type="block" device="disk">
  986. <driver name="phy" />
  987. <source dev="/tmp/kernel/modules.img" />
  988. <target dev="xvdd" />
  989. <backenddomain name="dom0" />
  990. </disk>
  991. <console type="pty">
  992. <target type="xen" port="0"/>
  993. </console>
  994. </devices>
  995. </domain>
  996. '''
  997. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  998. vm = self.get_vm(uuid=my_uuid)
  999. vm.netvm = None
  1000. vm.virt_mode = 'pvh'
  1001. vm.maxmem = 0
  1002. with unittest.mock.patch('qubes.config.qubes_base_dir',
  1003. '/tmp/qubes-test'):
  1004. kernel_dir = '/tmp/qubes-test/vm-kernels/dummy'
  1005. os.makedirs(kernel_dir, exist_ok=True)
  1006. open(os.path.join(kernel_dir, 'vmlinuz'), 'w').close()
  1007. open(os.path.join(kernel_dir, 'initramfs'), 'w').close()
  1008. self.addCleanup(shutil.rmtree, '/tmp/qubes-test')
  1009. vm.kernel = 'dummy'
  1010. # tests for storage are later
  1011. vm.volumes['kernel'] = unittest.mock.Mock(**{
  1012. 'kernels_dir': '/tmp/kernel',
  1013. 'block_device.return_value.domain': 'dom0',
  1014. 'block_device.return_value.script': None,
  1015. 'block_device.return_value.path': '/tmp/kernel/modules.img',
  1016. 'block_device.return_value.devtype': 'disk',
  1017. 'block_device.return_value.name': 'kernel',
  1018. })
  1019. libvirt_xml = vm.create_config_file()
  1020. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  1021. lxml.etree.XML(expected))
  1022. def test_600_libvirt_xml_hvm_pcidev(self):
  1023. expected = '''<domain type="xen">
  1024. <name>test-inst-test</name>
  1025. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  1026. <memory unit="MiB">400</memory>
  1027. <currentMemory unit="MiB">400</currentMemory>
  1028. <vcpu placement="static">2</vcpu>
  1029. <cpu mode='host-passthrough'>
  1030. <!-- disable nested HVM -->
  1031. <feature name='vmx' policy='disable'/>
  1032. <feature name='svm' policy='disable'/>
  1033. <!-- disable SMAP inside VM, because of Linux bug -->
  1034. <feature name='smap' policy='disable'/>
  1035. </cpu>
  1036. <os>
  1037. <type arch="x86_64" machine="xenfv">hvm</type>
  1038. <!--
  1039. For the libxl backend libvirt switches between OVMF (UEFI)
  1040. and SeaBIOS based on the loader type. This has nothing to
  1041. do with the hvmloader binary.
  1042. -->
  1043. <loader type="rom">hvmloader</loader>
  1044. <boot dev="cdrom" />
  1045. <boot dev="hd" />
  1046. </os>
  1047. <features>
  1048. <pae/>
  1049. <acpi/>
  1050. <apic/>
  1051. <viridian/>
  1052. <xen>
  1053. <e820_host state="on"/>
  1054. </xen>
  1055. </features>
  1056. <clock offset="variable" adjustment="0" basis="localtime" />
  1057. <on_poweroff>destroy</on_poweroff>
  1058. <on_reboot>destroy</on_reboot>
  1059. <on_crash>destroy</on_crash>
  1060. <devices>
  1061. <hostdev type="pci" managed="yes">
  1062. <source>
  1063. <address
  1064. bus="0x00"
  1065. slot="0x00"
  1066. function="0x0" />
  1067. </source>
  1068. </hostdev>
  1069. <!-- server_ip is the address of stubdomain. It hosts it's own DNS server. -->
  1070. <emulator type="stubdom-linux" />
  1071. <input type="tablet" bus="usb"/>
  1072. <video>
  1073. <model type="vga"/>
  1074. </video>
  1075. <graphics type="qubes"/>
  1076. <console type="pty">
  1077. <target type="xen" port="0"/>
  1078. </console>
  1079. </devices>
  1080. </domain>
  1081. '''
  1082. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  1083. # required for PCI devices listing
  1084. self.app.vmm.offline_mode = False
  1085. vm = self.get_vm(uuid=my_uuid)
  1086. vm.netvm = None
  1087. vm.virt_mode = 'hvm'
  1088. vm.kernel = None
  1089. # even with meminfo-writer enabled, should have memory==maxmem
  1090. vm.features['service.meminfo-writer'] = True
  1091. assignment = qubes.devices.DeviceAssignment(
  1092. vm, # this is violation of API, but for PCI the argument
  1093. # is unused
  1094. '00_00.0',
  1095. bus='pci',
  1096. persistent=True)
  1097. vm.devices['pci']._set.add(
  1098. assignment)
  1099. libvirt_xml = vm.create_config_file()
  1100. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  1101. lxml.etree.XML(expected))
  1102. def test_610_libvirt_xml_network(self):
  1103. expected = '''<domain type="xen">
  1104. <name>test-inst-test</name>
  1105. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  1106. <memory unit="MiB">500</memory>
  1107. <currentMemory unit="MiB">400</currentMemory>
  1108. <vcpu placement="static">2</vcpu>
  1109. <cpu mode='host-passthrough'>
  1110. <!-- disable nested HVM -->
  1111. <feature name='vmx' policy='disable'/>
  1112. <feature name='svm' policy='disable'/>
  1113. <!-- disable SMAP inside VM, because of Linux bug -->
  1114. <feature name='smap' policy='disable'/>
  1115. </cpu>
  1116. <os>
  1117. <type arch="x86_64" machine="xenfv">hvm</type>
  1118. <!--
  1119. For the libxl backend libvirt switches between OVMF (UEFI)
  1120. and SeaBIOS based on the loader type. This has nothing to
  1121. do with the hvmloader binary.
  1122. -->
  1123. <loader type="rom">hvmloader</loader>
  1124. <boot dev="cdrom" />
  1125. <boot dev="hd" />
  1126. </os>
  1127. <features>
  1128. <pae/>
  1129. <acpi/>
  1130. <apic/>
  1131. <viridian/>
  1132. </features>
  1133. <clock offset="variable" adjustment="0" basis="localtime" />
  1134. <on_poweroff>destroy</on_poweroff>
  1135. <on_reboot>destroy</on_reboot>
  1136. <on_crash>destroy</on_crash>
  1137. <devices>
  1138. <interface type="ethernet">
  1139. <mac address="00:16:3e:5e:6c:00" />
  1140. <ip address="10.137.0.1" />
  1141. {extra_ip}
  1142. <backenddomain name="test-inst-netvm" />
  1143. <script path="vif-route-qubes" />
  1144. </interface>
  1145. <!-- server_ip is the address of stubdomain. It hosts it's own DNS server. -->
  1146. <emulator type="stubdom-linux" />
  1147. <input type="tablet" bus="usb"/>
  1148. <video>
  1149. <model type="vga"/>
  1150. </video>
  1151. <graphics type="qubes"/>
  1152. <console type="pty">
  1153. <target type="xen" port="0"/>
  1154. </console>
  1155. </devices>
  1156. </domain>
  1157. '''
  1158. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  1159. netvm = self.get_vm(qid=2, name='netvm', provides_network=True)
  1160. vm = self.get_vm(uuid=my_uuid)
  1161. vm.netvm = netvm
  1162. vm.virt_mode = 'hvm'
  1163. vm.features['qrexec'] = True
  1164. with self.subTest('ipv4_only'):
  1165. libvirt_xml = vm.create_config_file()
  1166. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  1167. lxml.etree.XML(expected.format(extra_ip='')))
  1168. with self.subTest('ipv6'):
  1169. netvm.features['ipv6'] = True
  1170. libvirt_xml = vm.create_config_file()
  1171. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  1172. lxml.etree.XML(expected.format(
  1173. extra_ip='<ip address="{}::a89:1" family=\'ipv6\'/>'.format(
  1174. qubes.config.qubes_ipv6_prefix.replace(':0000', '')))))
  1175. @unittest.mock.patch('qubes.utils.get_timezone')
  1176. @unittest.mock.patch('qubes.utils.urandom')
  1177. @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.untrusted_qdb')
  1178. def test_620_qdb_standalone(self, mock_qubesdb, mock_urandom,
  1179. mock_timezone):
  1180. mock_urandom.return_value = b'A' * 64
  1181. mock_timezone.return_value = 'UTC'
  1182. vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM)
  1183. vm.netvm = None
  1184. vm.events_enabled = True
  1185. test_qubesdb = TestQubesDB()
  1186. mock_qubesdb.write.side_effect = test_qubesdb.write
  1187. mock_qubesdb.rm.side_effect = test_qubesdb.rm
  1188. vm.create_qdb_entries()
  1189. self.maxDiff = None
  1190. iptables_header = (
  1191. '# Generated by Qubes Core on {}\n'
  1192. '*filter\n'
  1193. ':INPUT DROP [0:0]\n'
  1194. ':FORWARD DROP [0:0]\n'
  1195. ':OUTPUT ACCEPT [0:0]\n'
  1196. '-A INPUT -i vif+ -p udp -m udp --dport 68 -j DROP\n'
  1197. '-A INPUT -m conntrack --ctstate '
  1198. 'RELATED,ESTABLISHED -j ACCEPT\n'
  1199. '-A INPUT -p icmp -j ACCEPT\n'
  1200. '-A INPUT -i lo -j ACCEPT\n'
  1201. '-A INPUT -j REJECT --reject-with '
  1202. 'icmp-host-prohibited\n'
  1203. '-A FORWARD -m conntrack --ctstate '
  1204. 'RELATED,ESTABLISHED -j ACCEPT\n'
  1205. '-A FORWARD -i vif+ -o vif+ -j DROP\n'
  1206. 'COMMIT\n'.format(datetime.datetime.now().ctime()))
  1207. self.assertEqual(test_qubesdb.data, {
  1208. '/name': 'test-inst-test',
  1209. '/type': 'StandaloneVM',
  1210. '/default-user': 'user',
  1211. '/qubes-vm-type': 'AppVM',
  1212. '/qubes-debug-mode': '0',
  1213. '/qubes-base-template': '',
  1214. '/qubes-timezone': 'UTC',
  1215. '/qubes-random-seed': base64.b64encode(b'A' * 64),
  1216. '/qubes-vm-persistence': 'full',
  1217. '/qubes-vm-updateable': 'True',
  1218. '/qubes-block-devices': '',
  1219. '/qubes-usb-devices': '',
  1220. '/qubes-iptables': 'reload',
  1221. '/qubes-iptables-error': '',
  1222. '/qubes-iptables-header': iptables_header,
  1223. '/qubes-service/qubes-update-check': '0',
  1224. '/qubes-service/meminfo-writer': '1',
  1225. })
  1226. @unittest.mock.patch('datetime.datetime')
  1227. @unittest.mock.patch('qubes.utils.get_timezone')
  1228. @unittest.mock.patch('qubes.utils.urandom')
  1229. @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.untrusted_qdb')
  1230. def test_621_qdb_vm_with_network(self, mock_qubesdb, mock_urandom,
  1231. mock_timezone, mock_datetime):
  1232. mock_urandom.return_value = b'A' * 64
  1233. mock_timezone.return_value = 'UTC'
  1234. template = self.get_vm(cls=qubes.vm.templatevm.TemplateVM, name='template')
  1235. template.netvm = None
  1236. netvm = self.get_vm(cls=qubes.vm.appvm.AppVM, template=template,
  1237. name='netvm', qid=2, provides_network=True)
  1238. vm = self.get_vm(cls=qubes.vm.appvm.AppVM, template=template,
  1239. name='appvm', qid=3)
  1240. vm.netvm = netvm
  1241. vm.kernel = None
  1242. # pretend the VM is running...
  1243. vm._qubesprop_xid = 3
  1244. netvm.kernel = None
  1245. test_qubesdb = TestQubesDB()
  1246. mock_qubesdb.write.side_effect = test_qubesdb.write
  1247. mock_qubesdb.rm.side_effect = test_qubesdb.rm
  1248. self.maxDiff = None
  1249. mock_datetime.now.returnvalue = \
  1250. datetime.datetime(2019, 2, 27, 15, 12, 15, 385822)
  1251. iptables_header = (
  1252. '# Generated by Qubes Core on {}\n'
  1253. '*filter\n'
  1254. ':INPUT DROP [0:0]\n'
  1255. ':FORWARD DROP [0:0]\n'
  1256. ':OUTPUT ACCEPT [0:0]\n'
  1257. '-A INPUT -i vif+ -p udp -m udp --dport 68 -j DROP\n'
  1258. '-A INPUT -m conntrack --ctstate '
  1259. 'RELATED,ESTABLISHED -j ACCEPT\n'
  1260. '-A INPUT -p icmp -j ACCEPT\n'
  1261. '-A INPUT -i lo -j ACCEPT\n'
  1262. '-A INPUT -j REJECT --reject-with '
  1263. 'icmp-host-prohibited\n'
  1264. '-A FORWARD -m conntrack --ctstate '
  1265. 'RELATED,ESTABLISHED -j ACCEPT\n'
  1266. '-A FORWARD -i vif+ -o vif+ -j DROP\n'
  1267. 'COMMIT\n'.format(datetime.datetime.now().ctime()))
  1268. expected = {
  1269. '/name': 'test-inst-appvm',
  1270. '/type': 'AppVM',
  1271. '/default-user': 'user',
  1272. '/qubes-vm-type': 'AppVM',
  1273. '/qubes-debug-mode': '0',
  1274. '/qubes-base-template': 'test-inst-template',
  1275. '/qubes-timezone': 'UTC',
  1276. '/qubes-random-seed': base64.b64encode(b'A' * 64),
  1277. '/qubes-vm-persistence': 'rw-only',
  1278. '/qubes-vm-updateable': 'False',
  1279. '/qubes-block-devices': '',
  1280. '/qubes-usb-devices': '',
  1281. '/qubes-iptables': 'reload',
  1282. '/qubes-iptables-error': '',
  1283. '/qubes-iptables-header': iptables_header,
  1284. '/qubes-service/qubes-update-check': '0',
  1285. '/qubes-service/meminfo-writer': '1',
  1286. '/qubes-mac': '00:16:3e:5e:6c:00',
  1287. '/qubes-ip': '10.137.0.3',
  1288. '/qubes-netmask': '255.255.255.255',
  1289. '/qubes-gateway': '10.137.0.2',
  1290. '/qubes-primary-dns': '10.139.1.1',
  1291. '/qubes-secondary-dns': '10.139.1.2',
  1292. }
  1293. with self.subTest('ipv4'):
  1294. vm.create_qdb_entries()
  1295. self.assertEqual(test_qubesdb.data, expected)
  1296. test_qubesdb.data.clear()
  1297. with self.subTest('ipv6'):
  1298. netvm.features['ipv6'] = True
  1299. expected['/qubes-ip6'] = \
  1300. qubes.config.qubes_ipv6_prefix.replace(':0000', '') + \
  1301. '::a89:3'
  1302. expected['/qubes-gateway6'] = expected['/qubes-ip6'][:-1] + '2'
  1303. vm.create_qdb_entries()
  1304. self.assertEqual(test_qubesdb.data, expected)
  1305. test_qubesdb.data.clear()
  1306. with self.subTest('ipv6_just_appvm'):
  1307. del netvm.features['ipv6']
  1308. vm.features['ipv6'] = True
  1309. expected['/qubes-ip6'] = \
  1310. qubes.config.qubes_ipv6_prefix.replace(':0000', '') + \
  1311. '::a89:3'
  1312. del expected['/qubes-gateway6']
  1313. vm.create_qdb_entries()
  1314. self.assertEqual(test_qubesdb.data, expected)
  1315. test_qubesdb.data.clear()
  1316. with self.subTest('proxy_ipv4'):
  1317. del vm.features['ipv6']
  1318. expected['/name'] = 'test-inst-netvm'
  1319. expected['/qubes-vm-type'] = 'NetVM'
  1320. del expected['/qubes-ip']
  1321. del expected['/qubes-gateway']
  1322. del expected['/qubes-netmask']
  1323. del expected['/qubes-ip6']
  1324. del expected['/qubes-primary-dns']
  1325. del expected['/qubes-secondary-dns']
  1326. del expected['/qubes-mac']
  1327. expected['/qubes-netvm-primary-dns'] = '10.139.1.1'
  1328. expected['/qubes-netvm-secondary-dns'] = '10.139.1.2'
  1329. expected['/qubes-netvm-network'] = '10.137.0.2'
  1330. expected['/qubes-netvm-gateway'] = '10.137.0.2'
  1331. expected['/qubes-netvm-netmask'] = '255.255.255.255'
  1332. expected['/qubes-iptables-domainrules/3'] = \
  1333. '*filter\n' \
  1334. '-A FORWARD -s 10.137.0.3 -j ACCEPT\n' \
  1335. '-A FORWARD -s 10.137.0.3 -j DROP\n' \
  1336. 'COMMIT\n'
  1337. expected['/mapped-ip/10.137.0.3/visible-ip'] = '10.137.0.3'
  1338. expected['/mapped-ip/10.137.0.3/visible-gateway'] = '10.137.0.2'
  1339. expected['/qubes-firewall/10.137.0.3'] = ''
  1340. expected['/qubes-firewall/10.137.0.3/0000'] = 'action=accept'
  1341. expected['/qubes-firewall/10.137.0.3/policy'] = 'drop'
  1342. with unittest.mock.patch('qubes.vm.qubesvm.QubesVM.is_running',
  1343. lambda _: True):
  1344. netvm.create_qdb_entries()
  1345. self.assertEqual(test_qubesdb.data, expected)
  1346. test_qubesdb.data.clear()
  1347. with self.subTest('proxy_ipv6'):
  1348. netvm.features['ipv6'] = True
  1349. ip6 = qubes.config.qubes_ipv6_prefix.replace(
  1350. ':0000', '') + '::a89:3'
  1351. expected['/qubes-netvm-gateway6'] = ip6[:-1] + '2'
  1352. expected['/qubes-firewall/' + ip6] = ''
  1353. expected['/qubes-firewall/' + ip6 + '/0000'] = 'action=accept'
  1354. expected['/qubes-firewall/' + ip6 + '/policy'] = 'drop'
  1355. with unittest.mock.patch('qubes.vm.qubesvm.QubesVM.is_running',
  1356. lambda _: True):
  1357. netvm.create_qdb_entries()
  1358. self.assertEqual(test_qubesdb.data, expected)
  1359. @asyncio.coroutine
  1360. def coroutine_mock(self, mock, *args, **kwargs):
  1361. return mock(*args, **kwargs)
  1362. @unittest.mock.patch('asyncio.create_subprocess_exec')
  1363. def test_700_run_service(self, mock_subprocess):
  1364. func_mock = unittest.mock.Mock()
  1365. mock_subprocess.side_effect = functools.partial(
  1366. self.coroutine_mock, func_mock)
  1367. start_mock = unittest.mock.Mock()
  1368. vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
  1369. name='vm')
  1370. vm.is_running = lambda: True
  1371. vm.is_qrexec_running = lambda: True
  1372. vm.start = functools.partial(self.coroutine_mock, start_mock)
  1373. with self.subTest('running'):
  1374. self.loop.run_until_complete(vm.run_service('test.service'))
  1375. func_mock.assert_called_once_with(
  1376. '/usr/bin/qrexec-client', '-d', 'test-inst-vm',
  1377. 'user:QUBESRPC test.service dom0')
  1378. self.assertFalse(start_mock.called)
  1379. func_mock.reset_mock()
  1380. start_mock.reset_mock()
  1381. with self.subTest('not_running'):
  1382. vm.is_running = lambda: False
  1383. with self.assertRaises(qubes.exc.QubesVMNotRunningError):
  1384. self.loop.run_until_complete(vm.run_service('test.service'))
  1385. self.assertFalse(func_mock.called)
  1386. func_mock.reset_mock()
  1387. start_mock.reset_mock()
  1388. with self.subTest('autostart'):
  1389. vm.is_running = lambda: False
  1390. self.loop.run_until_complete(vm.run_service(
  1391. 'test.service', autostart=True))
  1392. func_mock.assert_called_once_with(
  1393. '/usr/bin/qrexec-client', '-d', 'test-inst-vm',
  1394. 'user:QUBESRPC test.service dom0')
  1395. self.assertTrue(start_mock.called)
  1396. func_mock.reset_mock()
  1397. start_mock.reset_mock()
  1398. with self.subTest('no_qrexec'):
  1399. vm.is_running = lambda: True
  1400. vm.is_qrexec_running = lambda: False
  1401. with self.assertRaises(qubes.exc.QubesVMError):
  1402. self.loop.run_until_complete(vm.run_service('test.service'))
  1403. self.assertFalse(start_mock.called)
  1404. self.assertFalse(func_mock.called)
  1405. func_mock.reset_mock()
  1406. start_mock.reset_mock()
  1407. with self.subTest('other_user'):
  1408. vm.is_running = lambda: True
  1409. vm.is_qrexec_running = lambda: True
  1410. self.loop.run_until_complete(vm.run_service('test.service',
  1411. user='other'))
  1412. func_mock.assert_called_once_with(
  1413. '/usr/bin/qrexec-client', '-d', 'test-inst-vm',
  1414. 'other:QUBESRPC test.service dom0')
  1415. self.assertFalse(start_mock.called)
  1416. func_mock.reset_mock()
  1417. start_mock.reset_mock()
  1418. with self.subTest('other_source'):
  1419. vm.is_running = lambda: True
  1420. vm.is_qrexec_running = lambda: True
  1421. self.loop.run_until_complete(vm.run_service('test.service',
  1422. source='test-inst-vm'))
  1423. func_mock.assert_called_once_with(
  1424. '/usr/bin/qrexec-client', '-d', 'test-inst-vm',
  1425. 'user:QUBESRPC test.service test-inst-vm')
  1426. self.assertFalse(start_mock.called)
  1427. @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.run_service')
  1428. def test_710_run_service_for_stdio(self, mock_run_service):
  1429. vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
  1430. name='vm')
  1431. func_mock = unittest.mock.Mock()
  1432. mock_run_service.side_effect = functools.partial(
  1433. self.coroutine_mock, func_mock)
  1434. communicate_mock = unittest.mock.Mock()
  1435. func_mock.return_value.communicate.side_effect = functools.partial(
  1436. self.coroutine_mock, communicate_mock)
  1437. communicate_mock.return_value = (b'stdout', b'stderr')
  1438. func_mock.return_value.returncode = 0
  1439. with self.subTest('default'):
  1440. value = self.loop.run_until_complete(
  1441. vm.run_service_for_stdio('test.service'))
  1442. func_mock.assert_called_once_with(
  1443. 'test.service',
  1444. stdout=subprocess.PIPE,
  1445. stderr=subprocess.PIPE,
  1446. stdin=subprocess.PIPE)
  1447. communicate_mock.assert_called_once_with(input=None)
  1448. self.assertEqual(value, (b'stdout', b'stderr'))
  1449. func_mock.reset_mock()
  1450. communicate_mock.reset_mock()
  1451. with self.subTest('with_input'):
  1452. value = self.loop.run_until_complete(
  1453. vm.run_service_for_stdio('test.service', input=b'abc'))
  1454. func_mock.assert_called_once_with(
  1455. 'test.service',
  1456. stdout=subprocess.PIPE,
  1457. stderr=subprocess.PIPE,
  1458. stdin=subprocess.PIPE)
  1459. communicate_mock.assert_called_once_with(input=b'abc')
  1460. self.assertEqual(value, (b'stdout', b'stderr'))
  1461. func_mock.reset_mock()
  1462. communicate_mock.reset_mock()
  1463. with self.subTest('error'):
  1464. func_mock.return_value.returncode = 1
  1465. with self.assertRaises(subprocess.CalledProcessError) as exc:
  1466. self.loop.run_until_complete(
  1467. vm.run_service_for_stdio('test.service'))
  1468. func_mock.assert_called_once_with(
  1469. 'test.service',
  1470. stdout=subprocess.PIPE,
  1471. stderr=subprocess.PIPE,
  1472. stdin=subprocess.PIPE)
  1473. communicate_mock.assert_called_once_with(input=None)
  1474. self.assertEqual(exc.exception.returncode, 1)
  1475. self.assertEqual(exc.exception.output, b'stdout')
  1476. self.assertEqual(exc.exception.stderr, b'stderr')