api_admin.py 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. ''' Tests for management calls endpoints '''
  21. import asyncio
  22. import os
  23. import shutil
  24. import unittest.mock
  25. import libvirt
  26. import qubes
  27. import qubes.api.admin
  28. import qubes.tests
  29. # properties defined in API
  30. volume_properties = [
  31. 'pool', 'vid', 'size', 'usage', 'rw', 'internal', 'source',
  32. 'save_on_stop', 'snap_on_start']
  33. class AdminAPITestCase(qubes.tests.QubesTestCase):
  34. def setUp(self):
  35. super().setUp()
  36. app = qubes.Qubes('/tmp/qubes-test.xml', load=False)
  37. app.vmm = unittest.mock.Mock(spec=qubes.app.VMMConnection)
  38. app.load_initial_values()
  39. app.default_kernel = '1.0'
  40. app.default_netvm = None
  41. self.template = app.add_new_vm('TemplateVM', label='black',
  42. name='test-template')
  43. app.default_template = 'test-template'
  44. with qubes.tests.substitute_entry_points('qubes.storage',
  45. 'qubes.tests.storage'):
  46. app.add_pool('test', driver='test')
  47. app.save = unittest.mock.Mock()
  48. self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1',
  49. template='test-template')
  50. self.app = app
  51. libvirt_attrs = {
  52. 'libvirt_conn.lookupByUUID.return_value.isActive.return_value':
  53. False,
  54. 'libvirt_conn.lookupByUUID.return_value.state.return_value':
  55. [libvirt.VIR_DOMAIN_SHUTOFF],
  56. }
  57. app.vmm.configure_mock(**libvirt_attrs)
  58. self.emitter = qubes.tests.TestEmitter()
  59. self.app.domains[0].fire_event = self.emitter.fire_event
  60. self.app.domains[0].fire_event_pre = self.emitter.fire_event_pre
  61. self.test_base_dir = '/tmp/qubes-test-dir'
  62. self.base_dir_patch = unittest.mock.patch.dict(qubes.config.system_path,
  63. {'qubes_base_dir': self.test_base_dir})
  64. self.base_dir_patch.start()
  65. def tearDown(self):
  66. self.base_dir_patch.stop()
  67. if os.path.exists(self.test_base_dir):
  68. shutil.rmtree(self.test_base_dir)
  69. super(AdminAPITestCase, self).tearDown()
  70. def call_mgmt_func(self, method, dest, arg=b'', payload=b''):
  71. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', method, dest, arg)
  72. loop = asyncio.get_event_loop()
  73. response = loop.run_until_complete(
  74. mgmt_obj.execute(untrusted_payload=payload))
  75. self.assertEventFired(self.emitter,
  76. 'mgmt-permission:' + method.decode('ascii'))
  77. return response
  78. class TC_00_VMs(AdminAPITestCase):
  79. def test_000_vm_list(self):
  80. value = self.call_mgmt_func(b'admin.vm.List', b'dom0')
  81. self.assertEqual(value,
  82. 'dom0 class=AdminVM state=Running\n'
  83. 'test-template class=TemplateVM state=Halted\n'
  84. 'test-vm1 class=AppVM state=Halted\n')
  85. def test_001_vm_list_single(self):
  86. value = self.call_mgmt_func(b'admin.vm.List', b'test-vm1')
  87. self.assertEqual(value,
  88. 'test-vm1 class=AppVM state=Halted\n')
  89. def test_010_vm_property_list(self):
  90. # this test is kind of stupid, but at least check if appropriate
  91. # mgmt-permission event is fired
  92. value = self.call_mgmt_func(b'admin.vm.property.List', b'test-vm1')
  93. properties = self.app.domains['test-vm1'].property_list()
  94. self.assertEqual(value,
  95. ''.join('{}\n'.format(prop.__name__) for prop in properties))
  96. def test_020_vm_property_get_str(self):
  97. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  98. b'name')
  99. self.assertEqual(value, 'default=False type=str test-vm1')
  100. def test_021_vm_property_get_int(self):
  101. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  102. b'vcpus')
  103. self.assertEqual(value, 'default=True type=int 42')
  104. def test_022_vm_property_get_bool(self):
  105. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  106. b'provides_network')
  107. self.assertEqual(value, 'default=True type=bool False')
  108. def test_023_vm_property_get_label(self):
  109. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  110. b'label')
  111. self.assertEqual(value, 'default=False type=label red')
  112. def test_024_vm_property_get_vm(self):
  113. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  114. b'template')
  115. self.assertEqual(value, 'default=False type=vm test-template')
  116. def test_025_vm_property_get_vm_none(self):
  117. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  118. b'netvm')
  119. self.assertEqual(value, 'default=True type=vm ')
  120. def test_030_vm_property_set_vm(self):
  121. netvm = self.app.add_new_vm('AppVM', label='red', name='test-net',
  122. template='test-template', provides_network=True)
  123. with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
  124. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  125. b'netvm', b'test-net')
  126. self.assertIsNone(value)
  127. mock.assert_called_once_with(self.vm, 'test-net')
  128. self.app.save.assert_called_once_with()
  129. def test_032_vm_property_set_vm_invalid1(self):
  130. with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
  131. with self.assertRaises(qubes.exc.QubesValueError):
  132. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  133. b'netvm', b'forbidden-chars/../!')
  134. self.assertFalse(mock.called)
  135. self.assertFalse(self.app.save.called)
  136. def test_033_vm_property_set_vm_invalid2(self):
  137. with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
  138. with self.assertRaises(qubes.exc.QubesValueError):
  139. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  140. b'netvm', b'\x80\x90\xa0')
  141. self.assertFalse(mock.called)
  142. self.assertFalse(self.app.save.called)
  143. def test_034_vm_propert_set_bool_true(self):
  144. with unittest.mock.patch('qubes.property.__set__') as mock:
  145. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  146. b'autostart', b'True')
  147. self.assertIsNone(value)
  148. mock.assert_called_once_with(self.vm, True)
  149. self.app.save.assert_called_once_with()
  150. def test_035_vm_propert_set_bool_false(self):
  151. with unittest.mock.patch('qubes.property.__set__') as mock:
  152. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  153. b'autostart', b'False')
  154. self.assertIsNone(value)
  155. mock.assert_called_once_with(self.vm, False)
  156. self.app.save.assert_called_once_with()
  157. def test_036_vm_propert_set_bool_invalid1(self):
  158. with unittest.mock.patch('qubes.property.__set__') as mock:
  159. with self.assertRaises(qubes.exc.QubesValueError):
  160. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  161. b'autostart', b'some string')
  162. self.assertFalse(mock.called)
  163. self.assertFalse(self.app.save.called)
  164. def test_037_vm_propert_set_bool_invalid2(self):
  165. with unittest.mock.patch('qubes.property.__set__') as mock:
  166. with self.assertRaises(qubes.exc.QubesValueError):
  167. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  168. b'autostart', b'\x80\x90@#$%^&*(')
  169. self.assertFalse(mock.called)
  170. self.assertFalse(self.app.save.called)
  171. def test_038_vm_propert_set_str(self):
  172. with unittest.mock.patch('qubes.property.__set__') as mock:
  173. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  174. b'kernel', b'1.0')
  175. self.assertIsNone(value)
  176. mock.assert_called_once_with(self.vm, '1.0')
  177. self.app.save.assert_called_once_with()
  178. def test_039_vm_propert_set_str_invalid1(self):
  179. with unittest.mock.patch('qubes.property.__set__') as mock:
  180. with self.assertRaises(qubes.exc.QubesValueError):
  181. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  182. b'kernel', b'some, non-ASCII: \x80\xd2')
  183. self.assertFalse(mock.called)
  184. self.assertFalse(self.app.save.called)
  185. def test_040_vm_propert_set_int(self):
  186. with unittest.mock.patch('qubes.property.__set__') as mock:
  187. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  188. b'maxmem', b'1024000')
  189. self.assertIsNone(value)
  190. mock.assert_called_once_with(self.vm, 1024000)
  191. self.app.save.assert_called_once_with()
  192. def test_041_vm_propert_set_int_invalid1(self):
  193. with unittest.mock.patch('qubes.property.__set__') as mock:
  194. with self.assertRaises(qubes.exc.QubesValueError):
  195. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  196. b'maxmem', b'fourty two')
  197. self.assertFalse(mock.called)
  198. self.assertFalse(self.app.save.called)
  199. def test_042_vm_propert_set_label(self):
  200. with unittest.mock.patch('qubes.property.__set__') as mock:
  201. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  202. b'label', b'green')
  203. self.assertIsNone(value)
  204. mock.assert_called_once_with(self.vm, 'green')
  205. self.app.save.assert_called_once_with()
  206. def test_043_vm_propert_set_label_invalid1(self):
  207. with unittest.mock.patch('qubes.property.__set__') as mock:
  208. with self.assertRaises(qubes.exc.QubesValueError):
  209. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  210. b'maxmem', b'some, non-ASCII: \x80\xd2')
  211. self.assertFalse(mock.called)
  212. self.assertFalse(self.app.save.called)
  213. @unittest.skip('label existence not checked before actual setter yet')
  214. def test_044_vm_propert_set_label_invalid2(self):
  215. with unittest.mock.patch('qubes.property.__set__') as mock:
  216. with self.assertRaises(qubes.exc.QubesValueError):
  217. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  218. b'maxmem', b'non-existing-color')
  219. self.assertFalse(mock.called)
  220. self.assertFalse(self.app.save.called)
  221. def test_050_vm_property_help(self):
  222. value = self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1',
  223. b'label')
  224. self.assertEqual(value,
  225. 'Colourful label assigned to VM. This is where the colour of the '
  226. 'padlock is set.')
  227. self.assertFalse(self.app.save.called)
  228. def test_052_vm_property_help_invalid_property(self):
  229. with self.assertRaises(AssertionError):
  230. self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1',
  231. b'no-such-property')
  232. self.assertFalse(self.app.save.called)
  233. def test_060_vm_property_reset(self):
  234. with unittest.mock.patch('qubes.property.__delete__') as mock:
  235. value = self.call_mgmt_func(b'admin.vm.property.Reset', b'test-vm1',
  236. b'default_user')
  237. mock.assert_called_with(self.vm)
  238. self.assertIsNone(value)
  239. self.app.save.assert_called_once_with()
  240. def test_062_vm_property_reset_invalid_property(self):
  241. with unittest.mock.patch('qubes.property.__delete__') as mock:
  242. with self.assertRaises(AssertionError):
  243. self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1',
  244. b'no-such-property')
  245. self.assertFalse(mock.called)
  246. self.assertFalse(self.app.save.called)
  247. def test_070_vm_volume_list(self):
  248. self.vm.volumes = unittest.mock.Mock()
  249. volumes_conf = {
  250. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  251. }
  252. self.vm.volumes.configure_mock(**volumes_conf)
  253. value = self.call_mgmt_func(b'admin.vm.volume.List', b'test-vm1')
  254. self.assertEqual(value, 'root\nprivate\nvolatile\nkernel\n')
  255. # check if _only_ keys were accessed
  256. self.assertEqual(self.vm.volumes.mock_calls,
  257. [unittest.mock.call.keys()])
  258. def test_080_vm_volume_info(self):
  259. self.vm.volumes = unittest.mock.MagicMock()
  260. volumes_conf = {
  261. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  262. }
  263. for prop in volume_properties:
  264. volumes_conf[
  265. '__getitem__.return_value.{}'.format(prop)] = prop +'-value'
  266. self.vm.volumes.configure_mock(**volumes_conf)
  267. value = self.call_mgmt_func(b'admin.vm.volume.Info', b'test-vm1',
  268. b'private')
  269. self.assertEqual(value,
  270. ''.join('{p}={p}-value\n'.format(p=p) for p in volume_properties))
  271. self.assertEqual(self.vm.volumes.mock_calls,
  272. [unittest.mock.call.keys(),
  273. unittest.mock.call.__getattr__('__getitem__')('private')])
  274. def test_080_vm_volume_info_invalid_volume(self):
  275. self.vm.volumes = unittest.mock.MagicMock()
  276. volumes_conf = {
  277. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  278. }
  279. self.vm.volumes.configure_mock(**volumes_conf)
  280. with self.assertRaises(AssertionError):
  281. self.call_mgmt_func(b'admin.vm.volume.Info', b'test-vm1',
  282. b'no-such-volume')
  283. self.assertEqual(self.vm.volumes.mock_calls,
  284. [unittest.mock.call.keys()])
  285. def test_090_vm_volume_listsnapshots(self):
  286. self.vm.volumes = unittest.mock.MagicMock()
  287. volumes_conf = {
  288. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  289. '__getitem__.return_value.revisions': ['rev1', 'rev2'],
  290. }
  291. self.vm.volumes.configure_mock(**volumes_conf)
  292. value = self.call_mgmt_func(b'admin.vm.volume.ListSnapshots',
  293. b'test-vm1', b'private')
  294. self.assertEqual(value,
  295. 'rev1\nrev2\n')
  296. self.assertEqual(self.vm.volumes.mock_calls,
  297. [unittest.mock.call.keys(),
  298. unittest.mock.call.__getattr__('__getitem__')('private')])
  299. def test_090_vm_volume_listsnapshots_invalid_volume(self):
  300. self.vm.volumes = unittest.mock.MagicMock()
  301. volumes_conf = {
  302. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  303. }
  304. self.vm.volumes.configure_mock(**volumes_conf)
  305. with self.assertRaises(AssertionError):
  306. self.call_mgmt_func(b'admin.vm.volume.ListSnapshots', b'test-vm1',
  307. b'no-such-volume')
  308. self.assertEqual(self.vm.volumes.mock_calls,
  309. [unittest.mock.call.keys()])
  310. @unittest.skip('method not implemented yet')
  311. def test_100_vm_volume_snapshot(self):
  312. pass
  313. @unittest.skip('method not implemented yet')
  314. def test_100_vm_volume_snapshot_invlid_volume(self):
  315. self.vm.volumes = unittest.mock.MagicMock()
  316. volumes_conf = {
  317. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  318. '__getitem__.return_value.revisions': ['rev1', 'rev2'],
  319. }
  320. self.vm.volumes.configure_mock(**volumes_conf)
  321. with self.assertRaises(AssertionError):
  322. self.call_mgmt_func(b'admin.vm.volume.Snapshots',
  323. b'test-vm1', b'no-such-volume')
  324. self.assertEqual(self.vm.volumes.mock_calls,
  325. [unittest.mock.call.keys()])
  326. @unittest.skip('method not implemented yet')
  327. def test_100_vm_volume_snapshot_invalid_revision(self):
  328. self.vm.volumes = unittest.mock.MagicMock()
  329. volumes_conf = {
  330. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  331. }
  332. self.vm.volumes.configure_mock(**volumes_conf)
  333. with self.assertRaises(AssertionError):
  334. self.call_mgmt_func(b'admin.vm.volume.Snapshots',
  335. b'test-vm1', b'private', b'no-such-rev')
  336. self.assertEqual(self.vm.volumes.mock_calls,
  337. [unittest.mock.call.keys(),
  338. unittest.mock.call.__getattr__('__getitem__')('private')])
  339. def test_110_vm_volume_revert(self):
  340. self.vm.volumes = unittest.mock.MagicMock()
  341. volumes_conf = {
  342. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  343. '__getitem__.return_value.revisions': ['rev1', 'rev2'],
  344. }
  345. self.vm.volumes.configure_mock(**volumes_conf)
  346. self.vm.storage = unittest.mock.Mock()
  347. value = self.call_mgmt_func(b'admin.vm.volume.Revert',
  348. b'test-vm1', b'private', b'rev1')
  349. self.assertIsNone(value)
  350. self.assertEqual(self.vm.volumes.mock_calls,
  351. [unittest.mock.call.keys(),
  352. unittest.mock.call.__getattr__('__getitem__')('private')])
  353. self.assertEqual(self.vm.storage.mock_calls,
  354. [unittest.mock.call.get_pool(self.vm.volumes['private']),
  355. unittest.mock.call.get_pool().revert('rev1')])
  356. def test_110_vm_volume_revert_invalid_rev(self):
  357. self.vm.volumes = unittest.mock.MagicMock()
  358. volumes_conf = {
  359. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  360. '__getitem__.return_value.revisions': ['rev1', 'rev2'],
  361. }
  362. self.vm.volumes.configure_mock(**volumes_conf)
  363. self.vm.storage = unittest.mock.Mock()
  364. with self.assertRaises(AssertionError):
  365. self.call_mgmt_func(b'admin.vm.volume.Revert',
  366. b'test-vm1', b'private', b'no-such-rev')
  367. self.assertEqual(self.vm.volumes.mock_calls,
  368. [unittest.mock.call.keys(),
  369. unittest.mock.call.__getattr__('__getitem__')('private')])
  370. self.assertFalse(self.vm.storage.called)
  371. def test_120_vm_volume_resize(self):
  372. self.vm.volumes = unittest.mock.MagicMock()
  373. volumes_conf = {
  374. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  375. }
  376. self.vm.volumes.configure_mock(**volumes_conf)
  377. self.vm.storage = unittest.mock.Mock()
  378. value = self.call_mgmt_func(b'admin.vm.volume.Resize',
  379. b'test-vm1', b'private', b'1024000000')
  380. self.assertIsNone(value)
  381. self.assertEqual(self.vm.volumes.mock_calls,
  382. [unittest.mock.call.keys()])
  383. self.assertEqual(self.vm.storage.mock_calls,
  384. [unittest.mock.call.resize('private', 1024000000)])
  385. def test_120_vm_volume_resize_invalid_size1(self):
  386. self.vm.volumes = unittest.mock.MagicMock()
  387. volumes_conf = {
  388. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  389. }
  390. self.vm.volumes.configure_mock(**volumes_conf)
  391. self.vm.storage = unittest.mock.Mock()
  392. with self.assertRaises(AssertionError):
  393. self.call_mgmt_func(b'admin.vm.volume.Resize',
  394. b'test-vm1', b'private', b'no-int-size')
  395. self.assertEqual(self.vm.volumes.mock_calls,
  396. [unittest.mock.call.keys()])
  397. self.assertFalse(self.vm.storage.called)
  398. def test_120_vm_volume_resize_invalid_size2(self):
  399. self.vm.volumes = unittest.mock.MagicMock()
  400. volumes_conf = {
  401. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  402. }
  403. self.vm.volumes.configure_mock(**volumes_conf)
  404. self.vm.storage = unittest.mock.Mock()
  405. with self.assertRaises(AssertionError):
  406. self.call_mgmt_func(b'admin.vm.volume.Resize',
  407. b'test-vm1', b'private', b'-1')
  408. self.assertEqual(self.vm.volumes.mock_calls,
  409. [unittest.mock.call.keys()])
  410. self.assertFalse(self.vm.storage.called)
  411. def test_130_pool_list(self):
  412. self.app.pools = ['file', 'lvm']
  413. value = self.call_mgmt_func(b'admin.pool.List', b'dom0')
  414. self.assertEqual(value, 'file\nlvm\n')
  415. self.assertFalse(self.app.save.called)
  416. @unittest.mock.patch('qubes.storage.pool_drivers')
  417. @unittest.mock.patch('qubes.storage.driver_parameters')
  418. def test_140_pool_listdrivers(self, mock_parameters, mock_drivers):
  419. self.app.pools = ['file', 'lvm']
  420. mock_drivers.return_value = ['driver1', 'driver2']
  421. mock_parameters.side_effect = \
  422. lambda driver: {
  423. 'driver1': ['param1', 'param2'],
  424. 'driver2': ['param3', 'param4']
  425. }[driver]
  426. value = self.call_mgmt_func(b'admin.pool.ListDrivers', b'dom0')
  427. self.assertEqual(value,
  428. 'driver1 param1 param2\ndriver2 param3 param4\n')
  429. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  430. self.assertEqual(mock_parameters.mock_calls,
  431. [unittest.mock.call('driver1'), unittest.mock.call('driver2')])
  432. self.assertFalse(self.app.save.called)
  433. def test_150_pool_info(self):
  434. self.app.pools = {
  435. 'pool1': unittest.mock.Mock(config={
  436. 'param1': 'value1', 'param2': 'value2'})
  437. }
  438. value = self.call_mgmt_func(b'admin.pool.Info', b'dom0', b'pool1')
  439. self.assertEqual(value, 'param1=value1\nparam2=value2\n')
  440. self.assertFalse(self.app.save.called)
  441. @unittest.mock.patch('qubes.storage.pool_drivers')
  442. @unittest.mock.patch('qubes.storage.driver_parameters')
  443. def test_160_pool_add(self, mock_parameters, mock_drivers):
  444. self.app.pools = {
  445. 'file': unittest.mock.Mock(),
  446. 'lvm': unittest.mock.Mock()
  447. }
  448. mock_drivers.return_value = ['driver1', 'driver2']
  449. mock_parameters.side_effect = \
  450. lambda driver: {
  451. 'driver1': ['param1', 'param2'],
  452. 'driver2': ['param3', 'param4']
  453. }[driver]
  454. self.app.add_pool = unittest.mock.Mock()
  455. value = self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'driver1',
  456. b'name=test-pool\nparam1=some-value\n')
  457. self.assertIsNone(value)
  458. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  459. self.assertEqual(mock_parameters.mock_calls,
  460. [unittest.mock.call('driver1')])
  461. self.assertEqual(self.app.add_pool.mock_calls,
  462. [unittest.mock.call(name='test-pool', driver='driver1',
  463. param1='some-value')])
  464. self.assertTrue(self.app.save.called)
  465. @unittest.mock.patch('qubes.storage.pool_drivers')
  466. @unittest.mock.patch('qubes.storage.driver_parameters')
  467. def test_160_pool_add_invalid_driver(self, mock_parameters, mock_drivers):
  468. self.app.pools = {
  469. 'file': unittest.mock.Mock(),
  470. 'lvm': unittest.mock.Mock()
  471. }
  472. mock_drivers.return_value = ['driver1', 'driver2']
  473. mock_parameters.side_effect = \
  474. lambda driver: {
  475. 'driver1': ['param1', 'param2'],
  476. 'driver2': ['param3', 'param4']
  477. }[driver]
  478. self.app.add_pool = unittest.mock.Mock()
  479. with self.assertRaises(AssertionError):
  480. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  481. b'no-such-driver', b'name=test-pool\nparam1=some-value\n')
  482. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  483. self.assertEqual(mock_parameters.mock_calls, [])
  484. self.assertEqual(self.app.add_pool.mock_calls, [])
  485. self.assertFalse(self.app.save.called)
  486. @unittest.mock.patch('qubes.storage.pool_drivers')
  487. @unittest.mock.patch('qubes.storage.driver_parameters')
  488. def test_160_pool_add_invalid_param(self, mock_parameters, mock_drivers):
  489. self.app.pools = {
  490. 'file': unittest.mock.Mock(),
  491. 'lvm': unittest.mock.Mock()
  492. }
  493. mock_drivers.return_value = ['driver1', 'driver2']
  494. mock_parameters.side_effect = \
  495. lambda driver: {
  496. 'driver1': ['param1', 'param2'],
  497. 'driver2': ['param3', 'param4']
  498. }[driver]
  499. self.app.add_pool = unittest.mock.Mock()
  500. with self.assertRaises(AssertionError):
  501. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  502. b'driver1', b'name=test-pool\nparam3=some-value\n')
  503. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  504. self.assertEqual(mock_parameters.mock_calls,
  505. [unittest.mock.call('driver1')])
  506. self.assertEqual(self.app.add_pool.mock_calls, [])
  507. self.assertFalse(self.app.save.called)
  508. @unittest.mock.patch('qubes.storage.pool_drivers')
  509. @unittest.mock.patch('qubes.storage.driver_parameters')
  510. def test_160_pool_add_missing_name(self, mock_parameters, mock_drivers):
  511. self.app.pools = {
  512. 'file': unittest.mock.Mock(),
  513. 'lvm': unittest.mock.Mock()
  514. }
  515. mock_drivers.return_value = ['driver1', 'driver2']
  516. mock_parameters.side_effect = \
  517. lambda driver: {
  518. 'driver1': ['param1', 'param2'],
  519. 'driver2': ['param3', 'param4']
  520. }[driver]
  521. self.app.add_pool = unittest.mock.Mock()
  522. with self.assertRaises(AssertionError):
  523. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  524. b'driver1', b'param1=value\nparam2=some-value\n')
  525. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  526. self.assertEqual(mock_parameters.mock_calls, [])
  527. self.assertEqual(self.app.add_pool.mock_calls, [])
  528. self.assertFalse(self.app.save.called)
  529. @unittest.mock.patch('qubes.storage.pool_drivers')
  530. @unittest.mock.patch('qubes.storage.driver_parameters')
  531. def test_160_pool_add_existing_pool(self, mock_parameters, mock_drivers):
  532. self.app.pools = {
  533. 'file': unittest.mock.Mock(),
  534. 'lvm': unittest.mock.Mock()
  535. }
  536. mock_drivers.return_value = ['driver1', 'driver2']
  537. mock_parameters.side_effect = \
  538. lambda driver: {
  539. 'driver1': ['param1', 'param2'],
  540. 'driver2': ['param3', 'param4']
  541. }[driver]
  542. self.app.add_pool = unittest.mock.Mock()
  543. with self.assertRaises(AssertionError):
  544. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  545. b'driver1', b'name=file\nparam1=value\nparam2=some-value\n')
  546. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  547. self.assertEqual(mock_parameters.mock_calls, [])
  548. self.assertEqual(self.app.add_pool.mock_calls, [])
  549. self.assertFalse(self.app.save.called)
  550. @unittest.mock.patch('qubes.storage.pool_drivers')
  551. @unittest.mock.patch('qubes.storage.driver_parameters')
  552. def test_160_pool_add_invalid_config_format(self, mock_parameters,
  553. mock_drivers):
  554. self.app.pools = {
  555. 'file': unittest.mock.Mock(),
  556. 'lvm': unittest.mock.Mock()
  557. }
  558. mock_drivers.return_value = ['driver1', 'driver2']
  559. mock_parameters.side_effect = \
  560. lambda driver: {
  561. 'driver1': ['param1', 'param2'],
  562. 'driver2': ['param3', 'param4']
  563. }[driver]
  564. self.app.add_pool = unittest.mock.Mock()
  565. with self.assertRaises(AssertionError):
  566. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  567. b'driver1', b'name=test-pool\nparam 1=value\n_param2\n')
  568. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  569. self.assertEqual(mock_parameters.mock_calls, [])
  570. self.assertEqual(self.app.add_pool.mock_calls, [])
  571. self.assertFalse(self.app.save.called)
  572. def test_170_pool_remove(self):
  573. self.app.pools = {
  574. 'file': unittest.mock.Mock(),
  575. 'lvm': unittest.mock.Mock(),
  576. 'test-pool': unittest.mock.Mock(),
  577. }
  578. self.app.remove_pool = unittest.mock.Mock()
  579. value = self.call_mgmt_func(b'admin.pool.Remove', b'dom0', b'test-pool')
  580. self.assertIsNone(value)
  581. self.assertEqual(self.app.remove_pool.mock_calls,
  582. [unittest.mock.call('test-pool')])
  583. self.assertTrue(self.app.save.called)
  584. def test_170_pool_remove_invalid_pool(self):
  585. self.app.pools = {
  586. 'file': unittest.mock.Mock(),
  587. 'lvm': unittest.mock.Mock(),
  588. 'test-pool': unittest.mock.Mock(),
  589. }
  590. self.app.remove_pool = unittest.mock.Mock()
  591. with self.assertRaises(AssertionError):
  592. self.call_mgmt_func(b'admin.pool.Remove', b'dom0',
  593. b'no-such-pool')
  594. self.assertEqual(self.app.remove_pool.mock_calls, [])
  595. self.assertFalse(self.app.save.called)
  596. def test_180_label_list(self):
  597. value = self.call_mgmt_func(b'admin.label.List', b'dom0')
  598. self.assertEqual(value,
  599. ''.join('{}\n'.format(l.name) for l in self.app.labels.values()))
  600. self.assertFalse(self.app.save.called)
  601. def test_190_label_get(self):
  602. self.app.get_label = unittest.mock.Mock()
  603. self.app.get_label.configure_mock(**{'return_value.color': '0xff0000'})
  604. value = self.call_mgmt_func(b'admin.label.Get', b'dom0', b'red')
  605. self.assertEqual(value, '0xff0000')
  606. self.assertEqual(self.app.get_label.mock_calls,
  607. [unittest.mock.call('red')])
  608. self.assertFalse(self.app.save.called)
  609. def test_195_label_index(self):
  610. self.app.get_label = unittest.mock.Mock()
  611. self.app.get_label.configure_mock(**{'return_value.index': 1})
  612. value = self.call_mgmt_func(b'admin.label.Index', b'dom0', b'red')
  613. self.assertEqual(value, '1')
  614. self.assertEqual(self.app.get_label.mock_calls,
  615. [unittest.mock.call('red')])
  616. self.assertFalse(self.app.save.called)
  617. def test_200_label_create(self):
  618. self.app.get_label = unittest.mock.Mock()
  619. self.app.get_label.side_effect=KeyError
  620. self.app.labels = unittest.mock.MagicMock()
  621. labels_config = {
  622. 'keys.return_value': range(1, 9),
  623. }
  624. self.app.labels.configure_mock(**labels_config)
  625. value = self.call_mgmt_func(b'admin.label.Create', b'dom0', b'cyan',
  626. b'0x00ffff')
  627. self.assertIsNone(value)
  628. self.assertEqual(self.app.get_label.mock_calls,
  629. [unittest.mock.call('cyan')])
  630. self.assertEqual(self.app.labels.mock_calls,
  631. [unittest.mock.call.keys(),
  632. unittest.mock.call.__getattr__('__setitem__')(9,
  633. qubes.Label(9, '0x00ffff', 'cyan'))])
  634. self.assertTrue(self.app.save.called)
  635. def test_200_label_create_invalid_color(self):
  636. self.app.get_label = unittest.mock.Mock()
  637. self.app.get_label.side_effect=KeyError
  638. self.app.labels = unittest.mock.MagicMock()
  639. labels_config = {
  640. 'keys.return_value': range(1, 9),
  641. }
  642. self.app.labels.configure_mock(**labels_config)
  643. with self.assertRaises(AssertionError):
  644. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'cyan',
  645. b'abcd')
  646. self.assertEqual(self.app.get_label.mock_calls,
  647. [unittest.mock.call('cyan')])
  648. self.assertEqual(self.app.labels.mock_calls, [])
  649. self.assertFalse(self.app.save.called)
  650. def test_200_label_create_invalid_name(self):
  651. self.app.get_label = unittest.mock.Mock()
  652. self.app.get_label.side_effect=KeyError
  653. self.app.labels = unittest.mock.MagicMock()
  654. labels_config = {
  655. 'keys.return_value': range(1, 9),
  656. }
  657. self.app.labels.configure_mock(**labels_config)
  658. with self.assertRaises(AssertionError):
  659. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'01',
  660. b'0xff0000')
  661. with self.assertRaises(AssertionError):
  662. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'../xxx',
  663. b'0xff0000')
  664. with self.assertRaises(AssertionError):
  665. self.call_mgmt_func(b'admin.label.Create', b'dom0',
  666. b'strange-name!@#$',
  667. b'0xff0000')
  668. self.assertEqual(self.app.get_label.mock_calls, [])
  669. self.assertEqual(self.app.labels.mock_calls, [])
  670. self.assertFalse(self.app.save.called)
  671. def test_200_label_create_already_exists(self):
  672. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label)
  673. with self.assertRaises(qubes.exc.QubesValueError):
  674. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'red',
  675. b'abcd')
  676. self.assertEqual(self.app.get_label.mock_calls,
  677. [unittest.mock.call('red')])
  678. self.assertFalse(self.app.save.called)
  679. def test_210_label_remove(self):
  680. label = qubes.Label(9, '0x00ffff', 'cyan')
  681. self.app.labels[9] = label
  682. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label,
  683. **{'return_value.index': 9})
  684. self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels)
  685. value = self.call_mgmt_func(b'admin.label.Remove', b'dom0', b'cyan')
  686. self.assertIsNone(value)
  687. self.assertEqual(self.app.get_label.mock_calls,
  688. [unittest.mock.call('cyan')])
  689. self.assertEqual(self.app.labels.mock_calls,
  690. [unittest.mock.call.__delitem__(9)])
  691. self.assertTrue(self.app.save.called)
  692. def test_210_label_remove_invalid_label(self):
  693. with self.assertRaises(qubes.exc.QubesValueError):
  694. self.call_mgmt_func(b'admin.label.Remove', b'dom0',
  695. b'no-such-label')
  696. self.assertFalse(self.app.save.called)
  697. def test_210_label_remove_default_label(self):
  698. self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels)
  699. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label,
  700. **{'return_value.index': 6})
  701. with self.assertRaises(AssertionError):
  702. self.call_mgmt_func(b'admin.label.Remove', b'dom0',
  703. b'blue')
  704. self.assertEqual(self.app.labels.mock_calls, [])
  705. self.assertFalse(self.app.save.called)
  706. def test_210_label_remove_in_use(self):
  707. self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels)
  708. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label,
  709. **{'return_value.index': 1})
  710. with self.assertRaises(AssertionError):
  711. self.call_mgmt_func(b'admin.label.Remove', b'dom0',
  712. b'red')
  713. self.assertEqual(self.app.labels.mock_calls, [])
  714. self.assertFalse(self.app.save.called)
  715. def test_220_start(self):
  716. func_mock = unittest.mock.Mock()
  717. @asyncio.coroutine
  718. def coroutine_mock(*args, **kwargs):
  719. return func_mock(*args, **kwargs)
  720. self.vm.start = coroutine_mock
  721. value = self.call_mgmt_func(b'admin.vm.Start', b'test-vm1')
  722. self.assertIsNone(value)
  723. func_mock.assert_called_once_with()
  724. def test_230_shutdown(self):
  725. func_mock = unittest.mock.Mock()
  726. @asyncio.coroutine
  727. def coroutine_mock(*args, **kwargs):
  728. return func_mock(*args, **kwargs)
  729. self.vm.shutdown = coroutine_mock
  730. value = self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1')
  731. self.assertIsNone(value)
  732. func_mock.assert_called_once_with()
  733. def test_240_pause(self):
  734. func_mock = unittest.mock.Mock()
  735. @asyncio.coroutine
  736. def coroutine_mock(*args, **kwargs):
  737. return func_mock(*args, **kwargs)
  738. self.vm.pause = coroutine_mock
  739. value = self.call_mgmt_func(b'admin.vm.Pause', b'test-vm1')
  740. self.assertIsNone(value)
  741. func_mock.assert_called_once_with()
  742. def test_250_unpause(self):
  743. func_mock = unittest.mock.Mock()
  744. @asyncio.coroutine
  745. def coroutine_mock(*args, **kwargs):
  746. return func_mock(*args, **kwargs)
  747. self.vm.unpause = coroutine_mock
  748. value = self.call_mgmt_func(b'admin.vm.Unpause', b'test-vm1')
  749. self.assertIsNone(value)
  750. func_mock.assert_called_once_with()
  751. def test_260_kill(self):
  752. func_mock = unittest.mock.Mock()
  753. @asyncio.coroutine
  754. def coroutine_mock(*args, **kwargs):
  755. return func_mock(*args, **kwargs)
  756. self.vm.kill = coroutine_mock
  757. value = self.call_mgmt_func(b'admin.vm.Kill', b'test-vm1')
  758. self.assertIsNone(value)
  759. func_mock.assert_called_once_with()
  760. def test_270_events(self):
  761. send_event = unittest.mock.Mock(spec=[])
  762. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', b'admin.Events',
  763. b'dom0', b'', send_event=send_event)
  764. @asyncio.coroutine
  765. def fire_event():
  766. self.vm.fire_event('test-event', arg1='abc')
  767. mgmt_obj.cancel()
  768. loop = asyncio.get_event_loop()
  769. execute_task = asyncio.ensure_future(
  770. mgmt_obj.execute(untrusted_payload=b''))
  771. asyncio.ensure_future(fire_event())
  772. loop.run_until_complete(execute_task)
  773. self.assertIsNone(execute_task.result())
  774. self.assertEventFired(self.emitter,
  775. 'mgmt-permission:' + 'admin.Events')
  776. self.assertEqual(send_event.mock_calls,
  777. [
  778. unittest.mock.call(self.app, 'connection-established'),
  779. unittest.mock.call(self.vm, 'test-event', arg1='abc')
  780. ])
  781. def test_271_events_add_vm(self):
  782. send_event = unittest.mock.Mock(spec=[])
  783. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', b'admin.Events',
  784. b'dom0', b'', send_event=send_event)
  785. @asyncio.coroutine
  786. def fire_event():
  787. self.vm.fire_event('test-event', arg1='abc')
  788. # add VM _after_ starting admin.Events call
  789. vm = self.app.add_new_vm('AppVM', label='red', name='test-vm2',
  790. template='test-template')
  791. vm.fire_event('test-event2', arg1='abc')
  792. mgmt_obj.cancel()
  793. return vm
  794. loop = asyncio.get_event_loop()
  795. execute_task = asyncio.ensure_future(
  796. mgmt_obj.execute(untrusted_payload=b''))
  797. event_task = asyncio.ensure_future(fire_event())
  798. loop.run_until_complete(execute_task)
  799. vm2 = event_task.result()
  800. self.assertIsNone(execute_task.result())
  801. self.assertEventFired(self.emitter,
  802. 'mgmt-permission:' + 'admin.Events')
  803. self.assertEqual(send_event.mock_calls,
  804. [
  805. unittest.mock.call(self.app, 'connection-established'),
  806. unittest.mock.call(self.vm, 'test-event', arg1='abc'),
  807. unittest.mock.call(self.app, 'domain-add', vm=vm2),
  808. unittest.mock.call(vm2, 'test-event2', arg1='abc'),
  809. ])
  810. def test_280_feature_list(self):
  811. self.vm.features['test-feature'] = 'some-value'
  812. value = self.call_mgmt_func(b'admin.vm.feature.List', b'test-vm1')
  813. self.assertEqual(value, 'test-feature\n')
  814. self.assertFalse(self.app.save.called)
  815. def test_290_feature_get(self):
  816. self.vm.features['test-feature'] = 'some-value'
  817. value = self.call_mgmt_func(b'admin.vm.feature.Get', b'test-vm1',
  818. b'test-feature')
  819. self.assertEqual(value, 'some-value')
  820. self.assertFalse(self.app.save.called)
  821. def test_291_feature_get_none(self):
  822. with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
  823. self.call_mgmt_func(b'admin.vm.feature.Get',
  824. b'test-vm1', b'test-feature')
  825. self.assertFalse(self.app.save.called)
  826. def test_300_feature_remove(self):
  827. self.vm.features['test-feature'] = 'some-value'
  828. value = self.call_mgmt_func(b'admin.vm.feature.Remove', b'test-vm1',
  829. b'test-feature')
  830. self.assertIsNone(value, None)
  831. self.assertNotIn('test-feature', self.vm.features)
  832. self.assertTrue(self.app.save.called)
  833. def test_301_feature_remove_none(self):
  834. with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
  835. self.call_mgmt_func(b'admin.vm.feature.Remove',
  836. b'test-vm1', b'test-feature')
  837. self.assertFalse(self.app.save.called)
  838. def test_310_feature_checkwithtemplate(self):
  839. self.vm.features['test-feature'] = 'some-value'
  840. value = self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate',
  841. b'test-vm1', b'test-feature')
  842. self.assertEqual(value, 'some-value')
  843. self.assertFalse(self.app.save.called)
  844. def test_311_feature_checkwithtemplate_tpl(self):
  845. self.template.features['test-feature'] = 'some-value'
  846. value = self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate',
  847. b'test-vm1', b'test-feature')
  848. self.assertEqual(value, 'some-value')
  849. self.assertFalse(self.app.save.called)
  850. def test_312_feature_checkwithtemplate_none(self):
  851. with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
  852. self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate',
  853. b'test-vm1', b'test-feature')
  854. self.assertFalse(self.app.save.called)
  855. def test_320_feature_set(self):
  856. value = self.call_mgmt_func(b'admin.vm.feature.Set',
  857. b'test-vm1', b'test-feature', b'some-value')
  858. self.assertIsNone(value)
  859. self.assertEqual(self.vm.features['test-feature'], 'some-value')
  860. self.assertTrue(self.app.save.called)
  861. def test_321_feature_set_empty(self):
  862. value = self.call_mgmt_func(b'admin.vm.feature.Set',
  863. b'test-vm1', b'test-feature', b'')
  864. self.assertIsNone(value)
  865. self.assertEqual(self.vm.features['test-feature'], '')
  866. self.assertTrue(self.app.save.called)
  867. def test_320_feature_set_invalid(self):
  868. with self.assertRaises(UnicodeDecodeError):
  869. self.call_mgmt_func(b'admin.vm.feature.Set',
  870. b'test-vm1', b'test-feature', b'\x02\x03\xffsome-value')
  871. self.assertNotIn('test-feature', self.vm.features)
  872. self.assertFalse(self.app.save.called)
  873. @asyncio.coroutine
  874. def dummy_coro(self, *args, **kwargs):
  875. pass
  876. @unittest.mock.patch('qubes.storage.Storage.create')
  877. def test_330_vm_create_standalone(self, storage_mock):
  878. storage_mock.side_effect = self.dummy_coro
  879. self.call_mgmt_func(b'admin.vm.Create.StandaloneVM',
  880. b'dom0', b'', b'name=test-vm2 label=red')
  881. self.assertIn('test-vm2', self.app.domains)
  882. vm = self.app.domains['test-vm2']
  883. self.assertIsInstance(vm, qubes.vm.standalonevm.StandaloneVM)
  884. self.assertEqual(vm.label, self.app.get_label('red'))
  885. self.assertEqual(storage_mock.mock_calls,
  886. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  887. self.assertTrue(os.path.exists(os.path.join(
  888. self.test_base_dir, 'appvms', 'test-vm2')))
  889. self.assertTrue(self.app.save.called)
  890. @unittest.mock.patch('qubes.storage.Storage.create')
  891. def test_331_vm_create_standalone_spurious_template(self, storage_mock):
  892. storage_mock.side_effect = self.dummy_coro
  893. with self.assertRaises(AssertionError):
  894. self.call_mgmt_func(b'admin.vm.Create.StandaloneVM',
  895. b'dom0', b'test-template', b'name=test-vm2 label=red')
  896. self.assertNotIn('test-vm2', self.app.domains)
  897. self.assertEqual(storage_mock.mock_calls, [])
  898. self.assertFalse(os.path.exists(os.path.join(
  899. self.test_base_dir, 'appvms', 'test-vm2')))
  900. self.assertNotIn('test-vm2', self.app.domains)
  901. self.assertFalse(self.app.save.called)
  902. @unittest.mock.patch('qubes.storage.Storage.create')
  903. def test_332_vm_create_app(self, storage_mock):
  904. storage_mock.side_effect = self.dummy_coro
  905. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  906. b'dom0', b'test-template', b'name=test-vm2 label=red')
  907. self.assertIn('test-vm2', self.app.domains)
  908. vm = self.app.domains['test-vm2']
  909. self.assertEqual(vm.label, self.app.get_label('red'))
  910. self.assertEqual(vm.template, self.app.domains['test-template'])
  911. self.assertEqual(storage_mock.mock_calls,
  912. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  913. self.assertTrue(os.path.exists(os.path.join(
  914. self.test_base_dir, 'appvms', 'test-vm2')))
  915. self.assertTrue(self.app.save.called)
  916. @unittest.mock.patch('qubes.storage.Storage.create')
  917. def test_333_vm_create_app_default_template(self, storage_mock):
  918. storage_mock.side_effect = self.dummy_coro
  919. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  920. b'dom0', b'', b'name=test-vm2 label=red')
  921. self.assertEqual(storage_mock.mock_calls,
  922. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  923. self.assertIn('test-vm2', self.app.domains)
  924. self.assertEqual(self.app.domains['test-vm2'].template,
  925. self.app.default_template)
  926. self.assertTrue(self.app.save.called)
  927. @unittest.mock.patch('qubes.storage.Storage.create')
  928. def test_334_vm_create_invalid_name(self, storage_mock):
  929. storage_mock.side_effect = self.dummy_coro
  930. with self.assertRaises(qubes.exc.QubesValueError):
  931. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  932. b'dom0', b'test-template', b'name=test-###')
  933. self.assertNotIn('test-###', self.app.domains)
  934. self.assertFalse(self.app.save.called)
  935. @unittest.mock.patch('qubes.storage.Storage.create')
  936. def test_335_vm_create_missing_name(self, storage_mock):
  937. storage_mock.side_effect = self.dummy_coro
  938. with self.assertRaises(AssertionError):
  939. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  940. b'dom0', b'test-template', b'label=red')
  941. self.assertFalse(self.app.save.called)
  942. @unittest.mock.patch('qubes.storage.Storage.create')
  943. def test_336_vm_create_spurious_pool(self, storage_mock):
  944. storage_mock.side_effect = self.dummy_coro
  945. with self.assertRaises(AssertionError):
  946. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  947. b'dom0', b'test-template',
  948. b'name=test-vm2 label=red pool=default')
  949. self.assertNotIn('test-vm2', self.app.domains)
  950. self.assertFalse(self.app.save.called)
  951. @unittest.mock.patch('qubes.storage.Storage.create')
  952. def test_337_vm_create_duplicate_name(self, storage_mock):
  953. storage_mock.side_effect = self.dummy_coro
  954. with self.assertRaises(qubes.exc.QubesException):
  955. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  956. b'dom0', b'test-template',
  957. b'name=test-vm1 label=red')
  958. self.assertFalse(self.app.save.called)
  959. @unittest.mock.patch('qubes.storage.Storage.create')
  960. def test_338_vm_create_name_twice(self, storage_mock):
  961. storage_mock.side_effect = self.dummy_coro
  962. with self.assertRaises(AssertionError):
  963. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  964. b'dom0', b'test-template',
  965. b'name=test-vm2 name=test-vm3 label=red')
  966. self.assertNotIn('test-vm2', self.app.domains)
  967. self.assertNotIn('test-vm3', self.app.domains)
  968. self.assertFalse(self.app.save.called)
  969. @unittest.mock.patch('qubes.storage.Storage.create')
  970. def test_340_vm_create_in_pool_app(self, storage_mock):
  971. storage_mock.side_effect = self.dummy_coro
  972. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  973. b'dom0', b'test-template', b'name=test-vm2 label=red '
  974. b'pool=test')
  975. self.assertIn('test-vm2', self.app.domains)
  976. vm = self.app.domains['test-vm2']
  977. self.assertEqual(vm.label, self.app.get_label('red'))
  978. self.assertEqual(vm.template, self.app.domains['test-template'])
  979. # setting pool= affect only volumes actually created for this VM,
  980. # not used from a template or so
  981. self.assertEqual(vm.volume_config['root']['pool'], 'default')
  982. self.assertEqual(vm.volume_config['private']['pool'], 'test')
  983. self.assertEqual(vm.volume_config['volatile']['pool'], 'test')
  984. self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel')
  985. self.assertEqual(storage_mock.mock_calls,
  986. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  987. self.assertTrue(os.path.exists(os.path.join(
  988. self.test_base_dir, 'appvms', 'test-vm2')))
  989. self.assertTrue(self.app.save.called)
  990. @unittest.mock.patch('qubes.storage.Storage.create')
  991. def test_341_vm_create_in_pool_private(self, storage_mock):
  992. storage_mock.side_effect = self.dummy_coro
  993. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  994. b'dom0', b'test-template', b'name=test-vm2 label=red '
  995. b'pool:private=test')
  996. self.assertIn('test-vm2', self.app.domains)
  997. vm = self.app.domains['test-vm2']
  998. self.assertEqual(vm.label, self.app.get_label('red'))
  999. self.assertEqual(vm.template, self.app.domains['test-template'])
  1000. self.assertEqual(vm.volume_config['root']['pool'], 'default')
  1001. self.assertEqual(vm.volume_config['private']['pool'], 'test')
  1002. self.assertEqual(vm.volume_config['volatile']['pool'], 'default')
  1003. self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel')
  1004. self.assertEqual(storage_mock.mock_calls,
  1005. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  1006. self.assertTrue(os.path.exists(os.path.join(
  1007. self.test_base_dir, 'appvms', 'test-vm2')))
  1008. self.assertTrue(self.app.save.called)
  1009. @unittest.mock.patch('qubes.storage.Storage.create')
  1010. def test_342_vm_create_in_pool_invalid_pool(self, storage_mock):
  1011. storage_mock.side_effect = self.dummy_coro
  1012. with self.assertRaises(qubes.exc.QubesException):
  1013. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1014. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1015. b'pool=no-such-pool')
  1016. self.assertFalse(self.app.save.called)
  1017. @unittest.mock.patch('qubes.storage.Storage.create')
  1018. def test_343_vm_create_in_pool_invalid_pool2(self, storage_mock):
  1019. storage_mock.side_effect = self.dummy_coro
  1020. with self.assertRaises(qubes.exc.QubesException):
  1021. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1022. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1023. b'pool:private=no-such-pool')
  1024. self.assertNotIn('test-vm2', self.app.domains)
  1025. self.assertFalse(self.app.save.called)
  1026. @unittest.mock.patch('qubes.storage.Storage.create')
  1027. def test_344_vm_create_in_pool_invalid_volume(self, storage_mock):
  1028. storage_mock.side_effect = self.dummy_coro
  1029. with self.assertRaises(AssertionError):
  1030. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1031. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1032. b'pool:invalid=test')
  1033. self.assertNotIn('test-vm2', self.app.domains)
  1034. self.assertFalse(self.app.save.called)
  1035. @unittest.mock.patch('qubes.storage.Storage.create')
  1036. def test_345_vm_create_in_pool_app_root(self, storage_mock):
  1037. # setting custom pool for 'root' volume of AppVM should not be
  1038. # allowed - this volume belongs to the template
  1039. storage_mock.side_effect = self.dummy_coro
  1040. with self.assertRaises(qubes.exc.QubesException):
  1041. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1042. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1043. b'pool:root=test')
  1044. self.assertNotIn('test-vm2', self.app.domains)
  1045. self.assertFalse(self.app.save.called)
  1046. @unittest.mock.patch('qubes.storage.Storage.create')
  1047. def test_346_vm_create_in_pool_duplicate_pool(self, storage_mock):
  1048. # setting custom pool for 'root' volume of AppVM should not be
  1049. # allowed - this volume belongs to the template
  1050. storage_mock.side_effect = self.dummy_coro
  1051. with self.assertRaises(AssertionError):
  1052. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1053. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1054. b'pool=test pool:root=test')
  1055. self.assertNotIn('test-vm2', self.app.domains)
  1056. self.assertFalse(self.app.save.called)
  1057. @unittest.mock.patch('qubes.storage.Storage.clone')
  1058. @unittest.mock.patch('qubes.storage.Storage.verify')
  1059. def test_350_vm_clone(self, mock_verify, mock_clone):
  1060. mock_clone.side_effect = self.dummy_coro
  1061. mock_verify.side_effect = self.dummy_coro
  1062. self.call_mgmt_func(b'admin.vm.Clone',
  1063. b'test-vm1', b'', b'name=test-vm2')
  1064. self.assertIn('test-vm2', self.app.domains)
  1065. vm = self.app.domains['test-vm2']
  1066. self.assertEqual(vm.label, self.app.get_label('red'))
  1067. self.assertEqual(vm.template, self.app.domains['test-template'])
  1068. self.assertEqual(mock_clone.mock_calls,
  1069. [unittest.mock.call(self.app.domains['test-vm2']).clone(
  1070. self.app.domains['test-vm1'])])
  1071. self.assertTrue(os.path.exists(os.path.join(
  1072. self.test_base_dir, 'appvms', 'test-vm2')))
  1073. self.assertTrue(self.app.save.called)
  1074. @unittest.mock.patch('qubes.storage.Storage.clone')
  1075. @unittest.mock.patch('qubes.storage.Storage.verify')
  1076. def test_351_vm_clone_extra_params(self, mock_verify, mock_clone):
  1077. mock_clone.side_effect = self.dummy_coro
  1078. mock_verify.side_effect = self.dummy_coro
  1079. with self.assertRaises(qubes.exc.QubesException):
  1080. self.call_mgmt_func(b'admin.vm.Clone',
  1081. b'test-vm1', b'', b'name=test-vm2 label=red')
  1082. self.assertNotIn('test-vm2', self.app.domains)
  1083. self.assertEqual(mock_clone.mock_calls, [])
  1084. self.assertFalse(os.path.exists(os.path.join(
  1085. self.test_base_dir, 'appvms', 'test-vm2')))
  1086. self.assertFalse(self.app.save.called)
  1087. @unittest.mock.patch('qubes.storage.Storage.clone')
  1088. @unittest.mock.patch('qubes.storage.Storage.verify')
  1089. def test_352_vm_clone_duplicate_name(self, mock_verify, mock_clone):
  1090. mock_clone.side_effect = self.dummy_coro
  1091. mock_verify.side_effect = self.dummy_coro
  1092. with self.assertRaises(qubes.exc.QubesException):
  1093. self.call_mgmt_func(b'admin.vm.Clone',
  1094. b'test-vm1', b'', b'name=test-vm1')
  1095. self.assertFalse(self.app.save.called)
  1096. @unittest.mock.patch('qubes.storage.Storage.clone')
  1097. @unittest.mock.patch('qubes.storage.Storage.verify')
  1098. def test_353_vm_clone_invalid_name(self, mock_verify, mock_clone):
  1099. mock_clone.side_effect = self.dummy_coro
  1100. mock_verify.side_effect = self.dummy_coro
  1101. with self.assertRaises(qubes.exc.QubesException):
  1102. self.call_mgmt_func(b'admin.vm.Clone',
  1103. b'test-vm1', b'', b'name=test-vm2/..')
  1104. self.assertNotIn('test-vm2/..', self.app.domains)
  1105. self.assertEqual(mock_clone.mock_calls, [])
  1106. self.assertFalse(os.path.exists(os.path.join(
  1107. self.test_base_dir, 'appvms', 'test-vm2/..')))
  1108. self.assertFalse(self.app.save.called)
  1109. def test_400_property_list(self):
  1110. # actual function tested for admin.vm.property.* already
  1111. # this test is kind of stupid, but at least check if appropriate
  1112. # mgmt-permission event is fired
  1113. value = self.call_mgmt_func(b'admin.property.List', b'dom0')
  1114. properties = self.app.property_list()
  1115. self.assertEqual(value,
  1116. ''.join('{}\n'.format(prop.__name__) for prop in properties))
  1117. def test_410_property_get_str(self):
  1118. # actual function tested for admin.vm.property.* already
  1119. value = self.call_mgmt_func(b'admin.property.Get', b'dom0',
  1120. b'default_kernel')
  1121. self.assertEqual(value, 'default=False type=str 1.0')
  1122. def test_420_propert_set_str(self):
  1123. # actual function tested for admin.vm.property.* already
  1124. with unittest.mock.patch('qubes.property.__set__') as mock:
  1125. value = self.call_mgmt_func(b'admin.property.Set', b'dom0',
  1126. b'default_kernel', b'1.0')
  1127. self.assertIsNone(value)
  1128. mock.assert_called_once_with(self.app, '1.0')
  1129. self.app.save.assert_called_once_with()
  1130. def test_440_property_help(self):
  1131. # actual function tested for admin.vm.property.* already
  1132. value = self.call_mgmt_func(b'admin.property.Help', b'dom0',
  1133. b'clockvm')
  1134. self.assertEqual(value,
  1135. 'Which VM to use as NTP proxy for updating AdminVM')
  1136. self.assertFalse(self.app.save.called)
  1137. def test_450_property_reset(self):
  1138. # actual function tested for admin.vm.property.* already
  1139. with unittest.mock.patch('qubes.property.__delete__') as mock:
  1140. value = self.call_mgmt_func(b'admin.property.Reset', b'dom0',
  1141. b'clockvm')
  1142. mock.assert_called_with(self.app)
  1143. self.assertIsNone(value)
  1144. self.app.save.assert_called_once_with()
  1145. def test_990_vm_unexpected_payload(self):
  1146. methods_with_no_payload = [
  1147. b'admin.vm.List',
  1148. b'admin.vm.Remove',
  1149. b'admin.vm.property.List',
  1150. b'admin.vm.property.Get',
  1151. b'admin.vm.property.Help',
  1152. b'admin.vm.property.HelpRst',
  1153. b'admin.vm.property.Reset',
  1154. b'admin.vm.feature.List',
  1155. b'admin.vm.feature.Get',
  1156. b'admin.vm.feature.CheckWithTemplate',
  1157. b'admin.vm.feature.Remove',
  1158. b'admin.vm.tag.List',
  1159. b'admin.vm.tag.Get',
  1160. b'admin.vm.tag.Remove',
  1161. b'admin.vm.tag.Set',
  1162. b'admin.vm.firewall.Get',
  1163. b'admin.vm.firewall.RemoveRule',
  1164. b'admin.vm.firewall.Flush',
  1165. b'admin.vm.device.pci.Attach',
  1166. b'admin.vm.device.pci.Detach',
  1167. b'admin.vm.device.pci.List',
  1168. b'admin.vm.device.pci.Available',
  1169. b'admin.vm.microphone.Attach',
  1170. b'admin.vm.microphone.Detach',
  1171. b'admin.vm.microphone.Status',
  1172. b'admin.vm.volume.ListSnapshots',
  1173. b'admin.vm.volume.List',
  1174. b'admin.vm.volume.Info',
  1175. b'admin.vm.Start',
  1176. b'admin.vm.Shutdown',
  1177. b'admin.vm.Pause',
  1178. b'admin.vm.Unpause',
  1179. b'admin.vm.Kill',
  1180. b'admin.Events',
  1181. b'admin.vm.feature.List',
  1182. b'admin.vm.feature.Get',
  1183. b'admin.vm.feature.Remove',
  1184. b'admin.vm.feature.CheckWithTemplate',
  1185. ]
  1186. # make sure also no methods on actual VM gets called
  1187. vm_mock = unittest.mock.MagicMock()
  1188. vm_mock.name = self.vm.name
  1189. vm_mock.qid = self.vm.qid
  1190. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  1191. self.app.domains._dict[self.vm.qid] = vm_mock
  1192. for method in methods_with_no_payload:
  1193. # should reject payload regardless of having argument or not
  1194. with self.subTest(method.decode('ascii')):
  1195. with self.assertRaises(AssertionError):
  1196. self.call_mgmt_func(method, b'test-vm1', b'',
  1197. b'unexpected-payload')
  1198. self.assertFalse(vm_mock.called)
  1199. self.assertFalse(self.app.save.called)
  1200. with self.subTest(method.decode('ascii') + '+arg'):
  1201. with self.assertRaises(AssertionError):
  1202. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  1203. b'unexpected-payload')
  1204. self.assertFalse(vm_mock.called)
  1205. self.assertFalse(self.app.save.called)
  1206. def test_991_vm_unexpected_argument(self):
  1207. methods_with_no_argument = [
  1208. b'admin.vm.List',
  1209. b'admin.vm.Clone',
  1210. b'admin.vm.Remove',
  1211. b'admin.vm.property.List',
  1212. b'admin.vm.feature.List',
  1213. b'admin.vm.tag.List',
  1214. b'admin.vm.firewall.List',
  1215. b'admin.vm.firewall.Flush',
  1216. b'admin.vm.device.pci.List',
  1217. b'admin.vm.device.pci.Available',
  1218. b'admin.vm.microphone.Attach',
  1219. b'admin.vm.microphone.Detach',
  1220. b'admin.vm.microphone.Status',
  1221. b'admin.vm.volume.List',
  1222. b'admin.vm.Start',
  1223. b'admin.vm.Shutdown',
  1224. b'admin.vm.Pause',
  1225. b'admin.vm.Unpause',
  1226. b'admin.vm.Kill',
  1227. b'admin.Events',
  1228. b'admin.vm.feature.List',
  1229. ]
  1230. # make sure also no methods on actual VM gets called
  1231. vm_mock = unittest.mock.MagicMock()
  1232. vm_mock.name = self.vm.name
  1233. vm_mock.qid = self.vm.qid
  1234. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  1235. self.app.domains._dict[self.vm.qid] = vm_mock
  1236. for method in methods_with_no_argument:
  1237. # should reject argument regardless of having payload or not
  1238. with self.subTest(method.decode('ascii')):
  1239. with self.assertRaises(AssertionError):
  1240. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  1241. b'')
  1242. self.assertFalse(vm_mock.called)
  1243. self.assertFalse(self.app.save.called)
  1244. with self.subTest(method.decode('ascii') + '+payload'):
  1245. with self.assertRaises(AssertionError):
  1246. self.call_mgmt_func(method, b'test-vm1', b'unexpected-arg',
  1247. b'some-payload')
  1248. self.assertFalse(vm_mock.called)
  1249. self.assertFalse(self.app.save.called)
  1250. def test_992_dom0_unexpected_payload(self):
  1251. methods_with_no_payload = [
  1252. b'admin.vmclass.List',
  1253. b'admin.vm.List',
  1254. b'admin.label.List',
  1255. b'admin.label.Get',
  1256. b'admin.label.Remove',
  1257. b'admin.property.List',
  1258. b'admin.property.Get',
  1259. b'admin.property.Help',
  1260. b'admin.property.HelpRst',
  1261. b'admin.property.Reset',
  1262. b'admin.pool.List',
  1263. b'admin.pool.ListDrivers',
  1264. b'admin.pool.Info',
  1265. b'admin.pool.Remove',
  1266. b'admin.backup.Execute',
  1267. b'admin.Events',
  1268. ]
  1269. # make sure also no methods on actual VM gets called
  1270. vm_mock = unittest.mock.MagicMock()
  1271. vm_mock.name = self.vm.name
  1272. vm_mock.qid = self.vm.qid
  1273. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  1274. self.app.domains._dict[self.vm.qid] = vm_mock
  1275. for method in methods_with_no_payload:
  1276. # should reject payload regardless of having argument or not
  1277. with self.subTest(method.decode('ascii')):
  1278. with self.assertRaises(AssertionError):
  1279. self.call_mgmt_func(method, b'dom0', b'',
  1280. b'unexpected-payload')
  1281. self.assertFalse(vm_mock.called)
  1282. self.assertFalse(self.app.save.called)
  1283. with self.subTest(method.decode('ascii') + '+arg'):
  1284. with self.assertRaises(AssertionError):
  1285. self.call_mgmt_func(method, b'dom0', b'some-arg',
  1286. b'unexpected-payload')
  1287. self.assertFalse(vm_mock.called)
  1288. self.assertFalse(self.app.save.called)
  1289. def test_993_dom0_unexpected_argument(self):
  1290. methods_with_no_argument = [
  1291. b'admin.vmclass.List',
  1292. b'admin.vm.List',
  1293. b'admin.label.List',
  1294. b'admin.property.List',
  1295. b'admin.pool.List',
  1296. b'admin.pool.ListDrivers',
  1297. b'admin.Events',
  1298. ]
  1299. # make sure also no methods on actual VM gets called
  1300. vm_mock = unittest.mock.MagicMock()
  1301. vm_mock.name = self.vm.name
  1302. vm_mock.qid = self.vm.qid
  1303. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  1304. self.app.domains._dict[self.vm.qid] = vm_mock
  1305. for method in methods_with_no_argument:
  1306. # should reject argument regardless of having payload or not
  1307. with self.subTest(method.decode('ascii')):
  1308. with self.assertRaises(AssertionError):
  1309. self.call_mgmt_func(method, b'dom0', b'some-arg',
  1310. b'')
  1311. self.assertFalse(vm_mock.called)
  1312. self.assertFalse(self.app.save.called)
  1313. with self.subTest(method.decode('ascii') + '+payload'):
  1314. with self.assertRaises(AssertionError):
  1315. self.call_mgmt_func(method, b'dom0', b'unexpected-arg',
  1316. b'some-payload')
  1317. self.assertFalse(vm_mock.called)
  1318. self.assertFalse(self.app.save.called)
  1319. def test_994_dom0_only_calls(self):
  1320. # TODO set some better arguments, to make sure the call was rejected
  1321. # because of invalid destination, not invalid arguments
  1322. methods_for_dom0_only = [
  1323. b'admin.vmclass.List',
  1324. b'admin.vm.Create.AppVM',
  1325. b'admin.vm.CreateInPool.AppVM',
  1326. b'admin.vm.CreateTemplate',
  1327. b'admin.label.List',
  1328. b'admin.label.Create',
  1329. b'admin.label.Get',
  1330. b'admin.label.Remove',
  1331. b'admin.property.List',
  1332. b'admin.property.Get',
  1333. b'admin.property.Set',
  1334. b'admin.property.Help',
  1335. b'admin.property.HelpRst',
  1336. b'admin.property.Reset',
  1337. b'admin.pool.List',
  1338. b'admin.pool.ListDrivers',
  1339. b'admin.pool.Info',
  1340. b'admin.pool.Add',
  1341. b'admin.pool.Remove',
  1342. b'admin.pool.volume.List',
  1343. b'admin.pool.volume.Info',
  1344. b'admin.pool.volume.ListSnapshots',
  1345. b'admin.pool.volume.Snapshot',
  1346. b'admin.pool.volume.Revert',
  1347. b'admin.pool.volume.Resize',
  1348. b'admin.backup.Execute',
  1349. b'admin.backup.Info',
  1350. b'admin.backup.Restore',
  1351. ]
  1352. # make sure also no methods on actual VM gets called
  1353. vm_mock = unittest.mock.MagicMock()
  1354. vm_mock.name = self.vm.name
  1355. vm_mock.qid = self.vm.qid
  1356. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  1357. self.app.domains._dict[self.vm.qid] = vm_mock
  1358. for method in methods_for_dom0_only:
  1359. # should reject call regardless of having payload or not
  1360. with self.subTest(method.decode('ascii')):
  1361. with self.assertRaises(AssertionError):
  1362. self.call_mgmt_func(method, b'test-vm1', b'',
  1363. b'')
  1364. self.assertFalse(vm_mock.called)
  1365. self.assertFalse(self.app.save.called)
  1366. with self.subTest(method.decode('ascii') + '+arg'):
  1367. with self.assertRaises(AssertionError):
  1368. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  1369. b'')
  1370. self.assertFalse(vm_mock.called)
  1371. self.assertFalse(self.app.save.called)
  1372. with self.subTest(method.decode('ascii') + '+payload'):
  1373. with self.assertRaises(AssertionError):
  1374. self.call_mgmt_func(method, b'test-vm1', b'',
  1375. b'payload')
  1376. self.assertFalse(vm_mock.called)
  1377. self.assertFalse(self.app.save.called)
  1378. with self.subTest(method.decode('ascii') + '+arg+payload'):
  1379. with self.assertRaises(AssertionError):
  1380. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  1381. b'some-payload')
  1382. self.assertFalse(vm_mock.called)
  1383. self.assertFalse(self.app.save.called)
  1384. @unittest.skip('undecided')
  1385. def test_995_vm_only_calls(self):
  1386. # XXX is it really a good idea to prevent those calls this early?
  1387. # TODO set some better arguments, to make sure the call was rejected
  1388. # because of invalid destination, not invalid arguments
  1389. methods_for_vm_only = [
  1390. b'admin.vm.Clone',
  1391. b'admin.vm.Remove',
  1392. b'admin.vm.property.List',
  1393. b'admin.vm.property.Get',
  1394. b'admin.vm.property.Set',
  1395. b'admin.vm.property.Help',
  1396. b'admin.vm.property.HelpRst',
  1397. b'admin.vm.property.Reset',
  1398. b'admin.vm.feature.List',
  1399. b'admin.vm.feature.Get',
  1400. b'admin.vm.feature.Set',
  1401. b'admin.vm.feature.CheckWithTemplate',
  1402. b'admin.vm.feature.Remove',
  1403. b'admin.vm.tag.List',
  1404. b'admin.vm.tag.Get',
  1405. b'admin.vm.tag.Remove',
  1406. b'admin.vm.tag.Set',
  1407. b'admin.vm.firewall.Get',
  1408. b'admin.vm.firewall.RemoveRule',
  1409. b'admin.vm.firewall.InsertRule',
  1410. b'admin.vm.firewall.Flush',
  1411. b'admin.vm.device.pci.Attach',
  1412. b'admin.vm.device.pci.Detach',
  1413. b'admin.vm.device.pci.List',
  1414. b'admin.vm.device.pci.Available',
  1415. b'admin.vm.microphone.Attach',
  1416. b'admin.vm.microphone.Detach',
  1417. b'admin.vm.microphone.Status',
  1418. b'admin.vm.volume.ListSnapshots',
  1419. b'admin.vm.volume.List',
  1420. b'admin.vm.volume.Info',
  1421. b'admin.vm.volume.Revert',
  1422. b'admin.vm.volume.Resize',
  1423. b'admin.vm.Start',
  1424. b'admin.vm.Shutdown',
  1425. b'admin.vm.Pause',
  1426. b'admin.vm.Unpause',
  1427. b'admin.vm.Kill',
  1428. b'admin.vm.feature.List',
  1429. b'admin.vm.feature.Get',
  1430. b'admin.vm.feature.Set',
  1431. b'admin.vm.feature.Remove',
  1432. b'admin.vm.feature.CheckWithTemplate',
  1433. ]
  1434. # make sure also no methods on actual VM gets called
  1435. vm_mock = unittest.mock.MagicMock()
  1436. vm_mock.name = self.vm.name
  1437. vm_mock.qid = self.vm.qid
  1438. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  1439. self.app.domains._dict[self.vm.qid] = vm_mock
  1440. for method in methods_for_vm_only:
  1441. # should reject payload regardless of having argument or not
  1442. # should reject call regardless of having payload or not
  1443. with self.subTest(method.decode('ascii')):
  1444. with self.assertRaises(AssertionError):
  1445. self.call_mgmt_func(method, b'dom0', b'',
  1446. b'')
  1447. self.assertFalse(vm_mock.called)
  1448. self.assertFalse(self.app.save.called)
  1449. with self.subTest(method.decode('ascii') + '+arg'):
  1450. with self.assertRaises(AssertionError):
  1451. self.call_mgmt_func(method, b'dom0', b'some-arg',
  1452. b'')
  1453. self.assertFalse(vm_mock.called)
  1454. self.assertFalse(self.app.save.called)
  1455. with self.subTest(method.decode('ascii') + '+payload'):
  1456. with self.assertRaises(AssertionError):
  1457. self.call_mgmt_func(method, b'dom0', b'',
  1458. b'payload')
  1459. self.assertFalse(vm_mock.called)
  1460. self.assertFalse(self.app.save.called)
  1461. with self.subTest(method.decode('ascii') + '+arg+payload'):
  1462. with self.assertRaises(AssertionError):
  1463. self.call_mgmt_func(method, b'dom0', b'some-arg',
  1464. b'some-payload')
  1465. self.assertFalse(vm_mock.called)
  1466. self.assertFalse(self.app.save.called)