core-admin/tests/dom0_update.py
Marek Marczykowski-Górecki e87da9ec9d
tests: adjust dom0_update tests for dnf in VM
There is no support for 'copy_local' repository option, so setup test
repository over http.

Related to QubesOS/qubes-issues#1574
2016-06-02 02:51:18 +02:00

379 lines
14 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2015 Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
import os
import shutil
import subprocess
import tempfile
import unittest
from qubes.qubes import QubesVmCollection
import qubes.qubes
VM_PREFIX = "test-"
@unittest.skipUnless(os.path.exists('/usr/bin/rpmsign') and
os.path.exists('/usr/bin/rpmbuild'),
'rpm-sign and/or rpm-build not installed')
class TC_00_Dom0UpgradeMixin(qubes.tests.SystemTestsMixin):
"""
Tests for downloading dom0 updates using VMs based on different templates
"""
pkg_name = 'qubes-test-pkg'
dom0_update_common_opts = ['--disablerepo=*', '--enablerepo=test']
update_flag_path = '/var/lib/qubes/updates/dom0-updates-available'
@classmethod
def generate_key(cls, keydir):
gpg_opts = ['gpg', '--quiet', '--no-default-keyring',
'--homedir', keydir]
p = subprocess.Popen(gpg_opts + ['--gen-key', '--batch'],
stdin=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
p.stdin.write('''
Key-Type: RSA
Key-Length: 1024
Key-Usage: sign
Name-Real: Qubes test
Expire-Date: 0
%commit
'''.format(keydir=keydir))
p.stdin.close()
p.wait()
subprocess.check_call(gpg_opts + ['-a', '--export',
'--output', os.path.join(keydir, 'pubkey.asc')])
p = subprocess.Popen(gpg_opts + ['--with-colons', '--list-keys'],
stdout=subprocess.PIPE)
for line in p.stdout.readlines():
fields = line.split(':')
if fields[0] == 'pub':
return fields[4][-8:].lower()
raise RuntimeError
@classmethod
def setUpClass(cls):
super(TC_00_Dom0UpgradeMixin, cls).setUpClass()
cls.tmpdir = tempfile.mkdtemp()
cls.keyid = cls.generate_key(cls.tmpdir)
p = subprocess.Popen(['sudo', 'dd',
'status=none', 'of=/etc/yum.repos.d/test.repo'],
stdin=subprocess.PIPE)
p.stdin.write('''
[test]
name = Test
baseurl = http://localhost:8080/
enabled = 1
''')
p.stdin.close()
p.wait()
@classmethod
def tearDownClass(cls):
subprocess.check_call(['sudo', 'rm', '-f',
'/etc/yum.repos.d/test.repo'])
shutil.rmtree(cls.tmpdir)
def setUp(self):
super(TC_00_Dom0UpgradeMixin, self).setUp()
self.updatevm = self.qc.add_new_vm(
"QubesProxyVm",
name=self.make_vm_name("updatevm"),
template=self.qc.get_vm_by_name(self.template))
self.updatevm.create_on_disk(verbose=False)
self.saved_updatevm = self.qc.get_updatevm_vm()
self.qc.set_updatevm_vm(self.updatevm)
self.qc.save()
self.qc.unlock_db()
subprocess.call(['sudo', 'rpm', '-e', self.pkg_name],
stderr=open(os.devnull, 'w'))
subprocess.check_call(['sudo', 'rpm', '--import',
os.path.join(self.tmpdir, 'pubkey.asc')])
self.updatevm.start()
self.repo_running = False
def tearDown(self):
self.qc.lock_db_for_writing()
self.qc.load()
self.qc.set_updatevm_vm(self.qc[self.saved_updatevm.qid])
self.qc.save()
super(TC_00_Dom0UpgradeMixin, self).tearDown()
subprocess.call(['sudo', 'rpm', '-e', self.pkg_name], stderr=open(
os.devnull, 'w'))
subprocess.call(['sudo', 'rpm', '-e', 'gpg-pubkey-{}'.format(
self.keyid)], stderr=open(os.devnull, 'w'))
for pkg in os.listdir(self.tmpdir):
if pkg.endswith('.rpm'):
os.unlink(pkg)
def create_pkg(self, dir, name, version):
spec_path = os.path.join(dir, name+'.spec')
spec = open(spec_path, 'w')
spec.write(
'''
Name: {name}
Summary: Test Package
Version: {version}
Release: 1
Vendor: Invisible Things Lab
License: GPL
Group: Qubes
URL: http://www.qubes-os.org
%description
Test package
%install
%files
'''.format(name=name, version=version)
)
spec.close()
subprocess.check_call(
['rpmbuild', '--quiet', '-bb', '--define', '_rpmdir {}'.format(dir),
spec_path])
pkg_path = os.path.join(dir, 'x86_64',
'{}-{}-1.x86_64.rpm'.format(name, version))
subprocess.check_call(['sudo', 'chmod', 'go-rw', '/dev/tty'])
subprocess.check_call(
['rpm', '--quiet', '--define=_gpg_path {}'.format(dir),
'--define=_gpg_name {}'.format("Qubes test"),
'--addsign', pkg_path],
stdin=open(os.devnull),
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT)
subprocess.check_call(['sudo', 'chmod', 'go+rw', '/dev/tty'])
return pkg_path
def send_pkg(self, filename):
p = self.updatevm.run('mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
os.path.basename(
filename)), passio_popen=True)
p.stdin.write(open(filename).read())
p.stdin.close()
p.wait()
retcode = self.updatevm.run('cd /tmp/repo; createrepo .', wait=True)
if retcode == 127:
self.skipTest("createrepo not installed in template {}".format(
self.template))
elif retcode != 0:
self.skipTest("createrepo failed with code {}, cannot perform the "
"test".format(retcode))
self.start_repo()
def start_repo(self):
if not self.repo_running:
self.updatevm.run("cd /tmp/repo &&"
"python -m SimpleHTTPServer 8080")
self.repo_running = True
def test_000_update(self):
"""Dom0 update tests
Check if package update is:
- detected
- installed
- "updates pending" flag is cleared
"""
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
subprocess.check_call(['sudo', 'rpm', '-i', filename])
filename = self.create_pkg(self.tmpdir, self.pkg_name, '2.0')
self.send_pkg(filename)
open(self.update_flag_path, 'a').close()
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y'] +
self.dom0_update_common_opts,
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
self.fail("qubes-dom0-update failed: " + open(
logpath).read())
retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
self.pkg_name)], stdout=open(os.devnull, 'w'))
self.assertEqual(retcode, 1, 'Package {}-1.0 still installed after '
'update'.format(self.pkg_name))
retcode = subprocess.call(['rpm', '-q', '{}-2.0'.format(
self.pkg_name)], stdout=open(os.devnull, 'w'))
self.assertEqual(retcode, 0, 'Package {}-2.0 not installed after '
'update'.format(self.pkg_name))
self.assertFalse(os.path.exists(self.update_flag_path),
"'updates pending' flag not cleared")
def test_005_update_flag_clear(self):
"""Check if 'updates pending' flag is creared"""
# create any pkg (but not install it) to initialize repo in the VM
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
self.send_pkg(filename)
open(self.update_flag_path, 'a').close()
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y'] +
self.dom0_update_common_opts,
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
self.fail("qubes-dom0-update failed: " + open(
logpath).read())
with open(logpath) as f:
dom0_update_output = f.read()
self.assertFalse('Errno' in dom0_update_output or
'Couldn\'t' in dom0_update_output,
"qubes-dom0-update reported an error: {}".
format(dom0_update_output))
self.assertFalse(os.path.exists(self.update_flag_path),
"'updates pending' flag not cleared")
def test_006_update_flag_clear(self):
"""Check if 'updates pending' flag is creared, using --clean"""
# create any pkg (but not install it) to initialize repo in the VM
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
self.send_pkg(filename)
open(self.update_flag_path, 'a').close()
# remove also repodata to test #1685
shutil.rmtree('/var/lib/qubes/updates/repodata')
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y',
'--clean'] +
self.dom0_update_common_opts,
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
self.fail("qubes-dom0-update failed: " + open(
logpath).read())
with open(logpath) as f:
dom0_update_output = f.read()
self.assertFalse('Errno' in dom0_update_output or
'Couldn\'t' in dom0_update_output,
"qubes-dom0-update reported an error: {}".
format(dom0_update_output))
self.assertFalse(os.path.exists(self.update_flag_path),
"'updates pending' flag not cleared")
def test_010_instal(self):
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
self.send_pkg(filename)
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y'] +
self.dom0_update_common_opts + [
self.pkg_name],
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
self.fail("qubes-dom0-update failed: " + open(
logpath).read())
retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
self.pkg_name)], stdout=open('/dev/null', 'w'))
self.assertEqual(retcode, 0, 'Package {}-1.0 not installed'.format(
self.pkg_name))
def test_020_install_wrong_sign(self):
subprocess.call(['sudo', 'rpm', '-e', 'gpg-pubkey-{}'.format(
self.keyid)])
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
self.send_pkg(filename)
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y'] +
self.dom0_update_common_opts + [
self.pkg_name],
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT)
self.fail("qubes-dom0-update unexpectedly succeeded: " + open(
logpath).read())
except subprocess.CalledProcessError:
pass
retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
self.pkg_name)], stdout=open('/dev/null', 'w'))
self.assertEqual(retcode, 1,
'Package {}-1.0 installed although '
'signature is invalid'.format(self.pkg_name))
def test_030_install_unsigned(self):
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
subprocess.check_call(['rpm', '--delsign', filename],
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT)
self.send_pkg(filename)
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y'] +
self.dom0_update_common_opts +
[self.pkg_name],
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT
)
self.fail("qubes-dom0-update unexpectedly succeeded: " + open(
logpath).read())
except subprocess.CalledProcessError:
pass
retcode = subprocess.call(['rpm', '-q', '{}-1.0'.format(
self.pkg_name)], stdout=open('/dev/null', 'w'))
self.assertEqual(retcode, 1,
'UNSIGNED package {}-1.0 installed'.format(self.pkg_name))
def load_tests(loader, tests, pattern):
try:
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
templates = [vm.name for vm in qc.values() if
isinstance(vm, qubes.qubes.QubesTemplateVm)]
except OSError:
templates = []
for template in templates:
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_00_Dom0Upgrade_' + template,
(TC_00_Dom0UpgradeMixin, qubes.tests.QubesTestCase),
{'template': template})))
return tests