qubesvm.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830
  1. # pylint: disable=protected-access
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  6. # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. import base64
  22. import os
  23. import unittest
  24. import uuid
  25. import datetime
  26. import lxml.etree
  27. import unittest.mock
  28. import qubes
  29. import qubes.exc
  30. import qubes.config
  31. import qubes.vm
  32. import qubes.vm.qubesvm
  33. import qubes.tests
  34. import qubes.tests.vm
  35. class TestApp(object):
  36. labels = {1: qubes.Label(1, '0xcc0000', 'red')}
  37. def __init__(self):
  38. self.domains = {}
  39. class TestProp(object):
  40. # pylint: disable=too-few-public-methods
  41. __name__ = 'testprop'
  42. class TestDeviceCollection(object):
  43. def __init__(self):
  44. self._list = []
  45. def persistent(self):
  46. return self._list
  47. class TestQubesDB(object):
  48. def __init__(self):
  49. self.data = {}
  50. def write(self, path, value):
  51. self.data[path] = value
  52. def rm(self, path):
  53. if path.endswith('/'):
  54. for key in [x for x in self.data if x.startswith(path)]:
  55. del self.data[key]
  56. else:
  57. self.data.pop(path, None)
  58. class TestVM(object):
  59. # pylint: disable=too-few-public-methods
  60. app = TestApp()
  61. def __init__(self, **kwargs):
  62. self.running = False
  63. self.installed_by_rpm = False
  64. for k, v in kwargs.items():
  65. setattr(self, k, v)
  66. self.devices = {'pci': TestDeviceCollection()}
  67. def is_running(self):
  68. return self.running
  69. class TC_00_setters(qubes.tests.QubesTestCase):
  70. def setUp(self):
  71. super().setUp()
  72. self.vm = TestVM()
  73. self.prop = TestProp()
  74. def test_000_setter_qid(self):
  75. self.assertEqual(
  76. qubes.vm._setter_qid(self.vm, self.prop, 5), 5)
  77. def test_001_setter_qid_lt_0(self):
  78. with self.assertRaises(ValueError):
  79. qubes.vm._setter_qid(self.vm, self.prop, -1)
  80. def test_002_setter_qid_gt_max(self):
  81. with self.assertRaises(ValueError):
  82. qubes.vm._setter_qid(self.vm,
  83. self.prop, qubes.config.max_qid + 5)
  84. @unittest.skip('test not implemented')
  85. def test_020_setter_kernel(self):
  86. pass
  87. def test_030_setter_label_object(self):
  88. label = TestApp.labels[1]
  89. self.assertIs(label,
  90. qubes.vm.setter_label(self.vm, self.prop, label))
  91. def test_031_setter_label_getitem(self):
  92. label = TestApp.labels[1]
  93. self.assertIs(label,
  94. qubes.vm.setter_label(self.vm, self.prop, 'label-1'))
  95. # there is no check for self.app.get_label()
  96. def test_040_setter_virt_mode(self):
  97. self.assertEqual(
  98. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'hvm'),
  99. 'hvm')
  100. self.assertEqual(
  101. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'HVM'),
  102. 'hvm')
  103. self.assertEqual(
  104. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'PV'),
  105. 'pv')
  106. self.assertEqual(
  107. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'pvh'),
  108. 'pvh')
  109. self.vm.devices['pci']._list.append(object())
  110. with self.assertRaises(ValueError):
  111. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'pvh')
  112. with self.assertRaises(ValueError):
  113. qubes.vm.qubesvm._setter_virt_mode(self.vm, self.prop, 'True')
  114. class QubesVMTestsMixin(object):
  115. property_no_default = object()
  116. def setUp(self):
  117. super(QubesVMTestsMixin, self).setUp()
  118. self.app = qubes.tests.vm.TestApp()
  119. self.app.vmm.offline_mode = True
  120. # when full test run is called, extensions are loaded by earlier
  121. # tests, but if just this test class is run, load them manually here,
  122. # to have the same behaviour
  123. qubes.ext.get_extensions()
  124. def tearDown(self):
  125. try:
  126. self.app.domains.close()
  127. except AttributeError:
  128. pass
  129. super(QubesVMTestsMixin, self).tearDown()
  130. def get_vm(self, name='test', cls=qubes.vm.qubesvm.QubesVM, **kwargs):
  131. vm = cls(self.app, None,
  132. qid=kwargs.pop('qid', 1), name=qubes.tests.VMPREFIX + name,
  133. **kwargs)
  134. self.app.domains[vm.qid] = vm
  135. self.app.domains[vm.uuid] = vm
  136. self.app.domains[vm] = vm
  137. self.addCleanup(vm.close)
  138. return vm
  139. def assertPropertyValue(self, vm, prop_name, set_value, expected_value,
  140. expected_xml_content=None):
  141. # FIXME: any better exception list? or maybe all of that should be a
  142. # single exception?
  143. with self.assertNotRaises((ValueError, TypeError, KeyError)):
  144. setattr(vm, prop_name, set_value)
  145. self.assertEqual(getattr(vm, prop_name), expected_value)
  146. if expected_xml_content is not None:
  147. xml = vm.__xml__()
  148. prop_xml = xml.xpath(
  149. './properties/property[@name=\'{}\']'.format(prop_name))
  150. self.assertEqual(len(prop_xml), 1, "Property not found in XML")
  151. self.assertEqual(prop_xml[0].text, expected_xml_content)
  152. def assertPropertyInvalidValue(self, vm, prop_name, set_value):
  153. orig_value_set = True
  154. orig_value = None
  155. try:
  156. orig_value = getattr(vm, prop_name)
  157. except AttributeError:
  158. orig_value_set = False
  159. # FIXME: any better exception list? or maybe all of that should be a
  160. # single exception?
  161. with self.assertRaises((ValueError, TypeError, KeyError)):
  162. setattr(vm, prop_name, set_value)
  163. if orig_value_set:
  164. self.assertEqual(getattr(vm, prop_name), orig_value)
  165. else:
  166. with self.assertRaises(AttributeError):
  167. getattr(vm, prop_name)
  168. def assertPropertyDefaultValue(self, vm, prop_name,
  169. expected_default=property_no_default):
  170. if expected_default is self.property_no_default:
  171. with self.assertRaises(AttributeError):
  172. getattr(vm, prop_name)
  173. else:
  174. with self.assertNotRaises(AttributeError):
  175. self.assertEqual(getattr(vm, prop_name), expected_default)
  176. xml = vm.__xml__()
  177. prop_xml = xml.xpath(
  178. './properties/property[@name=\'{}\']'.format(prop_name))
  179. self.assertEqual(len(prop_xml), 0, "Property still found in XML")
  180. def _test_generic_bool_property(self, vm, prop_name, default=False):
  181. self.assertPropertyDefaultValue(vm, prop_name, default)
  182. self.assertPropertyValue(vm, prop_name, False, False, 'False')
  183. self.assertPropertyValue(vm, prop_name, True, True, 'True')
  184. delattr(vm, prop_name)
  185. self.assertPropertyDefaultValue(vm, prop_name, default)
  186. self.assertPropertyValue(vm, prop_name, 'True', True, 'True')
  187. self.assertPropertyValue(vm, prop_name, 'False', False, 'False')
  188. self.assertPropertyInvalidValue(vm, prop_name, 'xxx')
  189. self.assertPropertyValue(vm, prop_name, 123, True)
  190. self.assertPropertyInvalidValue(vm, prop_name, '')
  191. class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
  192. def test_000_init(self):
  193. self.get_vm()
  194. def test_001_init_no_qid_or_name(self):
  195. with self.assertRaises(AssertionError):
  196. qubes.vm.qubesvm.QubesVM(self.app, None,
  197. name=qubes.tests.VMPREFIX + 'test')
  198. with self.assertRaises(AssertionError):
  199. qubes.vm.qubesvm.QubesVM(self.app, None,
  200. qid=1)
  201. def test_003_init_fire_domain_init(self):
  202. class TestVM2(qubes.vm.qubesvm.QubesVM):
  203. event_fired = False
  204. @qubes.events.handler('domain-init')
  205. def on_domain_init(self, event): # pylint: disable=unused-argument
  206. self.__class__.event_fired = True
  207. TestVM2(self.app, None, qid=1, name=qubes.tests.VMPREFIX + 'test')
  208. self.assertTrue(TestVM2.event_fired)
  209. def test_004_uuid_autogen(self):
  210. vm = self.get_vm()
  211. self.assertTrue(hasattr(vm, 'uuid'))
  212. def test_100_qid(self):
  213. vm = self.get_vm()
  214. self.assertIsInstance(vm.qid, int)
  215. with self.assertRaises(AttributeError):
  216. vm.qid = 2
  217. def test_110_name(self):
  218. vm = self.get_vm()
  219. self.assertIsInstance(vm.name, str)
  220. def test_120_uuid(self):
  221. my_uuid = uuid.uuid4()
  222. vm = self.get_vm(uuid=my_uuid)
  223. self.assertIsInstance(vm.uuid, uuid.UUID)
  224. self.assertIs(vm.uuid, my_uuid)
  225. with self.assertRaises(AttributeError):
  226. vm.uuid = uuid.uuid4()
  227. @unittest.skip('TODO: how to not fail on making an icon symlink here?')
  228. def test_130_label(self):
  229. vm = self.get_vm()
  230. self.assertPropertyDefaultValue(vm, 'label')
  231. self.assertPropertyValue(vm, 'label', self.app.labels[1],
  232. self.app.labels[1], 'label-1')
  233. del vm.label
  234. self.assertPropertyDefaultValue(vm, 'label')
  235. self.assertPropertyValue(vm, 'label', 'red',
  236. self.app.labels[1], 'label-1')
  237. self.assertPropertyValue(vm, 'label', 'label-1',
  238. self.app.labels[1], 'label-1')
  239. def test_131_label_invalid(self):
  240. vm = self.get_vm()
  241. self.assertPropertyInvalidValue(vm, 'label', 'invalid')
  242. self.assertPropertyInvalidValue(vm, 'label', 123)
  243. def test_160_memory(self):
  244. vm = self.get_vm()
  245. self.assertPropertyDefaultValue(vm, 'memory', 400)
  246. self.assertPropertyValue(vm, 'memory', 500, 500, '500')
  247. del vm.memory
  248. self.assertPropertyDefaultValue(vm, 'memory', 400)
  249. self.assertPropertyValue(vm, 'memory', '500', 500, '500')
  250. def test_161_memory_invalid(self):
  251. vm = self.get_vm()
  252. self.assertPropertyInvalidValue(vm, 'memory', -100)
  253. self.assertPropertyInvalidValue(vm, 'memory', '-100')
  254. self.assertPropertyInvalidValue(vm, 'memory', '')
  255. # TODO: higher than maxmem
  256. # TODO: human readable setter (500M, 4G)?
  257. def test_170_maxmem(self):
  258. vm = self.get_vm()
  259. self.assertPropertyDefaultValue(vm, 'maxmem',
  260. self.app.host.memory_total / 1024 / 2)
  261. self.assertPropertyValue(vm, 'maxmem', 500, 500, '500')
  262. del vm.maxmem
  263. self.assertPropertyDefaultValue(vm, 'maxmem',
  264. self.app.host.memory_total / 1024 / 2)
  265. self.assertPropertyValue(vm, 'maxmem', '500', 500, '500')
  266. def test_171_maxmem_invalid(self):
  267. vm = self.get_vm()
  268. self.assertPropertyInvalidValue(vm, 'maxmem', -100)
  269. self.assertPropertyInvalidValue(vm, 'maxmem', '-100')
  270. self.assertPropertyInvalidValue(vm, 'maxmem', '')
  271. # TODO: lower than memory
  272. # TODO: human readable setter (500M, 4G)?
  273. def test_190_vcpus(self):
  274. vm = self.get_vm()
  275. self.assertPropertyDefaultValue(vm, 'vcpus', 2)
  276. self.assertPropertyValue(vm, 'vcpus', 3, 3, '3')
  277. del vm.vcpus
  278. self.assertPropertyDefaultValue(vm, 'vcpus', 2)
  279. self.assertPropertyValue(vm, 'vcpus', '3', 3, '3')
  280. def test_191_vcpus_invalid(self):
  281. vm = self.get_vm()
  282. self.assertPropertyInvalidValue(vm, 'vcpus', 0)
  283. self.assertPropertyInvalidValue(vm, 'vcpus', -2)
  284. self.assertPropertyInvalidValue(vm, 'vcpus', '-2')
  285. self.assertPropertyInvalidValue(vm, 'vcpus', '')
  286. def test_200_debug(self):
  287. vm = self.get_vm()
  288. self._test_generic_bool_property(vm, 'debug', False)
  289. def test_210_installed_by_rpm(self):
  290. vm = self.get_vm()
  291. self._test_generic_bool_property(vm, 'installed_by_rpm', False)
  292. def test_220_include_in_backups(self):
  293. vm = self.get_vm()
  294. self._test_generic_bool_property(vm, 'include_in_backups', True)
  295. @qubes.tests.skipUnlessDom0
  296. def test_250_kernel(self):
  297. kernels = os.listdir(os.path.join(
  298. qubes.config.qubes_base_dir,
  299. qubes.config.system_path['qubes_kernels_base_dir']))
  300. if not len(kernels):
  301. self.skipTest('Needs at least one kernel installed')
  302. self.app.default_kernel = kernels[0]
  303. vm = self.get_vm()
  304. self.assertPropertyDefaultValue(vm, 'kernel', kernels[0])
  305. self.assertPropertyValue(vm, 'kernel', kernels[-1], kernels[-1],
  306. kernels[-1])
  307. del vm.kernel
  308. self.assertPropertyDefaultValue(vm, 'kernel', kernels[0])
  309. @qubes.tests.skipUnlessDom0
  310. def test_251_kernel_invalid(self):
  311. vm = self.get_vm()
  312. self.assertPropertyInvalidValue(vm, 'kernel', 123)
  313. self.assertPropertyInvalidValue(vm, 'kernel', 'invalid')
  314. def test_252_kernel_empty(self):
  315. vm = self.get_vm()
  316. self.assertPropertyValue(vm, 'kernel', '', '', '')
  317. self.assertPropertyValue(vm, 'kernel', None, '', '')
  318. def test_260_kernelopts(self):
  319. vm = self.get_vm()
  320. self.assertPropertyDefaultValue(vm, 'kernelopts',
  321. qubes.config.defaults['kernelopts'])
  322. self.assertPropertyValue(vm, 'kernelopts', 'some options',
  323. 'some options', 'some options')
  324. del vm.kernelopts
  325. self.assertPropertyDefaultValue(vm, 'kernelopts',
  326. qubes.config.defaults['kernelopts'])
  327. self.assertPropertyValue(vm, 'kernelopts', '',
  328. '', '')
  329. # TODO?
  330. # self.assertPropertyInvalidValue(vm, 'kernelopts', None),
  331. @unittest.skip('test not implemented')
  332. def test_261_kernelopts_pcidevs(self):
  333. vm = self.get_vm()
  334. # how to do that here? use dummy DeviceManager/DeviceCollection?
  335. # Disable events?
  336. vm.devices['pci'].attach('something')
  337. self.assertPropertyDefaultValue(vm, 'kernelopts',
  338. qubes.config.defaults['kernelopts_pcidevs'])
  339. def test_270_qrexec_timeout(self):
  340. vm = self.get_vm()
  341. self.assertPropertyDefaultValue(vm, 'qrexec_timeout', 60)
  342. self.assertPropertyValue(vm, 'qrexec_timeout', 3, 3, '3')
  343. del vm.qrexec_timeout
  344. self.assertPropertyDefaultValue(vm, 'qrexec_timeout', 60)
  345. self.assertPropertyValue(vm, 'qrexec_timeout', '3', 3, '3')
  346. def test_271_qrexec_timeout_invalid(self):
  347. vm = self.get_vm()
  348. self.assertPropertyInvalidValue(vm, 'qrexec_timeout', -2)
  349. self.assertPropertyInvalidValue(vm, 'qrexec_timeout', '-2')
  350. self.assertPropertyInvalidValue(vm, 'qrexec_timeout', '')
  351. def test_280_autostart(self):
  352. vm = self.get_vm()
  353. # FIXME any better idea to not involve systemctl call at this stage?
  354. vm.events_enabled = False
  355. self._test_generic_bool_property(vm, 'autostart', False)
  356. @qubes.tests.skipUnlessDom0
  357. def test_281_autostart_systemd(self):
  358. vm = self.get_vm()
  359. self.assertFalse(os.path.exists(
  360. '/etc/systemd/system/multi-user.target.wants/'
  361. 'qubes-vm@{}.service'.format(vm.name)),
  362. "systemd service enabled before setting autostart")
  363. vm.autostart = True
  364. self.assertTrue(os.path.exists(
  365. '/etc/systemd/system/multi-user.target.wants/'
  366. 'qubes-vm@{}.service'.format(vm.name)),
  367. "systemd service not enabled by autostart=True")
  368. vm.autostart = False
  369. self.assertFalse(os.path.exists(
  370. '/etc/systemd/system/multi-user.target.wants/'
  371. 'qubes-vm@{}.service'.format(vm.name)),
  372. "systemd service not disabled by autostart=False")
  373. vm.autostart = True
  374. del vm.autostart
  375. self.assertFalse(os.path.exists(
  376. '/etc/systemd/system/multi-user.target.wants/'
  377. 'qubes-vm@{}.service'.format(vm.name)),
  378. "systemd service not disabled by resetting autostart")
  379. @unittest.skip('TODO')
  380. def test_320_seamless_gui_mode(self):
  381. vm = self.get_vm()
  382. self._test_generic_bool_property(vm, 'seamless_gui_mode')
  383. # TODO: reject setting to True when guiagent_installed is false
  384. def test_330_mac(self):
  385. vm = self.get_vm()
  386. # TODO: calculate proper default here
  387. default_mac = vm.mac
  388. self.assertIsNotNone(default_mac)
  389. self.assertPropertyDefaultValue(vm, 'mac', default_mac)
  390. self.assertPropertyValue(vm, 'mac', '00:11:22:33:44:55',
  391. '00:11:22:33:44:55', '00:11:22:33:44:55')
  392. del vm.mac
  393. self.assertPropertyDefaultValue(vm, 'mac', default_mac)
  394. def test_331_mac_invalid(self):
  395. vm = self.get_vm()
  396. self.assertPropertyInvalidValue(vm, 'mac', 123)
  397. self.assertPropertyInvalidValue(vm, 'mac', 'invalid')
  398. self.assertPropertyInvalidValue(vm, 'mac', '00:11:22:33:44:55:66')
  399. def test_340_default_user(self):
  400. vm = self.get_vm()
  401. self.assertPropertyDefaultValue(vm, 'default_user', 'user')
  402. self.assertPropertyValue(vm, 'default_user', 'someuser', 'someuser',
  403. 'someuser')
  404. del vm.default_user
  405. self.assertPropertyDefaultValue(vm, 'default_user', 'user')
  406. self.assertPropertyValue(vm, 'default_user', 123, '123', '123')
  407. # TODO: check propagation for template-based VMs
  408. @unittest.skip('TODO')
  409. def test_350_timezone(self):
  410. vm = self.get_vm()
  411. self.assertPropertyDefaultValue(vm, 'timezone', 'localtime')
  412. self.assertPropertyValue(vm, 'timezone', 0, 0, '0')
  413. del vm.timezone
  414. self.assertPropertyDefaultValue(vm, 'timezone', 'localtime')
  415. self.assertPropertyValue(vm, 'timezone', '0', 0, '0')
  416. self.assertPropertyValue(vm, 'timezone', -3600, -3600, '-3600')
  417. self.assertPropertyValue(vm, 'timezone', 7200, 7200, '7200')
  418. @unittest.skip('TODO')
  419. def test_350_timezone_invalid(self):
  420. vm = self.get_vm()
  421. self.assertPropertyInvalidValue(vm, 'timezone', 'xxx')
  422. @unittest.skip('TODO')
  423. def test_360_drive(self):
  424. vm = self.get_vm()
  425. self.assertPropertyDefaultValue(vm, 'drive', None)
  426. # self.execute_tests('drive', [
  427. # ('hd:dom0:/tmp/drive.img', 'hd:dom0:/tmp/drive.img', True),
  428. # ('hd:/tmp/drive.img', 'hd:dom0:/tmp/drive.img', True),
  429. # ('cdrom:dom0:/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  430. # ('cdrom:/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  431. # ('/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  432. # ('hd:drive.img', '', False),
  433. # ('drive.img', '', False),
  434. # ])
  435. def test_400_backup_timestamp(self):
  436. vm = self.get_vm()
  437. timestamp = datetime.datetime(2016, 1, 1, 12, 14, 2)
  438. timestamp_str = timestamp.strftime('%s')
  439. self.assertPropertyDefaultValue(vm, 'backup_timestamp', None)
  440. self.assertPropertyValue(vm, 'backup_timestamp', timestamp,
  441. timestamp, timestamp_str)
  442. del vm.backup_timestamp
  443. self.assertPropertyDefaultValue(vm, 'backup_timestamp', None)
  444. self.assertPropertyValue(vm, 'backup_timestamp', timestamp_str,
  445. timestamp)
  446. def test_401_backup_timestamp_invalid(self):
  447. vm = self.get_vm()
  448. self.assertPropertyInvalidValue(vm, 'backup_timestamp', 'xxx')
  449. self.assertPropertyInvalidValue(vm, 'backup_timestamp', None)
  450. def test_500_property_migrate_virt_mode(self):
  451. xml_template = '''
  452. <domain class="QubesVM" id="domain-1">
  453. <properties>
  454. <property name="qid">1</property>
  455. <property name="name">testvm</property>
  456. <property name="label" ref="label-1" />
  457. <property name="hvm">{hvm_value}</property>
  458. </properties>
  459. </domain>
  460. '''
  461. xml = lxml.etree.XML(xml_template.format(hvm_value='True'))
  462. vm = qubes.vm.qubesvm.QubesVM(self.app, xml)
  463. self.assertEqual(vm.virt_mode, 'hvm')
  464. with self.assertRaises(AttributeError):
  465. vm.hvm
  466. xml = lxml.etree.XML(xml_template.format(hvm_value='False'))
  467. vm = qubes.vm.qubesvm.QubesVM(self.app, xml)
  468. self.assertEqual(vm.virt_mode, 'pv')
  469. with self.assertRaises(AttributeError):
  470. vm.hvm
  471. def test_600_libvirt_xml_pv(self):
  472. expected = '''<domain type="xen">
  473. <name>test-inst-test</name>
  474. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  475. <memory unit="MiB">500</memory>
  476. <currentMemory unit="MiB">400</currentMemory>
  477. <vcpu placement="static">2</vcpu>
  478. <os>
  479. <type arch="x86_64" machine="xenpv">linux</type>
  480. <kernel>/tmp/kernel/vmlinuz</kernel>
  481. <initrd>/tmp/kernel/initramfs</initrd>
  482. <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 nopat</cmdline>
  483. </os>
  484. <features>
  485. </features>
  486. <clock offset='utc' adjustment='reset'>
  487. <timer name="tsc" mode="native"/>
  488. </clock>
  489. <on_poweroff>destroy</on_poweroff>
  490. <on_reboot>destroy</on_reboot>
  491. <on_crash>destroy</on_crash>
  492. <devices>
  493. <disk type="block" device="disk">
  494. <driver name="phy" />
  495. <source dev="/tmp/kernel/modules.img" />
  496. <target dev="xvdd" />
  497. <backenddomain name="dom0" />
  498. </disk>
  499. <console type="pty">
  500. <target type="xen" port="0"/>
  501. </console>
  502. </devices>
  503. </domain>
  504. '''
  505. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  506. vm = self.get_vm(uuid=my_uuid)
  507. vm.netvm = None
  508. vm.virt_mode = 'pv'
  509. # tests for storage are later
  510. vm.volumes['kernel'] = unittest.mock.Mock(**{
  511. 'kernels_dir': '/tmp/kernel',
  512. 'block_device.return_value.domain': 'dom0',
  513. 'block_device.return_value.script': None,
  514. 'block_device.return_value.path': '/tmp/kernel/modules.img',
  515. 'block_device.return_value.devtype': 'disk',
  516. 'block_device.return_value.name': 'kernel',
  517. })
  518. libvirt_xml = vm.create_config_file()
  519. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  520. lxml.etree.XML(expected))
  521. def test_600_libvirt_xml_hvm(self):
  522. expected = '''<domain type="xen">
  523. <name>test-inst-test</name>
  524. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  525. <memory unit="MiB">500</memory>
  526. <currentMemory unit="MiB">400</currentMemory>
  527. <vcpu placement="static">2</vcpu>
  528. <cpu mode='host-passthrough'>
  529. <!-- disable nested HVM -->
  530. <feature name='vmx' policy='disable'/>
  531. <feature name='svm' policy='disable'/>
  532. <!-- disable SMAP inside VM, because of Linux bug -->
  533. <feature name='smap' policy='disable'/>
  534. </cpu>
  535. <os>
  536. <type arch="x86_64" machine="xenfv">hvm</type>
  537. <!--
  538. For the libxl backend libvirt switches between OVMF (UEFI)
  539. and SeaBIOS based on the loader type. This has nothing to
  540. do with the hvmloader binary.
  541. -->
  542. <loader type="rom">hvmloader</loader>
  543. <boot dev="cdrom" />
  544. <boot dev="hd" />
  545. <!-- server_ip is the address of stubdomain. It hosts it's own DNS server. -->
  546. <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 nopat</cmdline>
  547. </os>
  548. <features>
  549. <pae/>
  550. <acpi/>
  551. <apic/>
  552. <viridian/>
  553. </features>
  554. <clock offset="variable" adjustment="0" basis="localtime" />
  555. <on_poweroff>destroy</on_poweroff>
  556. <on_reboot>destroy</on_reboot>
  557. <on_crash>destroy</on_crash>
  558. <devices>
  559. <emulator type="stubdom-linux" />
  560. <input type="tablet" bus="usb"/>
  561. <video>
  562. <model type="vga"/>
  563. </video>
  564. <graphics type="qubes"/>
  565. </devices>
  566. </domain>
  567. '''
  568. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  569. vm = self.get_vm(uuid=my_uuid)
  570. vm.netvm = None
  571. vm.virt_mode = 'hvm'
  572. libvirt_xml = vm.create_config_file()
  573. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  574. lxml.etree.XML(expected))
  575. def test_600_libvirt_xml_pvh(self):
  576. expected = '''<domain type="xen">
  577. <name>test-inst-test</name>
  578. <uuid>7db78950-c467-4863-94d1-af59806384ea</uuid>
  579. <memory unit="MiB">500</memory>
  580. <currentMemory unit="MiB">400</currentMemory>
  581. <vcpu placement="static">2</vcpu>
  582. <cpu mode='host-passthrough'>
  583. <!-- disable nested HVM -->
  584. <feature name='vmx' policy='disable'/>
  585. <feature name='svm' policy='disable'/>
  586. <!-- disable SMAP inside VM, because of Linux bug -->
  587. <feature name='smap' policy='disable'/>
  588. </cpu>
  589. <os>
  590. <type arch="x86_64" machine="xenfv">hvm</type>
  591. <kernel>/tmp/kernel/vmlinuz</kernel>
  592. <initrd>/tmp/kernel/initramfs</initrd>
  593. <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 nopat</cmdline>
  594. </os>
  595. <features>
  596. <pae/>
  597. <acpi/>
  598. <apic/>
  599. <viridian/>
  600. </features>
  601. <clock offset='utc' adjustment='reset'>
  602. <timer name="tsc" mode="native"/>
  603. </clock>
  604. <on_poweroff>destroy</on_poweroff>
  605. <on_reboot>destroy</on_reboot>
  606. <on_crash>destroy</on_crash>
  607. <devices>
  608. <disk type="block" device="disk">
  609. <driver name="phy" />
  610. <source dev="/tmp/kernel/modules.img" />
  611. <target dev="xvdd" />
  612. <backenddomain name="dom0" />
  613. </disk>
  614. <emulator type="none" />
  615. <console type="pty">
  616. <target type="xen" port="0"/>
  617. </console>
  618. </devices>
  619. </domain>
  620. '''
  621. my_uuid = '7db78950-c467-4863-94d1-af59806384ea'
  622. vm = self.get_vm(uuid=my_uuid)
  623. vm.netvm = None
  624. vm.virt_mode = 'pvh'
  625. # tests for storage are later
  626. vm.volumes['kernel'] = unittest.mock.Mock(**{
  627. 'kernels_dir': '/tmp/kernel',
  628. 'block_device.return_value.domain': 'dom0',
  629. 'block_device.return_value.script': None,
  630. 'block_device.return_value.path': '/tmp/kernel/modules.img',
  631. 'block_device.return_value.devtype': 'disk',
  632. 'block_device.return_value.name': 'kernel',
  633. })
  634. libvirt_xml = vm.create_config_file()
  635. self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
  636. lxml.etree.XML(expected))
  637. @unittest.mock.patch('qubes.utils.get_timezone')
  638. @unittest.mock.patch('qubes.utils.urandom')
  639. @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.untrusted_qdb')
  640. def test_620_qdb_standalone(self, mock_qubesdb, mock_urandom,
  641. mock_timezone):
  642. mock_urandom.return_value = b'A' * 64
  643. mock_timezone.return_value = 'UTC'
  644. vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM)
  645. vm.netvm = None
  646. vm.events_enabled = True
  647. test_qubesdb = TestQubesDB()
  648. mock_qubesdb.write.side_effect = test_qubesdb.write
  649. mock_qubesdb.rm.side_effect = test_qubesdb.rm
  650. vm.create_qdb_entries()
  651. self.maxDiff = None
  652. iptables_header = (
  653. '# Generated by Qubes Core on {}\n'
  654. '*filter\n'
  655. ':INPUT DROP [0:0]\n'
  656. ':FORWARD DROP [0:0]\n'
  657. ':OUTPUT ACCEPT [0:0]\n'
  658. '-A INPUT -i vif+ -p udp -m udp --dport 68 -j DROP\n'
  659. '-A INPUT -m conntrack --ctstate '
  660. 'RELATED,ESTABLISHED -j ACCEPT\n'
  661. '-A INPUT -p icmp -j ACCEPT\n'
  662. '-A INPUT -i lo -j ACCEPT\n'
  663. '-A INPUT -j REJECT --reject-with '
  664. 'icmp-host-prohibited\n'
  665. '-A FORWARD -m conntrack --ctstate '
  666. 'RELATED,ESTABLISHED -j ACCEPT\n'
  667. '-A FORWARD -i vif+ -o vif+ -j DROP\n'
  668. 'COMMIT\n'.format(datetime.datetime.now().ctime()))
  669. self.assertEqual(test_qubesdb.data, {
  670. '/name': 'test-inst-test',
  671. '/type': 'StandaloneVM',
  672. '/qubes-vm-type': 'AppVM',
  673. '/qubes-debug-mode': '0',
  674. '/qubes-base-template': '',
  675. '/qubes-timezone': 'UTC',
  676. '/qubes-random-seed': base64.b64encode(b'A' * 64),
  677. '/qubes-vm-persistence': 'full',
  678. '/qubes-vm-updateable': 'True',
  679. '/qubes-block-devices': '',
  680. '/qubes-usb-devices': '',
  681. '/qubes-iptables': 'reload',
  682. '/qubes-iptables-error': '',
  683. '/qubes-iptables-header': iptables_header,
  684. '/qubes-service/qubes-update-check': '0',
  685. })
  686. @unittest.mock.patch('qubes.utils.get_timezone')
  687. @unittest.mock.patch('qubes.utils.urandom')
  688. @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.untrusted_qdb')
  689. def test_621_qdb_appvm_with_network(self, mock_qubesdb, mock_urandom,
  690. mock_timezone):
  691. mock_urandom.return_value = b'A' * 64
  692. mock_timezone.return_value = 'UTC'
  693. template = self.get_vm(cls=qubes.vm.templatevm.TemplateVM, name='template')
  694. template.netvm = None
  695. netvm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
  696. name='netvm', qid=2, provides_network=True)
  697. vm = self.get_vm(cls=qubes.vm.appvm.AppVM, template=template,
  698. name='appvm', qid=3)
  699. vm.netvm = netvm
  700. test_qubesdb = TestQubesDB()
  701. mock_qubesdb.write.side_effect = test_qubesdb.write
  702. mock_qubesdb.rm.side_effect = test_qubesdb.rm
  703. self.maxDiff = None
  704. iptables_header = (
  705. '# Generated by Qubes Core on {}\n'
  706. '*filter\n'
  707. ':INPUT DROP [0:0]\n'
  708. ':FORWARD DROP [0:0]\n'
  709. ':OUTPUT ACCEPT [0:0]\n'
  710. '-A INPUT -i vif+ -p udp -m udp --dport 68 -j DROP\n'
  711. '-A INPUT -m conntrack --ctstate '
  712. 'RELATED,ESTABLISHED -j ACCEPT\n'
  713. '-A INPUT -p icmp -j ACCEPT\n'
  714. '-A INPUT -i lo -j ACCEPT\n'
  715. '-A INPUT -j REJECT --reject-with '
  716. 'icmp-host-prohibited\n'
  717. '-A FORWARD -m conntrack --ctstate '
  718. 'RELATED,ESTABLISHED -j ACCEPT\n'
  719. '-A FORWARD -i vif+ -o vif+ -j DROP\n'
  720. 'COMMIT\n'.format(datetime.datetime.now().ctime()))
  721. expected = {
  722. '/name': 'test-inst-appvm',
  723. '/type': 'AppVM',
  724. '/qubes-vm-type': 'AppVM',
  725. '/qubes-debug-mode': '0',
  726. '/qubes-base-template': 'test-inst-template',
  727. '/qubes-timezone': 'UTC',
  728. '/qubes-random-seed': base64.b64encode(b'A' * 64),
  729. '/qubes-vm-persistence': 'rw-only',
  730. '/qubes-vm-updateable': 'False',
  731. '/qubes-block-devices': '',
  732. '/qubes-usb-devices': '',
  733. '/qubes-iptables': 'reload',
  734. '/qubes-iptables-error': '',
  735. '/qubes-iptables-header': iptables_header,
  736. '/qubes-service/qubes-update-check': '0',
  737. '/qubes-ip': '10.137.0.3',
  738. '/qubes-netmask': '255.255.255.255',
  739. '/qubes-gateway': '10.137.0.2',
  740. '/qubes-primary-dns': '10.139.1.1',
  741. '/qubes-secondary-dns': '10.139.1.2',
  742. }
  743. vm.create_qdb_entries()
  744. self.assertEqual(test_qubesdb.data, expected)