api_admin.py 137 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190
  1. # -*- encoding: utf-8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. ''' Tests for management calls endpoints '''
  21. import asyncio
  22. import operator
  23. import os
  24. import shutil
  25. import tempfile
  26. import unittest.mock
  27. import libvirt
  28. import copy
  29. import pathlib
  30. import qubes
  31. import qubes.devices
  32. import qubes.firewall
  33. import qubes.api.admin
  34. import qubes.api.internal
  35. import qubes.tests
  36. import qubes.storage
  37. # properties defined in API
  38. volume_properties = [
  39. 'pool', 'vid', 'size', 'usage', 'rw', 'source', 'path',
  40. 'save_on_stop', 'snap_on_start', 'revisions_to_keep']
  41. class AdminAPITestCase(qubes.tests.QubesTestCase):
  42. def setUp(self):
  43. super().setUp()
  44. self.test_base_dir = '/tmp/qubes-test-dir'
  45. self.base_dir_patch = unittest.mock.patch.dict(qubes.config.system_path,
  46. {'qubes_base_dir': self.test_base_dir})
  47. self.base_dir_patch2 = unittest.mock.patch(
  48. 'qubes.config.qubes_base_dir', self.test_base_dir)
  49. self.base_dir_patch3 = unittest.mock.patch.dict(
  50. qubes.config.defaults['pool_configs']['varlibqubes'],
  51. {'dir_path': self.test_base_dir})
  52. self.base_dir_patch.start()
  53. self.base_dir_patch2.start()
  54. self.base_dir_patch3.start()
  55. app = qubes.Qubes('/tmp/qubes-test.xml', load=False)
  56. app.vmm = unittest.mock.Mock(spec=qubes.app.VMMConnection)
  57. app.load_initial_values()
  58. self.loop.run_until_complete(app.setup_pools())
  59. app.default_kernel = '1.0'
  60. app.default_netvm = None
  61. self.template = app.add_new_vm('TemplateVM', label='black',
  62. name='test-template')
  63. app.default_template = 'test-template'
  64. with qubes.tests.substitute_entry_points('qubes.storage',
  65. 'qubes.tests.storage'):
  66. self.loop.run_until_complete(
  67. app.add_pool('test', driver='test'))
  68. app.default_pool = 'varlibqubes'
  69. app.save = unittest.mock.Mock()
  70. self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1',
  71. template='test-template')
  72. self.app = app
  73. libvirt_attrs = {
  74. 'libvirt_conn.lookupByUUID.return_value.isActive.return_value':
  75. False,
  76. 'libvirt_conn.lookupByUUID.return_value.state.return_value':
  77. [libvirt.VIR_DOMAIN_SHUTOFF],
  78. }
  79. app.vmm.configure_mock(**libvirt_attrs)
  80. self.emitter = qubes.tests.TestEmitter()
  81. self.app.domains[0].fire_event = self.emitter.fire_event
  82. def tearDown(self):
  83. self.base_dir_patch3.stop()
  84. self.base_dir_patch2.stop()
  85. self.base_dir_patch.stop()
  86. if os.path.exists(self.test_base_dir):
  87. shutil.rmtree(self.test_base_dir)
  88. try:
  89. del self.netvm
  90. except AttributeError:
  91. pass
  92. del self.vm
  93. del self.template
  94. self.app.close()
  95. del self.app
  96. del self.emitter
  97. super(AdminAPITestCase, self).tearDown()
  98. def call_mgmt_func(self, method, dest, arg=b'', payload=b''):
  99. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', method, dest, arg)
  100. loop = asyncio.get_event_loop()
  101. response = loop.run_until_complete(
  102. mgmt_obj.execute(untrusted_payload=payload))
  103. self.assertEventFired(self.emitter,
  104. 'admin-permission:' + method.decode('ascii'))
  105. return response
  106. def call_internal_mgmt_func(self, method, dest, arg=b'', payload=b''):
  107. mgmt_obj = qubes.api.internal.QubesInternalAPI(self.app, b'dom0', method, dest, arg)
  108. loop = asyncio.get_event_loop()
  109. response = loop.run_until_complete(
  110. mgmt_obj.execute(untrusted_payload=payload))
  111. return response
  112. class TC_00_VMs(AdminAPITestCase):
  113. def test_000_vm_list(self):
  114. value = self.call_mgmt_func(b'admin.vm.List', b'dom0')
  115. self.assertEqual(value,
  116. 'dom0 class=AdminVM state=Running\n'
  117. 'test-template class=TemplateVM state=Halted\n'
  118. 'test-vm1 class=AppVM state=Halted\n')
  119. def test_001_vm_list_single(self):
  120. value = self.call_mgmt_func(b'admin.vm.List', b'test-vm1')
  121. self.assertEqual(value,
  122. 'test-vm1 class=AppVM state=Halted\n')
  123. def test_002_vm_list_filter(self):
  124. with tempfile.TemporaryDirectory() as tmpdir:
  125. tmpdir = pathlib.Path(tmpdir)
  126. with unittest.mock.patch(
  127. 'qubes.ext.admin.AdminExtension._instance.policy_cache.path',
  128. pathlib.Path(tmpdir)):
  129. with (tmpdir / 'admin.policy').open('w') as f:
  130. f.write('admin.vm.List * @anyvm @adminvm allow\n')
  131. f.write('admin.vm.List * @anyvm test-vm1 allow')
  132. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'test-vm1',
  133. b'admin.vm.List', b'dom0', b'')
  134. loop = asyncio.get_event_loop()
  135. value = loop.run_until_complete(
  136. mgmt_obj.execute(untrusted_payload=b''))
  137. self.assertEqual(value,
  138. 'dom0 class=AdminVM state=Running\n'
  139. 'test-vm1 class=AppVM state=Halted\n')
  140. def test_010_vm_property_list(self):
  141. # this test is kind of stupid, but at least check if appropriate
  142. # admin-permission event is fired
  143. value = self.call_mgmt_func(b'admin.vm.property.List', b'test-vm1')
  144. properties = self.app.domains['test-vm1'].property_list()
  145. self.assertEqual(value,
  146. ''.join('{}\n'.format(prop.__name__) for prop in properties))
  147. def test_020_vm_property_get_str(self):
  148. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  149. b'name')
  150. self.assertEqual(value, 'default=False type=str test-vm1')
  151. def test_021_vm_property_get_int(self):
  152. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  153. b'vcpus')
  154. self.assertEqual(value, 'default=True type=int 2')
  155. def test_022_vm_property_get_bool(self):
  156. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  157. b'provides_network')
  158. self.assertEqual(value, 'default=True type=bool False')
  159. def test_023_vm_property_get_label(self):
  160. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  161. b'label')
  162. self.assertEqual(value, 'default=False type=label red')
  163. def test_024_vm_property_get_vm(self):
  164. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  165. b'template')
  166. self.assertEqual(value, 'default=False type=vm test-template')
  167. def test_025_vm_property_get_vm_none(self):
  168. value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1',
  169. b'netvm')
  170. self.assertEqual(value, 'default=True type=vm ')
  171. def test_025_vm_property_get_default_vm_none(self):
  172. value = self.call_mgmt_func(
  173. b'admin.vm.property.GetDefault',
  174. b'test-vm1',
  175. b'template')
  176. self.assertEqual(value, None)
  177. def test_026_vm_property_get_default_bool(self):
  178. self.vm.provides_network = True
  179. value = self.call_mgmt_func(
  180. b'admin.vm.property.GetDefault',
  181. b'test-vm1',
  182. b'provides_network')
  183. self.assertEqual(value, 'type=bool False')
  184. def test_027_vm_property_get_all(self):
  185. # any string property, test \n encoding
  186. self.vm.kernelopts = 'opt1\nopt2\nopt3\\opt4'
  187. with unittest.mock.patch.object(self.vm, 'property_list') as list_mock:
  188. list_mock.return_value = [
  189. self.vm.property_get_def('name'),
  190. self.vm.property_get_def('default_user'),
  191. self.vm.property_get_def('netvm'),
  192. self.vm.property_get_def('klass'),
  193. self.vm.property_get_def('debug'),
  194. self.vm.property_get_def('label'),
  195. self.vm.property_get_def('kernelopts'),
  196. self.vm.property_get_def('qrexec_timeout'),
  197. self.vm.property_get_def('qid'),
  198. self.vm.property_get_def('updateable'),
  199. ]
  200. value = self.call_mgmt_func(b'admin.vm.property.GetAll', b'test-vm1')
  201. self.maxDiff = None
  202. expected = '''debug default=True type=bool False
  203. default_user default=True type=str user
  204. klass default=True type=str AppVM
  205. label default=False type=label red
  206. name default=False type=str test-vm1
  207. qid default=False type=int 2
  208. qrexec_timeout default=True type=int 60
  209. updateable default=True type=bool False
  210. kernelopts default=False type=str opt1\\nopt2\\nopt3\\\\opt4
  211. netvm default=True type=vm \n'''
  212. self.assertEqual(value, expected)
  213. def test_030_vm_property_set_vm(self):
  214. netvm = self.app.add_new_vm('AppVM', label='red', name='test-net',
  215. template='test-template', provides_network=True)
  216. with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
  217. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  218. b'netvm', b'test-net')
  219. self.assertIsNone(value)
  220. mock.assert_called_once_with(self.vm, 'test-net')
  221. self.app.save.assert_called_once_with()
  222. def test_031_vm_property_set_vm_none(self):
  223. netvm = self.app.add_new_vm('AppVM', label='red', name='test-net',
  224. template='test-template', provides_network=True)
  225. with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
  226. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  227. b'netvm', b'')
  228. self.assertIsNone(value)
  229. mock.assert_called_once_with(self.vm, '')
  230. self.app.save.assert_called_once_with()
  231. def test_032_vm_property_set_vm_invalid1(self):
  232. with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
  233. with self.assertRaises(qubes.exc.QubesValueError):
  234. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  235. b'netvm', b'forbidden-chars/../!')
  236. self.assertFalse(mock.called)
  237. self.assertFalse(self.app.save.called)
  238. def test_033_vm_property_set_vm_invalid2(self):
  239. with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
  240. with self.assertRaises(qubes.exc.QubesValueError):
  241. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  242. b'netvm', b'\x80\x90\xa0')
  243. self.assertFalse(mock.called)
  244. self.assertFalse(self.app.save.called)
  245. def test_034_vm_propert_set_bool_true(self):
  246. with unittest.mock.patch('qubes.property.__set__') as mock:
  247. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  248. b'autostart', b'True')
  249. self.assertIsNone(value)
  250. mock.assert_called_once_with(self.vm, True)
  251. self.app.save.assert_called_once_with()
  252. def test_035_vm_propert_set_bool_false(self):
  253. with unittest.mock.patch('qubes.property.__set__') as mock:
  254. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  255. b'autostart', b'False')
  256. self.assertIsNone(value)
  257. mock.assert_called_once_with(self.vm, False)
  258. self.app.save.assert_called_once_with()
  259. def test_036_vm_propert_set_bool_invalid1(self):
  260. with unittest.mock.patch('qubes.property.__set__') as mock:
  261. with self.assertRaises(qubes.exc.QubesValueError):
  262. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  263. b'autostart', b'some string')
  264. self.assertFalse(mock.called)
  265. self.assertFalse(self.app.save.called)
  266. def test_037_vm_propert_set_bool_invalid2(self):
  267. with unittest.mock.patch('qubes.property.__set__') as mock:
  268. with self.assertRaises(qubes.exc.QubesValueError):
  269. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  270. b'autostart', b'\x80\x90@#$%^&*(')
  271. self.assertFalse(mock.called)
  272. self.assertFalse(self.app.save.called)
  273. def test_038_vm_propert_set_str(self):
  274. with unittest.mock.patch('qubes.property.__set__') as mock:
  275. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  276. b'kernel', b'1.0')
  277. self.assertIsNone(value)
  278. mock.assert_called_once_with(self.vm, '1.0')
  279. self.app.save.assert_called_once_with()
  280. def test_039_vm_propert_set_str_invalid1(self):
  281. with unittest.mock.patch('qubes.property.__set__') as mock:
  282. with self.assertRaises(qubes.exc.QubesValueError):
  283. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  284. b'kernel', b'some, non-ASCII: \x80\xd2')
  285. self.assertFalse(mock.called)
  286. self.assertFalse(self.app.save.called)
  287. def test_040_vm_propert_set_int(self):
  288. with unittest.mock.patch('qubes.property.__set__') as mock:
  289. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  290. b'maxmem', b'1024000')
  291. self.assertIsNone(value)
  292. mock.assert_called_once_with(self.vm, 1024000)
  293. self.app.save.assert_called_once_with()
  294. def test_041_vm_propert_set_int_invalid1(self):
  295. with unittest.mock.patch('qubes.property.__set__') as mock:
  296. with self.assertRaises(qubes.exc.QubesValueError):
  297. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  298. b'maxmem', b'fourty two')
  299. self.assertFalse(mock.called)
  300. self.assertFalse(self.app.save.called)
  301. def test_042_vm_propert_set_label(self):
  302. with unittest.mock.patch('qubes.property.__set__') as mock:
  303. value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  304. b'label', b'green')
  305. self.assertIsNone(value)
  306. mock.assert_called_once_with(self.vm, 'green')
  307. self.app.save.assert_called_once_with()
  308. def test_043_vm_propert_set_label_invalid1(self):
  309. with unittest.mock.patch('qubes.property.__set__') as mock:
  310. with self.assertRaises(qubes.exc.QubesValueError):
  311. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  312. b'maxmem', b'some, non-ASCII: \x80\xd2')
  313. self.assertFalse(mock.called)
  314. self.assertFalse(self.app.save.called)
  315. @unittest.skip('label existence not checked before actual setter yet')
  316. def test_044_vm_propert_set_label_invalid2(self):
  317. with unittest.mock.patch('qubes.property.__set__') as mock:
  318. with self.assertRaises(qubes.exc.QubesValueError):
  319. self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1',
  320. b'maxmem', b'non-existing-color')
  321. self.assertFalse(mock.called)
  322. self.assertFalse(self.app.save.called)
  323. def test_050_vm_property_help(self):
  324. value = self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1',
  325. b'label')
  326. self.assertEqual(value,
  327. 'Colourful label assigned to VM. This is where the colour of the '
  328. 'padlock is set.')
  329. self.assertFalse(self.app.save.called)
  330. def test_052_vm_property_help_invalid_property(self):
  331. with self.assertRaises(qubes.exc.QubesNoSuchPropertyError):
  332. self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1',
  333. b'no-such-property')
  334. self.assertFalse(self.app.save.called)
  335. def test_060_vm_property_reset(self):
  336. with unittest.mock.patch('qubes.property.__delete__') as mock:
  337. value = self.call_mgmt_func(b'admin.vm.property.Reset', b'test-vm1',
  338. b'default_user')
  339. mock.assert_called_with(self.vm)
  340. self.assertIsNone(value)
  341. self.app.save.assert_called_once_with()
  342. def test_062_vm_property_reset_invalid_property(self):
  343. with unittest.mock.patch('qubes.property.__delete__') as mock:
  344. with self.assertRaises(qubes.exc.QubesNoSuchPropertyError):
  345. self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1',
  346. b'no-such-property')
  347. self.assertFalse(mock.called)
  348. self.assertFalse(self.app.save.called)
  349. def test_070_vm_volume_list(self):
  350. self.vm.volumes = unittest.mock.Mock()
  351. volumes_conf = {
  352. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  353. }
  354. self.vm.volumes.configure_mock(**volumes_conf)
  355. value = self.call_mgmt_func(b'admin.vm.volume.List', b'test-vm1')
  356. self.assertEqual(value, 'root\nprivate\nvolatile\nkernel\n')
  357. # check if _only_ keys were accessed
  358. self.assertEqual(self.vm.volumes.mock_calls,
  359. [unittest.mock.call.keys()])
  360. def test_080_vm_volume_info(self):
  361. self.vm.volumes = unittest.mock.MagicMock()
  362. volumes_conf = {
  363. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  364. }
  365. for prop in volume_properties:
  366. volumes_conf[
  367. '__getitem__.return_value.{}'.format(prop)] = prop + '-value'
  368. volumes_conf[
  369. '__getitem__.return_value.is_outdated.return_value'] = False
  370. self.vm.volumes.configure_mock(**volumes_conf)
  371. value = self.call_mgmt_func(b'admin.vm.volume.Info', b'test-vm1',
  372. b'private')
  373. self.assertEqual(value,
  374. ''.join('{p}={p}-value\n'.format(p=p) for p in volume_properties) +
  375. 'is_outdated=False\n')
  376. self.assertEqual(self.vm.volumes.mock_calls,
  377. [unittest.mock.call.keys(),
  378. unittest.mock.call.__getattr__('__getitem__')('private'),
  379. unittest.mock.call.__getattr__('__getitem__')().is_outdated()])
  380. def test_081_vm_volume_info_unsupported_is_outdated(self):
  381. self.vm.volumes = unittest.mock.MagicMock()
  382. volumes_conf = {
  383. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  384. }
  385. for prop in volume_properties:
  386. volumes_conf[
  387. '__getitem__.return_value.{}'.format(prop)] = prop + '-value'
  388. volumes_conf[
  389. '__getitem__.return_value.is_outdated.side_effect'] = \
  390. NotImplementedError
  391. self.vm.volumes.configure_mock(**volumes_conf)
  392. value = self.call_mgmt_func(b'admin.vm.volume.Info', b'test-vm1',
  393. b'private')
  394. self.assertEqual(value,
  395. ''.join('{p}={p}-value\n'.format(p=p) for p in volume_properties))
  396. self.assertEqual(self.vm.volumes.mock_calls,
  397. [unittest.mock.call.keys(),
  398. unittest.mock.call.__getattr__('__getitem__')('private'),
  399. unittest.mock.call.__getattr__('__getitem__')().is_outdated()])
  400. def test_080_vm_volume_info_invalid_volume(self):
  401. self.vm.volumes = unittest.mock.MagicMock()
  402. volumes_conf = {
  403. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  404. }
  405. self.vm.volumes.configure_mock(**volumes_conf)
  406. with self.assertRaises(qubes.api.PermissionDenied):
  407. self.call_mgmt_func(b'admin.vm.volume.Info', b'test-vm1',
  408. b'no-such-volume')
  409. self.assertEqual(self.vm.volumes.mock_calls,
  410. [unittest.mock.call.keys()])
  411. def test_090_vm_volume_listsnapshots(self):
  412. self.vm.volumes = unittest.mock.MagicMock()
  413. volumes_conf = {
  414. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  415. '__getitem__.return_value.revisions':
  416. {'rev2': '2018-02-22T22:22:22', 'rev1': '2018-01-11T11:11:11'},
  417. }
  418. self.vm.volumes.configure_mock(**volumes_conf)
  419. value = self.call_mgmt_func(b'admin.vm.volume.ListSnapshots',
  420. b'test-vm1', b'private')
  421. self.assertEqual(value,
  422. 'rev1\nrev2\n')
  423. self.assertEqual(self.vm.volumes.mock_calls,
  424. [unittest.mock.call.keys(),
  425. unittest.mock.call.__getattr__('__getitem__')('private')])
  426. def test_090_vm_volume_listsnapshots_invalid_volume(self):
  427. self.vm.volumes = unittest.mock.MagicMock()
  428. volumes_conf = {
  429. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  430. }
  431. self.vm.volumes.configure_mock(**volumes_conf)
  432. with self.assertRaises(qubes.api.PermissionDenied):
  433. self.call_mgmt_func(b'admin.vm.volume.ListSnapshots', b'test-vm1',
  434. b'no-such-volume')
  435. self.assertEqual(self.vm.volumes.mock_calls,
  436. [unittest.mock.call.keys()])
  437. @unittest.skip('method not implemented yet')
  438. def test_100_vm_volume_snapshot(self):
  439. pass
  440. @unittest.skip('method not implemented yet')
  441. def test_100_vm_volume_snapshot_invalid_volume(self):
  442. self.vm.volumes = unittest.mock.MagicMock()
  443. volumes_conf = {
  444. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  445. '__getitem__.return_value.revisions':
  446. {'rev2': '2018-02-22T22:22:22', 'rev1': '2018-01-11T11:11:11'},
  447. }
  448. self.vm.volumes.configure_mock(**volumes_conf)
  449. with self.assertRaises(qubes.api.PermissionDenied):
  450. self.call_mgmt_func(b'admin.vm.volume.Snapshots',
  451. b'test-vm1', b'no-such-volume')
  452. self.assertEqual(self.vm.volumes.mock_calls,
  453. [unittest.mock.call.keys()])
  454. @unittest.skip('method not implemented yet')
  455. def test_100_vm_volume_snapshot_invalid_revision(self):
  456. self.vm.volumes = unittest.mock.MagicMock()
  457. volumes_conf = {
  458. 'keys.return_value': ['root', 'private', 'volatile', 'kernel']
  459. }
  460. self.vm.volumes.configure_mock(**volumes_conf)
  461. with self.assertRaises(qubes.api.PermissionDenied):
  462. self.call_mgmt_func(b'admin.vm.volume.Snapshots',
  463. b'test-vm1', b'private', b'no-such-rev')
  464. self.assertEqual(self.vm.volumes.mock_calls,
  465. [unittest.mock.call.keys(),
  466. unittest.mock.call.__getattr__('__getitem__')('private')])
  467. def test_110_vm_volume_revert(self):
  468. self.vm.volumes = unittest.mock.MagicMock()
  469. volumes_conf = {
  470. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  471. '__getitem__.return_value.revisions':
  472. {'rev2': '2018-02-22T22:22:22', 'rev1': '2018-01-11T11:11:11'},
  473. }
  474. self.vm.volumes.configure_mock(**volumes_conf)
  475. del self.vm.volumes['private'].revert('rev1')._is_coroutine
  476. self.vm.storage = unittest.mock.Mock()
  477. value = self.call_mgmt_func(b'admin.vm.volume.Revert',
  478. b'test-vm1', b'private', b'rev1')
  479. self.assertIsNone(value)
  480. self.assertEqual(self.vm.volumes.mock_calls, [
  481. ('__getitem__', ('private', ), {}),
  482. ('__getitem__().revert', ('rev1', ), {}),
  483. ('keys', (), {}),
  484. ('__getitem__', ('private', ), {}),
  485. ('__getitem__().__hash__', (), {}),
  486. ('__getitem__().revert', ('rev1', ), {}),
  487. ])
  488. self.assertEqual(self.vm.storage.mock_calls, [])
  489. def test_110_vm_volume_revert_invalid_rev(self):
  490. self.vm.volumes = unittest.mock.MagicMock()
  491. volumes_conf = {
  492. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  493. '__getitem__.return_value.revisions':
  494. {'rev2': '2018-02-22T22:22:22', 'rev1': '2018-01-11T11:11:11'},
  495. }
  496. self.vm.volumes.configure_mock(**volumes_conf)
  497. self.vm.storage = unittest.mock.Mock()
  498. with self.assertRaises(qubes.api.PermissionDenied):
  499. self.call_mgmt_func(b'admin.vm.volume.Revert',
  500. b'test-vm1', b'private', b'no-such-rev')
  501. self.assertEqual(self.vm.volumes.mock_calls,
  502. [unittest.mock.call.keys(),
  503. unittest.mock.call.__getattr__('__getitem__')('private')])
  504. self.assertFalse(self.vm.storage.called)
  505. def test_120_vm_volume_resize(self):
  506. self.vm.volumes = unittest.mock.MagicMock()
  507. volumes_conf = {
  508. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  509. }
  510. self.vm.volumes.configure_mock(**volumes_conf)
  511. self.vm.storage = unittest.mock.Mock()
  512. self.vm.storage.resize.side_effect = self.dummy_coro
  513. value = self.call_mgmt_func(b'admin.vm.volume.Resize',
  514. b'test-vm1', b'private', b'1024000000')
  515. self.assertIsNone(value)
  516. self.assertEqual(self.vm.volumes.mock_calls,
  517. [unittest.mock.call.keys()])
  518. self.assertEqual(self.vm.storage.mock_calls,
  519. [unittest.mock.call.resize('private', 1024000000)])
  520. def test_120_vm_volume_resize_invalid_size1(self):
  521. self.vm.volumes = unittest.mock.MagicMock()
  522. volumes_conf = {
  523. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  524. }
  525. self.vm.volumes.configure_mock(**volumes_conf)
  526. self.vm.storage = unittest.mock.Mock()
  527. self.vm.storage.resize.side_effect = self.dummy_coro
  528. with self.assertRaises(qubes.api.PermissionDenied):
  529. self.call_mgmt_func(b'admin.vm.volume.Resize',
  530. b'test-vm1', b'private', b'no-int-size')
  531. self.assertEqual(self.vm.volumes.mock_calls,
  532. [unittest.mock.call.keys()])
  533. self.assertFalse(self.vm.storage.called)
  534. def test_120_vm_volume_resize_invalid_size2(self):
  535. self.vm.volumes = unittest.mock.MagicMock()
  536. volumes_conf = {
  537. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  538. }
  539. self.vm.volumes.configure_mock(**volumes_conf)
  540. self.vm.storage = unittest.mock.Mock()
  541. self.vm.storage.resize.side_effect = self.dummy_coro
  542. with self.assertRaises(qubes.api.PermissionDenied):
  543. self.call_mgmt_func(b'admin.vm.volume.Resize',
  544. b'test-vm1', b'private', b'-1')
  545. self.assertEqual(self.vm.volumes.mock_calls,
  546. [unittest.mock.call.keys()])
  547. self.assertFalse(self.vm.storage.called)
  548. def test_130_pool_list(self):
  549. self.app.pools = ['file', 'lvm']
  550. value = self.call_mgmt_func(b'admin.pool.List', b'dom0')
  551. self.assertEqual(value, 'file\nlvm\n')
  552. self.assertFalse(self.app.save.called)
  553. @unittest.mock.patch('qubes.storage.pool_drivers')
  554. @unittest.mock.patch('qubes.storage.driver_parameters')
  555. def test_140_pool_listdrivers(self, mock_parameters, mock_drivers):
  556. self.app.pools = ['file', 'lvm']
  557. mock_drivers.return_value = ['driver1', 'driver2']
  558. mock_parameters.side_effect = \
  559. lambda driver: {
  560. 'driver1': ['param1', 'param2'],
  561. 'driver2': ['param3', 'param4']
  562. }[driver]
  563. value = self.call_mgmt_func(b'admin.pool.ListDrivers', b'dom0')
  564. self.assertEqual(value,
  565. 'driver1 param1 param2\ndriver2 param3 param4\n')
  566. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  567. self.assertEqual(mock_parameters.mock_calls,
  568. [unittest.mock.call('driver1'), unittest.mock.call('driver2')])
  569. self.assertFalse(self.app.save.called)
  570. def test_150_pool_info(self):
  571. self.app.pools = {
  572. 'pool1': unittest.mock.Mock(config={
  573. 'param1': 'value1', 'param2': 'value2'},
  574. usage=102400,
  575. size=204800)
  576. }
  577. self.app.pools['pool1'].included_in.return_value = None
  578. value = self.call_mgmt_func(b'admin.pool.Info', b'dom0', b'pool1')
  579. self.assertEqual(value,
  580. 'param1=value1\nparam2=value2\nsize=204800\nusage=102400\n')
  581. self.assertFalse(self.app.save.called)
  582. def test_151_pool_info_unsupported_size(self):
  583. self.app.pools = {
  584. 'pool1': unittest.mock.Mock(config={
  585. 'param1': 'value1', 'param2': 'value2'},
  586. size=None, usage=None, usage_details={}),
  587. }
  588. self.app.pools['pool1'].included_in.return_value = None
  589. value = self.call_mgmt_func(b'admin.pool.Info', b'dom0', b'pool1')
  590. self.assertEqual(value,
  591. 'param1=value1\nparam2=value2\n')
  592. self.assertFalse(self.app.save.called)
  593. def test_152_pool_info_included_in(self):
  594. self.app.pools = {
  595. 'pool1': unittest.mock.MagicMock(config={
  596. 'param1': 'value1',
  597. 'param2': 'value2'},
  598. usage=102400,
  599. size=204800)
  600. }
  601. self.app.pools['pool1'].included_in.return_value = \
  602. self.app.pools['pool1']
  603. self.app.pools['pool1'].__str__.return_value = 'pool1'
  604. value = self.call_mgmt_func(b'admin.pool.Info', b'dom0', b'pool1')
  605. self.assertEqual(value,
  606. 'param1=value1\nparam2=value2\nsize=204800\nusage=102400'
  607. '\nincluded_in=pool1\n')
  608. self.assertFalse(self.app.save.called)
  609. def test_153_pool_usage(self):
  610. self.app.pools = {
  611. 'pool1': unittest.mock.Mock(config={
  612. 'param1': 'value1', 'param2': 'value2'},
  613. usage_details={
  614. 'data_usage': 102400,
  615. 'data_size': 204800,
  616. 'metadata_size': 1024,
  617. 'metadata_usage': 50})
  618. }
  619. self.app.pools['pool1'].included_in.return_value = None
  620. value = self.call_mgmt_func(b'admin.pool.UsageDetails', b'dom0', b'pool1')
  621. self.assertEqual(value,
  622. 'data_size=204800\ndata_usage=102400\nmetadata_size=1024\nmetadata_usage=50\n')
  623. self.assertFalse(self.app.save.called)
  624. @unittest.mock.patch('qubes.storage.pool_drivers')
  625. @unittest.mock.patch('qubes.storage.driver_parameters')
  626. def test_160_pool_add(self, mock_parameters, mock_drivers):
  627. self.app.pools = {
  628. 'file': unittest.mock.Mock(),
  629. 'lvm': unittest.mock.Mock()
  630. }
  631. mock_drivers.return_value = ['driver1', 'driver2']
  632. mock_parameters.side_effect = \
  633. lambda driver: {
  634. 'driver1': ['param1', 'param2'],
  635. 'driver2': ['param3', 'param4']
  636. }[driver]
  637. add_pool_mock, self.app.add_pool = self.coroutine_mock()
  638. value = self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'driver1',
  639. b'name=test-pool\nparam1=some-value\n')
  640. self.assertIsNone(value)
  641. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  642. self.assertEqual(mock_parameters.mock_calls,
  643. [unittest.mock.call('driver1')])
  644. self.assertEqual(add_pool_mock.mock_calls,
  645. [unittest.mock.call(name='test-pool', driver='driver1',
  646. param1='some-value')])
  647. self.assertTrue(self.app.save.called)
  648. @unittest.mock.patch('qubes.storage.pool_drivers')
  649. @unittest.mock.patch('qubes.storage.driver_parameters')
  650. def test_160_pool_add_invalid_driver(self, mock_parameters, mock_drivers):
  651. self.app.pools = {
  652. 'file': unittest.mock.Mock(),
  653. 'lvm': unittest.mock.Mock()
  654. }
  655. mock_drivers.return_value = ['driver1', 'driver2']
  656. mock_parameters.side_effect = \
  657. lambda driver: {
  658. 'driver1': ['param1', 'param2'],
  659. 'driver2': ['param3', 'param4']
  660. }[driver]
  661. add_pool_mock, self.app.add_pool = self.coroutine_mock()
  662. with self.assertRaises(qubes.api.PermissionDenied):
  663. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  664. b'no-such-driver', b'name=test-pool\nparam1=some-value\n')
  665. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  666. self.assertEqual(mock_parameters.mock_calls, [])
  667. self.assertEqual(add_pool_mock.mock_calls, [])
  668. self.assertFalse(self.app.save.called)
  669. @unittest.mock.patch('qubes.storage.pool_drivers')
  670. @unittest.mock.patch('qubes.storage.driver_parameters')
  671. def test_160_pool_add_invalid_param(self, mock_parameters, mock_drivers):
  672. self.app.pools = {
  673. 'file': unittest.mock.Mock(),
  674. 'lvm': unittest.mock.Mock()
  675. }
  676. mock_drivers.return_value = ['driver1', 'driver2']
  677. mock_parameters.side_effect = \
  678. lambda driver: {
  679. 'driver1': ['param1', 'param2'],
  680. 'driver2': ['param3', 'param4']
  681. }[driver]
  682. add_pool_mock, self.app.add_pool = self.coroutine_mock()
  683. with self.assertRaises(qubes.exc.QubesException):
  684. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  685. b'driver1', b'name=test-pool\nparam3=some-value\n')
  686. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  687. self.assertEqual(mock_parameters.mock_calls,
  688. [unittest.mock.call('driver1')])
  689. self.assertEqual(add_pool_mock.mock_calls, [])
  690. self.assertFalse(self.app.save.called)
  691. @unittest.mock.patch('qubes.storage.pool_drivers')
  692. @unittest.mock.patch('qubes.storage.driver_parameters')
  693. def test_160_pool_add_missing_name(self, mock_parameters, mock_drivers):
  694. self.app.pools = {
  695. 'file': unittest.mock.Mock(),
  696. 'lvm': unittest.mock.Mock()
  697. }
  698. mock_drivers.return_value = ['driver1', 'driver2']
  699. mock_parameters.side_effect = \
  700. lambda driver: {
  701. 'driver1': ['param1', 'param2'],
  702. 'driver2': ['param3', 'param4']
  703. }[driver]
  704. add_pool_mock, self.app.add_pool = self.coroutine_mock()
  705. with self.assertRaises(qubes.api.PermissionDenied):
  706. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  707. b'driver1', b'param1=value\nparam2=some-value\n')
  708. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  709. self.assertEqual(mock_parameters.mock_calls, [])
  710. self.assertEqual(add_pool_mock.mock_calls, [])
  711. self.assertFalse(self.app.save.called)
  712. @unittest.mock.patch('qubes.storage.pool_drivers')
  713. @unittest.mock.patch('qubes.storage.driver_parameters')
  714. def test_160_pool_add_existing_pool(self, mock_parameters, mock_drivers):
  715. self.app.pools = {
  716. 'file': unittest.mock.Mock(),
  717. 'lvm': unittest.mock.Mock()
  718. }
  719. mock_drivers.return_value = ['driver1', 'driver2']
  720. mock_parameters.side_effect = \
  721. lambda driver: {
  722. 'driver1': ['param1', 'param2'],
  723. 'driver2': ['param3', 'param4']
  724. }[driver]
  725. add_pool_mock, self.app.add_pool = self.coroutine_mock()
  726. with self.assertRaises(qubes.api.PermissionDenied):
  727. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  728. b'driver1', b'name=file\nparam1=value\nparam2=some-value\n')
  729. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  730. self.assertEqual(mock_parameters.mock_calls, [])
  731. self.assertEqual(add_pool_mock.mock_calls, [])
  732. self.assertFalse(self.app.save.called)
  733. @unittest.mock.patch('qubes.storage.pool_drivers')
  734. @unittest.mock.patch('qubes.storage.driver_parameters')
  735. def test_160_pool_add_invalid_config_format(self, mock_parameters,
  736. mock_drivers):
  737. self.app.pools = {
  738. 'file': unittest.mock.Mock(),
  739. 'lvm': unittest.mock.Mock()
  740. }
  741. mock_drivers.return_value = ['driver1', 'driver2']
  742. mock_parameters.side_effect = \
  743. lambda driver: {
  744. 'driver1': ['param1', 'param2'],
  745. 'driver2': ['param3', 'param4']
  746. }[driver]
  747. add_pool_mock, self.app.add_pool = self.coroutine_mock()
  748. with self.assertRaises(qubes.api.PermissionDenied):
  749. self.call_mgmt_func(b'admin.pool.Add', b'dom0',
  750. b'driver1', b'name=test-pool\nparam 1=value\n_param2\n')
  751. self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()])
  752. self.assertEqual(mock_parameters.mock_calls, [])
  753. self.assertEqual(add_pool_mock.mock_calls, [])
  754. self.assertFalse(self.app.save.called)
  755. def test_170_pool_remove(self):
  756. self.app.pools = {
  757. 'file': unittest.mock.Mock(),
  758. 'lvm': unittest.mock.Mock(),
  759. 'test-pool': unittest.mock.Mock(),
  760. }
  761. remove_pool_mock, self.app.remove_pool = self.coroutine_mock()
  762. value = self.call_mgmt_func(b'admin.pool.Remove', b'dom0', b'test-pool')
  763. self.assertIsNone(value)
  764. self.assertEqual(remove_pool_mock.mock_calls,
  765. [unittest.mock.call('test-pool')])
  766. self.assertTrue(self.app.save.called)
  767. def test_170_pool_remove_invalid_pool(self):
  768. self.app.pools = {
  769. 'file': unittest.mock.Mock(),
  770. 'lvm': unittest.mock.Mock(),
  771. 'test-pool': unittest.mock.Mock(),
  772. }
  773. remove_pool_mock, self.app.remove_pool = self.coroutine_mock()
  774. with self.assertRaises(qubes.api.PermissionDenied):
  775. self.call_mgmt_func(b'admin.pool.Remove', b'dom0',
  776. b'no-such-pool')
  777. self.assertEqual(remove_pool_mock.mock_calls, [])
  778. self.assertFalse(self.app.save.called)
  779. def test_180_label_list(self):
  780. value = self.call_mgmt_func(b'admin.label.List', b'dom0')
  781. self.assertEqual(value,
  782. ''.join('{}\n'.format(l.name) for l in self.app.labels.values()))
  783. self.assertFalse(self.app.save.called)
  784. def test_190_label_get(self):
  785. self.app.get_label = unittest.mock.Mock()
  786. self.app.get_label.configure_mock(**{'return_value.color': '0xff0000'})
  787. value = self.call_mgmt_func(b'admin.label.Get', b'dom0', b'red')
  788. self.assertEqual(value, '0xff0000')
  789. self.assertEqual(self.app.get_label.mock_calls,
  790. [unittest.mock.call('red')])
  791. self.assertFalse(self.app.save.called)
  792. def test_195_label_index(self):
  793. self.app.get_label = unittest.mock.Mock()
  794. self.app.get_label.configure_mock(**{'return_value.index': 1})
  795. value = self.call_mgmt_func(b'admin.label.Index', b'dom0', b'red')
  796. self.assertEqual(value, '1')
  797. self.assertEqual(self.app.get_label.mock_calls,
  798. [unittest.mock.call('red')])
  799. self.assertFalse(self.app.save.called)
  800. def test_200_label_create(self):
  801. self.app.get_label = unittest.mock.Mock()
  802. self.app.get_label.side_effect=KeyError
  803. self.app.labels = unittest.mock.MagicMock()
  804. labels_config = {
  805. 'keys.return_value': range(1, 9),
  806. }
  807. self.app.labels.configure_mock(**labels_config)
  808. value = self.call_mgmt_func(b'admin.label.Create', b'dom0', b'cyan',
  809. b'0x00ffff')
  810. self.assertIsNone(value)
  811. self.assertEqual(self.app.get_label.mock_calls,
  812. [unittest.mock.call('cyan')])
  813. self.assertEqual(self.app.labels.mock_calls,
  814. [unittest.mock.call.keys(),
  815. unittest.mock.call.__getattr__('__setitem__')(9,
  816. qubes.Label(9, '0x00ffff', 'cyan'))])
  817. self.assertTrue(self.app.save.called)
  818. def test_200_label_create_invalid_color(self):
  819. self.app.get_label = unittest.mock.Mock()
  820. self.app.get_label.side_effect=KeyError
  821. self.app.labels = unittest.mock.MagicMock()
  822. labels_config = {
  823. 'keys.return_value': range(1, 9),
  824. }
  825. self.app.labels.configure_mock(**labels_config)
  826. with self.assertRaises(qubes.api.PermissionDenied):
  827. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'cyan',
  828. b'abcd')
  829. self.assertEqual(self.app.get_label.mock_calls,
  830. [unittest.mock.call('cyan')])
  831. self.assertEqual(self.app.labels.mock_calls, [])
  832. self.assertFalse(self.app.save.called)
  833. def test_200_label_create_invalid_name(self):
  834. self.app.get_label = unittest.mock.Mock()
  835. self.app.get_label.side_effect=KeyError
  836. self.app.labels = unittest.mock.MagicMock()
  837. labels_config = {
  838. 'keys.return_value': range(1, 9),
  839. }
  840. self.app.labels.configure_mock(**labels_config)
  841. with self.assertRaises(qubes.api.PermissionDenied):
  842. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'01',
  843. b'0xff0000')
  844. with self.assertRaises(qubes.api.PermissionDenied):
  845. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'../xxx',
  846. b'0xff0000')
  847. with self.assertRaises(qubes.api.PermissionDenied):
  848. self.call_mgmt_func(b'admin.label.Create', b'dom0',
  849. b'strange-name!@#$',
  850. b'0xff0000')
  851. self.assertEqual(self.app.get_label.mock_calls, [])
  852. self.assertEqual(self.app.labels.mock_calls, [])
  853. self.assertFalse(self.app.save.called)
  854. def test_200_label_create_already_exists(self):
  855. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label)
  856. with self.assertRaises(qubes.exc.QubesValueError):
  857. self.call_mgmt_func(b'admin.label.Create', b'dom0', b'red',
  858. b'abcd')
  859. self.assertEqual(self.app.get_label.mock_calls,
  860. [unittest.mock.call('red')])
  861. self.assertFalse(self.app.save.called)
  862. def test_210_label_remove(self):
  863. label = qubes.Label(9, '0x00ffff', 'cyan')
  864. self.app.labels[9] = label
  865. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label,
  866. **{'return_value.index': 9})
  867. self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels)
  868. value = self.call_mgmt_func(b'admin.label.Remove', b'dom0', b'cyan')
  869. self.assertIsNone(value)
  870. self.assertEqual(self.app.get_label.mock_calls,
  871. [unittest.mock.call('cyan')])
  872. self.assertEqual(self.app.labels.mock_calls,
  873. [unittest.mock.call.__delitem__(9)])
  874. self.assertTrue(self.app.save.called)
  875. def test_210_label_remove_invalid_label(self):
  876. with self.assertRaises(qubes.exc.QubesValueError):
  877. self.call_mgmt_func(b'admin.label.Remove', b'dom0',
  878. b'no-such-label')
  879. self.assertFalse(self.app.save.called)
  880. def test_210_label_remove_default_label(self):
  881. self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels)
  882. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label,
  883. **{'return_value.index': 6})
  884. with self.assertRaises(qubes.api.PermissionDenied):
  885. self.call_mgmt_func(b'admin.label.Remove', b'dom0',
  886. b'blue')
  887. self.assertEqual(self.app.labels.mock_calls, [])
  888. self.assertFalse(self.app.save.called)
  889. def test_210_label_remove_in_use(self):
  890. self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels)
  891. self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label,
  892. **{'return_value.index': 1})
  893. with self.assertRaises(qubes.api.PermissionDenied):
  894. self.call_mgmt_func(b'admin.label.Remove', b'dom0',
  895. b'red')
  896. self.assertEqual(self.app.labels.mock_calls, [])
  897. self.assertFalse(self.app.save.called)
  898. def test_220_start(self):
  899. func_mock = unittest.mock.Mock()
  900. @asyncio.coroutine
  901. def coroutine_mock(*args, **kwargs):
  902. return func_mock(*args, **kwargs)
  903. self.vm.start = coroutine_mock
  904. value = self.call_mgmt_func(b'admin.vm.Start', b'test-vm1')
  905. self.assertIsNone(value)
  906. func_mock.assert_called_once_with()
  907. def test_230_shutdown(self):
  908. func_mock = unittest.mock.Mock()
  909. @asyncio.coroutine
  910. def coroutine_mock(*args, **kwargs):
  911. return func_mock(*args, **kwargs)
  912. self.vm.shutdown = coroutine_mock
  913. value = self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1')
  914. self.assertIsNone(value)
  915. func_mock.assert_called_once_with(force=False, wait=False)
  916. def test_231_shutdown_force(self):
  917. func_mock = unittest.mock.Mock()
  918. @asyncio.coroutine
  919. def coroutine_mock(*args, **kwargs):
  920. return func_mock(*args, **kwargs)
  921. self.vm.shutdown = coroutine_mock
  922. value = self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1', b'force')
  923. self.assertIsNone(value)
  924. func_mock.assert_called_once_with(force=True, wait=False)
  925. def test_232_shutdown_wait(self):
  926. func_mock = unittest.mock.Mock()
  927. @asyncio.coroutine
  928. def coroutine_mock(*args, **kwargs):
  929. return func_mock(*args, **kwargs)
  930. self.vm.shutdown = coroutine_mock
  931. value = self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1', b'wait')
  932. self.assertIsNone(value)
  933. func_mock.assert_called_once_with(force=False, wait=True)
  934. def test_233_shutdown_wait_force(self):
  935. func_mock = unittest.mock.Mock()
  936. @asyncio.coroutine
  937. def coroutine_mock(*args, **kwargs):
  938. return func_mock(*args, **kwargs)
  939. self.vm.shutdown = coroutine_mock
  940. value = self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1', b'wait+force')
  941. self.assertIsNone(value)
  942. func_mock.assert_called_once_with(force=True, wait=True)
  943. def test_234_shutdown_force_wait(self):
  944. func_mock = unittest.mock.Mock()
  945. @asyncio.coroutine
  946. def coroutine_mock(*args, **kwargs):
  947. return func_mock(*args, **kwargs)
  948. self.vm.shutdown = coroutine_mock
  949. value = self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1', b'force+wait')
  950. self.assertIsNone(value)
  951. func_mock.assert_called_once_with(force=True, wait=True)
  952. def test_234_shutdown_force_wait_invalid(self):
  953. func_mock = unittest.mock.Mock()
  954. @asyncio.coroutine
  955. def coroutine_mock(*args, **kwargs):
  956. return func_mock(*args, **kwargs)
  957. self.vm.shutdown = coroutine_mock
  958. with self.assertRaises(qubes.api.PermissionDenied):
  959. self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1', b'forcewait')
  960. func_mock.assert_not_called()
  961. def test_240_pause(self):
  962. func_mock = unittest.mock.Mock()
  963. @asyncio.coroutine
  964. def coroutine_mock(*args, **kwargs):
  965. return func_mock(*args, **kwargs)
  966. self.vm.pause = coroutine_mock
  967. value = self.call_mgmt_func(b'admin.vm.Pause', b'test-vm1')
  968. self.assertIsNone(value)
  969. func_mock.assert_called_once_with()
  970. def test_250_unpause(self):
  971. func_mock = unittest.mock.Mock()
  972. @asyncio.coroutine
  973. def coroutine_mock(*args, **kwargs):
  974. return func_mock(*args, **kwargs)
  975. self.vm.unpause = coroutine_mock
  976. value = self.call_mgmt_func(b'admin.vm.Unpause', b'test-vm1')
  977. self.assertIsNone(value)
  978. func_mock.assert_called_once_with()
  979. def test_260_kill(self):
  980. func_mock = unittest.mock.Mock()
  981. @asyncio.coroutine
  982. def coroutine_mock(*args, **kwargs):
  983. return func_mock(*args, **kwargs)
  984. self.vm.kill = coroutine_mock
  985. value = self.call_mgmt_func(b'admin.vm.Kill', b'test-vm1')
  986. self.assertIsNone(value)
  987. func_mock.assert_called_once_with()
  988. def test_270_events(self):
  989. send_event = unittest.mock.Mock(spec=[])
  990. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', b'admin.Events',
  991. b'dom0', b'', send_event=send_event)
  992. @asyncio.coroutine
  993. def fire_event():
  994. self.vm.fire_event('test-event', arg1='abc')
  995. mgmt_obj.cancel()
  996. loop = asyncio.get_event_loop()
  997. execute_task = asyncio.ensure_future(
  998. mgmt_obj.execute(untrusted_payload=b''))
  999. asyncio.ensure_future(fire_event())
  1000. loop.run_until_complete(execute_task)
  1001. self.assertIsNone(execute_task.result())
  1002. self.assertEventFired(self.emitter,
  1003. 'admin-permission:' + 'admin.Events')
  1004. self.assertEqual(send_event.mock_calls,
  1005. [
  1006. unittest.mock.call(self.app, 'connection-established'),
  1007. unittest.mock.call(self.vm, 'test-event', arg1='abc')
  1008. ])
  1009. def test_271_events_add_vm(self):
  1010. send_event = unittest.mock.Mock(spec=[])
  1011. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', b'admin.Events',
  1012. b'dom0', b'', send_event=send_event)
  1013. @asyncio.coroutine
  1014. def fire_event():
  1015. self.vm.fire_event('test-event', arg1='abc')
  1016. # add VM _after_ starting admin.Events call
  1017. vm = self.app.add_new_vm('AppVM', label='red', name='test-vm2',
  1018. template='test-template')
  1019. vm.fire_event('test-event2', arg1='abc')
  1020. mgmt_obj.cancel()
  1021. return vm
  1022. loop = asyncio.get_event_loop()
  1023. execute_task = asyncio.ensure_future(
  1024. mgmt_obj.execute(untrusted_payload=b''))
  1025. event_task = asyncio.ensure_future(fire_event())
  1026. loop.run_until_complete(execute_task)
  1027. vm2 = event_task.result()
  1028. self.assertIsNone(execute_task.result())
  1029. self.assertEventFired(self.emitter,
  1030. 'admin-permission:' + 'admin.Events')
  1031. self.assertEqual(send_event.mock_calls,
  1032. [
  1033. unittest.mock.call(self.app, 'connection-established'),
  1034. unittest.mock.call(self.vm, 'test-event', arg1='abc'),
  1035. unittest.mock.call(self.app, 'domain-add', vm=vm2),
  1036. unittest.mock.call(vm2, 'test-event2', arg1='abc'),
  1037. ])
  1038. def test_272_events_filter(self):
  1039. with tempfile.TemporaryDirectory() as tmpdir:
  1040. tmpdir = pathlib.Path(tmpdir)
  1041. with unittest.mock.patch(
  1042. 'qubes.ext.admin.AdminExtension._instance.policy_cache.path',
  1043. pathlib.Path(tmpdir)):
  1044. with (tmpdir / 'admin.policy').open('w') as f:
  1045. f.write('admin.Events * @anyvm @adminvm allow\n')
  1046. f.write('admin.Events * @anyvm test-vm1 allow')
  1047. send_event = unittest.mock.Mock(spec=[])
  1048. mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'test-vm1',
  1049. b'admin.Events',
  1050. b'dom0', b'', send_event=send_event)
  1051. @asyncio.coroutine
  1052. def fire_event():
  1053. # add VM _after_ starting admin.Events call
  1054. vm = self.app.add_new_vm('AppVM', label='red',
  1055. name='test-vm2',
  1056. template='test-template')
  1057. vm.fire_event('test-event2', arg1='abc')
  1058. self.vm.fire_event('test-event', arg1='abc')
  1059. mgmt_obj.cancel()
  1060. return vm
  1061. loop = asyncio.get_event_loop()
  1062. execute_task = asyncio.ensure_future(
  1063. mgmt_obj.execute(untrusted_payload=b''))
  1064. event_task = asyncio.ensure_future(fire_event())
  1065. loop.run_until_complete(execute_task)
  1066. vm2 = event_task.result()
  1067. self.assertIsNone(execute_task.result())
  1068. self.assertEqual(send_event.mock_calls,
  1069. [
  1070. unittest.mock.call(self.app, 'connection-established'),
  1071. unittest.mock.call(self.vm, 'test-event', arg1='abc'),
  1072. ])
  1073. def test_280_feature_list(self):
  1074. self.vm.features['test-feature'] = 'some-value'
  1075. value = self.call_mgmt_func(b'admin.vm.feature.List', b'test-vm1')
  1076. self.assertEqual(value, 'test-feature\n')
  1077. self.assertFalse(self.app.save.called)
  1078. def test_290_feature_get(self):
  1079. self.vm.features['test-feature'] = 'some-value'
  1080. value = self.call_mgmt_func(b'admin.vm.feature.Get', b'test-vm1',
  1081. b'test-feature')
  1082. self.assertEqual(value, 'some-value')
  1083. self.assertFalse(self.app.save.called)
  1084. def test_291_feature_get_none(self):
  1085. with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
  1086. self.call_mgmt_func(b'admin.vm.feature.Get',
  1087. b'test-vm1', b'test-feature')
  1088. self.assertFalse(self.app.save.called)
  1089. def test_300_feature_remove(self):
  1090. self.vm.features['test-feature'] = 'some-value'
  1091. value = self.call_mgmt_func(b'admin.vm.feature.Remove', b'test-vm1',
  1092. b'test-feature')
  1093. self.assertIsNone(value, None)
  1094. self.assertNotIn('test-feature', self.vm.features)
  1095. self.assertTrue(self.app.save.called)
  1096. def test_301_feature_remove_none(self):
  1097. with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
  1098. self.call_mgmt_func(b'admin.vm.feature.Remove',
  1099. b'test-vm1', b'test-feature')
  1100. self.assertFalse(self.app.save.called)
  1101. def test_310_feature_checkwithtemplate(self):
  1102. self.vm.features['test-feature'] = 'some-value'
  1103. value = self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate',
  1104. b'test-vm1', b'test-feature')
  1105. self.assertEqual(value, 'some-value')
  1106. self.assertFalse(self.app.save.called)
  1107. def test_311_feature_checkwithtemplate_tpl(self):
  1108. self.template.features['test-feature'] = 'some-value'
  1109. value = self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate',
  1110. b'test-vm1', b'test-feature')
  1111. self.assertEqual(value, 'some-value')
  1112. self.assertFalse(self.app.save.called)
  1113. def test_312_feature_checkwithtemplate_none(self):
  1114. with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
  1115. self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate',
  1116. b'test-vm1', b'test-feature')
  1117. self.assertFalse(self.app.save.called)
  1118. def test_315_feature_checkwithnetvm(self):
  1119. self.vm.features['test-feature'] = 'some-value'
  1120. value = self.call_mgmt_func(b'admin.vm.feature.CheckWithNetvm',
  1121. b'test-vm1', b'test-feature')
  1122. self.assertEqual(value, 'some-value')
  1123. self.assertFalse(self.app.save.called)
  1124. def test_316_feature_checkwithnetvm_netvm(self):
  1125. self.netvm = self.app.add_new_vm('AppVM', label='red',
  1126. name='test-netvm1',
  1127. template='test-template',
  1128. provides_network=True)
  1129. self.vm.netvm = self.netvm
  1130. self.netvm.features['test-feature'] = 'some-value'
  1131. value = self.call_mgmt_func(b'admin.vm.feature.CheckWithNetvm',
  1132. b'test-vm1', b'test-feature')
  1133. self.assertEqual(value, 'some-value')
  1134. self.assertFalse(self.app.save.called)
  1135. def test_317_feature_checkwithnetvm_none(self):
  1136. with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
  1137. self.call_mgmt_func(b'admin.vm.feature.CheckWithNetvm',
  1138. b'test-vm1', b'test-feature')
  1139. self.assertFalse(self.app.save.called)
  1140. def test_318_feature_checkwithadminvm(self):
  1141. self.app.domains['dom0'].features['test-feature'] = 'some-value'
  1142. value = self.call_mgmt_func(b'admin.vm.feature.CheckWithAdminVM',
  1143. b'test-vm1', b'test-feature')
  1144. self.assertEqual(value, 'some-value')
  1145. self.assertFalse(self.app.save.called)
  1146. def test_319_feature_checkwithtpladminvm(self):
  1147. self.app.domains['dom0'].features['test-feature'] = 'some-value'
  1148. value = self.call_mgmt_func(
  1149. b'admin.vm.feature.CheckWithTemplateAndAdminVM',
  1150. b'test-vm1', b'test-feature')
  1151. self.assertEqual(value, 'some-value')
  1152. self.template.features['test-feature'] = 'some-value2'
  1153. value = self.call_mgmt_func(
  1154. b'admin.vm.feature.CheckWithTemplateAndAdminVM',
  1155. b'test-vm1', b'test-feature')
  1156. self.assertEqual(value, 'some-value2')
  1157. self.assertFalse(self.app.save.called)
  1158. def test_320_feature_set(self):
  1159. value = self.call_mgmt_func(b'admin.vm.feature.Set',
  1160. b'test-vm1', b'test-feature', b'some-value')
  1161. self.assertIsNone(value)
  1162. self.assertEqual(self.vm.features['test-feature'], 'some-value')
  1163. self.assertTrue(self.app.save.called)
  1164. def test_321_feature_set_empty(self):
  1165. value = self.call_mgmt_func(b'admin.vm.feature.Set',
  1166. b'test-vm1', b'test-feature', b'')
  1167. self.assertIsNone(value)
  1168. self.assertEqual(self.vm.features['test-feature'], '')
  1169. self.assertTrue(self.app.save.called)
  1170. def test_320_feature_set_invalid(self):
  1171. with self.assertRaises(UnicodeDecodeError):
  1172. self.call_mgmt_func(b'admin.vm.feature.Set',
  1173. b'test-vm1', b'test-feature', b'\x02\x03\xffsome-value')
  1174. self.assertNotIn('test-feature', self.vm.features)
  1175. self.assertFalse(self.app.save.called)
  1176. @asyncio.coroutine
  1177. def dummy_coro(self, *args, **kwargs):
  1178. pass
  1179. def coroutine_mock(self):
  1180. func_mock = unittest.mock.Mock()
  1181. @asyncio.coroutine
  1182. def coroutine_mock(*args, **kwargs):
  1183. return func_mock(*args, **kwargs)
  1184. return func_mock, coroutine_mock
  1185. @unittest.mock.patch('qubes.storage.Storage.create')
  1186. def test_330_vm_create_standalone(self, storage_mock):
  1187. storage_mock.side_effect = self.dummy_coro
  1188. self.call_mgmt_func(b'admin.vm.Create.StandaloneVM',
  1189. b'dom0', b'', b'name=test-vm2 label=red')
  1190. self.assertIn('test-vm2', self.app.domains)
  1191. vm = self.app.domains['test-vm2']
  1192. self.assertIsInstance(vm, qubes.vm.standalonevm.StandaloneVM)
  1193. self.assertEqual(vm.label, self.app.get_label('red'))
  1194. self.assertEqual(storage_mock.mock_calls,
  1195. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  1196. self.assertTrue(os.path.exists(os.path.join(
  1197. self.test_base_dir, 'appvms', 'test-vm2')))
  1198. self.assertTrue(self.app.save.called)
  1199. @unittest.mock.patch('qubes.storage.Storage.create')
  1200. def test_331_vm_create_standalone_spurious_template(self, storage_mock):
  1201. storage_mock.side_effect = self.dummy_coro
  1202. with self.assertRaises(qubes.exc.QubesValueError):
  1203. self.call_mgmt_func(b'admin.vm.Create.StandaloneVM',
  1204. b'dom0', b'test-template', b'name=test-vm2 label=red')
  1205. self.assertNotIn('test-vm2', self.app.domains)
  1206. self.assertEqual(storage_mock.mock_calls, [])
  1207. self.assertFalse(os.path.exists(os.path.join(
  1208. self.test_base_dir, 'appvms', 'test-vm2')))
  1209. self.assertNotIn('test-vm2', self.app.domains)
  1210. self.assertFalse(self.app.save.called)
  1211. @unittest.mock.patch('qubes.storage.Storage.create')
  1212. def test_332_vm_create_app(self, storage_mock):
  1213. storage_mock.side_effect = self.dummy_coro
  1214. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  1215. b'dom0', b'test-template', b'name=test-vm2 label=red')
  1216. self.assertIn('test-vm2', self.app.domains)
  1217. vm = self.app.domains['test-vm2']
  1218. self.assertEqual(vm.label, self.app.get_label('red'))
  1219. self.assertEqual(vm.template, self.app.domains['test-template'])
  1220. self.assertEqual(storage_mock.mock_calls,
  1221. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  1222. self.assertTrue(os.path.exists(os.path.join(
  1223. self.test_base_dir, 'appvms', 'test-vm2')))
  1224. self.assertTrue(self.app.save.called)
  1225. @unittest.mock.patch('qubes.storage.Storage.create')
  1226. def test_333_vm_create_app_default_template(self, storage_mock):
  1227. storage_mock.side_effect = self.dummy_coro
  1228. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  1229. b'dom0', b'', b'name=test-vm2 label=red')
  1230. self.assertEqual(storage_mock.mock_calls,
  1231. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  1232. self.assertIn('test-vm2', self.app.domains)
  1233. self.assertEqual(self.app.domains['test-vm2'].template,
  1234. self.app.default_template)
  1235. self.assertTrue(self.app.save.called)
  1236. @unittest.mock.patch('qubes.storage.Storage.create')
  1237. def test_334_vm_create_invalid_name(self, storage_mock):
  1238. storage_mock.side_effect = self.dummy_coro
  1239. with self.assertRaises(qubes.exc.QubesValueError):
  1240. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  1241. b'dom0', b'test-template', b'name=test-###')
  1242. self.assertNotIn('test-###', self.app.domains)
  1243. self.assertFalse(self.app.save.called)
  1244. @unittest.mock.patch('qubes.storage.Storage.create')
  1245. def test_335_vm_create_missing_name(self, storage_mock):
  1246. storage_mock.side_effect = self.dummy_coro
  1247. with self.assertRaises(qubes.api.ProtocolError):
  1248. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  1249. b'dom0', b'test-template', b'label=red')
  1250. self.assertFalse(self.app.save.called)
  1251. @unittest.mock.patch('qubes.storage.Storage.create')
  1252. def test_336_vm_create_spurious_pool(self, storage_mock):
  1253. storage_mock.side_effect = self.dummy_coro
  1254. with self.assertRaises(qubes.api.ProtocolError):
  1255. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  1256. b'dom0', b'test-template',
  1257. b'name=test-vm2 label=red pool=default')
  1258. self.assertNotIn('test-vm2', self.app.domains)
  1259. self.assertFalse(self.app.save.called)
  1260. @unittest.mock.patch('qubes.storage.Storage.create')
  1261. def test_337_vm_create_duplicate_name(self, storage_mock):
  1262. storage_mock.side_effect = self.dummy_coro
  1263. with self.assertRaises(qubes.exc.QubesException):
  1264. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  1265. b'dom0', b'test-template',
  1266. b'name=test-vm1 label=red')
  1267. self.assertFalse(self.app.save.called)
  1268. @unittest.mock.patch('qubes.storage.Storage.create')
  1269. def test_338_vm_create_name_twice(self, storage_mock):
  1270. storage_mock.side_effect = self.dummy_coro
  1271. with self.assertRaises(qubes.api.ProtocolError):
  1272. self.call_mgmt_func(b'admin.vm.Create.AppVM',
  1273. b'dom0', b'test-template',
  1274. b'name=test-vm2 name=test-vm3 label=red')
  1275. self.assertNotIn('test-vm2', self.app.domains)
  1276. self.assertNotIn('test-vm3', self.app.domains)
  1277. self.assertFalse(self.app.save.called)
  1278. @unittest.mock.patch('qubes.storage.Storage.create')
  1279. def test_340_vm_create_in_pool_app(self, storage_mock):
  1280. storage_mock.side_effect = self.dummy_coro
  1281. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1282. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1283. b'pool=test')
  1284. self.assertIn('test-vm2', self.app.domains)
  1285. vm = self.app.domains['test-vm2']
  1286. self.assertEqual(vm.label, self.app.get_label('red'))
  1287. self.assertEqual(vm.template, self.app.domains['test-template'])
  1288. # setting pool= affect only volumes actually created for this VM,
  1289. # not used from a template or so
  1290. self.assertEqual(vm.volume_config['root']['pool'],
  1291. self.template.volumes['root'].pool)
  1292. self.assertEqual(vm.volume_config['private']['pool'], 'test')
  1293. self.assertEqual(vm.volume_config['volatile']['pool'], 'test')
  1294. self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel')
  1295. self.assertEqual(storage_mock.mock_calls,
  1296. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  1297. self.assertTrue(os.path.exists(os.path.join(
  1298. self.test_base_dir, 'appvms', 'test-vm2')))
  1299. self.assertTrue(self.app.save.called)
  1300. @unittest.mock.patch('qubes.storage.Storage.create')
  1301. def test_341_vm_create_in_pool_private(self, storage_mock):
  1302. storage_mock.side_effect = self.dummy_coro
  1303. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1304. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1305. b'pool:private=test')
  1306. self.assertIn('test-vm2', self.app.domains)
  1307. vm = self.app.domains['test-vm2']
  1308. self.assertEqual(vm.label, self.app.get_label('red'))
  1309. self.assertEqual(vm.template, self.app.domains['test-template'])
  1310. self.assertEqual(vm.volume_config['root']['pool'],
  1311. self.template.volumes['root'].pool)
  1312. self.assertEqual(vm.volume_config['private']['pool'], 'test')
  1313. self.assertEqual(vm.volume_config['volatile']['pool'],
  1314. self.app.default_pool_volatile)
  1315. self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel')
  1316. self.assertEqual(storage_mock.mock_calls,
  1317. [unittest.mock.call(self.app.domains['test-vm2']).create()])
  1318. self.assertTrue(os.path.exists(os.path.join(
  1319. self.test_base_dir, 'appvms', 'test-vm2')))
  1320. self.assertTrue(self.app.save.called)
  1321. @unittest.mock.patch('qubes.storage.Storage.create')
  1322. def test_342_vm_create_in_pool_invalid_pool(self, storage_mock):
  1323. storage_mock.side_effect = self.dummy_coro
  1324. with self.assertRaises(qubes.exc.QubesException):
  1325. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1326. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1327. b'pool=no-such-pool')
  1328. self.assertFalse(self.app.save.called)
  1329. @unittest.mock.patch('qubes.storage.Storage.create')
  1330. def test_343_vm_create_in_pool_invalid_pool2(self, storage_mock):
  1331. storage_mock.side_effect = self.dummy_coro
  1332. with self.assertRaises(qubes.exc.QubesException):
  1333. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1334. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1335. b'pool:private=no-such-pool')
  1336. self.assertNotIn('test-vm2', self.app.domains)
  1337. self.assertFalse(self.app.save.called)
  1338. @unittest.mock.patch('qubes.storage.Storage.create')
  1339. def test_344_vm_create_in_pool_invalid_volume(self, storage_mock):
  1340. storage_mock.side_effect = self.dummy_coro
  1341. with self.assertRaises(qubes.api.PermissionDenied):
  1342. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1343. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1344. b'pool:invalid=test')
  1345. self.assertNotIn('test-vm2', self.app.domains)
  1346. self.assertFalse(self.app.save.called)
  1347. @unittest.mock.patch('qubes.storage.Storage.create')
  1348. def test_345_vm_create_in_pool_app_root(self, storage_mock):
  1349. # setting custom pool for 'root' volume of AppVM should not be
  1350. # allowed - this volume belongs to the template
  1351. storage_mock.side_effect = self.dummy_coro
  1352. with self.assertRaises(qubes.exc.QubesException):
  1353. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1354. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1355. b'pool:root=test')
  1356. self.assertNotIn('test-vm2', self.app.domains)
  1357. self.assertFalse(self.app.save.called)
  1358. @unittest.mock.patch('qubes.storage.Storage.create')
  1359. def test_346_vm_create_in_pool_duplicate_pool(self, storage_mock):
  1360. # setting custom pool for 'root' volume of AppVM should not be
  1361. # allowed - this volume belongs to the template
  1362. storage_mock.side_effect = self.dummy_coro
  1363. with self.assertRaises(qubes.api.ProtocolError):
  1364. self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM',
  1365. b'dom0', b'test-template', b'name=test-vm2 label=red '
  1366. b'pool=test pool:root=test')
  1367. self.assertNotIn('test-vm2', self.app.domains)
  1368. self.assertFalse(self.app.save.called)
  1369. def test_400_property_list(self):
  1370. # actual function tested for admin.vm.property.* already
  1371. # this test is kind of stupid, but at least check if appropriate
  1372. # admin-permission event is fired
  1373. value = self.call_mgmt_func(b'admin.property.List', b'dom0')
  1374. properties = self.app.property_list()
  1375. self.assertEqual(value,
  1376. ''.join('{}\n'.format(prop.__name__) for prop in properties))
  1377. def test_410_property_get_str(self):
  1378. # actual function tested for admin.vm.property.* already
  1379. value = self.call_mgmt_func(b'admin.property.Get', b'dom0',
  1380. b'default_kernel')
  1381. self.assertEqual(value, 'default=False type=str 1.0')
  1382. def test_420_propert_set_str(self):
  1383. # actual function tested for admin.vm.property.* already
  1384. with unittest.mock.patch('qubes.property.__set__') as mock:
  1385. value = self.call_mgmt_func(b'admin.property.Set', b'dom0',
  1386. b'default_kernel', b'1.0')
  1387. self.assertIsNone(value)
  1388. mock.assert_called_once_with(self.app, '1.0')
  1389. self.app.save.assert_called_once_with()
  1390. def test_440_property_help(self):
  1391. # actual function tested for admin.vm.property.* already
  1392. value = self.call_mgmt_func(b'admin.property.Help', b'dom0',
  1393. b'clockvm')
  1394. self.assertEqual(value,
  1395. 'Which VM to use as NTP proxy for updating AdminVM')
  1396. self.assertFalse(self.app.save.called)
  1397. def test_450_property_reset(self):
  1398. # actual function tested for admin.vm.property.* already
  1399. with unittest.mock.patch('qubes.property.__delete__') as mock:
  1400. value = self.call_mgmt_func(b'admin.property.Reset', b'dom0',
  1401. b'clockvm')
  1402. mock.assert_called_with(self.app)
  1403. self.assertIsNone(value)
  1404. self.app.save.assert_called_once_with()
  1405. def device_list_testclass(self, vm, event):
  1406. if vm is not self.vm:
  1407. return
  1408. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  1409. dev.description = 'Some device'
  1410. dev.extra_prop = 'xx'
  1411. yield dev
  1412. dev = qubes.devices.DeviceInfo(self.vm, '4321')
  1413. dev.description = 'Some other device'
  1414. yield dev
  1415. def test_460_vm_device_available(self):
  1416. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1417. value = self.call_mgmt_func(b'admin.vm.device.testclass.Available',
  1418. b'test-vm1')
  1419. self.assertEqual(value,
  1420. '1234 extra_prop=xx description=Some '
  1421. 'device\n'
  1422. '4321 description=Some other device\n')
  1423. self.assertFalse(self.app.save.called)
  1424. def test_461_vm_device_available_specific(self):
  1425. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1426. value = self.call_mgmt_func(b'admin.vm.device.testclass.Available',
  1427. b'test-vm1', b'4321')
  1428. self.assertEqual(value,
  1429. '4321 description=Some other device\n')
  1430. self.assertFalse(self.app.save.called)
  1431. def test_462_vm_device_available_invalid(self):
  1432. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1433. value = self.call_mgmt_func(b'admin.vm.device.testclass.Available',
  1434. b'test-vm1', b'no-such-device')
  1435. self.assertEqual(value, '')
  1436. self.assertFalse(self.app.save.called)
  1437. def test_470_vm_device_list_persistent(self):
  1438. assignment = qubes.devices.DeviceAssignment(self.vm, '1234',
  1439. persistent=True)
  1440. self.loop.run_until_complete(
  1441. self.vm.devices['testclass'].attach(assignment))
  1442. value = self.call_mgmt_func(b'admin.vm.device.testclass.List',
  1443. b'test-vm1')
  1444. self.assertEqual(value,
  1445. 'test-vm1+1234 persistent=yes\n')
  1446. self.assertFalse(self.app.save.called)
  1447. def test_471_vm_device_list_persistent_options(self):
  1448. assignment = qubes.devices.DeviceAssignment(self.vm, '1234',
  1449. persistent=True, options={'opt1': 'value'})
  1450. self.loop.run_until_complete(
  1451. self.vm.devices['testclass'].attach(assignment))
  1452. assignment = qubes.devices.DeviceAssignment(self.vm, '4321',
  1453. persistent=True)
  1454. self.loop.run_until_complete(
  1455. self.vm.devices['testclass'].attach(assignment))
  1456. value = self.call_mgmt_func(b'admin.vm.device.testclass.List',
  1457. b'test-vm1')
  1458. self.assertEqual(value,
  1459. 'test-vm1+1234 opt1=value persistent=yes\n'
  1460. 'test-vm1+4321 persistent=yes\n')
  1461. self.assertFalse(self.app.save.called)
  1462. def device_list_attached_testclass(self, vm, event, **kwargs):
  1463. if vm is not self.vm:
  1464. return
  1465. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  1466. yield (dev, {'attach_opt': 'value'})
  1467. def test_472_vm_device_list_temporary(self):
  1468. self.vm.add_handler('device-list-attached:testclass',
  1469. self.device_list_attached_testclass)
  1470. value = self.call_mgmt_func(b'admin.vm.device.testclass.List',
  1471. b'test-vm1')
  1472. self.assertEqual(value,
  1473. 'test-vm1+1234 attach_opt=value persistent=no\n')
  1474. self.assertFalse(self.app.save.called)
  1475. def test_473_vm_device_list_mixed(self):
  1476. self.vm.add_handler('device-list-attached:testclass',
  1477. self.device_list_attached_testclass)
  1478. assignment = qubes.devices.DeviceAssignment(self.vm, '4321',
  1479. persistent=True)
  1480. self.loop.run_until_complete(
  1481. self.vm.devices['testclass'].attach(assignment))
  1482. value = self.call_mgmt_func(b'admin.vm.device.testclass.List',
  1483. b'test-vm1')
  1484. self.assertEqual(value,
  1485. 'test-vm1+1234 attach_opt=value persistent=no\n'
  1486. 'test-vm1+4321 persistent=yes\n')
  1487. self.assertFalse(self.app.save.called)
  1488. def test_474_vm_device_list_specific(self):
  1489. self.vm.add_handler('device-list-attached:testclass',
  1490. self.device_list_attached_testclass)
  1491. assignment = qubes.devices.DeviceAssignment(self.vm, '4321',
  1492. persistent=True)
  1493. self.loop.run_until_complete(
  1494. self.vm.devices['testclass'].attach(assignment))
  1495. value = self.call_mgmt_func(b'admin.vm.device.testclass.List',
  1496. b'test-vm1', b'test-vm1+1234')
  1497. self.assertEqual(value,
  1498. 'test-vm1+1234 attach_opt=value persistent=no\n')
  1499. self.assertFalse(self.app.save.called)
  1500. def test_480_vm_device_attach(self):
  1501. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1502. mock_attach = unittest.mock.Mock()
  1503. mock_attach.return_value = None
  1504. del mock_attach._is_coroutine
  1505. self.vm.add_handler('device-attach:testclass', mock_attach)
  1506. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  1507. 'is_halted', lambda _: False):
  1508. value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach',
  1509. b'test-vm1', b'test-vm1+1234')
  1510. self.assertIsNone(value)
  1511. mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass',
  1512. device=self.vm.devices['testclass']['1234'],
  1513. options={})
  1514. self.assertEqual(len(self.vm.devices['testclass'].persistent()), 0)
  1515. self.app.save.assert_called_once_with()
  1516. def test_481_vm_device_attach(self):
  1517. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1518. mock_attach = unittest.mock.Mock()
  1519. mock_attach.return_value = None
  1520. del mock_attach._is_coroutine
  1521. self.vm.add_handler('device-attach:testclass', mock_attach)
  1522. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  1523. 'is_halted', lambda _: False):
  1524. value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach',
  1525. b'test-vm1', b'test-vm1+1234', b'persistent=no')
  1526. self.assertIsNone(value)
  1527. mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass',
  1528. device=self.vm.devices['testclass']['1234'],
  1529. options={})
  1530. self.assertEqual(len(self.vm.devices['testclass'].persistent()), 0)
  1531. self.app.save.assert_called_once_with()
  1532. def test_482_vm_device_attach_not_running(self):
  1533. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1534. mock_attach = unittest.mock.Mock()
  1535. del mock_attach._is_coroutine
  1536. self.vm.add_handler('device-attach:testclass', mock_attach)
  1537. with self.assertRaises(qubes.exc.QubesVMNotRunningError):
  1538. self.call_mgmt_func(b'admin.vm.device.testclass.Attach',
  1539. b'test-vm1', b'test-vm1+1234')
  1540. self.assertFalse(mock_attach.called)
  1541. self.assertEqual(len(self.vm.devices['testclass'].persistent()), 0)
  1542. self.assertFalse(self.app.save.called)
  1543. def test_483_vm_device_attach_persistent(self):
  1544. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1545. mock_attach = unittest.mock.Mock()
  1546. mock_attach.return_value = None
  1547. del mock_attach._is_coroutine
  1548. self.vm.add_handler('device-attach:testclass', mock_attach)
  1549. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  1550. 'is_halted', lambda _: False):
  1551. value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach',
  1552. b'test-vm1', b'test-vm1+1234', b'persistent=yes')
  1553. self.assertIsNone(value)
  1554. dev = self.vm.devices['testclass']['1234']
  1555. mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass',
  1556. device=dev,
  1557. options={})
  1558. self.assertIn(dev, self.vm.devices['testclass'].persistent())
  1559. self.app.save.assert_called_once_with()
  1560. def test_484_vm_device_attach_persistent_not_running(self):
  1561. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1562. mock_attach = unittest.mock.Mock()
  1563. mock_attach.return_value = None
  1564. del mock_attach._is_coroutine
  1565. self.vm.add_handler('device-attach:testclass', mock_attach)
  1566. value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach',
  1567. b'test-vm1', b'test-vm1+1234', b'persistent=yes')
  1568. self.assertIsNone(value)
  1569. dev = self.vm.devices['testclass']['1234']
  1570. mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass',
  1571. device=dev,
  1572. options={})
  1573. self.assertIn(dev, self.vm.devices['testclass'].persistent())
  1574. self.app.save.assert_called_once_with()
  1575. def test_485_vm_device_attach_options(self):
  1576. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1577. mock_attach = unittest.mock.Mock()
  1578. mock_attach.return_value = None
  1579. del mock_attach._is_coroutine
  1580. self.vm.add_handler('device-attach:testclass', mock_attach)
  1581. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  1582. 'is_halted', lambda _: False):
  1583. value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach',
  1584. b'test-vm1', b'test-vm1+1234', b'option1=value2')
  1585. self.assertIsNone(value)
  1586. dev = self.vm.devices['testclass']['1234']
  1587. mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass',
  1588. device=dev,
  1589. options={'option1': 'value2'})
  1590. self.app.save.assert_called_once_with()
  1591. def test_490_vm_device_detach(self):
  1592. self.vm.add_handler('device-list:testclass', self.device_list_testclass)
  1593. self.vm.add_handler('device-list-attached:testclass',
  1594. self.device_list_attached_testclass)
  1595. mock_detach = unittest.mock.Mock()
  1596. mock_detach.return_value = None
  1597. del mock_detach._is_coroutine
  1598. self.vm.add_handler('device-detach:testclass', mock_detach)
  1599. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  1600. 'is_halted', lambda _: False):
  1601. value = self.call_mgmt_func(b'admin.vm.device.testclass.Detach',
  1602. b'test-vm1', b'test-vm1+1234')
  1603. self.assertIsNone(value)
  1604. mock_detach.assert_called_once_with(self.vm, 'device-detach:testclass',
  1605. device=self.vm.devices['testclass']['1234'])
  1606. self.app.save.assert_called_once_with()
  1607. def test_491_vm_device_detach_not_attached(self):
  1608. mock_detach = unittest.mock.Mock()
  1609. mock_detach.return_value = None
  1610. del mock_detach._is_coroutine
  1611. self.vm.add_handler('device-detach:testclass', mock_detach)
  1612. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  1613. 'is_halted', lambda _: False):
  1614. with self.assertRaises(qubes.devices.DeviceNotAttached):
  1615. self.call_mgmt_func(b'admin.vm.device.testclass.Detach',
  1616. b'test-vm1', b'test-vm1+1234')
  1617. self.assertFalse(mock_detach.called)
  1618. self.assertFalse(self.app.save.called)
  1619. @unittest.mock.patch('qubes.storage.Storage.remove')
  1620. @unittest.mock.patch('shutil.rmtree')
  1621. def test_500_vm_remove(self, mock_rmtree, mock_remove):
  1622. mock_remove.side_effect = self.dummy_coro
  1623. value = self.call_mgmt_func(b'admin.vm.Remove', b'test-vm1')
  1624. self.assertIsNone(value)
  1625. mock_rmtree.assert_called_once_with(
  1626. '/tmp/qubes-test-dir/appvms/test-vm1')
  1627. mock_remove.assert_called_once_with()
  1628. self.app.save.assert_called_once_with()
  1629. @unittest.mock.patch('qubes.storage.Storage.remove')
  1630. @unittest.mock.patch('shutil.rmtree')
  1631. def test_501_vm_remove_running(self, mock_rmtree, mock_remove):
  1632. mock_remove.side_effect = self.dummy_coro
  1633. with unittest.mock.patch.object(
  1634. self.vm, 'get_power_state', lambda: 'Running'):
  1635. with self.assertRaises(qubes.exc.QubesVMNotHaltedError):
  1636. self.call_mgmt_func(b'admin.vm.Remove', b'test-vm1')
  1637. self.assertFalse(mock_rmtree.called)
  1638. self.assertFalse(mock_remove.called)
  1639. self.assertFalse(self.app.save.called)
  1640. @unittest.mock.patch('qubes.storage.Storage.remove')
  1641. @unittest.mock.patch('shutil.rmtree')
  1642. def test_502_vm_remove_attached(self, mock_rmtree, mock_remove):
  1643. self.setup_for_clone()
  1644. assignment = qubes.devices.DeviceAssignment(
  1645. self.vm, '1234', persistent=True)
  1646. self.loop.run_until_complete(
  1647. self.vm2.devices['testclass'].attach(assignment))
  1648. mock_remove.side_effect = self.dummy_coro
  1649. with self.assertRaises(qubes.exc.QubesVMInUseError):
  1650. self.call_mgmt_func(b'admin.vm.Remove', b'test-vm1')
  1651. self.assertFalse(mock_rmtree.called)
  1652. self.assertFalse(mock_remove.called)
  1653. self.assertFalse(self.app.save.called)
  1654. # Import tests
  1655. # (internal methods, normally called from qubes-rpc script)
  1656. def test_510_vm_volume_import(self):
  1657. value = self.call_internal_mgmt_func(
  1658. b'internal.vm.volume.ImportBegin', b'test-vm1', b'private')
  1659. self.assertEqual(value, '{} {}'.format(
  1660. 2*2**30, '/tmp/qubes-test-dir/appvms/test-vm1/private-import.img'))
  1661. self.assertFalse(self.app.save.called)
  1662. def test_511_vm_volume_import_running(self):
  1663. with unittest.mock.patch.object(
  1664. self.vm, 'get_power_state', lambda: 'Running'):
  1665. with self.assertRaises(qubes.exc.QubesVMNotHaltedError):
  1666. self.call_internal_mgmt_func(
  1667. b'internal.vm.volume.ImportBegin', b'test-vm1', b'private')
  1668. def test_512_vm_volume_import_with_size(self):
  1669. new_size = 4 * 2**30
  1670. file_name = '/tmp/qubes-test-dir/appvms/test-vm1/private-import.img'
  1671. value = self.call_internal_mgmt_func(
  1672. b'internal.vm.volume.ImportBegin', b'test-vm1',
  1673. b'private', payload=str(new_size).encode())
  1674. self.assertEqual(value, '{} {}'.format(
  1675. new_size, file_name))
  1676. self.assertFalse(self.app.save.called)
  1677. self.assertEqual(os.stat(file_name).st_size, new_size)
  1678. def test_515_vm_volume_import_fire_event(self):
  1679. self.call_internal_mgmt_func(
  1680. b'internal.vm.volume.ImportBegin', b'test-vm1', b'private')
  1681. self.assertEventFired(
  1682. self.emitter, 'admin-permission:admin.vm.volume.Import')
  1683. def test_516_vm_volume_import_fire_event_with_size(self):
  1684. self.call_internal_mgmt_func(
  1685. b'internal.vm.volume.ImportBegin', b'test-vm1', b'private',
  1686. b'123')
  1687. self.assertEventFired(
  1688. self.emitter, 'admin-permission:admin.vm.volume.ImportWithSize')
  1689. def test_510_vm_volume_import_end_success(self):
  1690. import_data_end_mock, self.vm.storage.import_data_end = \
  1691. self.coroutine_mock()
  1692. self.call_internal_mgmt_func(
  1693. b'internal.vm.volume.ImportEnd', b'test-vm1', b'private',
  1694. payload=b'ok')
  1695. self.assertEqual(import_data_end_mock.mock_calls, [
  1696. unittest.mock.call('private', success=True)
  1697. ])
  1698. def test_510_vm_volume_import_end_failure(self):
  1699. import_data_end_mock, self.vm.storage.import_data_end = \
  1700. self.coroutine_mock()
  1701. with self.assertRaisesRegexp(
  1702. qubes.exc.QubesException, 'error message'):
  1703. self.call_internal_mgmt_func(
  1704. b'internal.vm.volume.ImportEnd', b'test-vm1', b'private',
  1705. payload=b'fail\nerror message')
  1706. self.assertEqual(import_data_end_mock.mock_calls, [
  1707. unittest.mock.call('private', success=False)
  1708. ])
  1709. def setup_for_clone(self):
  1710. self.pool = unittest.mock.MagicMock()
  1711. self.app.pools['test'] = self.pool
  1712. self.vm2 = self.app.add_new_vm('AppVM', label='red',
  1713. name='test-vm2',
  1714. template='test-template', kernel='')
  1715. self.pool.configure_mock(**{
  1716. 'volumes': qubes.storage.VolumesCollection(self.pool),
  1717. 'init_volume.return_value.pool': self.pool,
  1718. '__str__.return_value': 'test',
  1719. 'get_volume.side_effect': (lambda vid:
  1720. self.vm.volumes['private']
  1721. if vid is self.vm.volumes['private'].vid
  1722. else self.vm2.volumes['private']
  1723. ),
  1724. })
  1725. self.loop.run_until_complete(
  1726. self.vm.create_on_disk(pool='test'))
  1727. self.loop.run_until_complete(
  1728. self.vm2.create_on_disk(pool='test'))
  1729. # the call replaces self.vm.volumes[...] with result of import
  1730. # operation - make sure it stays as the same object
  1731. self.vm.volumes['private'].import_volume.return_value = \
  1732. self.vm.volumes['private']
  1733. self.vm2.volumes['private'].import_volume.return_value = \
  1734. self.vm2.volumes['private']
  1735. self.addCleanup(self.cleanup_for_clone)
  1736. def cleanup_for_clone(self):
  1737. del self.vm2
  1738. del self.pool
  1739. def test_520_vm_volume_clone(self):
  1740. self.setup_for_clone()
  1741. token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom',
  1742. b'test-vm1', b'private', b'')
  1743. # token
  1744. self.assertEqual(len(token), 32)
  1745. self.assertFalse(self.app.save.called)
  1746. value = self.call_mgmt_func(b'admin.vm.volume.CloneTo',
  1747. b'test-vm2', b'private', token.encode())
  1748. self.assertIsNone(value)
  1749. self.vm2.volumes['private'].import_volume.assert_called_once_with(
  1750. self.vm.volumes['private']
  1751. )
  1752. self.vm2.volumes['private'].import_volume.assert_called_once_with(
  1753. self.vm2.volumes['private']
  1754. )
  1755. self.app.save.assert_called_once_with()
  1756. def test_521_vm_volume_clone_invalid_volume(self):
  1757. self.setup_for_clone()
  1758. with self.assertRaises(qubes.api.PermissionDenied):
  1759. self.call_mgmt_func(b'admin.vm.volume.CloneFrom',
  1760. b'test-vm1', b'private123', b'')
  1761. self.assertNotIn('init_volume().import_volume',
  1762. map(operator.itemgetter(0), self.pool.mock_calls))
  1763. self.assertFalse(self.app.save.called)
  1764. def test_522_vm_volume_clone_invalid_volume2(self):
  1765. self.setup_for_clone()
  1766. token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom',
  1767. b'test-vm1', b'private', b'')
  1768. with self.assertRaises(qubes.api.PermissionDenied):
  1769. self.call_mgmt_func(b'admin.vm.volume.CloneTo',
  1770. b'test-vm1', b'private123', token.encode())
  1771. self.assertNotIn('init_volume().import_volume',
  1772. map(operator.itemgetter(0), self.pool.mock_calls))
  1773. self.assertFalse(self.app.save.called)
  1774. def test_523_vm_volume_clone_removed_volume(self):
  1775. self.setup_for_clone()
  1776. token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom',
  1777. b'test-vm1', b'private', b'')
  1778. def get_volume(vid):
  1779. if vid == self.vm.volumes['private']:
  1780. raise KeyError(vid)
  1781. else:
  1782. return unittest.mock.DEFAULT
  1783. self.pool.get_volume.side_effect = get_volume
  1784. with self.assertRaises(qubes.api.PermissionDenied):
  1785. self.call_mgmt_func(b'admin.vm.volume.CloneTo',
  1786. b'test-vm1', b'private', token.encode())
  1787. self.assertNotIn('init_volume().import_volume',
  1788. map(operator.itemgetter(0), self.pool.mock_calls))
  1789. self.assertFalse(self.app.save.called)
  1790. def test_524_vm_volume_clone_invlid_token(self):
  1791. self.setup_for_clone()
  1792. with self.assertRaises(qubes.api.PermissionDenied):
  1793. self.call_mgmt_func(b'admin.vm.volume.CloneTo',
  1794. b'test-vm1', b'private', b'no-such-token')
  1795. self.assertNotIn('init_volume().import_volume',
  1796. map(operator.itemgetter(0), self.pool.mock_calls))
  1797. self.assertFalse(self.app.save.called)
  1798. def test_530_tag_list(self):
  1799. self.vm.tags.add('tag1')
  1800. self.vm.tags.add('tag2')
  1801. value = self.call_mgmt_func(b'admin.vm.tag.List', b'test-vm1')
  1802. self.assertEqual(value, 'audiovm-dom0\nguivm-dom0\ntag1\ntag2\n')
  1803. self.assertFalse(self.app.save.called)
  1804. def test_540_tag_get(self):
  1805. self.vm.tags.add('tag1')
  1806. value = self.call_mgmt_func(b'admin.vm.tag.Get', b'test-vm1',
  1807. b'tag1')
  1808. self.assertEqual(value, '1')
  1809. self.assertFalse(self.app.save.called)
  1810. def test_541_tag_get_absent(self):
  1811. value = self.call_mgmt_func(b'admin.vm.tag.Get', b'test-vm1', b'tag1')
  1812. self.assertEqual(value, '0')
  1813. self.assertFalse(self.app.save.called)
  1814. def test_550_tag_remove(self):
  1815. self.vm.tags.add('tag1')
  1816. value = self.call_mgmt_func(b'admin.vm.tag.Remove', b'test-vm1',
  1817. b'tag1')
  1818. self.assertIsNone(value, None)
  1819. self.assertNotIn('tag1', self.vm.tags)
  1820. self.assertTrue(self.app.save.called)
  1821. def test_551_tag_remove_absent(self):
  1822. with self.assertRaises(qubes.exc.QubesTagNotFoundError):
  1823. self.call_mgmt_func(b'admin.vm.tag.Remove',
  1824. b'test-vm1', b'tag1')
  1825. self.assertFalse(self.app.save.called)
  1826. def test_560_tag_set(self):
  1827. value = self.call_mgmt_func(b'admin.vm.tag.Set',
  1828. b'test-vm1', b'tag1')
  1829. self.assertIsNone(value)
  1830. self.assertIn('tag1', self.vm.tags)
  1831. self.assertTrue(self.app.save.called)
  1832. def test_561_tag_set_invalid(self):
  1833. with self.assertRaises(ValueError):
  1834. self.call_mgmt_func(b'admin.vm.tag.Set',
  1835. b'test-vm1', b'+.some-tag')
  1836. self.assertNotIn('+.some-tag', self.vm.tags)
  1837. self.assertFalse(self.app.save.called)
  1838. def test_570_firewall_get(self):
  1839. self.vm.firewall.save = unittest.mock.Mock()
  1840. value = self.call_mgmt_func(b'admin.vm.firewall.Get',
  1841. b'test-vm1', b'')
  1842. self.assertEqual(value, 'action=accept\n')
  1843. self.assertFalse(self.vm.firewall.save.called)
  1844. self.assertFalse(self.app.save.called)
  1845. def test_571_firewall_get_non_default(self):
  1846. self.vm.firewall.save = unittest.mock.Mock()
  1847. self.vm.firewall.rules = [
  1848. qubes.firewall.Rule(action='accept', proto='tcp',
  1849. dstports='1-1024'),
  1850. qubes.firewall.Rule(action='drop', proto='icmp',
  1851. comment='No ICMP'),
  1852. # should not output expired rule
  1853. qubes.firewall.Rule(action='drop', proto='udp',
  1854. expire='1499450306'),
  1855. qubes.firewall.Rule(action='drop', proto='udp',
  1856. expire='2099450306'),
  1857. qubes.firewall.Rule(action='accept'),
  1858. ]
  1859. value = self.call_mgmt_func(b'admin.vm.firewall.Get',
  1860. b'test-vm1', b'')
  1861. self.assertEqual(value,
  1862. 'action=accept proto=tcp dstports=1-1024\n'
  1863. 'action=drop proto=icmp comment=No ICMP\n'
  1864. 'action=drop expire=2099450306 proto=udp\n'
  1865. 'action=accept\n')
  1866. self.assertFalse(self.vm.firewall.save.called)
  1867. self.assertFalse(self.app.save.called)
  1868. def test_580_firewall_set_simple(self):
  1869. self.vm.firewall.save = unittest.mock.Mock()
  1870. value = self.call_mgmt_func(b'admin.vm.firewall.Set',
  1871. b'test-vm1', b'', b'action=accept\n')
  1872. self.assertEqual(self.vm.firewall.rules,
  1873. ['action=accept'])
  1874. self.assertTrue(self.vm.firewall.save.called)
  1875. self.assertFalse(self.app.save.called)
  1876. def test_581_firewall_set_multi(self):
  1877. self.vm.firewall.save = unittest.mock.Mock()
  1878. rules = [
  1879. qubes.firewall.Rule(action='accept', proto='tcp',
  1880. dstports='1-1024'),
  1881. qubes.firewall.Rule(action='drop', proto='icmp',
  1882. comment='No ICMP'),
  1883. qubes.firewall.Rule(action='drop', proto='udp',
  1884. expire='1499450306'),
  1885. qubes.firewall.Rule(action='accept'),
  1886. ]
  1887. rules_txt = (
  1888. 'action=accept proto=tcp dstports=1-1024\n'
  1889. 'action=drop proto=icmp comment=No ICMP\n'
  1890. 'action=drop expire=1499450306 proto=udp\n'
  1891. 'action=accept\n')
  1892. value = self.call_mgmt_func(b'admin.vm.firewall.Set',
  1893. b'test-vm1', b'', rules_txt.encode())
  1894. self.assertEqual(self.vm.firewall.rules, rules)
  1895. self.assertTrue(self.vm.firewall.save.called)
  1896. self.assertFalse(self.app.save.called)
  1897. def test_582_firewall_set_invalid(self):
  1898. self.vm.firewall.save = unittest.mock.Mock()
  1899. rules_txt = (
  1900. 'action=accept protoxyz=tcp dst4=127.0.0.1\n'
  1901. 'action=drop\n')
  1902. with self.assertRaises(ValueError):
  1903. self.call_mgmt_func(b'admin.vm.firewall.Set',
  1904. b'test-vm1', b'', rules_txt.encode())
  1905. self.assertEqual(self.vm.firewall.rules,
  1906. [qubes.firewall.Rule(action='accept')])
  1907. self.assertFalse(self.vm.firewall.save.called)
  1908. self.assertFalse(self.app.save.called)
  1909. def test_583_firewall_set_invalid(self):
  1910. self.vm.firewall.save = unittest.mock.Mock()
  1911. rules_txt = (
  1912. 'proto=tcp dstports=1-1024\n'
  1913. 'action=drop\n')
  1914. with self.assertRaises(ValueError):
  1915. self.call_mgmt_func(b'admin.vm.firewall.Set',
  1916. b'test-vm1', b'', rules_txt.encode())
  1917. self.assertEqual(self.vm.firewall.rules,
  1918. [qubes.firewall.Rule(action='accept')])
  1919. self.assertFalse(self.vm.firewall.save.called)
  1920. self.assertFalse(self.app.save.called)
  1921. def test_584_firewall_set_invalid(self):
  1922. self.vm.firewall.save = unittest.mock.Mock()
  1923. rules_txt = (
  1924. 'action=accept proto=tcp dstports=1-1024 '
  1925. 'action=drop\n')
  1926. with self.assertRaises(ValueError):
  1927. self.call_mgmt_func(b'admin.vm.firewall.Set',
  1928. b'test-vm1', b'', rules_txt.encode())
  1929. self.assertEqual(self.vm.firewall.rules,
  1930. [qubes.firewall.Rule(action='accept')])
  1931. self.assertFalse(self.vm.firewall.save.called)
  1932. self.assertFalse(self.app.save.called)
  1933. def test_585_firewall_set_invalid(self):
  1934. self.vm.firewall.save = unittest.mock.Mock()
  1935. rules_txt = (
  1936. 'action=accept dstports=1-1024 comment=ążźł\n'
  1937. 'action=drop\n')
  1938. with self.assertRaises(UnicodeDecodeError):
  1939. self.call_mgmt_func(b'admin.vm.firewall.Set',
  1940. b'test-vm1', b'', rules_txt.encode())
  1941. self.assertEqual(self.vm.firewall.rules,
  1942. [qubes.firewall.Rule(action='accept')])
  1943. self.assertFalse(self.vm.firewall.save.called)
  1944. self.assertFalse(self.app.save.called)
  1945. def test_590_firewall_reload(self):
  1946. self.vm.firewall.save = unittest.mock.Mock()
  1947. self.app.domains['test-vm1'].fire_event = self.emitter.fire_event
  1948. value = self.call_mgmt_func(b'admin.vm.firewall.Reload',
  1949. b'test-vm1', b'')
  1950. self.assertIsNone(value)
  1951. self.assertEventFired(self.emitter, 'firewall-changed')
  1952. self.assertFalse(self.vm.firewall.save.called)
  1953. self.assertFalse(self.app.save.called)
  1954. def test_600_backup_info(self):
  1955. backup_profile = (
  1956. 'include:\n'
  1957. ' - test-vm1\n'
  1958. 'destination_vm: test-vm1\n'
  1959. 'destination_path: /var/tmp\n'
  1960. 'passphrase_text: test\n'
  1961. )
  1962. expected_info = (
  1963. '------------------+--------------+--------------+\n'
  1964. ' VM | type | size |\n'
  1965. '------------------+--------------+--------------+\n'
  1966. ' test-vm1 | VM | 0 |\n'
  1967. '------------------+--------------+--------------+\n'
  1968. ' Total size: | 0 |\n'
  1969. '------------------+--------------+--------------+\n'
  1970. 'VMs not selected for backup:\n'
  1971. ' - dom0\n'
  1972. ' - test-template\n'
  1973. )
  1974. with tempfile.TemporaryDirectory() as profile_dir:
  1975. with open(os.path.join(profile_dir, 'testprofile.conf'), 'w') as \
  1976. profile_file:
  1977. profile_file.write(backup_profile)
  1978. with unittest.mock.patch('qubes.config.backup_profile_dir',
  1979. profile_dir):
  1980. result = self.call_mgmt_func(b'admin.backup.Info', b'dom0',
  1981. b'testprofile')
  1982. self.assertEqual(result, expected_info)
  1983. def test_601_backup_info_profile_missing_destination_path(self):
  1984. backup_profile = (
  1985. 'include:\n'
  1986. ' - test-vm1\n'
  1987. 'destination_vm: test-vm1\n'
  1988. 'passphrase_text: test\n'
  1989. )
  1990. with tempfile.TemporaryDirectory() as profile_dir:
  1991. with open(os.path.join(profile_dir, 'testprofile.conf'), 'w') as \
  1992. profile_file:
  1993. profile_file.write(backup_profile)
  1994. with unittest.mock.patch('qubes.config.backup_profile_dir',
  1995. profile_dir):
  1996. with self.assertRaises(qubes.exc.QubesException):
  1997. self.call_mgmt_func(b'admin.backup.Info', b'dom0',
  1998. b'testprofile')
  1999. def test_602_backup_info_profile_missing_destination_vm(self):
  2000. backup_profile = (
  2001. 'include:\n'
  2002. ' - test-vm1\n'
  2003. 'destination_path: /home/user\n'
  2004. 'passphrase_text: test\n'
  2005. )
  2006. with tempfile.TemporaryDirectory() as profile_dir:
  2007. with open(os.path.join(profile_dir, 'testprofile.conf'), 'w') as \
  2008. profile_file:
  2009. profile_file.write(backup_profile)
  2010. with unittest.mock.patch('qubes.config.backup_profile_dir',
  2011. profile_dir):
  2012. with self.assertRaises(qubes.exc.QubesException):
  2013. self.call_mgmt_func(b'admin.backup.Info', b'dom0',
  2014. b'testprofile')
  2015. def test_610_backup_cancel_not_running(self):
  2016. with self.assertRaises(qubes.exc.QubesException):
  2017. self.call_mgmt_func(b'admin.backup.Cancel', b'dom0',
  2018. b'testprofile')
  2019. def test_611_backup_already_running(self):
  2020. if not hasattr(self.app, 'api_admin_running_backups'):
  2021. self.app.api_admin_running_backups = {}
  2022. self.app.api_admin_running_backups['testprofile'] = 'test'
  2023. self.addCleanup(self.app.api_admin_running_backups.pop, 'testprofile')
  2024. backup_profile = (
  2025. 'include:\n'
  2026. ' - test-vm1\n'
  2027. 'destination_vm: test-vm1\n'
  2028. 'destination_path: /home/user\n'
  2029. 'passphrase_text: test\n'
  2030. )
  2031. with tempfile.TemporaryDirectory() as profile_dir:
  2032. with open(os.path.join(profile_dir, 'testprofile.conf'), 'w') as \
  2033. profile_file:
  2034. profile_file.write(backup_profile)
  2035. with unittest.mock.patch('qubes.config.backup_profile_dir',
  2036. profile_dir):
  2037. with self.assertRaises(qubes.exc.BackupAlreadyRunningError):
  2038. self.call_mgmt_func(b'admin.backup.Execute', b'dom0',
  2039. b'testprofile')
  2040. @unittest.mock.patch('qubes.backup.Backup')
  2041. def test_620_backup_execute(self, mock_backup):
  2042. backup_profile = (
  2043. 'include:\n'
  2044. ' - test-vm1\n'
  2045. 'destination_vm: test-vm1\n'
  2046. 'destination_path: /home/user\n'
  2047. 'passphrase_text: test\n'
  2048. )
  2049. mock_backup.return_value.backup_do.side_effect = self.dummy_coro
  2050. with tempfile.TemporaryDirectory() as profile_dir:
  2051. with open(os.path.join(profile_dir, 'testprofile.conf'), 'w') as \
  2052. profile_file:
  2053. profile_file.write(backup_profile)
  2054. with unittest.mock.patch('qubes.config.backup_profile_dir',
  2055. profile_dir):
  2056. result = self.call_mgmt_func(b'admin.backup.Execute', b'dom0',
  2057. b'testprofile')
  2058. self.assertIsNone(result)
  2059. mock_backup.assert_called_once_with(
  2060. self.app,
  2061. {self.vm},
  2062. set(),
  2063. target_vm=self.vm,
  2064. target_dir='/home/user',
  2065. compressed=True,
  2066. passphrase='test')
  2067. mock_backup.return_value.backup_do.assert_called_once_with()
  2068. @unittest.mock.patch('qubes.backup.Backup')
  2069. def test_621_backup_execute_passphrase_service(self, mock_backup):
  2070. backup_profile = (
  2071. 'include:\n'
  2072. ' - test-vm1\n'
  2073. 'destination_vm: test-vm1\n'
  2074. 'destination_path: /home/user\n'
  2075. 'passphrase_vm: test-vm1\n'
  2076. )
  2077. @asyncio.coroutine
  2078. def service_passphrase(*args, **kwargs):
  2079. return (b'pass-from-vm', None)
  2080. mock_backup.return_value.backup_do.side_effect = self.dummy_coro
  2081. self.vm.run_service_for_stdio = unittest.mock.Mock(
  2082. side_effect=service_passphrase)
  2083. with tempfile.TemporaryDirectory() as profile_dir:
  2084. with open(os.path.join(profile_dir, 'testprofile.conf'), 'w') as \
  2085. profile_file:
  2086. profile_file.write(backup_profile)
  2087. with unittest.mock.patch('qubes.config.backup_profile_dir',
  2088. profile_dir):
  2089. result = self.call_mgmt_func(b'admin.backup.Execute', b'dom0',
  2090. b'testprofile')
  2091. self.assertIsNone(result)
  2092. mock_backup.assert_called_once_with(
  2093. self.app,
  2094. {self.vm},
  2095. set(),
  2096. target_vm=self.vm,
  2097. target_dir='/home/user',
  2098. compressed=True,
  2099. passphrase=b'pass-from-vm')
  2100. mock_backup.return_value.backup_do.assert_called_once_with()
  2101. self.vm.run_service_for_stdio.assert_called_with(
  2102. 'qubes.BackupPassphrase+testprofile')
  2103. def test_630_vm_stats(self):
  2104. send_event = unittest.mock.Mock(spec=[])
  2105. stats1 = {
  2106. 0: {
  2107. 'cpu_time': 243951379111104 // 8,
  2108. 'cpu_usage': 0,
  2109. 'cpu_usage_raw': 0,
  2110. 'memory_kb': 3733212,
  2111. },
  2112. 1: {
  2113. 'cpu_time': 2849496569205,
  2114. 'cpu_usage': 0,
  2115. 'cpu_usage_raw': 0,
  2116. 'memory_kb': 303916,
  2117. },
  2118. }
  2119. stats2 = copy.deepcopy(stats1)
  2120. stats2[0]['cpu_time'] += 100000000
  2121. stats2[0]['cpu_usage'] = 10
  2122. stats2[0]['cpu_usage_raw'] = 10
  2123. stats2[1]['cpu_usage'] = 5
  2124. stats2[1]['cpu_usage_raw'] = 5
  2125. self.app.host.get_vm_stats = unittest.mock.Mock()
  2126. self.app.host.get_vm_stats.side_effect = [
  2127. (0, stats1), (1, stats2),
  2128. ]
  2129. self.app.stats_interval = 1
  2130. mgmt_obj = qubes.api.admin.QubesAdminAPI(
  2131. self.app, b'dom0', b'admin.vm.Stats',
  2132. b'dom0', b'', send_event=send_event)
  2133. def cancel_call():
  2134. mgmt_obj.cancel()
  2135. class MockVM(object):
  2136. def __init__(self, name):
  2137. self._name = name
  2138. def name(self):
  2139. return self._name
  2140. loop = asyncio.get_event_loop()
  2141. self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: {
  2142. 0: MockVM('Domain-0'),
  2143. 1: MockVM('test-template'),
  2144. 2: MockVM('test-vm1')}[xid]
  2145. execute_task = asyncio.ensure_future(
  2146. mgmt_obj.execute(untrusted_payload=b''))
  2147. loop.call_later(1.1, cancel_call)
  2148. loop.run_until_complete(execute_task)
  2149. self.assertIsNone(execute_task.result())
  2150. self.assertEventFired(self.emitter,
  2151. 'admin-permission:' + 'admin.vm.Stats')
  2152. self.assertEqual(self.app.host.get_vm_stats.mock_calls, [
  2153. unittest.mock.call(None, None, only_vm=None),
  2154. unittest.mock.call(0, stats1, only_vm=None),
  2155. ])
  2156. self.assertEqual(send_event.mock_calls, [
  2157. unittest.mock.call(self.app, 'connection-established'),
  2158. unittest.mock.call('dom0', 'vm-stats',
  2159. cpu_time=stats1[0]['cpu_time'] // 1000000,
  2160. cpu_usage=stats1[0]['cpu_usage'],
  2161. cpu_usage_raw=stats1[0]['cpu_usage_raw'],
  2162. memory_kb=stats1[0]['memory_kb']),
  2163. unittest.mock.call('test-template', 'vm-stats',
  2164. cpu_time=stats1[1]['cpu_time'] // 1000000,
  2165. cpu_usage=stats1[1]['cpu_usage'],
  2166. cpu_usage_raw=stats1[1]['cpu_usage_raw'],
  2167. memory_kb=stats1[1]['memory_kb']),
  2168. unittest.mock.call('dom0', 'vm-stats',
  2169. cpu_time=stats2[0]['cpu_time'] // 1000000,
  2170. cpu_usage=stats2[0]['cpu_usage'],
  2171. cpu_usage_raw=stats2[0]['cpu_usage_raw'],
  2172. memory_kb=stats2[0]['memory_kb']),
  2173. unittest.mock.call('test-template', 'vm-stats',
  2174. cpu_time=stats2[1]['cpu_time'] // 1000000,
  2175. cpu_usage=stats2[1]['cpu_usage'],
  2176. cpu_usage_raw=stats2[1]['cpu_usage_raw'],
  2177. memory_kb=stats2[1]['memory_kb']),
  2178. ])
  2179. def test_631_vm_stats_single_vm(self):
  2180. send_event = unittest.mock.Mock(spec=[])
  2181. stats1 = {
  2182. 2: {
  2183. 'cpu_time': 2849496569205,
  2184. 'cpu_usage': 0,
  2185. 'cpu_usage_raw': 0,
  2186. 'memory_kb': 303916,
  2187. },
  2188. }
  2189. stats2 = copy.deepcopy(stats1)
  2190. stats2[2]['cpu_usage'] = 5
  2191. stats2[2]['cpu_usage_raw'] = 5
  2192. self.app.host.get_vm_stats = unittest.mock.Mock()
  2193. self.app.host.get_vm_stats.side_effect = [
  2194. (0, stats1), (1, stats2),
  2195. ]
  2196. self.app.stats_interval = 1
  2197. mgmt_obj = qubes.api.admin.QubesAdminAPI(
  2198. self.app, b'dom0', b'admin.vm.Stats',
  2199. b'test-vm1', b'', send_event=send_event)
  2200. def cancel_call():
  2201. mgmt_obj.cancel()
  2202. class MockVM(object):
  2203. def __init__(self, name):
  2204. self._name = name
  2205. def name(self):
  2206. return self._name
  2207. loop = asyncio.get_event_loop()
  2208. self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: {
  2209. 0: MockVM('Domain-0'),
  2210. 1: MockVM('test-template'),
  2211. 2: MockVM('test-vm1')}[xid]
  2212. execute_task = asyncio.ensure_future(
  2213. mgmt_obj.execute(untrusted_payload=b''))
  2214. loop.call_later(1.1, cancel_call)
  2215. loop.run_until_complete(execute_task)
  2216. self.assertIsNone(execute_task.result())
  2217. self.assertEventFired(self.emitter,
  2218. 'admin-permission:' + 'admin.vm.Stats')
  2219. self.assertEqual(self.app.host.get_vm_stats.mock_calls, [
  2220. unittest.mock.call(None, None, only_vm=self.vm),
  2221. unittest.mock.call(0, stats1, only_vm=self.vm),
  2222. ])
  2223. self.assertEqual(send_event.mock_calls, [
  2224. unittest.mock.call(self.app, 'connection-established'),
  2225. unittest.mock.call('test-vm1', 'vm-stats',
  2226. cpu_time=stats1[2]['cpu_time'] // 1000000,
  2227. cpu_usage=stats1[2]['cpu_usage'],
  2228. cpu_usage_raw=stats1[2]['cpu_usage_raw'],
  2229. memory_kb=stats1[2]['memory_kb']),
  2230. unittest.mock.call('test-vm1', 'vm-stats',
  2231. cpu_time=stats2[2]['cpu_time'] // 1000000,
  2232. cpu_usage=stats2[2]['cpu_usage'],
  2233. cpu_usage_raw=stats2[2]['cpu_usage_raw'],
  2234. memory_kb=stats2[2]['memory_kb']),
  2235. ])
  2236. @unittest.mock.patch('qubes.storage.Storage.create')
  2237. def test_640_vm_create_disposable(self, mock_storage):
  2238. mock_storage.side_effect = self.dummy_coro
  2239. self.vm.template_for_dispvms = True
  2240. retval = self.call_mgmt_func(b'admin.vm.CreateDisposable',
  2241. b'test-vm1')
  2242. self.assertTrue(retval.startswith('disp'))
  2243. self.assertIn(retval, self.app.domains)
  2244. dispvm = self.app.domains[retval]
  2245. self.assertEqual(dispvm.template, self.vm)
  2246. mock_storage.assert_called_once_with()
  2247. self.assertTrue(self.app.save.called)
  2248. @unittest.mock.patch('qubes.storage.Storage.create')
  2249. def test_641_vm_create_disposable_default(self, mock_storage):
  2250. mock_storage.side_effect = self.dummy_coro
  2251. self.vm.template_for_dispvms = True
  2252. self.app.default_dispvm = self.vm
  2253. retval = self.call_mgmt_func(b'admin.vm.CreateDisposable',
  2254. b'dom0')
  2255. self.assertTrue(retval.startswith('disp'))
  2256. mock_storage.assert_called_once_with()
  2257. self.assertTrue(self.app.save.called)
  2258. @unittest.mock.patch('qubes.storage.Storage.create')
  2259. def test_642_vm_create_disposable_not_allowed(self, storage_mock):
  2260. storage_mock.side_effect = self.dummy_coro
  2261. with self.assertRaises(qubes.exc.QubesException):
  2262. self.call_mgmt_func(b'admin.vm.CreateDisposable',
  2263. b'test-vm1')
  2264. self.assertFalse(self.app.save.called)
  2265. def test_650_vm_device_set_persistent_true(self):
  2266. self.vm.add_handler('device-list:testclass',
  2267. self.device_list_testclass)
  2268. self.vm.add_handler('device-list-attached:testclass',
  2269. self.device_list_attached_testclass)
  2270. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  2271. 'is_halted', lambda _: False):
  2272. value = self.call_mgmt_func(
  2273. b'admin.vm.device.testclass.Set.persistent',
  2274. b'test-vm1', b'test-vm1+1234', b'True')
  2275. self.assertIsNone(value)
  2276. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  2277. self.assertIn(dev, self.vm.devices['testclass'].persistent())
  2278. self.app.save.assert_called_once_with()
  2279. def test_651_vm_device_set_persistent_false_unchanged(self):
  2280. self.vm.add_handler('device-list:testclass',
  2281. self.device_list_testclass)
  2282. self.vm.add_handler('device-list-attached:testclass',
  2283. self.device_list_attached_testclass)
  2284. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  2285. 'is_halted', lambda _: False):
  2286. value = self.call_mgmt_func(
  2287. b'admin.vm.device.testclass.Set.persistent',
  2288. b'test-vm1', b'test-vm1+1234', b'False')
  2289. self.assertIsNone(value)
  2290. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  2291. self.assertNotIn(dev, self.vm.devices['testclass'].persistent())
  2292. self.app.save.assert_called_once_with()
  2293. def test_652_vm_device_set_persistent_false(self):
  2294. self.vm.add_handler('device-list:testclass',
  2295. self.device_list_testclass)
  2296. assignment = qubes.devices.DeviceAssignment(self.vm, '1234', {},
  2297. True)
  2298. self.loop.run_until_complete(
  2299. self.vm.devices['testclass'].attach(assignment))
  2300. self.vm.add_handler('device-list-attached:testclass',
  2301. self.device_list_attached_testclass)
  2302. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  2303. self.assertIn(dev, self.vm.devices['testclass'].persistent())
  2304. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  2305. 'is_halted', lambda _: False):
  2306. value = self.call_mgmt_func(
  2307. b'admin.vm.device.testclass.Set.persistent',
  2308. b'test-vm1', b'test-vm1+1234', b'False')
  2309. self.assertIsNone(value)
  2310. self.assertNotIn(dev, self.vm.devices['testclass'].persistent())
  2311. self.assertIn(dev, self.vm.devices['testclass'].attached())
  2312. self.app.save.assert_called_once_with()
  2313. def test_653_vm_device_set_persistent_true_unchanged(self):
  2314. self.vm.add_handler('device-list:testclass',
  2315. self.device_list_testclass)
  2316. assignment = qubes.devices.DeviceAssignment(self.vm, '1234', {},
  2317. True)
  2318. self.loop.run_until_complete(
  2319. self.vm.devices['testclass'].attach(assignment))
  2320. self.vm.add_handler('device-list-attached:testclass',
  2321. self.device_list_attached_testclass)
  2322. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  2323. 'is_halted', lambda _: False):
  2324. value = self.call_mgmt_func(
  2325. b'admin.vm.device.testclass.Set.persistent',
  2326. b'test-vm1', b'test-vm1+1234', b'True')
  2327. self.assertIsNone(value)
  2328. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  2329. self.assertIn(dev, self.vm.devices['testclass'].persistent())
  2330. self.assertIn(dev, self.vm.devices['testclass'].attached())
  2331. self.app.save.assert_called_once_with()
  2332. def test_654_vm_device_set_persistent_not_attached(self):
  2333. self.vm.add_handler('device-list:testclass',
  2334. self.device_list_testclass)
  2335. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  2336. 'is_halted', lambda _: False):
  2337. with self.assertRaises(qubes.api.PermissionDenied):
  2338. self.call_mgmt_func(
  2339. b'admin.vm.device.testclass.Set.persistent',
  2340. b'test-vm1', b'test-vm1+1234', b'True')
  2341. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  2342. self.assertNotIn(dev, self.vm.devices['testclass'].persistent())
  2343. self.assertFalse(self.app.save.called)
  2344. def test_655_vm_device_set_persistent_invalid_value(self):
  2345. self.vm.add_handler('device-list:testclass',
  2346. self.device_list_testclass)
  2347. with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM,
  2348. 'is_halted', lambda _: False):
  2349. with self.assertRaises(qubes.api.PermissionDenied):
  2350. self.call_mgmt_func(
  2351. b'admin.vm.device.testclass.Set.persistent',
  2352. b'test-vm1', b'test-vm1+1234', b'maybe')
  2353. dev = qubes.devices.DeviceInfo(self.vm, '1234')
  2354. self.assertNotIn(dev, self.vm.devices['testclass'].persistent())
  2355. self.assertFalse(self.app.save.called)
  2356. def test_660_pool_set_revisions_to_keep(self):
  2357. self.app.pools['test-pool'] = unittest.mock.Mock()
  2358. value = self.call_mgmt_func(b'admin.pool.Set.revisions_to_keep',
  2359. b'dom0', b'test-pool', b'2')
  2360. self.assertIsNone(value)
  2361. self.assertEqual(self.app.pools['test-pool'].mock_calls, [])
  2362. self.assertEqual(self.app.pools['test-pool'].revisions_to_keep, 2)
  2363. self.app.save.assert_called_once_with()
  2364. def test_661_pool_set_revisions_to_keep_negative(self):
  2365. self.app.pools['test-pool'] = unittest.mock.Mock()
  2366. with self.assertRaises(qubes.api.PermissionDenied):
  2367. self.call_mgmt_func(b'admin.pool.Set.revisions_to_keep',
  2368. b'dom0', b'test-pool', b'-2')
  2369. self.assertEqual(self.app.pools['test-pool'].mock_calls, [])
  2370. self.assertFalse(self.app.save.called)
  2371. def test_662_pool_set_revisions_to_keep_not_a_number(self):
  2372. self.app.pools['test-pool'] = unittest.mock.Mock()
  2373. with self.assertRaises(qubes.api.ProtocolError):
  2374. self.call_mgmt_func(b'admin.pool.Set.revisions_to_keep',
  2375. b'dom0', b'test-pool', b'abc')
  2376. self.assertEqual(self.app.pools['test-pool'].mock_calls, [])
  2377. self.assertFalse(self.app.save.called)
  2378. def test_670_vm_volume_set_revisions_to_keep(self):
  2379. self.vm.volumes = unittest.mock.MagicMock()
  2380. volumes_conf = {
  2381. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  2382. }
  2383. self.vm.volumes.configure_mock(**volumes_conf)
  2384. self.vm.storage = unittest.mock.Mock()
  2385. value = self.call_mgmt_func(b'admin.vm.volume.Set.revisions_to_keep',
  2386. b'test-vm1', b'private', b'2')
  2387. self.assertIsNone(value)
  2388. self.assertEqual(self.vm.volumes.mock_calls,
  2389. [unittest.mock.call.keys(),
  2390. ('__getitem__', ('private',), {})])
  2391. self.assertEqual(self.vm.volumes['private'].revisions_to_keep, 2)
  2392. self.app.save.assert_called_once_with()
  2393. def test_671_vm_volume_set_revisions_to_keep_negative(self):
  2394. self.vm.volumes = unittest.mock.MagicMock()
  2395. volumes_conf = {
  2396. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  2397. }
  2398. self.vm.volumes.configure_mock(**volumes_conf)
  2399. self.vm.storage = unittest.mock.Mock()
  2400. with self.assertRaises(qubes.api.PermissionDenied):
  2401. self.call_mgmt_func(b'admin.vm.volume.Set.revisions_to_keep',
  2402. b'test-vm1', b'private', b'-2')
  2403. def test_672_vm_volume_set_revisions_to_keep_not_a_number(self):
  2404. self.vm.volumes = unittest.mock.MagicMock()
  2405. volumes_conf = {
  2406. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  2407. }
  2408. self.vm.volumes.configure_mock(**volumes_conf)
  2409. self.vm.storage = unittest.mock.Mock()
  2410. with self.assertRaises(qubes.api.ProtocolError):
  2411. self.call_mgmt_func(b'admin.vm.volume.Set.revisions_to_keep',
  2412. b'test-vm1', b'private', b'abc')
  2413. def test_680_vm_volume_set_rw(self):
  2414. self.vm.volumes = unittest.mock.MagicMock()
  2415. volumes_conf = {
  2416. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  2417. }
  2418. self.vm.volumes.configure_mock(**volumes_conf)
  2419. self.vm.storage = unittest.mock.Mock()
  2420. value = self.call_mgmt_func(b'admin.vm.volume.Set.rw',
  2421. b'test-vm1', b'private', b'True')
  2422. self.assertIsNone(value)
  2423. self.assertEqual(self.vm.volumes.mock_calls,
  2424. [unittest.mock.call.keys(),
  2425. ('__getitem__', ('private',), {})])
  2426. self.assertEqual(self.vm.volumes['private'].rw, True)
  2427. self.app.save.assert_called_once_with()
  2428. def test_681_vm_volume_set_rw_invalid(self):
  2429. self.vm.volumes = unittest.mock.MagicMock()
  2430. volumes_conf = {
  2431. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  2432. }
  2433. self.vm.volumes.configure_mock(**volumes_conf)
  2434. self.vm.storage = unittest.mock.Mock()
  2435. with self.assertRaises(qubes.api.ProtocolError):
  2436. self.call_mgmt_func(b'admin.vm.volume.Set.revisions_to_keep',
  2437. b'test-vm1', b'private', b'abc')
  2438. self.assertFalse(self.app.save.called)
  2439. def test_690_vm_console(self):
  2440. self.vm._libvirt_domain = unittest.mock.Mock()
  2441. xml_desc = (
  2442. '<domain type=\'xen\' id=\'42\'>\n'
  2443. '<name>test-vm1</name>\n'
  2444. '<devices>\n'
  2445. '<console type=\'pty\' tty=\'/dev/pts/42\'>\n'
  2446. '<source path=\'/dev/pts/42\'/>\n'
  2447. '<target type=\'xen\' port=\'0\'/>\n'
  2448. '</console>\n'
  2449. '</devices>\n'
  2450. '</domain>\n'
  2451. )
  2452. self.vm._libvirt_domain.configure_mock(
  2453. **{'XMLDesc.return_value': xml_desc,
  2454. 'isActive.return_value': True}
  2455. )
  2456. self.app.vmm.configure_mock(offline_mode=False)
  2457. value = self.call_mgmt_func(b'admin.vm.Console', b'test-vm1')
  2458. self.assertEqual(value, '/dev/pts/42')
  2459. def test_691_vm_console_not_running(self):
  2460. self.vm._libvirt_domain = unittest.mock.Mock()
  2461. xml_desc = (
  2462. '<domain type=\'xen\' id=\'42\'>\n'
  2463. '<name>test-vm1</name>\n'
  2464. '<devices>\n'
  2465. '<console type=\'pty\' tty=\'/dev/pts/42\'>\n'
  2466. '<source path=\'/dev/pts/42\'/>\n'
  2467. '<target type=\'xen\' port=\'0\'/>\n'
  2468. '</console>\n'
  2469. '</devices>\n'
  2470. '</domain>\n'
  2471. )
  2472. self.vm._libvirt_domain.configure_mock(
  2473. **{'XMLDesc.return_value': xml_desc,
  2474. 'isActive.return_value': False}
  2475. )
  2476. with self.assertRaises(qubes.exc.QubesVMNotRunningError):
  2477. self.call_mgmt_func(b'admin.vm.Console', b'test-vm1')
  2478. def test_700_pool_volume_list(self):
  2479. self.app.pools = {
  2480. 'pool1': unittest.mock.Mock(config={
  2481. 'param1': 'value1', 'param2': 'value2'},
  2482. usage=102400,
  2483. size=204800,
  2484. volumes={'vol1': unittest.mock.Mock(),
  2485. 'vol2': unittest.mock.Mock()})
  2486. }
  2487. value = self.call_mgmt_func(b'admin.pool.volume.List', b'dom0', b'pool1')
  2488. self.assertEqual(value, 'vol1\nvol2\n')
  2489. def test_710_vm_volume_clear(self):
  2490. with tempfile.TemporaryDirectory() as tmpdir:
  2491. tmpfile = os.path.join(tmpdir, 'testfile')
  2492. @asyncio.coroutine
  2493. def coroutine_mock(*args, **kwargs):
  2494. return tmpfile
  2495. self.vm.volumes = unittest.mock.MagicMock()
  2496. volumes_conf = {
  2497. 'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
  2498. '__getitem__.return_value.size': 0xdeadbeef
  2499. }
  2500. self.vm.volumes.configure_mock(**volumes_conf)
  2501. self.vm.storage = unittest.mock.Mock()
  2502. storage_conf = {
  2503. 'import_data.side_effect': coroutine_mock,
  2504. 'import_data_end.side_effect': self.dummy_coro
  2505. }
  2506. self.vm.storage.configure_mock(**storage_conf)
  2507. self.app.domains['test-vm1'].fire_event = self.emitter.fire_event
  2508. value = self.call_mgmt_func(b'admin.vm.volume.Clear',
  2509. b'test-vm1', b'private')
  2510. self.assertIsNone(value)
  2511. self.assertTrue(os.path.exists(tmpfile))
  2512. self.assertEqual(self.vm.volumes.mock_calls, [
  2513. unittest.mock.call.keys(),
  2514. unittest.mock.call.__getattr__('__getitem__')('private')])
  2515. self.assertEqual(self.vm.storage.mock_calls, [
  2516. unittest.mock.call.import_data('private', 0xdeadbeef),
  2517. unittest.mock.call.import_data_end('private', True)])
  2518. self.assertEventFired(
  2519. self.emitter, 'admin-permission:admin.vm.volume.Clear')
  2520. self.assertEventFired(
  2521. self.emitter, 'domain-volume-import-begin')
  2522. self.assertEventFired(
  2523. self.emitter, 'domain-volume-import-end')
  2524. def test_800_current_state_default(self):
  2525. value = self.call_mgmt_func(b'admin.vm.CurrentState', b'test-vm1')
  2526. self.assertEqual(
  2527. value, 'mem=0 mem_static_max=0 cputime=0 power_state=Halted')
  2528. def test_801_current_state_changed(self):
  2529. self.vm.get_mem = lambda: 512
  2530. self.vm.get_mem_static_max = lambda: 1024
  2531. self.vm.get_cputime = lambda: 100
  2532. self.vm.get_power_state = lambda: 'Running'
  2533. value = self.call_mgmt_func(b'admin.vm.CurrentState', b'test-vm1')
  2534. self.assertEqual(
  2535. value, 'mem=512 mem_static_max=1024 cputime=100 power_state=Running')
  2536. def test_990_vm_unexpected_payload(self):
  2537. methods_with_no_payload = [
  2538. b'admin.vm.List',
  2539. b'admin.vm.Remove',
  2540. b'admin.vm.property.List',
  2541. b'admin.vm.property.Get',
  2542. b'admin.vm.property.Help',
  2543. #b'admin.vm.property.HelpRst',
  2544. b'admin.vm.property.Reset',
  2545. b'admin.vm.feature.List',
  2546. b'admin.vm.feature.Get',
  2547. b'admin.vm.feature.CheckWithTemplate',
  2548. b'admin.vm.feature.Remove',
  2549. b'admin.vm.tag.List',
  2550. b'admin.vm.tag.Get',
  2551. b'admin.vm.tag.Remove',
  2552. b'admin.vm.tag.Set',
  2553. b'admin.vm.firewall.Get',
  2554. b'admin.vm.firewall.Reload',
  2555. b'admin.vm.device.pci.Detach',
  2556. b'admin.vm.device.pci.List',
  2557. b'admin.vm.device.pci.Available',
  2558. b'admin.vm.volume.ListSnapshots',
  2559. b'admin.vm.volume.List',
  2560. b'admin.vm.volume.Info',
  2561. b'admin.vm.Start',
  2562. b'admin.vm.Shutdown',
  2563. b'admin.vm.Pause',
  2564. b'admin.vm.Unpause',
  2565. b'admin.vm.Kill',
  2566. b'admin.vm.Console',
  2567. b'admin.Events',
  2568. b'admin.vm.feature.List',
  2569. b'admin.vm.feature.Get',
  2570. b'admin.vm.feature.Remove',
  2571. b'admin.vm.feature.CheckWithTemplate',
  2572. ]
  2573. # make sure also no methods on actual VM gets called
  2574. vm_mock = unittest.mock.MagicMock()
  2575. vm_mock.name = self.vm.name
  2576. vm_mock.qid = self.vm.qid
  2577. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  2578. self.app.domains._dict[self.vm.qid] = vm_mock
  2579. for method in methods_with_no_payload:
  2580. # should reject payload regardless of having argument or not
  2581. with self.subTest(method.decode('ascii')):
  2582. with self.assertRaises(qubes.api.ProtocolError):
  2583. self.call_mgmt_func(method, b'test-vm1', b'',
  2584. b'unexpected-payload')
  2585. self.assertFalse(vm_mock.called)
  2586. self.assertFalse(self.app.save.called)
  2587. with self.subTest(method.decode('ascii') + '+arg'):
  2588. with self.assertRaises(qubes.api.ProtocolError):
  2589. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  2590. b'unexpected-payload')
  2591. self.assertFalse(vm_mock.called)
  2592. self.assertFalse(self.app.save.called)
  2593. def test_991_vm_unexpected_argument(self):
  2594. methods_with_no_argument = [
  2595. b'admin.vm.List',
  2596. b'admin.vm.Remove',
  2597. b'admin.vm.property.List',
  2598. b'admin.vm.feature.List',
  2599. b'admin.vm.tag.List',
  2600. b'admin.vm.firewall.Get',
  2601. b'admin.vm.firewall.Set',
  2602. b'admin.vm.firewall.Reload',
  2603. b'admin.vm.volume.List',
  2604. b'admin.vm.Start',
  2605. b'admin.vm.Pause',
  2606. b'admin.vm.Unpause',
  2607. b'admin.vm.Kill',
  2608. b'admin.vm.Console',
  2609. b'admin.Events',
  2610. b'admin.vm.feature.List',
  2611. ]
  2612. # make sure also no methods on actual VM gets called
  2613. vm_mock = unittest.mock.MagicMock()
  2614. vm_mock.name = self.vm.name
  2615. vm_mock.qid = self.vm.qid
  2616. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  2617. self.app.domains._dict[self.vm.qid] = vm_mock
  2618. exceptions = (qubes.api.PermissionDenied, qubes.api.ProtocolError)
  2619. for method in methods_with_no_argument:
  2620. # should reject argument regardless of having payload or not
  2621. with self.subTest(method.decode('ascii')):
  2622. with self.assertRaises(qubes.api.PermissionDenied):
  2623. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  2624. b'')
  2625. self.assertFalse(vm_mock.called)
  2626. self.assertFalse(self.app.save.called)
  2627. with self.subTest(method.decode('ascii') + '+payload'):
  2628. with self.assertRaises(exceptions):
  2629. self.call_mgmt_func(method, b'test-vm1', b'unexpected-arg',
  2630. b'some-payload')
  2631. self.assertFalse(vm_mock.called)
  2632. self.assertFalse(self.app.save.called)
  2633. def test_992_dom0_unexpected_payload(self):
  2634. methods_with_no_payload = [
  2635. b'admin.deviceclass.List',
  2636. b'admin.vmclass.List',
  2637. b'admin.vm.List',
  2638. b'admin.pool.volume.List',
  2639. b'admin.label.List',
  2640. b'admin.label.Get',
  2641. b'admin.label.Remove',
  2642. b'admin.property.List',
  2643. b'admin.property.Get',
  2644. b'admin.property.Help',
  2645. #b'admin.property.HelpRst',
  2646. b'admin.property.Reset',
  2647. b'admin.pool.List',
  2648. b'admin.pool.ListDrivers',
  2649. b'admin.pool.Info',
  2650. b'admin.pool.Remove',
  2651. b'admin.backup.Execute',
  2652. b'admin.Events',
  2653. ]
  2654. # make sure also no methods on actual VM gets called
  2655. vm_mock = unittest.mock.MagicMock()
  2656. vm_mock.name = self.vm.name
  2657. vm_mock.qid = self.vm.qid
  2658. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  2659. self.app.domains._dict[self.vm.qid] = vm_mock
  2660. for method in methods_with_no_payload:
  2661. # should reject payload regardless of having argument or not
  2662. with self.subTest(method.decode('ascii')):
  2663. with self.assertRaises(qubes.api.ProtocolError):
  2664. self.call_mgmt_func(method, b'dom0', b'',
  2665. b'unexpected-payload')
  2666. self.assertFalse(vm_mock.called)
  2667. self.assertFalse(self.app.save.called)
  2668. with self.subTest(method.decode('ascii') + '+arg'):
  2669. with self.assertRaises(qubes.api.ProtocolError):
  2670. self.call_mgmt_func(method, b'dom0', b'some-arg',
  2671. b'unexpected-payload')
  2672. self.assertFalse(vm_mock.called)
  2673. self.assertFalse(self.app.save.called)
  2674. def test_993_dom0_unexpected_argument(self):
  2675. methods_with_no_argument = [
  2676. b'admin.deviceclass.List',
  2677. b'admin.vmclass.List',
  2678. b'admin.vm.List',
  2679. b'admin.label.List',
  2680. b'admin.property.List',
  2681. b'admin.pool.List',
  2682. b'admin.pool.ListDrivers',
  2683. b'admin.Events',
  2684. ]
  2685. # make sure also no methods on actual VM gets called
  2686. vm_mock = unittest.mock.MagicMock()
  2687. vm_mock.name = self.vm.name
  2688. vm_mock.qid = self.vm.qid
  2689. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  2690. self.app.domains._dict[self.vm.qid] = vm_mock
  2691. exceptions = (qubes.api.PermissionDenied, qubes.api.ProtocolError)
  2692. for method in methods_with_no_argument:
  2693. # should reject argument regardless of having payload or not
  2694. with self.subTest(method.decode('ascii')):
  2695. with self.assertRaises(qubes.api.PermissionDenied):
  2696. self.call_mgmt_func(method, b'dom0', b'some-arg',
  2697. b'')
  2698. self.assertFalse(vm_mock.called)
  2699. self.assertFalse(self.app.save.called)
  2700. with self.subTest(method.decode('ascii') + '+payload'):
  2701. with self.assertRaises(exceptions):
  2702. self.call_mgmt_func(method, b'dom0', b'unexpected-arg',
  2703. b'some-payload')
  2704. self.assertFalse(vm_mock.called)
  2705. self.assertFalse(self.app.save.called)
  2706. def test_994_dom0_only_calls(self):
  2707. # TODO set some better arguments, to make sure the call was rejected
  2708. # because of invalid destination, not invalid arguments
  2709. methods_for_dom0_only = [
  2710. b'admin.deviceclass.List',
  2711. b'admin.vmclass.List',
  2712. b'admin.vm.Create.AppVM',
  2713. b'admin.vm.CreateInPool.AppVM',
  2714. b'admin.label.List',
  2715. b'admin.label.Create',
  2716. b'admin.label.Get',
  2717. b'admin.label.Remove',
  2718. b'admin.pool.volume.List',
  2719. b'admin.property.List',
  2720. b'admin.property.Get',
  2721. b'admin.property.Set',
  2722. b'admin.property.Help',
  2723. #b'admin.property.HelpRst',
  2724. b'admin.property.Reset',
  2725. b'admin.pool.List',
  2726. b'admin.pool.ListDrivers',
  2727. b'admin.pool.Info',
  2728. b'admin.pool.Add',
  2729. b'admin.pool.Remove',
  2730. #b'admin.pool.volume.List',
  2731. #b'admin.pool.volume.Info',
  2732. #b'admin.pool.volume.ListSnapshots',
  2733. #b'admin.pool.volume.Snapshot',
  2734. #b'admin.pool.volume.Revert',
  2735. #b'admin.pool.volume.Resize',
  2736. b'admin.backup.Execute',
  2737. b'admin.backup.Info',
  2738. ]
  2739. # make sure also no methods on actual VM gets called
  2740. vm_mock = unittest.mock.MagicMock()
  2741. vm_mock.name = self.vm.name
  2742. vm_mock.qid = self.vm.qid
  2743. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  2744. self.app.domains._dict[self.vm.qid] = vm_mock
  2745. exceptions = (qubes.api.PermissionDenied, qubes.api.ProtocolError)
  2746. for method in methods_for_dom0_only:
  2747. # should reject call regardless of having payload or not
  2748. with self.subTest(method.decode('ascii')):
  2749. with self.assertRaises(exceptions):
  2750. self.call_mgmt_func(method, b'test-vm1', b'',
  2751. b'')
  2752. self.assertFalse(vm_mock.called)
  2753. self.assertFalse(self.app.save.called)
  2754. with self.subTest(method.decode('ascii') + '+arg'):
  2755. with self.assertRaises(exceptions):
  2756. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  2757. b'')
  2758. self.assertFalse(vm_mock.called)
  2759. self.assertFalse(self.app.save.called)
  2760. with self.subTest(method.decode('ascii') + '+payload'):
  2761. with self.assertRaises(exceptions):
  2762. self.call_mgmt_func(method, b'test-vm1', b'',
  2763. b'payload')
  2764. self.assertFalse(vm_mock.called)
  2765. self.assertFalse(self.app.save.called)
  2766. with self.subTest(method.decode('ascii') + '+arg+payload'):
  2767. with self.assertRaises(exceptions):
  2768. self.call_mgmt_func(method, b'test-vm1', b'some-arg',
  2769. b'some-payload')
  2770. self.assertFalse(vm_mock.called)
  2771. self.assertFalse(self.app.save.called)
  2772. @unittest.skip('undecided')
  2773. def test_995_vm_only_calls(self):
  2774. # XXX is it really a good idea to prevent those calls this early?
  2775. # TODO set some better arguments, to make sure the call was rejected
  2776. # because of invalid destination, not invalid arguments
  2777. methods_for_vm_only = [
  2778. b'admin.vm.Clone',
  2779. b'admin.vm.Remove',
  2780. b'admin.vm.property.List',
  2781. b'admin.vm.property.Get',
  2782. b'admin.vm.property.Set',
  2783. b'admin.vm.property.Help',
  2784. b'admin.vm.property.HelpRst',
  2785. b'admin.vm.property.Reset',
  2786. b'admin.vm.feature.List',
  2787. b'admin.vm.feature.Get',
  2788. b'admin.vm.feature.Set',
  2789. b'admin.vm.feature.CheckWithTemplate',
  2790. b'admin.vm.feature.Remove',
  2791. b'admin.vm.tag.List',
  2792. b'admin.vm.tag.Get',
  2793. b'admin.vm.tag.Remove',
  2794. b'admin.vm.tag.Set',
  2795. b'admin.vm.firewall.Get',
  2796. b'admin.vm.firewall.Set',
  2797. b'admin.vm.firewall.Reload',
  2798. b'admin.vm.device.pci.Attach',
  2799. b'admin.vm.device.pci.Detach',
  2800. b'admin.vm.device.pci.List',
  2801. b'admin.vm.device.pci.Available',
  2802. b'admin.vm.microphone.Attach',
  2803. b'admin.vm.microphone.Detach',
  2804. b'admin.vm.microphone.Status',
  2805. b'admin.vm.volume.ListSnapshots',
  2806. b'admin.vm.volume.List',
  2807. b'admin.vm.volume.Info',
  2808. b'admin.vm.volume.Revert',
  2809. b'admin.vm.volume.Resize',
  2810. b'admin.vm.volume.Clear',
  2811. b'admin.vm.Start',
  2812. b'admin.vm.Shutdown',
  2813. b'admin.vm.Pause',
  2814. b'admin.vm.Unpause',
  2815. b'admin.vm.Kill',
  2816. b'admin.vm.feature.List',
  2817. b'admin.vm.feature.Get',
  2818. b'admin.vm.feature.Set',
  2819. b'admin.vm.feature.Remove',
  2820. b'admin.vm.feature.CheckWithTemplate',
  2821. ]
  2822. # make sure also no methods on actual VM gets called
  2823. vm_mock = unittest.mock.MagicMock()
  2824. vm_mock.name = self.vm.name
  2825. vm_mock.qid = self.vm.qid
  2826. vm_mock.__lt__ = (lambda x, y: x.qid < y.qid)
  2827. self.app.domains._dict[self.vm.qid] = vm_mock
  2828. exceptions = (qubes.api.PermissionDenied, qubes.api.ProtocolError)
  2829. for method in methods_for_vm_only:
  2830. # should reject payload regardless of having argument or not
  2831. # should reject call regardless of having payload or not
  2832. with self.subTest(method.decode('ascii')):
  2833. with self.assertRaises(exceptions):
  2834. self.call_mgmt_func(method, b'dom0', b'',
  2835. b'')
  2836. self.assertFalse(vm_mock.called)
  2837. self.assertFalse(self.app.save.called)
  2838. with self.subTest(method.decode('ascii') + '+arg'):
  2839. with self.assertRaises(exceptions):
  2840. self.call_mgmt_func(method, b'dom0', b'some-arg',
  2841. b'')
  2842. self.assertFalse(vm_mock.called)
  2843. self.assertFalse(self.app.save.called)
  2844. with self.subTest(method.decode('ascii') + '+payload'):
  2845. with self.assertRaises(exceptions):
  2846. self.call_mgmt_func(method, b'dom0', b'',
  2847. b'payload')
  2848. self.assertFalse(vm_mock.called)
  2849. self.assertFalse(self.app.save.called)
  2850. with self.subTest(method.decode('ascii') + '+arg+payload'):
  2851. with self.assertRaises(exceptions):
  2852. self.call_mgmt_func(method, b'dom0', b'some-arg',
  2853. b'some-payload')
  2854. self.assertFalse(vm_mock.called)
  2855. self.assertFalse(self.app.save.called)