123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531 |
- # -*- encoding: utf8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- from unittest import mock
- import jinja2
- import qubes.tests
- import qubes.ext.block
- domain_xml_template = '''
- <domain type='xen' id='9'>
- <name>test-vm</name>
- <uuid>00000000-0000-0000-0000-0000000000ae</uuid>
- <memory unit='KiB'>4096000</memory>
- <currentMemory unit='KiB'>409600</currentMemory>
- <vcpu placement='static'>8</vcpu>
- <os>
- <type arch='x86_64' machine='xenpv'>linux</type>
- <kernel>/var/lib/qubes/vm-kernels/4.4.55-11/vmlinuz</kernel>
- <initrd>/var/lib/qubes/vm-kernels/4.4.55-11/initramfs</initrd>
- <cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 dyndbg="file drivers/xen/gntdev.c +p" printk=8</cmdline>
- </os>
- <clock offset='utc' adjustment='reset'>
- <timer name='tsc' mode='native'/>
- </clock>
- <on_poweroff>destroy</on_poweroff>
- <on_reboot>destroy</on_reboot>
- <on_crash>destroy</on_crash>
- <devices>
- <disk type='block' device='disk'>
- <driver name='phy'/>
- <source dev='/var/lib/qubes/vm-templates/fedora-25/root.img:/var/lib/qubes/vm-templates/fedora-25/root-cow.img'/>
- <backingStore/>
- <script path='block-snapshot'/>
- <target dev='xvda' bus='xen'/>
- <readonly/>
- </disk>
- <disk type='block' device='disk'>
- <driver name='phy'/>
- <source dev='/var/lib/qubes/appvms/test-vm/private.img'/>
- <backingStore/>
- <target dev='xvdb' bus='xen'/>
- </disk>
- <disk type='block' device='disk'>
- <driver name='phy'/>
- <source dev='/var/lib/qubes/appvms/test-vm/volatile.img'/>
- <backingStore/>
- <target dev='xvdc' bus='xen'/>
- </disk>
- <disk type='block' device='disk'>
- <driver name='phy'/>
- <source dev='/var/lib/qubes/vm-kernels/4.4.55-11/modules.img'/>
- <backingStore/>
- <target dev='xvdd' bus='xen'/>
- <readonly/>
- </disk>
- {}
- <interface type='ethernet'>
- <mac address='00:16:3e:5e:6c:06'/>
- <ip address='10.137.1.8' family='ipv4'/>
- <script path='vif-route-qubes'/>
- <backenddomain name='sys-firewall'/>
- </interface>
- <console type='pty' tty='/dev/pts/0'>
- <source path='/dev/pts/0'/>
- <target type='xen' port='0'/>
- </console>
- </devices>
- </domain>
- '''
- class TestQubesDB(object):
- def __init__(self, data):
- self._data = data
- def read(self, key):
- return self._data.get(key, None)
- def list(self, prefix):
- return [key for key in self._data if key.startswith(prefix)]
- class TestApp(object):
- def __init__(self):
- #: jinja2 environment for libvirt XML templates
- self.env = jinja2.Environment(
- loader=jinja2.FileSystemLoader([
- 'templates',
- '/etc/qubes/templates',
- '/usr/share/qubes/templates',
- ]),
- undefined=jinja2.StrictUndefined)
- self.domains = {}
- class TestVM(object):
- def __init__(self, qdb, domain_xml=None, running=True, name='test-vm'):
- self.name = name
- self.untrusted_qdb = TestQubesDB(qdb)
- self.libvirt_domain = mock.Mock()
- self.is_running = lambda: running
- self.log = mock.Mock()
- self.app = TestApp()
- if domain_xml:
- self.libvirt_domain.configure_mock(**{
- 'XMLDesc.return_value': domain_xml
- })
- def __eq__(self, other):
- if isinstance(other, TestVM):
- return self.name == other.name
- class TC_00_Block(qubes.tests.QubesTestCase):
- def setUp(self):
- super().setUp()
- self.ext = qubes.ext.block.BlockDeviceExtension()
- def test_000_device_get(self):
- vm = TestVM({
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- device_info = self.ext.device_get(vm, 'sda')
- self.assertIsInstance(device_info, qubes.ext.block.BlockDevice)
- self.assertEqual(device_info.backend_domain, vm)
- self.assertEqual(device_info.ident, 'sda')
- self.assertEqual(device_info.description, 'Test device')
- self.assertEqual(device_info._description, 'Test device')
- self.assertEqual(device_info.size, 1024000)
- self.assertEqual(device_info.mode, 'w')
- self.assertEqual(device_info.frontend_domain, None)
- self.assertEqual(device_info.device_node, '/dev/sda')
- def test_001_device_get_other_node(self):
- vm = TestVM({
- '/qubes-block-devices/mapper_dmroot': b'',
- '/qubes-block-devices/mapper_dmroot/desc': b'Test device',
- '/qubes-block-devices/mapper_dmroot/size': b'1024000',
- '/qubes-block-devices/mapper_dmroot/mode': b'w',
- })
- device_info = self.ext.device_get(vm, 'mapper_dmroot')
- self.assertIsInstance(device_info, qubes.ext.block.BlockDevice)
- self.assertEqual(device_info.backend_domain, vm)
- self.assertEqual(device_info.ident, 'mapper_dmroot')
- self.assertEqual(device_info.description, 'Test device')
- self.assertEqual(device_info._description, 'Test device')
- self.assertEqual(device_info.size, 1024000)
- self.assertEqual(device_info.mode, 'w')
- self.assertEqual(device_info.frontend_domain, None)
- self.assertEqual(device_info.device_node, '/dev/mapper/dmroot')
- def test_002_device_get_invalid_desc(self):
- vm = TestVM({
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device<>za\xc4\x87abc',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- device_info = self.ext.device_get(vm, 'sda')
- self.assertEqual(device_info.description, 'Test device__za__abc')
- def test_003_device_get_invalid_size(self):
- vm = TestVM({
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000abc',
- '/qubes-block-devices/sda/mode': b'w',
- })
- device_info = self.ext.device_get(vm, 'sda')
- self.assertEqual(device_info.size, 0)
- vm.log.warning.assert_called_once_with('Device sda has invalid size')
- def test_004_device_get_invalid_mode(self):
- vm = TestVM({
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'abc',
- })
- device_info = self.ext.device_get(vm, 'sda')
- self.assertEqual(device_info.mode, 'w')
- vm.log.warning.assert_called_once_with('Device sda has invalid mode')
- def test_005_device_get_none(self):
- vm = TestVM({
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- device_info = self.ext.device_get(vm, 'sdb')
- self.assertIsNone(device_info)
- def test_010_devices_list(self):
- vm = TestVM({
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- '/qubes-block-devices/sdb': b'',
- '/qubes-block-devices/sdb/desc': b'Test device2',
- '/qubes-block-devices/sdb/size': b'2048000',
- '/qubes-block-devices/sdb/mode': b'r',
- })
- devices = sorted(list(self.ext.on_device_list_block(vm, '')))
- self.assertEqual(len(devices), 2)
- self.assertEqual(devices[0].backend_domain, vm)
- self.assertEqual(devices[0].ident, 'sda')
- self.assertEqual(devices[0].description, 'Test device')
- self.assertEqual(devices[0].size, 1024000)
- self.assertEqual(devices[0].mode, 'w')
- self.assertEqual(devices[1].backend_domain, vm)
- self.assertEqual(devices[1].ident, 'sdb')
- self.assertEqual(devices[1].description, 'Test device2')
- self.assertEqual(devices[1].size, 2048000)
- self.assertEqual(devices[1].mode, 'r')
- def test_011_devices_list_empty(self):
- vm = TestVM({})
- devices = sorted(list(self.ext.on_device_list_block(vm, '')))
- self.assertEqual(len(devices), 0)
- def test_012_devices_list_invalid_ident(self):
- vm = TestVM({
- '/qubes-block-devices/invalid ident': b'',
- '/qubes-block-devices/invalid+ident': b'',
- '/qubes-block-devices/invalid#': b'',
- })
- devices = sorted(list(self.ext.on_device_list_block(vm, '')))
- self.assertEqual(len(devices), 0)
- msg = 'test-vm vm\'s device path name contains unsafe characters. '\
- 'Skipping it.'
- self.assertEqual(vm.log.warning.mock_calls, [
- mock.call(msg),
- mock.call(msg),
- mock.call(msg),
- ])
- def test_020_find_unused_frontend(self):
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- frontend = self.ext.find_unused_frontend(vm)
- self.assertEqual(frontend, 'xvdi')
- def test_022_find_unused_frontend2(self):
- disk = '''
- <disk type="block" device="disk">
- <driver name="phy" />
- <source dev="/dev/sda" />
- <target dev="xvdi" />
- <readonly />
- <backenddomain name="sys-usb" />
- </disk>
- '''
- vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
- frontend = self.ext.find_unused_frontend(vm)
- self.assertEqual(frontend, 'xvdj')
- def test_030_list_attached_empty(self):
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
- self.assertEqual(len(devices), 0)
- def test_031_list_attached(self):
- disk = '''
- <disk type="block" device="disk">
- <driver name="phy" />
- <source dev="/dev/sda" />
- <target dev="xvdi" />
- <readonly />
- <backenddomain name="sys-usb" />
- </disk>
- '''
- vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
- vm.app.domains['test-vm'] = vm
- vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
- devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
- self.assertEqual(len(devices), 1)
- dev = devices[0][0]
- options = devices[0][1]
- self.assertEqual(dev.backend_domain, vm.app.domains['sys-usb'])
- self.assertEqual(dev.ident, 'sda')
- self.assertEqual(options['frontend-dev'], 'xvdi')
- self.assertEqual(options['read-only'], 'yes')
- def test_032_list_attached_dom0(self):
- disk = '''
- <disk type="block" device="disk">
- <driver name="phy" />
- <source dev="/dev/sda" />
- <target dev="xvdi" />
- </disk>
- '''
- vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
- vm.app.domains['test-vm'] = vm
- vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
- vm.app.domains['dom0'] = TestVM({}, name='dom0')
- vm.app.domains[0] = vm.app.domains['dom0']
- devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
- self.assertEqual(len(devices), 1)
- dev = devices[0][0]
- options = devices[0][1]
- self.assertEqual(dev.backend_domain, vm.app.domains['dom0'])
- self.assertEqual(dev.ident, 'sda')
- self.assertEqual(options['frontend-dev'], 'xvdi')
- self.assertEqual(options['read-only'], 'no')
- def test_033_list_attached_cdrom(self):
- disk = '''
- <disk type="block" device="cdrom">
- <driver name="phy" />
- <source dev="/dev/sr0" />
- <target dev="xvdi" />
- <readonly />
- <backenddomain name="sys-usb" />
- </disk>
- '''
- vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
- vm.app.domains['test-vm'] = vm
- vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
- devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
- self.assertEqual(len(devices), 1)
- dev = devices[0][0]
- options = devices[0][1]
- self.assertEqual(dev.backend_domain, vm.app.domains['sys-usb'])
- self.assertEqual(dev.ident, 'sr0')
- self.assertEqual(options['frontend-dev'], 'xvdi')
- self.assertEqual(options['read-only'], 'yes')
- self.assertEqual(options['devtype'], 'cdrom')
- def test_040_attach(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- self.ext.on_device_pre_attached_block(vm, '', dev, {})
- device_xml = (
- '<disk type="block" device="disk">\n'
- ' <driver name="phy" />\n'
- ' <source dev="/dev/sda" />\n'
- ' <target dev="xvdi" />\n'
- ' <backenddomain name="sys-usb" />\n'
- '</disk>')
- vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
- def test_041_attach_frontend(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- self.ext.on_device_pre_attached_block(vm, '', dev,
- {'frontend-dev': 'xvdj'})
- device_xml = (
- '<disk type="block" device="disk">\n'
- ' <driver name="phy" />\n'
- ' <source dev="/dev/sda" />\n'
- ' <target dev="xvdj" />\n'
- ' <backenddomain name="sys-usb" />\n'
- '</disk>')
- vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
- def test_042_attach_read_only(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- self.ext.on_device_pre_attached_block(vm, '', dev,
- {'read-only': 'yes'})
- device_xml = (
- '<disk type="block" device="disk">\n'
- ' <driver name="phy" />\n'
- ' <source dev="/dev/sda" />\n'
- ' <target dev="xvdi" />\n'
- ' <readonly />\n'
- ' <backenddomain name="sys-usb" />\n'
- '</disk>')
- vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
- def test_043_attach_invalid_option(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- with self.assertRaises(qubes.exc.QubesValueError):
- self.ext.on_device_pre_attached_block(vm, '', dev,
- {'no-such-option': '123'})
- self.assertFalse(vm.libvirt_domain.attachDevice.called)
- def test_044_attach_invalid_option2(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- with self.assertRaises(qubes.exc.QubesValueError):
- self.ext.on_device_pre_attached_block(vm, '', dev,
- {'read-only': 'maybe'})
- self.assertFalse(vm.libvirt_domain.attachDevice.called)
- def test_045_attach_backend_not_running(self):
- back_vm = TestVM(name='sys-usb', running=False, qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'w',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- with self.assertRaises(qubes.exc.QubesVMNotRunningError):
- self.ext.on_device_pre_attached_block(vm, '', dev, {})
- self.assertFalse(vm.libvirt_domain.attachDevice.called)
- def test_046_attach_ro_dev_rw(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'r',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- with self.assertRaises(qubes.exc.QubesValueError):
- self.ext.on_device_pre_attached_block(vm, '', dev,
- {'read-only': 'no'})
- self.assertFalse(vm.libvirt_domain.attachDevice.called)
- def test_047_attach_read_only_auto(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'r',
- })
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- self.ext.on_device_pre_attached_block(vm, '', dev, {})
- device_xml = (
- '<disk type="block" device="disk">\n'
- ' <driver name="phy" />\n'
- ' <source dev="/dev/sda" />\n'
- ' <target dev="xvdi" />\n'
- ' <readonly />\n'
- ' <backenddomain name="sys-usb" />\n'
- '</disk>')
- vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
- def test_050_detach(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'r',
- })
- device_xml = (
- '<disk type="block" device="disk">\n'
- ' <driver name="phy" />\n'
- ' <source dev="/dev/sda" />\n'
- ' <target dev="xvdi" />\n'
- ' <readonly />\n'
- ' <backenddomain name="sys-usb" />\n'
- '</disk>')
- vm = TestVM({}, domain_xml=domain_xml_template.format(device_xml))
- vm.app.domains['test-vm'] = vm
- vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- self.ext.on_device_pre_detached_block(vm, '', dev)
- vm.libvirt_domain.detachDevice.assert_called_once_with(device_xml)
- def test_051_detach_not_attached(self):
- back_vm = TestVM(name='sys-usb', qdb={
- '/qubes-block-devices/sda': b'',
- '/qubes-block-devices/sda/desc': b'Test device',
- '/qubes-block-devices/sda/size': b'1024000',
- '/qubes-block-devices/sda/mode': b'r',
- })
- device_xml = (
- '<disk type="block" device="disk">\n'
- ' <driver name="phy" />\n'
- ' <source dev="/dev/sda" />\n'
- ' <target dev="xvdi" />\n'
- ' <readonly />\n\n'
- ' <backenddomain name="sys-usb" />\n'
- '</disk>')
- vm = TestVM({}, domain_xml=domain_xml_template.format(''))
- vm.app.domains['test-vm'] = vm
- vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
- dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
- self.ext.on_device_pre_detached_block(vm, '', dev)
- self.assertFalse(vm.libvirt_domain.detachDevice.called)
|