app.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import os
  21. import shutil
  22. import socket
  23. import subprocess
  24. import unittest
  25. import multiprocessing
  26. try:
  27. import unittest.mock as mock
  28. except ImportError:
  29. import mock
  30. import tempfile
  31. import qubesadmin.exc
  32. import qubesadmin.tests
  33. class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
  34. def test_000_list(self):
  35. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  36. b'0\x00test-vm class=AppVM state=Running\n'
  37. self.assertEqual(
  38. list(self.app.domains.keys()),
  39. ['test-vm'])
  40. self.assertAllCalled()
  41. def test_001_getitem(self):
  42. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  43. b'0\x00test-vm class=AppVM state=Running\n'
  44. try:
  45. vm = self.app.domains['test-vm']
  46. self.assertEqual(vm.name, 'test-vm')
  47. except KeyError:
  48. self.fail('VM not found in collection')
  49. self.assertAllCalled()
  50. with self.assertRaises(KeyError):
  51. vm = self.app.domains['test-non-existent']
  52. self.assertAllCalled()
  53. def test_002_in(self):
  54. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  55. b'0\x00test-vm class=AppVM state=Running\n'
  56. self.assertIn('test-vm', self.app.domains)
  57. self.assertAllCalled()
  58. self.assertNotIn('test-non-existent', self.app.domains)
  59. self.assertAllCalled()
  60. def test_003_iter(self):
  61. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  62. b'0\x00test-vm class=AppVM state=Running\n'
  63. self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
  64. self.assertAllCalled()
  65. def test_004_delitem(self):
  66. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  67. b'0\x00'
  68. del self.app.domains['test-vm']
  69. self.assertAllCalled()
  70. class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
  71. def test_010_new_simple(self):
  72. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', None,
  73. b'name=new-vm label=red')] = b'0\x00'
  74. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  75. b'0\x00new-vm class=AppVM state=Running\n'
  76. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red')
  77. self.assertEqual(vm.name, 'new-vm')
  78. self.assertEqual(vm.__class__.__name__, 'AppVM')
  79. self.assertAllCalled()
  80. def test_011_new_template(self):
  81. self.app.expected_calls[('dom0', 'admin.vm.Create.TemplateVM', None,
  82. b'name=new-template label=red')] = b'0\x00'
  83. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  84. b'0\x00new-template class=TemplateVM state=Running\n'
  85. vm = self.app.add_new_vm('TemplateVM', 'new-template', 'red')
  86. self.assertEqual(vm.name, 'new-template')
  87. self.assertEqual(vm.__class__.__name__, 'TemplateVM')
  88. self.assertAllCalled()
  89. def test_012_new_template_based(self):
  90. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  91. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  92. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  93. b'0\x00new-vm class=AppVM state=Running\n'
  94. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', 'some-template')
  95. self.assertEqual(vm.name, 'new-vm')
  96. self.assertEqual(vm.__class__.__name__, 'AppVM')
  97. self.assertAllCalled()
  98. def test_013_new_objects_params(self):
  99. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  100. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  101. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  102. b'0\x00red\nblue\n'
  103. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  104. b'0\x00new-vm class=AppVM state=Running\n' \
  105. b'some-template class=TemplateVM state=Running\n'
  106. vm = self.app.add_new_vm(self.app.get_vm_class('AppVM'), 'new-vm',
  107. self.app.get_label('red'), self.app.domains['some-template'])
  108. self.assertEqual(vm.name, 'new-vm')
  109. self.assertEqual(vm.__class__.__name__, 'AppVM')
  110. self.assertAllCalled()
  111. def test_014_new_pool(self):
  112. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  113. b'name=new-vm label=red pool=some-pool')] = b'0\x00'
  114. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  115. b'0\x00new-vm class=AppVM state=Running\n'
  116. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', pool='some-pool')
  117. self.assertEqual(vm.name, 'new-vm')
  118. self.assertEqual(vm.__class__.__name__, 'AppVM')
  119. self.assertAllCalled()
  120. def test_015_new_pools(self):
  121. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  122. b'name=new-vm label=red pool:private=some-pool '
  123. b'pool:volatile=other-pool')] = b'0\x00'
  124. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  125. b'0\x00new-vm class=AppVM state=Running\n'
  126. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  127. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  128. self.assertEqual(vm.name, 'new-vm')
  129. self.assertEqual(vm.__class__.__name__, 'AppVM')
  130. self.assertAllCalled()
  131. def test_016_new_template_based_default(self):
  132. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  133. None, b'name=new-vm label=red')] = b'0\x00'
  134. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  135. b'0\x00new-vm class=AppVM state=Running\n'
  136. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  137. template=qubesadmin.DEFAULT)
  138. self.assertEqual(vm.name, 'new-vm')
  139. self.assertEqual(vm.__class__.__name__, 'AppVM')
  140. self.assertAllCalled()
  141. def test_020_get_label(self):
  142. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  143. b'0\x00red\nblue\n'
  144. label = self.app.get_label('red')
  145. self.assertEqual(label.name, 'red')
  146. self.assertAllCalled()
  147. def clone_setup_common_calls(self, src, dst):
  148. # labels
  149. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  150. b'0\x00red\ngreen\nblue\n'
  151. # have each property type with default=no, each special-cased,
  152. # and some with default=yes
  153. properties = {
  154. 'label': 'default=False type=label red',
  155. 'template': 'default=False type=vm test-template',
  156. 'memory': 'default=False type=int 400',
  157. 'kernel': 'default=False type=str 4.9.31',
  158. 'netvm': 'default=False type=vm test-net',
  159. 'virt_mode': 'default=False type=str hvm',
  160. 'default_user': 'default=True type=str user',
  161. }
  162. self.app.expected_calls[
  163. (src, 'admin.vm.property.List', None, None)] = \
  164. b'0\0qid\nname\n' + \
  165. b'\n'.join(prop.encode() for prop in properties.keys()) + \
  166. b'\n'
  167. for prop, value in properties.items():
  168. self.app.expected_calls[
  169. (src, 'admin.vm.property.Get', prop, None)] = \
  170. b'0\0' + value.encode()
  171. # special cases handled by admin.vm.Create call
  172. if prop in ('label', 'template'):
  173. continue
  174. # default properties should not be set
  175. if 'default=True' in value:
  176. continue
  177. self.app.expected_calls[
  178. (dst, 'admin.vm.property.Set', prop,
  179. value.split()[-1].encode())] = b'0\0'
  180. # tags
  181. self.app.expected_calls[
  182. (src, 'admin.vm.tag.List', None, None)] = \
  183. b'0\0tag1\ntag2\n'
  184. self.app.expected_calls[
  185. (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
  186. self.app.expected_calls[
  187. (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0'
  188. # features
  189. self.app.expected_calls[
  190. (src, 'admin.vm.feature.List', None, None)] = \
  191. b'0\0feat1\nfeat2\n'
  192. self.app.expected_calls[
  193. (src, 'admin.vm.feature.Get', 'feat1', None)] = \
  194. b'0\0feat1-value with spaces'
  195. self.app.expected_calls[
  196. (src, 'admin.vm.feature.Get', 'feat2', None)] = \
  197. b'0\x001'
  198. self.app.expected_calls[
  199. (dst, 'admin.vm.feature.Set', 'feat1',
  200. b'feat1-value with spaces')] = b'0\0'
  201. self.app.expected_calls[
  202. (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0'
  203. # firewall
  204. rules = (
  205. b'action=drop dst4=192.168.0.0/24\n'
  206. b'action=accept\n'
  207. )
  208. self.app.expected_calls[
  209. (src, 'admin.vm.firewall.Get', None, None)] = \
  210. b'0\x00' + rules
  211. self.app.expected_calls[
  212. (dst, 'admin.vm.firewall.Set', None, rules)] = \
  213. b'0\x00'
  214. # storage
  215. self.app.expected_calls[
  216. (dst, 'admin.vm.volume.List', None, None)] = \
  217. b'0\x00root\nprivate\nvolatile\nkernel\n'
  218. self.app.expected_calls[
  219. (dst, 'admin.vm.volume.Info', 'root', None)] = \
  220. b'0\x00pool=lvm\n' \
  221. b'vid=vm-test-vm/root\n' \
  222. b'size=10737418240\n' \
  223. b'usage=2147483648\n' \
  224. b'rw=False\n' \
  225. b'internal=True\n' \
  226. b'source=vm-test-template/root\n' \
  227. b'save_on_stop=False\n' \
  228. b'snap_on_start=True\n'
  229. self.app.expected_calls[
  230. (dst, 'admin.vm.volume.Info', 'private', None)] = \
  231. b'0\x00pool=lvm\n' \
  232. b'vid=vm-test-vm/private\n' \
  233. b'size=2147483648\n' \
  234. b'usage=214748364\n' \
  235. b'rw=True\n' \
  236. b'internal=True\n' \
  237. b'save_on_stop=True\n' \
  238. b'snap_on_start=False\n'
  239. self.app.expected_calls[
  240. (dst, 'admin.vm.volume.Info', 'volatile', None)] = \
  241. b'0\x00pool=lvm\n' \
  242. b'vid=vm-test-vm/volatile\n' \
  243. b'size=10737418240\n' \
  244. b'usage=0\n' \
  245. b'rw=True\n' \
  246. b'internal=True\n' \
  247. b'source=None\n' \
  248. b'save_on_stop=False\n' \
  249. b'snap_on_start=False\n'
  250. self.app.expected_calls[
  251. (dst, 'admin.vm.volume.Info', 'kernel', None)] = \
  252. b'0\x00pool=linux-kernel\n' \
  253. b'vid=\n' \
  254. b'size=0\n' \
  255. b'usage=0\n' \
  256. b'rw=False\n' \
  257. b'internal=True\n' \
  258. b'source=None\n' \
  259. b'save_on_stop=False\n' \
  260. b'snap_on_start=False\n'
  261. self.app.expected_calls[
  262. (src, 'admin.vm.volume.List', None, None)] = \
  263. b'0\x00root\nprivate\nvolatile\nkernel\n'
  264. self.app.expected_calls[
  265. (src, 'admin.vm.volume.CloneFrom', 'private', None)] = \
  266. b'0\x00token-private'
  267. self.app.expected_calls[
  268. (dst, 'admin.vm.volume.CloneTo', 'private', b'token-private')] = \
  269. b'0\x00'
  270. def test_030_clone(self):
  271. self.clone_setup_common_calls('test-vm', 'new-name')
  272. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  273. b'0\x00new-name class=AppVM state=Halted\n' \
  274. b'test-vm class=AppVM state=Halted\n' \
  275. b'test-template class=TemplateVM state=Halted\n' \
  276. b'test-net class=AppVM state=Halted\n'
  277. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  278. 'test-template', b'name=new-name label=red')] = b'0\x00'
  279. new_vm = self.app.clone_vm('test-vm', 'new-name')
  280. self.assertEqual(new_vm.name, 'new-name')
  281. self.assertAllCalled()
  282. def test_031_clone_object(self):
  283. self.clone_setup_common_calls('test-vm', 'new-name')
  284. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  285. b'0\x00new-name class=AppVM state=Halted\n' \
  286. b'test-vm class=AppVM state=Halted\n' \
  287. b'test-template class=TemplateVM state=Halted\n' \
  288. b'test-net class=AppVM state=Halted\n'
  289. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  290. 'test-template', b'name=new-name label=red')] = b'0\x00'
  291. new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
  292. self.assertEqual(new_vm.name, 'new-name')
  293. self.assertAllCalled()
  294. def test_032_clone_pool(self):
  295. self.clone_setup_common_calls('test-vm', 'new-name')
  296. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  297. 'test-template',
  298. b'name=new-name label=red pool=some-pool')] = b'0\x00'
  299. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  300. b'0\x00new-name class=AppVM state=Halted\n' \
  301. b'test-vm class=AppVM state=Halted\n' \
  302. b'test-template class=TemplateVM state=Halted\n' \
  303. b'test-net class=AppVM state=Halted\n'
  304. new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
  305. self.assertEqual(new_vm.name, 'new-name')
  306. self.assertAllCalled()
  307. def test_033_clone_pools(self):
  308. self.clone_setup_common_calls('test-vm', 'new-name')
  309. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  310. 'test-template',
  311. b'name=new-name label=red pool:private=some-pool '
  312. b'pool:volatile=other-pool')] = b'0\x00'
  313. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  314. b'0\x00new-name class=AppVM state=Halted\n' \
  315. b'test-vm class=AppVM state=Halted\n' \
  316. b'test-template class=TemplateVM state=Halted\n' \
  317. b'test-net class=AppVM state=Halted\n'
  318. new_vm = self.app.clone_vm('test-vm', 'new-name',
  319. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  320. self.assertEqual(new_vm.name, 'new-name')
  321. self.assertAllCalled()
  322. def test_034_clone_class_change(self):
  323. self.clone_setup_common_calls('test-vm', 'new-name')
  324. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  325. b'0\x00new-name class=StandaloneVM state=Halted\n' \
  326. b'test-vm class=AppVM state=Halted\n' \
  327. b'test-template class=TemplateVM state=Halted\n' \
  328. b'test-net class=AppVM state=Halted\n'
  329. self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM',
  330. 'test-template', b'name=new-name label=red')] = b'0\x00'
  331. self.app.expected_calls[
  332. ('new-name', 'admin.vm.volume.Info', 'root', None)] = \
  333. b'0\x00pool=lvm\n' \
  334. b'vid=vm-new-name/root\n' \
  335. b'size=10737418240\n' \
  336. b'usage=2147483648\n' \
  337. b'rw=True\n' \
  338. b'internal=True\n' \
  339. b'source=None\n' \
  340. b'save_on_stop=True\n' \
  341. b'snap_on_start=False\n'
  342. self.app.expected_calls[
  343. ('test-vm', 'admin.vm.volume.CloneFrom', 'root', None)] = \
  344. b'0\x00token-root'
  345. self.app.expected_calls[
  346. ('new-name', 'admin.vm.volume.CloneTo', 'root', b'token-root')] = \
  347. b'0\x00'
  348. new_vm = self.app.clone_vm('test-vm', 'new-name',
  349. new_cls='StandaloneVM')
  350. self.assertEqual(new_vm.name, 'new-name')
  351. self.assertEqual(new_vm.__class__.__name__, 'StandaloneVM')
  352. self.assertAllCalled()
  353. def test_035_clone_fail(self):
  354. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  355. b'0\x00red\ngreen\nblue\n'
  356. self.app.expected_calls[
  357. ('test-vm', 'admin.vm.property.List', None, None)] = \
  358. b'0\0qid\nname\ntemplate\nlabel\nmemory\n'
  359. self.app.expected_calls[
  360. ('test-vm', 'admin.vm.property.Get', 'label', None)] = \
  361. b'0\0default=False type=label red'
  362. self.app.expected_calls[
  363. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  364. b'0\0default=False type=vm test-template'
  365. self.app.expected_calls[
  366. ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \
  367. b'0\0default=False type=int 400'
  368. self.app.expected_calls[
  369. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  370. b'2\0QubesException\0\0something happened\0'
  371. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  372. b'0\x00new-name class=AppVM state=Halted\n' \
  373. b'test-vm class=AppVM state=Halted\n' \
  374. b'test-template class=TemplateVM state=Halted\n' \
  375. b'test-net class=AppVM state=Halted\n'
  376. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  377. 'test-template', b'name=new-name label=red')] = b'0\x00'
  378. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  379. b'0\x00'
  380. with self.assertRaises(qubesadmin.exc.QubesException):
  381. self.app.clone_vm('test-vm', 'new-name')
  382. self.assertAllCalled()
  383. def test_036_clone_ignore_errors_prop(self):
  384. self.clone_setup_common_calls('test-vm', 'new-name')
  385. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  386. b'0\x00new-name class=AppVM state=Halted\n' \
  387. b'test-vm class=AppVM state=Halted\n' \
  388. b'test-template class=TemplateVM state=Halted\n' \
  389. b'test-net class=AppVM state=Halted\n'
  390. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  391. 'test-template', b'name=new-name label=red')] = b'0\x00'
  392. self.app.expected_calls[
  393. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  394. b'2\0QubesException\0\0something happened\0'
  395. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  396. self.assertEqual(new_vm.name, 'new-name')
  397. self.assertAllCalled()
  398. def test_037_clone_ignore_errors_feature(self):
  399. self.clone_setup_common_calls('test-vm', 'new-name')
  400. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  401. b'0\x00new-name class=AppVM state=Halted\n' \
  402. b'test-vm class=AppVM state=Halted\n' \
  403. b'test-template class=TemplateVM state=Halted\n' \
  404. b'test-net class=AppVM state=Halted\n'
  405. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  406. 'test-template', b'name=new-name label=red')] = b'0\x00'
  407. self.app.expected_calls[
  408. ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \
  409. b'2\0QubesException\0\0something happened\0'
  410. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  411. self.assertEqual(new_vm.name, 'new-name')
  412. self.assertAllCalled()
  413. def test_038_clone_ignore_errors_tag(self):
  414. self.clone_setup_common_calls('test-vm', 'new-name')
  415. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  416. b'0\x00new-name class=AppVM state=Halted\n' \
  417. b'test-vm class=AppVM state=Halted\n' \
  418. b'test-template class=TemplateVM state=Halted\n' \
  419. b'test-net class=AppVM state=Halted\n'
  420. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  421. 'test-template', b'name=new-name label=red')] = b'0\x00'
  422. self.app.expected_calls[
  423. ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \
  424. b'2\0QubesException\0\0something happened\0'
  425. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  426. self.assertEqual(new_vm.name, 'new-name')
  427. self.assertAllCalled()
  428. def test_039_clone_ignore_errors_firewall(self):
  429. self.clone_setup_common_calls('test-vm', 'new-name')
  430. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  431. b'0\x00new-name class=AppVM state=Halted\n' \
  432. b'test-vm class=AppVM state=Halted\n' \
  433. b'test-template class=TemplateVM state=Halted\n' \
  434. b'test-net class=AppVM state=Halted\n'
  435. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  436. 'test-template', b'name=new-name label=red')] = b'0\x00'
  437. self.app.expected_calls[
  438. ('new-name', 'admin.vm.firewall.Set', None,
  439. b'action=drop dst4=192.168.0.0/24\naction=accept\n')] = \
  440. b'2\0QubesException\0\0something happened\0'
  441. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  442. self.assertEqual(new_vm.name, 'new-name')
  443. self.assertAllCalled()
  444. def test_040_clone_ignore_errors_storage(self):
  445. self.clone_setup_common_calls('test-vm', 'new-name')
  446. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  447. b'0\x00new-name class=AppVM state=Halted\n' \
  448. b'test-vm class=AppVM state=Halted\n' \
  449. b'test-template class=TemplateVM state=Halted\n' \
  450. b'test-net class=AppVM state=Halted\n'
  451. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  452. 'test-template', b'name=new-name label=red')] = b'0\x00'
  453. self.app.expected_calls[
  454. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  455. b'token-private')] = \
  456. b'2\0QubesException\0\0something happened\0'
  457. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  458. b'0\x00'
  459. del self.app.expected_calls[
  460. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  461. del self.app.expected_calls[
  462. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  463. with self.assertRaises(qubesadmin.exc.QubesException):
  464. self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  465. self.assertAllCalled()
  466. def test_041_clone_fail_storage(self):
  467. self.clone_setup_common_calls('test-vm', 'new-name')
  468. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  469. b'0\x00new-name class=AppVM state=Halted\n' \
  470. b'test-vm class=AppVM state=Halted\n' \
  471. b'test-template class=TemplateVM state=Halted\n' \
  472. b'test-net class=AppVM state=Halted\n'
  473. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  474. 'test-template', b'name=new-name label=red')] = b'0\x00'
  475. self.app.expected_calls[
  476. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  477. b'token-private')] = \
  478. b'2\0QubesException\0\0something happened\0'
  479. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  480. b'0\x00'
  481. del self.app.expected_calls[
  482. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  483. del self.app.expected_calls[
  484. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  485. with self.assertRaises(qubesadmin.exc.QubesException):
  486. self.app.clone_vm('test-vm', 'new-name')
  487. self.assertAllCalled()
  488. class TC_20_QubesLocal(unittest.TestCase):
  489. def setUp(self):
  490. super(TC_20_QubesLocal, self).setUp()
  491. self.socket_dir = tempfile.mkdtemp()
  492. self.orig_sock = qubesadmin.config.QUBESD_SOCKET
  493. qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
  494. self.proc = None
  495. self.app = qubesadmin.app.QubesLocal()
  496. def listen_and_send(self, send_data):
  497. '''Listen on socket and send data in response.
  498. :param bytes send_data: data to send
  499. '''
  500. self.socket_pipe, child_pipe = multiprocessing.Pipe()
  501. self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  502. self.socket.bind(os.path.join(self.socket_dir, 'sock'))
  503. self.socket.listen(1)
  504. def worker(sock, pipe, send_data_):
  505. conn, addr = sock.accept()
  506. pipe.send(conn.makefile('rb').read())
  507. conn.sendall(send_data_)
  508. conn.close()
  509. self.proc = multiprocessing.Process(target=worker,
  510. args=(self.socket, child_pipe, send_data))
  511. self.proc.start()
  512. self.socket.close()
  513. def get_request(self):
  514. '''Get request sent to "qubesd" mock'''
  515. return self.socket_pipe.recv()
  516. def tearDown(self):
  517. qubesadmin.config.QUBESD_SOCKET = self.orig_sock
  518. if self.proc is not None:
  519. try:
  520. self.proc.terminate()
  521. except OSError:
  522. pass
  523. shutil.rmtree(self.socket_dir)
  524. super(TC_20_QubesLocal, self).tearDown()
  525. def test_000_qubesd_call(self):
  526. self.listen_and_send(b'0\0')
  527. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  528. self.assertEqual(self.get_request(),
  529. b'dom0\0some.method\0test-vm\0arg1\0payload')
  530. def test_001_qubesd_call_none_arg(self):
  531. self.listen_and_send(b'0\0')
  532. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  533. self.assertEqual(self.get_request(),
  534. b'dom0\0some.method\0test-vm\0\0payload')
  535. def test_002_qubesd_call_none_payload(self):
  536. self.listen_and_send(b'0\0')
  537. self.app.qubesd_call('test-vm', 'some.method', None, None)
  538. self.assertEqual(self.get_request(),
  539. b'dom0\0some.method\0test-vm\0\0')
  540. def test_003_qubesd_call_payload_stream(self):
  541. # this should really be in setUp()...
  542. tmpdir = tempfile.mkdtemp()
  543. self.addCleanup(shutil.rmtree, tmpdir)
  544. service_path = os.path.join(tmpdir, 'test.service')
  545. payload_input = os.path.join(tmpdir, 'payload-input')
  546. with open(service_path, 'w') as f:
  547. f.write('#!/bin/bash\n'
  548. 'env > {dir}/env\n'
  549. 'echo "$@" > {dir}/args\n'
  550. 'cat > {dir}/payload\n'
  551. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  552. os.chmod(service_path, 0o755)
  553. with open(payload_input, 'w+') as payload_file:
  554. payload_file.write('some payload\n')
  555. payload_file.seek(0)
  556. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  557. tmpdir):
  558. value = self.app.qubesd_call('test-vm', 'test.service',
  559. 'some-arg', payload_stream=payload_file)
  560. self.assertEqual(value, b'return-value')
  561. self.assertTrue(os.path.exists(tmpdir + '/env'))
  562. with open(tmpdir + '/env') as env:
  563. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  564. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  565. self.assertTrue(os.path.exists(tmpdir + '/args'))
  566. with open(tmpdir + '/args') as args:
  567. self.assertEqual(args.read(), 'some-arg\n')
  568. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  569. with open(tmpdir + '/payload') as payload:
  570. self.assertEqual(payload.read(), 'some payload\n')
  571. def test_004_qubesd_call_payload_stream_proc(self):
  572. # this should really be in setUp()...
  573. tmpdir = tempfile.mkdtemp()
  574. self.addCleanup(shutil.rmtree, tmpdir)
  575. service_path = os.path.join(tmpdir, 'test.service')
  576. echo = subprocess.Popen(['echo', 'some payload'],
  577. stdout=subprocess.PIPE)
  578. with open(service_path, 'w') as f:
  579. f.write('#!/bin/bash\n'
  580. 'env > {dir}/env\n'
  581. 'echo "$@" > {dir}/args\n'
  582. 'cat > {dir}/payload\n'
  583. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  584. os.chmod(service_path, 0o755)
  585. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  586. tmpdir):
  587. value = self.app.qubesd_call('test-vm', 'test.service',
  588. 'some-arg', payload_stream=echo.stdout)
  589. echo.stdout.close()
  590. self.assertEqual(value, b'return-value')
  591. self.assertTrue(os.path.exists(tmpdir + '/env'))
  592. with open(tmpdir + '/env') as env:
  593. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  594. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  595. self.assertTrue(os.path.exists(tmpdir + '/args'))
  596. with open(tmpdir + '/args') as args:
  597. self.assertEqual(args.read(), 'some-arg\n')
  598. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  599. with open(tmpdir + '/payload') as payload:
  600. self.assertEqual(payload.read(), 'some payload\n')
  601. def test_010_run_service(self):
  602. self.listen_and_send(b'0\0')
  603. with mock.patch('subprocess.Popen') as mock_proc:
  604. p = self.app.run_service('some-vm', 'service.name')
  605. mock_proc.assert_called_once_with([
  606. qubesadmin.config.QREXEC_CLIENT,
  607. '-d', 'some-vm', 'DEFAULT:QUBESRPC service.name dom0'],
  608. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  609. stderr=subprocess.PIPE)
  610. self.assertEqual(self.get_request(),
  611. b'dom0\0admin.vm.Start\0some-vm\0\0')
  612. def test_011_run_service_filter_esc(self):
  613. self.listen_and_send(b'0\0')
  614. with mock.patch('subprocess.Popen') as mock_proc:
  615. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  616. mock_proc.assert_called_once_with([
  617. qubesadmin.config.QREXEC_CLIENT,
  618. '-d', 'some-vm', '-t', '-T',
  619. 'DEFAULT:QUBESRPC service.name dom0'],
  620. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  621. stderr=subprocess.PIPE)
  622. self.assertEqual(self.get_request(),
  623. b'dom0\0admin.vm.Start\0some-vm\0\0')
  624. def test_012_run_service_user(self):
  625. self.listen_and_send(b'0\0')
  626. with mock.patch('subprocess.Popen') as mock_proc:
  627. p = self.app.run_service('some-vm', 'service.name', user='user')
  628. mock_proc.assert_called_once_with([
  629. qubesadmin.config.QREXEC_CLIENT,
  630. '-d', 'some-vm',
  631. 'user:QUBESRPC service.name dom0'],
  632. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  633. stderr=subprocess.PIPE)
  634. self.assertEqual(self.get_request(),
  635. b'dom0\0admin.vm.Start\0some-vm\0\0')
  636. def test_013_run_service_default_target(self):
  637. with self.assertRaises(ValueError):
  638. self.app.run_service('', 'service.name')
  639. class TC_30_QubesRemote(unittest.TestCase):
  640. def setUp(self):
  641. super(TC_30_QubesRemote, self).setUp()
  642. self.proc_mock = mock.Mock()
  643. self.proc_mock.configure_mock(**{
  644. 'return_value.returncode': 0
  645. })
  646. self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
  647. self.proc_patch.start()
  648. self.app = qubesadmin.app.QubesRemote()
  649. def set_proc_stdout(self, send_data):
  650. self.proc_mock.configure_mock(**{
  651. 'return_value.communicate.return_value': (send_data, None)
  652. })
  653. def tearDown(self):
  654. self.proc_patch.stop()
  655. super(TC_30_QubesRemote, self).tearDown()
  656. def test_000_qubesd_call(self):
  657. self.set_proc_stdout(b'0\0')
  658. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  659. self.assertEqual(self.proc_mock.mock_calls, [
  660. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  661. 'some.method+arg1'],
  662. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  663. stderr=subprocess.PIPE),
  664. mock.call().communicate(b'payload')
  665. ])
  666. def test_001_qubesd_call_none_arg(self):
  667. self.set_proc_stdout(b'0\0')
  668. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  669. self.assertEqual(self.proc_mock.mock_calls, [
  670. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  671. 'some.method'],
  672. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  673. stderr=subprocess.PIPE),
  674. mock.call().communicate(b'payload')
  675. ])
  676. def test_002_qubesd_call_none_payload(self):
  677. self.set_proc_stdout(b'0\0')
  678. self.app.qubesd_call('test-vm', 'some.method', None, None)
  679. self.assertEqual(self.proc_mock.mock_calls, [
  680. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  681. 'some.method'],
  682. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  683. stderr=subprocess.PIPE),
  684. mock.call().communicate(None)
  685. ])
  686. def test_003_qubesd_call_payload_stream(self):
  687. self.set_proc_stdout(b'0\0return-value')
  688. tmpdir = tempfile.mkdtemp()
  689. self.addCleanup(shutil.rmtree, tmpdir)
  690. payload_input = os.path.join(tmpdir, 'payload-input')
  691. with open(payload_input, 'w+') as payload_file:
  692. payload_file.write('some payload\n')
  693. payload_file.seek(0)
  694. value = self.app.qubesd_call('test-vm', 'some.method',
  695. 'some-arg', payload_stream=payload_file)
  696. self.assertEqual(self.proc_mock.mock_calls, [
  697. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  698. 'some.method+some-arg'],
  699. stdin=payload_file, stdout=subprocess.PIPE,
  700. stderr=subprocess.PIPE),
  701. mock.call().communicate(None)
  702. ])
  703. self.assertEqual(value, b'return-value')
  704. def test_010_run_service(self):
  705. self.app.run_service('some-vm', 'service.name')
  706. self.proc_mock.assert_called_once_with([
  707. qubesadmin.config.QREXEC_CLIENT_VM,
  708. 'some-vm', 'service.name'],
  709. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  710. stderr=subprocess.PIPE)
  711. def test_011_run_service_filter_esc(self):
  712. with self.assertRaises(NotImplementedError):
  713. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  714. def test_012_run_service_user(self):
  715. with self.assertRaises(ValueError):
  716. p = self.app.run_service('some-vm', 'service.name', user='user')
  717. def test_013_run_service_default_target(self):
  718. self.app.run_service('', 'service.name')
  719. self.proc_mock.assert_called_once_with([
  720. qubesadmin.config.QREXEC_CLIENT_VM,
  721. '', 'service.name'],
  722. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  723. stderr=subprocess.PIPE)