ext: BlockDevices extension

Handle block devices exposed by VMs
This commit is contained in:
Marek Marczykowski-Górecki 2017-05-29 21:20:06 +02:00
parent 866d908b6d
commit 9bb5054e50
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
7 changed files with 814 additions and 0 deletions

272
qubes/ext/block.py Normal file
View File

@ -0,0 +1,272 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
''' Qubes block devices extensions '''
import re
import string
import lxml.etree
import qubes.devices
import qubes.ext
name_re = re.compile(r"^[a-z0-9-]{1,12}$")
device_re = re.compile(r"^[a-z0-9/-]{1,64}$")
# FIXME: any better idea of desc_re?
desc_re = re.compile(r"^.{1,255}$")
mode_re = re.compile(r"^[rw]$")
# all frontends, prefer xvdi
# TODO: get this from libvirt driver?
AVAILABLE_FRONTENDS = ['xvd'+c for c in
string.ascii_lowercase[8:]+string.ascii_lowercase[:8]]
SYSTEM_DISKS = ('xvda', 'xvdb', 'xvdc', 'xvdd')
class BlockDevice(qubes.devices.DeviceInfo):
def __init__(self, backend_domain, ident):
super(BlockDevice, self).__init__(backend_domain=backend_domain,
ident=ident)
self._description = None
self._mode = None
self._size = None
@property
def description(self):
'''Human readable device description'''
if self._description is None:
if not self.backend_domain.is_running():
return self.ident
safe_set = {ord(c) for c in
string.ascii_letters + string.digits + '()+,-.:=_/ '}
untrusted_desc = self.backend_domain.qdb.read(
'/qubes-block-devices/{}/desc'.format(self.ident))
desc = ''.join((chr(c) if c in safe_set else '_')
for c in untrusted_desc)
self._description = desc
return self._description
@property
def mode(self):
'''Device mode, either 'w' for read-write, or 'r' for read-only'''
if self._mode is None:
if not self.backend_domain.is_running():
return 'w'
untrusted_mode = self.backend_domain.qdb.read(
'/qubes-block-devices/{}/mode'.format(self.ident))
if untrusted_mode is None:
self._mode = 'w'
elif untrusted_mode not in (b'w', b'r'):
self.backend_domain.log.warning(
'Device {} has invalid mode'.format(self.ident))
self._mode = 'w'
else:
self._mode = untrusted_mode.decode()
return self._mode
@property
def size(self):
'''Device size in bytes'''
if self._size is None:
if not self.backend_domain.is_running():
return None
untrusted_size = self.backend_domain.qdb.read(
'/qubes-block-devices/{}/size'.format(self.ident))
if untrusted_size is None:
self._size = 0
elif not untrusted_size.isdigit():
self.backend_domain.log.warning(
'Device {} has invalid size'.format(self.ident))
self._size = 0
else:
self._size = int(untrusted_size)
return self._size
@property
def device_node(self):
'''Device node in backend domain'''
return '/dev/' + self.ident.replace('_', '/')
class BlockDeviceExtension(qubes.ext.Extension):
def device_get(self, vm, ident):
# pylint: disable=no-self-use
'''Read information about device from QubesDB
:param vm: backend VM object
:param ident: device identifier
:returns BlockDevice'''
untrusted_qubes_device_attrs = vm.qdb.list(
'/qubes-block-devices/{}/'.format(ident))
if not untrusted_qubes_device_attrs:
return None
return BlockDevice(vm, ident)
@qubes.ext.handler('device-list:block')
def on_device_list_block(self, vm, event):
# pylint: disable=unused-argument,no-self-use
safe_set = {ord(c) for c in
string.ascii_letters + string.digits}
if not vm.is_running():
return
untrusted_qubes_devices = vm.qdb.list('/qubes-block-devices/')
untrusted_idents = set(untrusted_path.split(b'/', 3)[2]
for untrusted_path in untrusted_qubes_devices)
for untrusted_ident in untrusted_idents:
if not all(c in safe_set for c in untrusted_ident):
msg = ("%s vm's device path name contains unsafe characters. "
"Skipping it.")
vm.log.warning(msg % vm.name)
continue
ident = untrusted_ident.decode('ascii', errors='strict')
device_info = self.device_get(vm, ident)
if device_info:
yield device_info
@qubes.ext.handler('device-get:block')
def on_device_get_block(self, vm, event, ident):
# pylint: disable=unused-argument,no-self-use
if not vm.is_running():
return
if not vm.app.vmm.offline_mode:
yield self.device_get(vm, ident)
@qubes.ext.handler('device-list-attached:block')
def on_device_list_attached(self, vm, event, **kwargs):
# pylint: disable=unused-argument,no-self-use
if not vm.is_running():
return
xml_desc = lxml.etree.fromstring(vm.libvirt_domain.XMLDesc())
for disk in xml_desc.findall('devices/disk'):
if disk.get('type') != 'block':
continue
dev_path_node = disk.find('source')
if dev_path_node is None:
continue
dev_path = dev_path_node.get('dev')
target_node = disk.find('target')
if target_node is not None:
frontend_dev = target_node.get('dev')
if not frontend_dev:
continue
if frontend_dev in SYSTEM_DISKS:
continue
else:
continue
backend_domain_node = disk.find('backenddomain')
if backend_domain_node is not None:
backend_domain = vm.app.domains[backend_domain_node.get('name')]
else:
backend_domain = vm.app.domains[0]
options = {}
read_only_node = disk.find('readonly')
if read_only_node is not None:
options['read-only'] = 'yes'
else:
options['read-only'] = 'no'
options['frontend-dev'] = frontend_dev
if dev_path.startswith('/dev/'):
ident = dev_path[len('/dev/'):]
else:
ident = dev_path
ident = ident.replace('/', '_')
yield (BlockDevice(backend_domain, ident), options)
def find_unused_frontend(self, vm):
# pylint: disable=no-self-use
'''Find unused block frontend device node for <target dev=.../>
parameter'''
assert vm.is_running()
xml = vm.libvirt_domain.XMLDesc()
parsed_xml = lxml.etree.fromstring(xml)
used = [target.get('dev', None) for target in
parsed_xml.xpath("//domain/devices/disk/target")]
for dev in AVAILABLE_FRONTENDS:
if dev not in used:
return dev
return None
@qubes.ext.handler('device-pre-attach:block')
def on_device_pre_attached_block(self, vm, event, device, options):
# pylint: disable=unused-argument
# validate options
for option, value in options.items():
if option == 'frontend-dev':
if not value.startswith('xvd') and not value.startswith('sd'):
raise qubes.exc.QubesValueError(
'Invalid frontend-dev option value: ' + value)
elif option == 'read-only':
if value not in ('yes', 'no'):
raise qubes.exc.QubesValueError(
'read-only option can only have '
'\'yes\' or \'no\' value')
else:
raise qubes.exc.QubesValueError(
'Unsupported option {}'.format(option))
if 'read-only' not in options:
options['read-only'] = 'yes' if device.mode == 'r' else 'no'
if options.get('read-only', 'no') == 'no' and device.mode == 'r':
raise qubes.exc.QubesValueError(
'This device can be attached only read-only')
if not vm.is_running():
return
if not device.backend_domain.is_running():
raise qubes.exc.QubesVMNotRunningError(device.backend_domain,
'Domain {} needs to be running to attach device from '
'it'.format(device.backend_domain.name))
if 'frontend-dev' not in options:
options['frontend-dev'] = self.find_unused_frontend(vm)
vm.libvirt_domain.attachDevice(
vm.app.env.get_template('libvirt/devices/block.xml').render(
device=device, vm=vm, options=options))
@qubes.ext.handler('device-pre-detach:block')
def on_device_pre_detached_block(self, vm, event, device):
# pylint: disable=unused-argument,no-self-use
if not vm.is_running():
return
# need to enumerate attached device to find frontend_dev option (at
# least)
for attached_device, options in self.on_device_list_attached(vm, event):
if attached_device == device:
vm.libvirt_domain.detachDevice(
vm.app.env.get_template('libvirt/devices/block.xml').render(
device=device, vm=vm, options=options))
break

