dom0_update.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #
  2. # The Qubes OS Project, http://www.qubes-os.org
  3. #
  4. # Copyright (C) 2015 Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  5. #
  6. # This library is free software; you can redistribute it and/or
  7. # modify it under the terms of the GNU Lesser General Public
  8. # License as published by the Free Software Foundation; either
  9. # version 2.1 of the License, or (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. # Lesser General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU Lesser General Public
  17. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  18. # USA.
  19. #
  20. import asyncio
  21. import os
  22. import shutil
  23. import subprocess
  24. import sys
  25. import tempfile
  26. import unittest
  27. import qubes
  28. import qubes.tests
  29. VM_PREFIX = "test-"
  30. @unittest.skipUnless(os.path.exists('/usr/bin/rpmsign') and
  31. os.path.exists('/usr/bin/rpmbuild'),
  32. 'rpm-sign and/or rpm-build not installed')
  33. class TC_00_Dom0UpgradeMixin(object):
  34. """
  35. Tests for downloading dom0 updates using VMs based on different templates
  36. """
  37. pkg_name = 'qubes-test-pkg'
  38. dom0_update_common_opts = ['--disablerepo=*', '--enablerepo=test']
  39. update_flag_path = '/var/lib/qubes/updates/dom0-updates-available'
  40. @classmethod
  41. def generate_key(cls, keydir):
  42. gpg_opts = ['gpg', '--quiet', '--no-default-keyring',
  43. '--homedir', keydir]
  44. p = subprocess.Popen(gpg_opts + ['--gen-key', '--batch'],
  45. stdin=subprocess.PIPE,
  46. stderr=open(os.devnull, 'w'))
  47. p.stdin.write('''
  48. Key-Type: RSA
  49. Key-Length: 1024
  50. Key-Usage: sign
  51. Name-Real: Qubes test
  52. Expire-Date: 0
  53. %commit
  54. '''.format(keydir=keydir).encode())
  55. p.stdin.close()
  56. p.wait()
  57. subprocess.check_call(gpg_opts + ['-a', '--export',
  58. '--output', os.path.join(keydir, 'pubkey.asc')])
  59. p = subprocess.Popen(gpg_opts + ['--with-colons', '--list-keys'],
  60. stdout=subprocess.PIPE)
  61. for line in p.stdout.readlines():
  62. fields = line.decode().split(':')
  63. if fields[0] == 'pub':
  64. return fields[4][-8:].lower()
  65. raise RuntimeError
  66. @classmethod
  67. def setUpClass(cls):
  68. super(TC_00_Dom0UpgradeMixin, cls).setUpClass()
  69. cls.tmpdir = tempfile.mkdtemp()
  70. cls.keyid = cls.generate_key(cls.tmpdir)
  71. with open('/etc/yum.repos.d/test.repo', 'w') as repo_file:
  72. repo_file.write('''
  73. [test]
  74. name = Test
  75. baseurl = http://localhost:8080/
  76. enabled = 1
  77. ''')
  78. @classmethod
  79. def tearDownClass(cls):
  80. os.unlink('/etc/yum.repos.d/test.repo')
  81. shutil.rmtree(cls.tmpdir)
  82. def setUp(self):
  83. super(TC_00_Dom0UpgradeMixin, self).setUp()
  84. if self.template.startswith('whonix-'):
  85. # Whonix redirect all the traffic through tor, so repository
  86. # on http://localhost:8080/ is unavailable
  87. self.skipTest("Test not supported for this template")
  88. self.init_default_template(self.template)
  89. self.updatevm = self.app.add_new_vm(
  90. qubes.vm.appvm.AppVM,
  91. name=self.make_vm_name("updatevm"),
  92. label='red'
  93. )
  94. self.loop.run_until_complete(self.updatevm.create_on_disk())
  95. self.app.updatevm = self.updatevm
  96. self.app.save()
  97. subprocess.call(['rpm', '-e', self.pkg_name],
  98. stderr=subprocess.DEVNULL)
  99. subprocess.check_call(['rpm', '--import',
  100. os.path.join(self.tmpdir, 'pubkey.asc')])
  101. self.loop.run_until_complete(self.updatevm.start())
  102. self.repo_running = False
  103. self.repo_proc = None
  104. def tearDown(self):
  105. if self.repo_proc:
  106. self.repo_proc.terminate()
  107. self.loop.run_until_complete(self.repo_proc.wait())
  108. del self.repo_proc
  109. self.app.updatevm = None
  110. super(TC_00_Dom0UpgradeMixin, self).tearDown()
  111. subprocess.call(['rpm', '-e', self.pkg_name],
  112. stderr=subprocess.DEVNULL)
  113. subprocess.call(['rpm', '-e', 'gpg-pubkey-{}'.format(
  114. self.keyid)], stderr=subprocess.DEVNULL)
  115. for pkg in os.listdir(self.tmpdir):
  116. if pkg.endswith('.rpm'):
  117. os.unlink(pkg)
  118. def create_pkg(self, dir, name, version):
  119. spec_path = os.path.join(dir, name+'.spec')
  120. spec = open(spec_path, 'w')
  121. spec.write(
  122. '''
  123. Name: {name}
  124. Summary: Test Package
  125. Version: {version}
  126. Release: 1
  127. Vendor: Invisible Things Lab
  128. License: GPL
  129. Group: Qubes
  130. URL: http://www.qubes-os.org
  131. %description
  132. Test package
  133. %install
  134. %files
  135. '''.format(name=name, version=version)
  136. )
  137. spec.close()
  138. subprocess.check_call(
  139. ['rpmbuild', '--quiet', '-bb', '--define', '_rpmdir {}'.format(dir),
  140. spec_path])
  141. pkg_path = os.path.join(dir, 'x86_64',
  142. '{}-{}-1.x86_64.rpm'.format(name, version))
  143. subprocess.check_call(['chmod', 'go-rw', '/dev/tty'])
  144. subprocess.check_call(
  145. ['rpm', '--quiet', '--define=_gpg_path {}'.format(dir),
  146. '--define=_gpg_name {}'.format("Qubes test"),
  147. '--addsign', pkg_path],
  148. stdin=subprocess.DEVNULL,
  149. stdout=subprocess.DEVNULL,
  150. stderr=subprocess.STDOUT)
  151. subprocess.check_call(['chmod', 'go+rw', '/dev/tty'])
  152. return pkg_path
  153. def send_pkg(self, filename):
  154. with open(filename, 'rb') as f_pkg:
  155. self.loop.run_until_complete(self.updatevm.run_for_stdio(
  156. 'mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
  157. os.path.basename(filename)),
  158. input=f_pkg.read()))
  159. try:
  160. self.loop.run_until_complete(
  161. self.updatevm.run_for_stdio('cd /tmp/repo; createrepo .'))
  162. except subprocess.CalledProcessError as e:
  163. if e.returncode == 127:
  164. self.skipTest('createrepo not installed in template {}'.format(
  165. self.template))
  166. else:
  167. self.skipTest('createrepo failed with code {}, '
  168. 'cannot perform the test'.format(e.returncode))
  169. self.start_repo()
  170. def start_repo(self):
  171. if self.repo_running:
  172. return
  173. self.repo_proc = self.loop.run_until_complete(self.updatevm.run(
  174. 'cd /tmp/repo && python -m SimpleHTTPServer 8080',
  175. stdout=subprocess.DEVNULL,
  176. stderr=subprocess.STDOUT))
  177. self.repo_running = True
  178. def test_000_update(self):
  179. """Dom0 update tests
  180. Check if package update is:
  181. - detected
  182. - installed
  183. - "updates pending" flag is cleared
  184. """
  185. filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
  186. subprocess.check_call(['rpm', '-i', filename])
  187. filename = self.create_pkg(self.tmpdir, self.pkg_name, '2.0')
  188. self.send_pkg(filename)
  189. open(self.update_flag_path, 'a').close()
  190. logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
  191. with open(logpath, 'w') as f_log:
  192. proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  193. 'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
  194. stdout=f_log,
  195. stderr=subprocess.STDOUT))
  196. self.loop.run_until_complete(proc.wait())
  197. if proc.returncode:
  198. del proc
  199. with open(logpath) as f_log:
  200. self.fail("qubes-dom0-update failed: " + f_log.read())
  201. del proc
  202. retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
  203. self.pkg_name)], stdout=subprocess.DEVNULL)
  204. self.assertEqual(retcode, 1, 'Package {}-1.0 still installed after '
  205. 'update'.format(self.pkg_name))
  206. retcode = subprocess.call(['rpm', '-q', '{}-2.0'.format(
  207. self.pkg_name)], stdout=subprocess.DEVNULL)
  208. self.assertEqual(retcode, 0, 'Package {}-2.0 not installed after '
  209. 'update'.format(self.pkg_name))
  210. self.assertFalse(os.path.exists(self.update_flag_path),
  211. "'updates pending' flag not cleared")
  212. def test_005_update_flag_clear(self):
  213. """Check if 'updates pending' flag is creared"""
  214. # create any pkg (but not install it) to initialize repo in the VM
  215. filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
  216. self.send_pkg(filename)
  217. open(self.update_flag_path, 'a').close()
  218. logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
  219. with open(logpath, 'w') as f_log:
  220. proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  221. 'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
  222. stdout=f_log,
  223. stderr=subprocess.STDOUT))
  224. self.loop.run_until_complete(proc.wait())
  225. if proc.returncode:
  226. del proc
  227. with open(logpath) as f_log:
  228. self.fail("qubes-dom0-update failed: " + f_log.read())
  229. del proc
  230. with open(logpath) as f:
  231. dom0_update_output = f.read()
  232. self.assertFalse('Errno' in dom0_update_output or
  233. 'Couldn\'t' in dom0_update_output,
  234. "qubes-dom0-update reported an error: {}".
  235. format(dom0_update_output))
  236. self.assertFalse(os.path.exists(self.update_flag_path),
  237. "'updates pending' flag not cleared")
  238. def test_006_update_flag_clear(self):
  239. """Check if 'updates pending' flag is creared, using --clean"""
  240. # create any pkg (but not install it) to initialize repo in the VM
  241. filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
  242. self.send_pkg(filename)
  243. open(self.update_flag_path, 'a').close()
  244. # remove also repodata to test #1685
  245. if os.path.exists('/var/lib/qubes/updates/repodata'):
  246. shutil.rmtree('/var/lib/qubes/updates/repodata')
  247. logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
  248. with open(logpath, 'w') as f_log:
  249. proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  250. 'qubes-dom0-update', '-y', '--clean',
  251. *self.dom0_update_common_opts,
  252. stdout=f_log,
  253. stderr=subprocess.STDOUT))
  254. self.loop.run_until_complete(proc.wait())
  255. if proc.returncode:
  256. del proc
  257. with open(logpath) as f_log:
  258. self.fail("qubes-dom0-update failed: " + f_log.read())
  259. del proc
  260. with open(logpath) as f:
  261. dom0_update_output = f.read()
  262. self.assertFalse('Errno' in dom0_update_output or
  263. 'Couldn\'t' in dom0_update_output,
  264. "qubes-dom0-update reported an error: {}".
  265. format(dom0_update_output))
  266. self.assertFalse(os.path.exists(self.update_flag_path),
  267. "'updates pending' flag not cleared")
  268. def test_010_instal(self):
  269. filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
  270. self.send_pkg(filename)
  271. logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
  272. with open(logpath, 'w') as f_log:
  273. proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  274. 'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
  275. self.pkg_name,
  276. stdout=f_log,
  277. stderr=subprocess.STDOUT))
  278. self.loop.run_until_complete(proc.wait())
  279. if proc.returncode:
  280. del proc
  281. with open(logpath) as f_log:
  282. self.fail("qubes-dom0-update failed: " + f_log.read())
  283. del proc
  284. retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
  285. self.pkg_name)], stdout=open('/dev/null', 'w'))
  286. self.assertEqual(retcode, 0, 'Package {}-1.0 not installed'.format(
  287. self.pkg_name))
  288. def test_020_install_wrong_sign(self):
  289. subprocess.call(['rpm', '-e', 'gpg-pubkey-{}'.format(
  290. self.keyid)])
  291. filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
  292. self.send_pkg(filename)
  293. logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
  294. with open(logpath, 'w') as f_log:
  295. proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  296. 'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
  297. self.pkg_name,
  298. stdout=f_log,
  299. stderr=subprocess.STDOUT))
  300. self.loop.run_until_complete(proc.wait())
  301. if not proc.returncode:
  302. del proc
  303. with open(logpath) as f_log:
  304. self.fail("qubes-dom0-update unexpectedly succeeded: " +
  305. f_log.read())
  306. del proc
  307. retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
  308. self.pkg_name)], stdout=subprocess.DEVNULL)
  309. self.assertEqual(retcode, 1,
  310. 'Package {}-1.0 installed although '
  311. 'signature is invalid'.format(self.pkg_name))
  312. def test_030_install_unsigned(self):
  313. filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
  314. subprocess.check_call(['rpm', '--delsign', filename],
  315. stdout=subprocess.DEVNULL,
  316. stderr=subprocess.STDOUT)
  317. self.send_pkg(filename)
  318. logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
  319. with open(logpath, 'w') as f_log:
  320. proc = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  321. 'qubes-dom0-update', '-y', *self.dom0_update_common_opts,
  322. self.pkg_name,
  323. stdout=f_log,
  324. stderr=subprocess.STDOUT))
  325. self.loop.run_until_complete(proc.wait())
  326. if not proc.returncode:
  327. del proc
  328. with open(logpath) as f_log:
  329. self.fail("qubes-dom0-update unexpectedly succeeded: " +
  330. f_log.read())
  331. del proc
  332. retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
  333. self.pkg_name)], stdout=subprocess.DEVNULL)
  334. self.assertEqual(retcode, 1,
  335. 'UNSIGNED package {}-1.0 installed'.format(self.pkg_name))
  336. def create_testcases_for_templates():
  337. return qubes.tests.create_testcases_for_templates('TC_00_Dom0Upgrade',
  338. TC_00_Dom0UpgradeMixin, qubes.tests.SystemTestCase,
  339. module=sys.modules[__name__])
  340. def load_tests(loader, tests, pattern):
  341. tests.addTests(loader.loadTestsFromNames(
  342. create_testcases_for_templates()))
  343. return tests
  344. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)