core-admin/qubes/tests/devices_block.py
Marek Marczykowski-Górecki 6c7af109e5
ext/block: prefer connecting cdrom as xvdd
Only first 4 disks can be emulated as IDE disks by QEMU. Specifically,
CDROM must be one of those first 4 disks, otherwise it will be
ignored. This is especially important if one wants to boot the VM from
that CDROM.
Since xvdd normally is a kernel-related volume (boot image, modules) it
makes perfect sense to re-use it for CDROM. It is either set for kernel
volume (in which case, VM should boot from it and not the CDROM), or
(possibly bootable) CDROM.

This needs to be done in two places:
 - BlockExtension for dynamic attach
 - libvirt xen.xml - for before-boot attach

In theory the latter would be enough, but it would be quite confusing
that device will get different options depending on when it's attached
(in addition to whether the kernel is set - introduced here).

This all also means, xvdd not always is a "system disk". Adjust listing
connected disks accordingly.
2019-11-19 14:03:21 +01:00

574 lines
23 KiB
Python

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
from unittest import mock
import jinja2
import qubes.tests
import qubes.ext.block
modules_disk = '''
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/vm-kernels/4.4.55-11/modules.img'/>
<backingStore/>
<target dev='xvdd' bus='xen'/>
<readonly/>
</disk>
'''
domain_xml_template = '''
<domain type='xen' id='9'>
<name>test-vm</name>
<uuid>00000000-0000-0000-0000-0000000000ae</uuid>
<memory unit='KiB'>4096000</memory>
<currentMemory unit='KiB'>409600</currentMemory>
<vcpu placement='static'>8</vcpu>
<os>
<type arch='x86_64' machine='xenpv'>linux</type>
<kernel>/var/lib/qubes/vm-kernels/4.4.55-11/vmlinuz</kernel>
<initrd>/var/lib/qubes/vm-kernels/4.4.55-11/initramfs</initrd>
<cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 dyndbg=&quot;file drivers/xen/gntdev.c +p&quot; printk=8</cmdline>
</os>
<clock offset='utc' adjustment='reset'>
<timer name='tsc' mode='native'/>
</clock>
<on_poweroff>destroy</on_poweroff>
<on_reboot>destroy</on_reboot>
<on_crash>destroy</on_crash>
<devices>
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/vm-templates/fedora-25/root.img:/var/lib/qubes/vm-templates/fedora-25/root-cow.img'/>
<backingStore/>
<script path='block-snapshot'/>
<target dev='xvda' bus='xen'/>
<readonly/>
</disk>
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/appvms/test-vm/private.img'/>
<backingStore/>
<target dev='xvdb' bus='xen'/>
</disk>
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/appvms/test-vm/volatile.img'/>
<backingStore/>
<target dev='xvdc' bus='xen'/>
</disk>
{}
<interface type='ethernet'>
<mac address='00:16:3e:5e:6c:06'/>
<ip address='10.137.1.8' family='ipv4'/>
<script path='vif-route-qubes'/>
<backenddomain name='sys-firewall'/>
</interface>
<console type='pty' tty='/dev/pts/0'>
<source path='/dev/pts/0'/>
<target type='xen' port='0'/>
</console>
</devices>
</domain>
'''
class TestQubesDB(object):
def __init__(self, data):
self._data = data
def read(self, key):
return self._data.get(key, None)
def list(self, prefix):
return [key for key in self._data if key.startswith(prefix)]
class TestApp(object):
def __init__(self):
#: jinja2 environment for libvirt XML templates
self.env = jinja2.Environment(
loader=jinja2.FileSystemLoader([
'templates',
'/etc/qubes/templates',
'/usr/share/qubes/templates',
]),
undefined=jinja2.StrictUndefined)
self.domains = {}
class TestVM(object):
def __init__(self, qdb, domain_xml=None, running=True, name='test-vm'):
self.name = name
self.untrusted_qdb = TestQubesDB(qdb)
self.libvirt_domain = mock.Mock()
self.is_running = lambda: running
self.log = mock.Mock()
self.app = TestApp()
if domain_xml:
self.libvirt_domain.configure_mock(**{
'XMLDesc.return_value': domain_xml
})
def __eq__(self, other):
if isinstance(other, TestVM):
return self.name == other.name
class TC_00_Block(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.ext = qubes.ext.block.BlockDeviceExtension()
def test_000_device_get(self):
vm = TestVM({
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertIsInstance(device_info, qubes.ext.block.BlockDevice)
self.assertEqual(device_info.backend_domain, vm)
self.assertEqual(device_info.ident, 'sda')
self.assertEqual(device_info.description, 'Test device')
self.assertEqual(device_info._description, 'Test device')
self.assertEqual(device_info.size, 1024000)
self.assertEqual(device_info.mode, 'w')
self.assertEqual(device_info.frontend_domain, None)
self.assertEqual(device_info.device_node, '/dev/sda')
def test_001_device_get_other_node(self):
vm = TestVM({
'/qubes-block-devices/mapper_dmroot': b'',
'/qubes-block-devices/mapper_dmroot/desc': b'Test device',
'/qubes-block-devices/mapper_dmroot/size': b'1024000',
'/qubes-block-devices/mapper_dmroot/mode': b'w',
})
device_info = self.ext.device_get(vm, 'mapper_dmroot')
self.assertIsInstance(device_info, qubes.ext.block.BlockDevice)
self.assertEqual(device_info.backend_domain, vm)
self.assertEqual(device_info.ident, 'mapper_dmroot')
self.assertEqual(device_info.description, 'Test device')
self.assertEqual(device_info._description, 'Test device')
self.assertEqual(device_info.size, 1024000)
self.assertEqual(device_info.mode, 'w')
self.assertEqual(device_info.frontend_domain, None)
self.assertEqual(device_info.device_node, '/dev/mapper/dmroot')
def test_002_device_get_invalid_desc(self):
vm = TestVM({
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device<>za\xc4\x87abc',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertEqual(device_info.description, 'Test device__za__abc')
def test_003_device_get_invalid_size(self):
vm = TestVM({
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000abc',
'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertEqual(device_info.size, 0)
vm.log.warning.assert_called_once_with('Device sda has invalid size')
def test_004_device_get_invalid_mode(self):
vm = TestVM({
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'abc',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertEqual(device_info.mode, 'w')
vm.log.warning.assert_called_once_with('Device sda has invalid mode')
def test_005_device_get_none(self):
vm = TestVM({
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sdb')
self.assertIsNone(device_info)
def test_010_devices_list(self):
vm = TestVM({
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
'/qubes-block-devices/sdb': b'',
'/qubes-block-devices/sdb/desc': b'Test device2',
'/qubes-block-devices/sdb/size': b'2048000',
'/qubes-block-devices/sdb/mode': b'r',
})
devices = sorted(list(self.ext.on_device_list_block(vm, '')))
self.assertEqual(len(devices), 2)
self.assertEqual(devices[0].backend_domain, vm)
self.assertEqual(devices[0].ident, 'sda')
self.assertEqual(devices[0].description, 'Test device')
self.assertEqual(devices[0].size, 1024000)
self.assertEqual(devices[0].mode, 'w')
self.assertEqual(devices[1].backend_domain, vm)
self.assertEqual(devices[1].ident, 'sdb')
self.assertEqual(devices[1].description, 'Test device2')
self.assertEqual(devices[1].size, 2048000)
self.assertEqual(devices[1].mode, 'r')
def test_011_devices_list_empty(self):
vm = TestVM({})
devices = sorted(list(self.ext.on_device_list_block(vm, '')))
self.assertEqual(len(devices), 0)
def test_012_devices_list_invalid_ident(self):
vm = TestVM({
'/qubes-block-devices/invalid ident': b'',
'/qubes-block-devices/invalid+ident': b'',
'/qubes-block-devices/invalid#': b'',
})
devices = sorted(list(self.ext.on_device_list_block(vm, '')))
self.assertEqual(len(devices), 0)
msg = 'test-vm vm\'s device path name contains unsafe characters. '\
'Skipping it.'
self.assertEqual(vm.log.warning.mock_calls, [
mock.call(msg),
mock.call(msg),
mock.call(msg),
])
def test_020_find_unused_frontend(self):
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
frontend = self.ext.find_unused_frontend(vm)
self.assertEqual(frontend, 'xvdi')
def test_022_find_unused_frontend2(self):
disk = '''
<disk type="block" device="disk">
<driver name="phy" />
<source dev="/dev/sda" />
<target dev="xvdi" />
<readonly />
<backenddomain name="sys-usb" />
</disk>
'''
vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
frontend = self.ext.find_unused_frontend(vm)
self.assertEqual(frontend, 'xvdj')
def test_030_list_attached_empty(self):
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
self.assertEqual(len(devices), 0)
def test_031_list_attached(self):
disk = '''
<disk type="block" device="disk">
<driver name="phy" />
<source dev="/dev/sda" />
<target dev="xvdi" />
<readonly />
<backenddomain name="sys-usb" />
</disk>
'''
vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
self.assertEqual(len(devices), 1)
dev = devices[0][0]
options = devices[0][1]
self.assertEqual(dev.backend_domain, vm.app.domains['sys-usb'])
self.assertEqual(dev.ident, 'sda')
self.assertEqual(options['frontend-dev'], 'xvdi')
self.assertEqual(options['read-only'], 'yes')
def test_032_list_attached_dom0(self):
disk = '''
<disk type="block" device="disk">
<driver name="phy" />
<source dev="/dev/sda" />
<target dev="xvdi" />
</disk>
'''
vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
vm.app.domains['dom0'] = TestVM({}, name='dom0')
vm.app.domains[0] = vm.app.domains['dom0']
devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
self.assertEqual(len(devices), 1)
dev = devices[0][0]
options = devices[0][1]
self.assertEqual(dev.backend_domain, vm.app.domains['dom0'])
self.assertEqual(dev.ident, 'sda')
self.assertEqual(options['frontend-dev'], 'xvdi')
self.assertEqual(options['read-only'], 'no')
def test_033_list_attached_cdrom(self):
disk = '''
<disk type="block" device="cdrom">
<driver name="phy" />
<source dev="/dev/sr0" />
<target dev="xvdi" />
<readonly />
<backenddomain name="sys-usb" />
</disk>
'''
vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
self.assertEqual(len(devices), 1)
dev = devices[0][0]
options = devices[0][1]
self.assertEqual(dev.backend_domain, vm.app.domains['sys-usb'])
self.assertEqual(dev.ident, 'sr0')
self.assertEqual(options['frontend-dev'], 'xvdi')
self.assertEqual(options['read-only'], 'yes')
self.assertEqual(options['devtype'], 'cdrom')
def test_040_attach(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev, {})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_041_attach_frontend(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev,
{'frontend-dev': 'xvdj'})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdj" />\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_042_attach_read_only(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev,
{'read-only': 'yes'})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_043_attach_invalid_option(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesValueError):
self.ext.on_device_pre_attached_block(vm, '', dev,
{'no-such-option': '123'})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_044_attach_invalid_option2(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesValueError):
self.ext.on_device_pre_attached_block(vm, '', dev,
{'read-only': 'maybe'})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_045_attach_backend_not_running(self):
back_vm = TestVM(name='sys-usb', running=False, qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesVMNotRunningError):
self.ext.on_device_pre_attached_block(vm, '', dev, {})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_046_attach_ro_dev_rw(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'r',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesValueError):
self.ext.on_device_pre_attached_block(vm, '', dev,
{'read-only': 'no'})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_047_attach_read_only_auto(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'r',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev, {})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_048_attach_cdrom_xvdi(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'r',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(modules_disk))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev, {'devtype': 'cdrom'})
device_xml = (
'<disk type="block" device="cdrom">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_048_attach_cdrom_xvdd(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'r',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev, {'devtype': 'cdrom'})
device_xml = (
'<disk type="block" device="cdrom">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdd" />\n'
' <readonly />\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_050_detach(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'r',
})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm = TestVM({}, domain_xml=domain_xml_template.format(device_xml))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_detached_block(vm, '', dev)
vm.libvirt_domain.detachDevice.assert_called_once_with(device_xml)
def test_051_detach_not_attached(self):
back_vm = TestVM(name='sys-usb', qdb={
'/qubes-block-devices/sda': b'',
'/qubes-block-devices/sda/desc': b'Test device',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'r',
})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_detached_block(vm, '', dev)
self.assertFalse(vm.libvirt_domain.detachDevice.called)