View File

@ -955,6 +955,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
# unit tests # unit tests
'qubes.tests.events', 'qubes.tests.events',
'qubes.tests.devices', 'qubes.tests.devices',
'qubes.tests.devices_block',
'qubes.tests.firewall', 'qubes.tests.firewall',
'qubes.tests.init', 'qubes.tests.init',
'qubes.tests.vm.init', 'qubes.tests.vm.init',

View File

@ -0,0 +1,512 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
from unittest import mock
import jinja2
import qubes.tests
import qubes.ext.block
domain_xml_template = '''
<domain type='xen' id='9'>
<name>test-vm</name>
<uuid>00000000-0000-0000-0000-0000000000ae</uuid>
<memory unit='KiB'>4096000</memory>
<currentMemory unit='KiB'>409600</currentMemory>
<vcpu placement='static'>8</vcpu>
<os>
<type arch='x86_64' machine='xenpv'>linux</type>
<kernel>/var/lib/qubes/vm-kernels/4.4.55-11/vmlinuz</kernel>
<initrd>/var/lib/qubes/vm-kernels/4.4.55-11/initramfs</initrd>
<cmdline>root=/dev/mapper/dmroot ro nomodeset console=hvc0 rd_NO_PLYMOUTH rd.plymouth.enable=0 plymouth.enable=0 dyndbg=&quot;file drivers/xen/gntdev.c +p&quot; printk=8</cmdline>
</os>
<clock offset='utc' adjustment='reset'>
<timer name='tsc' mode='native'/>
</clock>
<on_poweroff>destroy</on_poweroff>
<on_reboot>destroy</on_reboot>
<on_crash>destroy</on_crash>
<devices>
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/vm-templates/fedora-25/root.img:/var/lib/qubes/vm-templates/fedora-25/root-cow.img'/>
<backingStore/>
<script path='block-snapshot'/>
<target dev='xvda' bus='xen'/>
<readonly/>
</disk>
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/appvms/test-vm/private.img'/>
<backingStore/>
<target dev='xvdb' bus='xen'/>
</disk>
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/appvms/test-vm/volatile.img'/>
<backingStore/>
<target dev='xvdc' bus='xen'/>
</disk>
<disk type='block' device='disk'>
<driver name='phy'/>
<source dev='/var/lib/qubes/vm-kernels/4.4.55-11/modules.img'/>
<backingStore/>
<target dev='xvdd' bus='xen'/>
<readonly/>
</disk>
{}
<interface type='ethernet'>
<mac address='00:16:3e:5e:6c:06'/>
<ip address='10.137.1.8' family='ipv4'/>
<script path='vif-route-qubes'/>
<backenddomain name='sys-firewall'/>
</interface>
<console type='pty' tty='/dev/pts/0'>
<source path='/dev/pts/0'/>
<target type='xen' port='0'/>
</console>
</devices>
</domain>
'''
class TestQubesDB(object):
def __init__(self, data):
self._data = data
def read(self, key):
if isinstance(key, str):
key = key.encode()
return self._data.get(key, None)
def list(self, prefix):
if isinstance(prefix, str):
prefix = prefix.encode()
return [key for key in self._data if key.startswith(prefix)]
class TestApp(object):
def __init__(self):
#: jinja2 environment for libvirt XML templates
self.env = jinja2.Environment(
loader=jinja2.FileSystemLoader([
'templates',
]),
undefined=jinja2.StrictUndefined)
self.domains = {}
class TestVM(object):
def __init__(self, qdb, domain_xml=None, running=True, name='test-vm'):
self.name = name
self.qdb = TestQubesDB(qdb)
self.libvirt_domain = mock.Mock()
self.is_running = lambda: running
self.log = mock.Mock()
self.app = TestApp()
if domain_xml:
self.libvirt_domain.configure_mock(**{
'XMLDesc.return_value': domain_xml
})
def __eq__(self, other):
if isinstance(other, TestVM):
return self.name == other.name
class TC_00_Block(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.ext = qubes.ext.block.BlockDeviceExtension()
def test_000_device_get(self):
vm = TestVM({
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertIsInstance(device_info, qubes.ext.block.BlockDevice)
self.assertEqual(device_info.backend_domain, vm)
self.assertEqual(device_info.ident, 'sda')
self.assertEqual(device_info.description, 'Test device')
self.assertEqual(device_info._description, 'Test device')
self.assertEqual(device_info.size, 1024000)
self.assertEqual(device_info.mode, 'w')
self.assertEqual(device_info.frontend_domain, None)
self.assertEqual(device_info.device_node, '/dev/sda')
def test_001_device_get_other_node(self):
vm = TestVM({
b'/qubes-block-devices/mapper_dmroot': b'',
b'/qubes-block-devices/mapper_dmroot/desc': b'Test device',
b'/qubes-block-devices/mapper_dmroot/size': b'1024000',
b'/qubes-block-devices/mapper_dmroot/mode': b'w',
})
device_info = self.ext.device_get(vm, 'mapper_dmroot')
self.assertIsInstance(device_info, qubes.ext.block.BlockDevice)
self.assertEqual(device_info.backend_domain, vm)
self.assertEqual(device_info.ident, 'mapper_dmroot')
self.assertEqual(device_info.description, 'Test device')
self.assertEqual(device_info._description, 'Test device')
self.assertEqual(device_info.size, 1024000)
self.assertEqual(device_info.mode, 'w')
self.assertEqual(device_info.frontend_domain, None)
self.assertEqual(device_info.device_node, '/dev/mapper/dmroot')
def test_002_device_get_invalid_desc(self):
vm = TestVM({
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device<>za\xc4\x87abc',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertEqual(device_info.description, 'Test device__za__abc')
def test_003_device_get_invalid_size(self):
vm = TestVM({
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000abc',
b'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertEqual(device_info.size, 0)
vm.log.warning.assert_called_once_with('Device sda has invalid size')
def test_004_device_get_invalid_mode(self):
vm = TestVM({
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'abc',
})
device_info = self.ext.device_get(vm, 'sda')
self.assertEqual(device_info.mode, 'w')
vm.log.warning.assert_called_once_with('Device sda has invalid mode')
def test_005_device_get_none(self):
vm = TestVM({
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
device_info = self.ext.device_get(vm, 'sdb')
self.assertIsNone(device_info)
def test_010_devices_list(self):
vm = TestVM({
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
b'/qubes-block-devices/sdb': b'',
b'/qubes-block-devices/sdb/desc': b'Test device2',
b'/qubes-block-devices/sdb/size': b'2048000',
b'/qubes-block-devices/sdb/mode': b'r',
})
devices = sorted(list(self.ext.on_device_list_block(vm, '')))
self.assertEqual(len(devices), 2)
self.assertEqual(devices[0].backend_domain, vm)
self.assertEqual(devices[0].ident, 'sda')
self.assertEqual(devices[0].description, 'Test device')
self.assertEqual(devices[0].size, 1024000)
self.assertEqual(devices[0].mode, 'w')
self.assertEqual(devices[1].backend_domain, vm)
self.assertEqual(devices[1].ident, 'sdb')
self.assertEqual(devices[1].description, 'Test device2')
self.assertEqual(devices[1].size, 2048000)
self.assertEqual(devices[1].mode, 'r')
def test_011_devices_list_empty(self):
vm = TestVM({})
devices = sorted(list(self.ext.on_device_list_block(vm, '')))
self.assertEqual(len(devices), 0)
def test_012_devices_list_invalid_ident(self):
vm = TestVM({
b'/qubes-block-devices/invalid ident': b'',
b'/qubes-block-devices/invalid+ident': b'',
b'/qubes-block-devices/invalid#': b'',
})
devices = sorted(list(self.ext.on_device_list_block(vm, '')))
self.assertEqual(len(devices), 0)
msg = 'test-vm vm\'s device path name contains unsafe characters. '\
'Skipping it.'
self.assertEqual(vm.log.warning.mock_calls, [
mock.call(msg),
mock.call(msg),
mock.call(msg),
])
def test_020_find_unused_frontend(self):
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
frontend = self.ext.find_unused_frontend(vm)
self.assertEqual(frontend, 'xvdi')
def test_022_find_unused_frontend2(self):
disk = '''
<disk type="block" device="disk">
<driver name="phy" />
<source dev="/dev/sda" />
<target dev="xvdi" />
<readonly />
<backenddomain name="sys-usb" />
</disk>
'''
vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
frontend = self.ext.find_unused_frontend(vm)
self.assertEqual(frontend, 'xvdj')
def test_030_list_attached_empty(self):
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
self.assertEqual(len(devices), 0)
def test_031_list_attached(self):
disk = '''
<disk type="block" device="disk">
<driver name="phy" />
<source dev="/dev/sda" />
<target dev="xvdi" />
<readonly />
<backenddomain name="sys-usb" />
</disk>
'''
vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
self.assertEqual(len(devices), 1)
dev = devices[0][0]
options = devices[0][1]
self.assertEqual(dev.backend_domain, vm.app.domains['sys-usb'])
self.assertEqual(dev.ident, 'sda')
self.assertEqual(options['frontend-dev'], 'xvdi')
self.assertEqual(options['read-only'], 'yes')
def test_032_list_attached_dom0(self):
disk = '''
<disk type="block" device="disk">
<driver name="phy" />
<source dev="/dev/sda" />
<target dev="xvdi" />
</disk>
'''
vm = TestVM({}, domain_xml=domain_xml_template.format(disk))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
vm.app.domains['dom0'] = TestVM({}, name='dom0')
vm.app.domains[0] = vm.app.domains['dom0']
devices = sorted(list(self.ext.on_device_list_attached(vm, '')))
self.assertEqual(len(devices), 1)
dev = devices[0][0]
options = devices[0][1]
self.assertEqual(dev.backend_domain, vm.app.domains['dom0'])
self.assertEqual(dev.ident, 'sda')
self.assertEqual(options['frontend-dev'], 'xvdi')
self.assertEqual(options['read-only'], 'no')
def test_040_attach(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev, {})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
'\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_041_attach_frontend(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev,
{'frontend-dev': 'xvdj'})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdj" />\n'
'\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_042_attach_read_only(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev,
{'read-only': 'yes'})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_043_attach_invalid_option(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesValueError):
self.ext.on_device_pre_attached_block(vm, '', dev,
{'no-such-option': '123'})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_044_attach_invalid_option2(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesValueError):
self.ext.on_device_pre_attached_block(vm, '', dev,
{'read-only': 'maybe'})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_045_attach_backend_not_running(self):
back_vm = TestVM(name='sys-usb', running=False, qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'w',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesVMNotRunningError):
self.ext.on_device_pre_attached_block(vm, '', dev, {})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_046_attach_ro_dev_rw(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'r',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
with self.assertRaises(qubes.exc.QubesValueError):
self.ext.on_device_pre_attached_block(vm, '', dev,
{'read-only': 'no'})
self.assertFalse(vm.libvirt_domain.attachDevice.called)
def test_047_attach_read_only_auto(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'r',
})
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_attached_block(vm, '', dev, {})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm.libvirt_domain.attachDevice.assert_called_once_with(device_xml)
def test_050_detach(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'r',
})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm = TestVM({}, domain_xml=domain_xml_template.format(device_xml))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_detached_block(vm, '', dev)
vm.libvirt_domain.detachDevice.assert_called_once_with(device_xml)
def test_051_detach_not_attached(self):
back_vm = TestVM(name='sys-usb', qdb={
b'/qubes-block-devices/sda': b'',
b'/qubes-block-devices/sda/desc': b'Test device',
b'/qubes-block-devices/sda/size': b'1024000',
b'/qubes-block-devices/sda/mode': b'r',
})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
dev = qubes.ext.block.BlockDevice(back_vm, 'sda')
self.ext.on_device_pre_detached_block(vm, '', dev)
self.assertFalse(vm.libvirt_domain.detachDevice.called)

View File

@ -285,6 +285,7 @@ fi
%dir %{python3_sitelib}/qubes/ext/__pycache__ %dir %{python3_sitelib}/qubes/ext/__pycache__
%{python3_sitelib}/qubes/ext/__pycache__/* %{python3_sitelib}/qubes/ext/__pycache__/*
%{python3_sitelib}/qubes/ext/__init__.py %{python3_sitelib}/qubes/ext/__init__.py
%{python3_sitelib}/qubes/ext/block.py
%{python3_sitelib}/qubes/ext/gui.py %{python3_sitelib}/qubes/ext/gui.py
%{python3_sitelib}/qubes/ext/pci.py %{python3_sitelib}/qubes/ext/pci.py
%{python3_sitelib}/qubes/ext/qubesmanager.py %{python3_sitelib}/qubes/ext/qubesmanager.py
@ -300,6 +301,7 @@ fi
%{python3_sitelib}/qubes/tests/api_admin.py %{python3_sitelib}/qubes/tests/api_admin.py
%{python3_sitelib}/qubes/tests/app.py %{python3_sitelib}/qubes/tests/app.py
%{python3_sitelib}/qubes/tests/devices.py %{python3_sitelib}/qubes/tests/devices.py
%{python3_sitelib}/qubes/tests/devices_block.py
%{python3_sitelib}/qubes/tests/events.py %{python3_sitelib}/qubes/tests/events.py
%{python3_sitelib}/qubes/tests/firewall.py %{python3_sitelib}/qubes/tests/firewall.py
%{python3_sitelib}/qubes/tests/init.py %{python3_sitelib}/qubes/tests/init.py
@ -397,6 +399,7 @@ fi
%attr(2770,root,qubes) %dir /var/lib/qubes/dvmdata %attr(2770,root,qubes) %dir /var/lib/qubes/dvmdata
%attr(2770,root,qubes) %dir /var/lib/qubes/vm-kernels %attr(2770,root,qubes) %dir /var/lib/qubes/vm-kernels
/usr/share/qubes/templates/libvirt/xen.xml /usr/share/qubes/templates/libvirt/xen.xml
/usr/share/qubes/templates/libvirt/devices/block.xml
/usr/share/qubes/templates/libvirt/devices/pci.xml /usr/share/qubes/templates/libvirt/devices/pci.xml
/usr/share/qubes/templates/libvirt/devices/net.xml /usr/share/qubes/templates/libvirt/devices/net.xml
/usr/lib/tmpfiles.d/qubes.conf /usr/lib/tmpfiles.d/qubes.conf

View File

@ -47,9 +47,11 @@ if __name__ == '__main__':
'qubes.ext.gui = qubes.ext.gui:GUI', 'qubes.ext.gui = qubes.ext.gui:GUI',
'qubes.ext.r3compatibility = qubes.ext.r3compatibility:R3Compatibility', 'qubes.ext.r3compatibility = qubes.ext.r3compatibility:R3Compatibility',
'qubes.ext.pci = qubes.ext.pci:PCIDeviceExtension', 'qubes.ext.pci = qubes.ext.pci:PCIDeviceExtension',
'qubes.ext.block = qubes.ext.block:BlockDeviceExtension',
], ],
'qubes.devices': [ 'qubes.devices': [
'pci = qubes.ext.pci:PCIDevice', 'pci = qubes.ext.pci:PCIDevice',
'block = qubes.ext.block:BlockDevice',
'testclass = qubes.tests.devices:TestDevice', 'testclass = qubes.tests.devices:TestDevice',
], ],
'qubes.storage': [ 'qubes.storage': [

View File

@ -0,0 +1,16 @@
<disk type="block" device="{{ options.get('devtype', 'disk') }}">
<driver name="phy" />
<source dev="{{ device.device_node }}" />
{%- if 'frontend-dev' in options %}
<target dev="{{ options.get('frontend-dev') }}" />
{%- else %}
<target dev="xvd{{dd[i]}}" />
{% set i = i + 1 %}
{%- endif %}
{%- if options.get('read-only', 'no') == 'yes' %}
<readonly />
{%- endif %}
<backenddomain name="{{ device.backend_domain.name }}" />
</disk>

View File

@ -102,6 +102,14 @@
</disk> </disk>
{% endfor %} {% endfor %}
{# start external devices from xvdi #}
{% set i = 4 %}
{% for assignment in vm.devices.block.assignments(True) %}
{% set device = assignment.device %}
{% set options = assignment.options %}
{% include 'libvirt/devices/block.xml' %}
{% endfor %}
{% if vm.netvm %} {% if vm.netvm %}
{% include 'libvirt/devices/net.xml' with context %} {% include 'libvirt/devices/net.xml' with context %}
{% endif %} {% endif %}