451cc339c7
Be consistent with other parts of the Admin API. Especially ...Attach accept "standard" boolean values for persistent= option.
292 lines
11 KiB
Python
292 lines
11 KiB
Python
# -*- encoding: utf8 -*-
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2017 Marek Marczykowski-Górecki
|
|
# <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along
|
|
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
''' Qubes block devices extensions '''
|
|
import re
|
|
import string
|
|
import lxml.etree
|
|
|
|
import qubes.devices
|
|
import qubes.ext
|
|
|
|
name_re = re.compile(r"^[a-z0-9-]{1,12}$")
|
|
device_re = re.compile(r"^[a-z0-9/-]{1,64}$")
|
|
# FIXME: any better idea of desc_re?
|
|
desc_re = re.compile(r"^.{1,255}$")
|
|
mode_re = re.compile(r"^[rw]$")
|
|
|
|
# all frontends, prefer xvdi
|
|
# TODO: get this from libvirt driver?
|
|
AVAILABLE_FRONTENDS = ['xvd'+c for c in
|
|
string.ascii_lowercase[8:]+string.ascii_lowercase[:8]]
|
|
|
|
SYSTEM_DISKS = ('xvda', 'xvdb', 'xvdc', 'xvdd')
|
|
|
|
|
|
class BlockDevice(qubes.devices.DeviceInfo):
|
|
def __init__(self, backend_domain, ident):
|
|
super(BlockDevice, self).__init__(backend_domain=backend_domain,
|
|
ident=ident)
|
|
self._description = None
|
|
self._mode = None
|
|
self._size = None
|
|
|
|
@property
|
|
def description(self):
|
|
'''Human readable device description'''
|
|
if self._description is None:
|
|
if not self.backend_domain.is_running():
|
|
return self.ident
|
|
safe_set = {ord(c) for c in
|
|
string.ascii_letters + string.digits + '()+,-.:=_/ '}
|
|
untrusted_desc = self.backend_domain.untrusted_qdb.read(
|
|
'/qubes-block-devices/{}/desc'.format(self.ident))
|
|
if not untrusted_desc:
|
|
return ''
|
|
desc = ''.join((chr(c) if c in safe_set else '_')
|
|
for c in untrusted_desc)
|
|
self._description = desc
|
|
return self._description
|
|
|
|
@property
|
|
def mode(self):
|
|
'''Device mode, either 'w' for read-write, or 'r' for read-only'''
|
|
if self._mode is None:
|
|
if not self.backend_domain.is_running():
|
|
return 'w'
|
|
untrusted_mode = self.backend_domain.untrusted_qdb.read(
|
|
'/qubes-block-devices/{}/mode'.format(self.ident))
|
|
if untrusted_mode is None:
|
|
self._mode = 'w'
|
|
elif untrusted_mode not in (b'w', b'r'):
|
|
self.backend_domain.log.warning(
|
|
'Device {} has invalid mode'.format(self.ident))
|
|
self._mode = 'w'
|
|
else:
|
|
self._mode = untrusted_mode.decode()
|
|
return self._mode
|
|
|
|
@property
|
|
def size(self):
|
|
'''Device size in bytes'''
|
|
if self._size is None:
|
|
if not self.backend_domain.is_running():
|
|
return None
|
|
untrusted_size = self.backend_domain.untrusted_qdb.read(
|
|
'/qubes-block-devices/{}/size'.format(self.ident))
|
|
if untrusted_size is None:
|
|
self._size = 0
|
|
elif not untrusted_size.isdigit():
|
|
self.backend_domain.log.warning(
|
|
'Device {} has invalid size'.format(self.ident))
|
|
self._size = 0
|
|
else:
|
|
self._size = int(untrusted_size)
|
|
return self._size
|
|
|
|
@property
|
|
def device_node(self):
|
|
'''Device node in backend domain'''
|
|
return '/dev/' + self.ident.replace('_', '/')
|
|
|
|
|
|
class BlockDeviceExtension(qubes.ext.Extension):
|
|
@qubes.ext.handler('domain-init', 'domain-load')
|
|
def on_domain_init_load(self, vm, event):
|
|
'''Initialize watching for changes'''
|
|
# pylint: disable=unused-argument,no-self-use
|
|
vm.watch_qdb_path('/qubes-block-devices')
|
|
|
|
@qubes.ext.handler('domain-qdb-change:/qubes-block-devices')
|
|
def on_qdb_change(self, vm, event, path):
|
|
'''A change in QubesDB means a change in device list'''
|
|
# pylint: disable=unused-argument,no-self-use
|
|
vm.fire_event('device-list-change:block')
|
|
|
|
def device_get(self, vm, ident):
|
|
# pylint: disable=no-self-use
|
|
'''Read information about device from QubesDB
|
|
|
|
:param vm: backend VM object
|
|
:param ident: device identifier
|
|
:returns BlockDevice'''
|
|
|
|
untrusted_qubes_device_attrs = vm.untrusted_qdb.list(
|
|
'/qubes-block-devices/{}/'.format(ident))
|
|
if not untrusted_qubes_device_attrs:
|
|
return None
|
|
return BlockDevice(vm, ident)
|
|
|
|
@qubes.ext.handler('device-list:block')
|
|
def on_device_list_block(self, vm, event):
|
|
# pylint: disable=unused-argument,no-self-use
|
|
|
|
if not vm.is_running():
|
|
return
|
|
untrusted_qubes_devices = vm.untrusted_qdb.list('/qubes-block-devices/')
|
|
untrusted_idents = set(untrusted_path.split('/', 3)[2]
|
|
for untrusted_path in untrusted_qubes_devices)
|
|
for untrusted_ident in untrusted_idents:
|
|
if not name_re.match(untrusted_ident):
|
|
msg = ("%s vm's device path name contains unsafe characters. "
|
|
"Skipping it.")
|
|
vm.log.warning(msg % vm.name)
|
|
continue
|
|
|
|
ident = untrusted_ident
|
|
|
|
device_info = self.device_get(vm, ident)
|
|
if device_info:
|
|
yield device_info
|
|
|
|
@qubes.ext.handler('device-get:block')
|
|
def on_device_get_block(self, vm, event, ident):
|
|
# pylint: disable=unused-argument,no-self-use
|
|
if not vm.is_running():
|
|
return
|
|
if not vm.app.vmm.offline_mode:
|
|
device_info = self.device_get(vm, ident)
|
|
if device_info:
|
|
yield device_info
|
|
|
|
@qubes.ext.handler('device-list-attached:block')
|
|
def on_device_list_attached(self, vm, event, **kwargs):
|
|
# pylint: disable=unused-argument,no-self-use
|
|
if not vm.is_running():
|
|
return
|
|
|
|
xml_desc = lxml.etree.fromstring(vm.libvirt_domain.XMLDesc())
|
|
|
|
for disk in xml_desc.findall('devices/disk'):
|
|
if disk.get('type') != 'block':
|
|
continue
|
|
dev_path_node = disk.find('source')
|
|
if dev_path_node is None:
|
|
continue
|
|
dev_path = dev_path_node.get('dev')
|
|
|
|
target_node = disk.find('target')
|
|
if target_node is not None:
|
|
frontend_dev = target_node.get('dev')
|
|
if not frontend_dev:
|
|
continue
|
|
if frontend_dev in SYSTEM_DISKS:
|
|
continue
|
|
else:
|
|
continue
|
|
|
|
backend_domain_node = disk.find('backenddomain')
|
|
if backend_domain_node is not None:
|
|
backend_domain = vm.app.domains[backend_domain_node.get('name')]
|
|
else:
|
|
backend_domain = vm.app.domains[0]
|
|
|
|
options = {}
|
|
read_only_node = disk.find('readonly')
|
|
if read_only_node is not None:
|
|
options['read-only'] = 'yes'
|
|
else:
|
|
options['read-only'] = 'no'
|
|
options['frontend-dev'] = frontend_dev
|
|
if disk.get('device') != 'disk':
|
|
options['devtype'] = disk.get('device')
|
|
|
|
if dev_path.startswith('/dev/'):
|
|
ident = dev_path[len('/dev/'):]
|
|
else:
|
|
ident = dev_path
|
|
|
|
ident = ident.replace('/', '_')
|
|
|
|
yield (BlockDevice(backend_domain, ident), options)
|
|
|
|
def find_unused_frontend(self, vm):
|
|
# pylint: disable=no-self-use
|
|
'''Find unused block frontend device node for <target dev=.../>
|
|
parameter'''
|
|
assert vm.is_running()
|
|
|
|
xml = vm.libvirt_domain.XMLDesc()
|
|
parsed_xml = lxml.etree.fromstring(xml)
|
|
used = [target.get('dev', None) for target in
|
|
parsed_xml.xpath("//domain/devices/disk/target")]
|
|
for dev in AVAILABLE_FRONTENDS:
|
|
if dev not in used:
|
|
return dev
|
|
return None
|
|
|
|
@qubes.ext.handler('device-pre-attach:block')
|
|
def on_device_pre_attached_block(self, vm, event, device, options):
|
|
# pylint: disable=unused-argument
|
|
|
|
# validate options
|
|
for option, value in options.items():
|
|
if option == 'frontend-dev':
|
|
if not value.startswith('xvd') and not value.startswith('sd'):
|
|
raise qubes.exc.QubesValueError(
|
|
'Invalid frontend-dev option value: ' + value)
|
|
elif option == 'read-only':
|
|
options[option] = (
|
|
'yes' if qubes.property.bool(None, None, value) else 'no')
|
|
elif option == 'devtype':
|
|
if value not in ('disk', 'cdrom'):
|
|
raise qubes.exc.QubesValueError(
|
|
'devtype option can only have '
|
|
'\'disk\' or \'cdrom\' value')
|
|
else:
|
|
raise qubes.exc.QubesValueError(
|
|
'Unsupported option {}'.format(option))
|
|
|
|
if 'read-only' not in options:
|
|
options['read-only'] = 'yes' if device.mode == 'r' else 'no'
|
|
if options.get('read-only', 'no') == 'no' and device.mode == 'r':
|
|
raise qubes.exc.QubesValueError(
|
|
'This device can be attached only read-only')
|
|
|
|
if not vm.is_running():
|
|
return
|
|
|
|
if not device.backend_domain.is_running():
|
|
raise qubes.exc.QubesVMNotRunningError(device.backend_domain,
|
|
'Domain {} needs to be running to attach device from '
|
|
'it'.format(device.backend_domain.name))
|
|
|
|
if 'frontend-dev' not in options:
|
|
options['frontend-dev'] = self.find_unused_frontend(vm)
|
|
|
|
vm.libvirt_domain.attachDevice(
|
|
vm.app.env.get_template('libvirt/devices/block.xml').render(
|
|
device=device, vm=vm, options=options))
|
|
|
|
@qubes.ext.handler('device-pre-detach:block')
|
|
def on_device_pre_detached_block(self, vm, event, device):
|
|
# pylint: disable=unused-argument,no-self-use
|
|
if not vm.is_running():
|
|
return
|
|
|
|
# need to enumerate attached device to find frontend_dev option (at
|
|
# least)
|
|
for attached_device, options in self.on_device_list_attached(vm, event):
|
|
if attached_device == device:
|
|
vm.libvirt_domain.detachDevice(
|
|
vm.app.env.get_template('libvirt/devices/block.xml').render(
|
|
device=device, vm=vm, options=options))
|
|
break
|