qvm-usb: modify for USBIP-over-qrexec implementation

QubesOS/qubes-issues#531
This commit is contained in:
Marek Marczykowski-Górecki 2016-05-26 01:38:08 +02:00
parent 3afc7b7d50
commit d67636308f
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
2 changed files with 189 additions and 260 deletions

View File

@ -25,6 +25,7 @@
from __future__ import absolute_import from __future__ import absolute_import
import string import string
import errno
from lxml import etree from lxml import etree
from lxml.etree import ElementTree, SubElement, Element from lxml.etree import ElementTree, SubElement, Element
@ -463,260 +464,184 @@ def block_detach_all(vm):
usb_ver_re = re.compile(r"^(1|2)$") usb_ver_re = re.compile(r"^(1|2)$")
usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)?$") usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)?$")
usb_port_re = re.compile(r"^$|^[0-9]+-[0-9]+(\.[0-9]+)?$") usb_port_re = re.compile(r"^$|^[0-9]+-[0-9]+(\.[0-9]+)?$")
usb_desc_re = re.compile(r"^[ -~]{1,255}$")
# should match valid VM name
usb_connected_to_re = re.compile(r"^[a-zA-Z][a-zA-Z0-9_.-]*$")
def usb_setup(backend_vm_xid, vm_xid, devid, usb_ver): def usb_decode_device_from_qdb(qdb_encoded_device):
"""
Attach frontend to the backend.
backend_vm_xid - id of the backend domain
vm_xid - id of the frontend domain
devid - id of the pvusb controller
"""
num_ports = 8
trans = vmm.xs.transaction_start()
be_path = "/local/domain/%d/backend/vusb/%d/%d" % (backend_vm_xid, vm_xid, devid)
fe_path = "/local/domain/%d/device/vusb/%d" % (vm_xid, devid)
be_perm = [{'dom': backend_vm_xid}, {'dom': vm_xid, 'read': True} ]
fe_perm = [{'dom': vm_xid}, {'dom': backend_vm_xid, 'read': True} ]
# Create directories and set permissions
vmm.xs.write(trans, be_path, "")
vmm.xs.set_permissions(trans, be_path, be_perm)
vmm.xs.write(trans, fe_path, "")
vmm.xs.set_permissions(trans, fe_path, fe_perm)
# Write backend information into the location that frontend looks for
vmm.xs.write(trans, "%s/backend-id" % fe_path, str(backend_vm_xid))
vmm.xs.write(trans, "%s/backend" % fe_path, be_path)
# Write frontend information into the location that backend looks for
vmm.xs.write(trans, "%s/frontend-id" % be_path, str(vm_xid))
vmm.xs.write(trans, "%s/frontend" % be_path, fe_path)
# Write USB Spec version field.
vmm.xs.write(trans, "%s/usb-ver" % be_path, usb_ver)
# Write virtual root hub field.
vmm.xs.write(trans, "%s/num-ports" % be_path, str(num_ports))
for port in range(1, num_ports+1):
# Set all port to disconnected state
vmm.xs.write(trans, "%s/port/%d" % (be_path, port), "")
# Set state to XenbusStateInitialising
vmm.xs.write(trans, "%s/state" % fe_path, "1")
vmm.xs.write(trans, "%s/state" % be_path, "1")
vmm.xs.write(trans, "%s/online" % be_path, "1")
vmm.xs.transaction_end(trans)
def usb_decode_device_from_xs(xs_encoded_device):
""" recover actual device name (xenstore doesn't allow dot in key names, so it was translated to underscore) """ """ recover actual device name (xenstore doesn't allow dot in key names, so it was translated to underscore) """
return xs_encoded_device.replace('_', '.') return qdb_encoded_device.replace('_', '.')
def usb_encode_device_for_xs(device): def usb_encode_device_for_qdb(device):
""" encode actual device name (xenstore doesn't allow dot in key names, so translated it into underscore) """ """ encode actual device name (xenstore doesn't allow dot in key names, so translated it into underscore) """
return device.replace('.', '_') return device.replace('.', '_')
def usb_list(): def usb_list_vm(vm):
if not vm.is_running():
return {}
try:
untrusted_devices = vm.qdb.multiread('/qubes-usb-devices/')
except Error:
vm.refresh()
return {}
def get_dev_item(dev, item):
return untrusted_devices.get(
'/qubes-usb-devices/%s/%s' % (dev, item),
None)
devices = {}
untrusted_devices_names = list(set(map(lambda x: x.split("/")[2],
untrusted_devices.keys())))
for untrusted_dev_name in untrusted_devices_names:
if usb_device_re.match(untrusted_dev_name):
dev_name = untrusted_dev_name
untrusted_device_desc = get_dev_item(dev_name, 'desc')
if not usb_desc_re.match(untrusted_device_desc):
print >> sys.stderr, "Invalid %s device desc in VM '%s'" % (
dev_name, vm.name)
continue
device_desc = untrusted_device_desc
untrusted_connected_to = get_dev_item(dev_name, 'connected-to')
if untrusted_connected_to:
if not usb_connected_to_re.match(untrusted_connected_to):
print >>sys.stderr, \
"Invalid %s device 'connected-to' in VM '%s'" % (
dev_name, vm.name)
continue
connected_to = untrusted_connected_to
else:
connected_to = None
device = usb_decode_device_from_qdb(dev_name)
full_name = vm.name + ':' + device
devices[full_name] = {
'vm': vm,
'device': device,
'qdb_path': '/qubes-usb-devices/' + dev_name,
'name': full_name,
'desc': device_desc,
'connected-to': connected_to,
}
return devices
def usb_list(qvmc=None, vm=None):
""" """
Returns a dictionary of USB devices (for PVUSB backends running in all VM). Returns a dictionary of USB devices (for PVUSB backends running in all VM).
The dictionary is keyed by 'name' (see below), each element is a dictionary itself: The dictionary is keyed by 'name' (see below), each element is a dictionary itself:
vm = name of the backend domain vm = backend domain object
xid = xid of the backend domain device = device ID
device = <frontend device number>-<frontend port number> name = <backend-vm>:<device>
name = <name of backend domain>:<frontend device number>-<frontend port number>
desc = description desc = description
""" """
# FIXME: any better idea of desc_re? if vm is not None:
desc_re = re.compile(r"^.{1,255}$") if not vm.is_running():
return {}
else:
vm_list = [vm]
else:
if qvmc is None:
raise QubesException("You must pass either qvm or vm argument")
vm_list = qvmc.values()
devices_list = {} devices_list = {}
for vm in vm_list:
xs_trans = vmm.xs.transaction_start() devices_list.update(usb_list_vm(vm))
vm_list = vmm.xs.ls(xs_trans, '/local/domain')
for xid in vm_list:
vm_name = vmm.xs.read(xs_trans, '/local/domain/%s/name' % xid)
vm_devices = vmm.xs.ls(xs_trans, '/local/domain/%s/qubes-usb-devices' % xid)
if vm_devices is None:
continue
# when listing devices in xenstore we get encoded names
for xs_encoded_device in vm_devices:
# Sanitize device id
if not usb_device_re.match(xs_encoded_device):
print >> sys.stderr, "Invalid device id in backend VM '%s'" % vm_name
continue
device = usb_decode_device_from_xs(xs_encoded_device)
device_desc = vmm.xs.read(xs_trans, '/local/domain/%s/qubes-usb-devices/%s/desc' % (xid, xs_encoded_device))
if not desc_re.match(device_desc):
print >> sys.stderr, "Invalid %s device desc in VM '%s'" % (device, vm_name)
continue
visible_name = "%s:%s" % (vm_name, device)
# grab version
usb_ver = vmm.xs.read(xs_trans, '/local/domain/%s/qubes-usb-devices/%s/usb-ver' % (xid, xs_encoded_device))
if usb_ver is None or not usb_ver_re.match(usb_ver):
print >> sys.stderr, "Invalid %s device USB version in VM '%s'" % (device, vm_name)
continue
devices_list[visible_name] = {"name": visible_name, "xid":int(xid),
"vm": vm_name, "device":device,
"desc":device_desc,
"usb_ver":usb_ver}
vmm.xs.transaction_end(xs_trans)
return devices_list return devices_list
def usb_check_attached(xs_trans, backend_vm, device): def usb_check_attached(device):
""" """Reread device attachment status"""
Checks if the given device in the given backend attached to any frontend. vm = device['vm']
Parameters: untrusted_connected_to = vm.qdb.read(
backend_vm - xid of the backend domain '{}/connected-to'.format(device['qdb_path']))
device - device name in the backend domain if untrusted_connected_to:
Returns None or a dictionary: if not usb_connected_to_re.match(untrusted_connected_to):
vm - the name of the frontend domain raise QubesException(
xid - xid of the frontend domain "Invalid %s device 'connected-to' in VM '%s'" % (
frontend - frontend device number FIXME device['device'], vm.name))
devid - frontend port number FIXME connected_to = untrusted_connected_to
"""
# sample xs content: /local/domain/0/backend/vusb/4/0/port/1 = "7-5"
attached_dev = None
vms = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb' % backend_vm)
if vms is None:
return None
for vm in vms:
if not vm.isdigit():
print >> sys.stderr, "Invalid VM id"
continue
frontend_devs = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb/%s' % (backend_vm, vm))
if frontend_devs is None:
continue
for frontend_dev in frontend_devs:
if not frontend_dev.isdigit():
print >> sys.stderr, "Invalid frontend in VM %s" % vm
continue
ports = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb/%s/%s/port' % (backend_vm, vm, frontend_dev))
if ports is None:
continue
for port in ports:
# FIXME: refactor, see similar loop in usb_find_unused_frontend(), use usb_list() instead?
if not port.isdigit():
print >> sys.stderr, "Invalid port in VM %s frontend %s" % (vm, frontend)
continue
dev = vmm.xs.read(xs_trans, '/local/domain/%d/backend/vusb/%s/%s/port/%s' % (backend_vm, vm, frontend_dev, port))
if dev == "":
continue
# Sanitize device id
if not usb_port_re.match(dev):
print >> sys.stderr, "Invalid device id in backend VM %d @ %s/%s/port/%s" % \
(backend_vm, vm, frontend_dev, port)
continue
if dev == device:
frontend = "%s-%s" % (frontend_dev, port)
#TODO
vm_name = xl_ctx.domid_to_name(int(vm))
if vm_name is None:
# FIXME: should we wipe references to frontends running on nonexistent VMs?
continue
attached_dev = {"xid":int(vm), "frontend": frontend, "devid": device, "vm": vm_name}
break
return attached_dev
#def usb_check_frontend_busy(vm, front_dev, port):
# devport = frontend.split("-")
# if len(devport) != 2:
# raise QubesException("Malformed frontend syntax, must be in device-port format")
# # FIXME:
# # return vmm.xs.read('', '/local/domain/%d/device/vusb/%d/state' % (vm.xid, frontend)) == '4'
# return False
def usb_find_unused_frontend(xs_trans, backend_vm_xid, vm_xid, usb_ver):
"""
Find an unused frontend/port to link the given backend with the given frontend.
Creates new frontend if needed.
Returns frontend specification in <device>-<port> format.
"""
# This variable holds an index of last frontend scanned by the loop below.
# If nothing found, this value will be used to derive the index of a new frontend.
last_frontend_dev = -1
frontend_devs = vmm.xs.ls(xs_trans, "/local/domain/%d/device/vusb" % vm_xid)
if frontend_devs is not None:
for frontend_dev in frontend_devs:
if not frontend_dev.isdigit():
print >> sys.stderr, "Invalid frontend_dev in VM %d" % vm_xid
continue
frontend_dev = int(frontend_dev)
fe_path = "/local/domain/%d/device/vusb/%d" % (vm_xid, frontend_dev)
if vmm.xs.read(xs_trans, "%s/backend-id" % fe_path) == str(backend_vm_xid):
if vmm.xs.read(xs_trans, '/local/domain/%d/backend/vusb/%d/%d/usb-ver' % (backend_vm_xid, vm_xid, frontend_dev)) != usb_ver:
last_frontend_dev = frontend_dev
continue
# here: found an existing frontend already connected to right backend using an appropriate USB version
ports = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb/%d/%d/port' % (backend_vm_xid, vm_xid, frontend_dev))
if ports is None:
print >> sys.stderr, "No ports in VM %d frontend_dev %d?" % (vm_xid, frontend_dev)
last_frontend_dev = frontend_dev
continue
for port in ports:
# FIXME: refactor, see similar loop in usb_check_attached(), use usb_list() instead?
if not port.isdigit():
print >> sys.stderr, "Invalid port in VM %d frontend_dev %d" % (vm_xid, frontend_dev)
continue
port = int(port)
dev = vmm.xs.read(xs_trans, '/local/domain/%d/backend/vusb/%d/%s/port/%s' % (backend_vm_xid, vm_xid, frontend_dev, port))
# Sanitize device id
if not usb_port_re.match(dev):
print >> sys.stderr, "Invalid device id in backend VM %d @ %d/%d/port/%d" % \
(backend_vm_xid, vm_xid, frontend_dev, port)
continue
if dev == "":
return '%d-%d' % (frontend_dev, port)
last_frontend_dev = frontend_dev
# create a new frontend_dev and link it to the backend
frontend_dev = last_frontend_dev + 1
usb_setup(backend_vm_xid, vm_xid, frontend_dev, usb_ver)
return '%d-%d' % (frontend_dev, 1)
def usb_attach(vm, backend_vm, device, frontend=None, auto_detach=False, wait=True):
device_attach_check(vm, backend_vm, device, frontend)
xs_trans = vmm.xs.transaction_start()
xs_encoded_device = usb_encode_device_for_xs(device)
usb_ver = vmm.xs.read(xs_trans, '/local/domain/%s/qubes-usb-devices/%s/usb-ver' % (backend_vm.xid, xs_encoded_device))
if usb_ver is None or not usb_ver_re.match(usb_ver):
vmm.xs.transaction_end(xs_trans)
raise QubesException("Invalid %s device USB version in VM '%s'" % (device, backend_vm.name))
if frontend is None:
frontend = usb_find_unused_frontend(xs_trans, backend_vm.xid, vm.xid, usb_ver)
else: else:
# Check if any device attached at this frontend connected_to = None
#if usb_check_frontend_busy(vm, frontend): return connected_to
# raise QubesException("Frontend %s busy in VM %s, detach it first" % (frontend, vm.name))
vmm.xs.transaction_end(xs_trans)
raise NotImplementedError("Explicit USB frontend specification is not implemented yet")
# Check if this device is attached to some domain def usb_attach(vm, device, auto_detach=False, wait=True):
attached_vm = usb_check_attached(xs_trans, backend_vm.xid, device) if not vm.is_running():
vmm.xs.transaction_end(xs_trans) raise QubesException("VM {} not running".format(vm.name))
if attached_vm: if not device['vm'].is_running():
raise QubesException("VM {} not running".format(device['vm'].name))
connected_to = usb_check_attached(device)
if connected_to:
if auto_detach: if auto_detach:
usb_detach(backend_vm, attached_vm) usb_detach(device)
else: else:
raise QubesException("Device %s from %s already connected to VM %s as %s" % (device, backend_vm.name, attached_vm['vm'], attached_vm['frontend'])) raise QubesException("Device {} already connected, to {}".format(
device['name'], connected_to
))
# Run helper script # set qrexec policy to allow this device
xl_cmd = [ '/usr/lib/qubes/xl-qvm-usb-attach.py', str(vm.xid), device, frontend, str(backend_vm.xid) ] policy_line = '{} {} allow\n'.format(vm.name, device['vm'].name)
subprocess.check_call(xl_cmd) policy_path = '/etc/qubes-rpc/policy/qubes.USB+{}'.format(device['device'])
policy_exists = os.path.exists(policy_path)
if not policy_exists:
try:
fd = os.open(policy_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
with os.fdopen(fd, 'w') as f:
f.write(policy_line)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
else:
with open(policy_path, 'r+') as f:
policy = f.readlines()
policy.insert(0, policy_line)
f.truncate(0)
f.seek(0)
f.write(''.join(policy))
try:
# and actual attach
p = vm.run_service('qubes.USBAttach', passio_popen=True, user='root')
(stdout, stderr) = p.communicate(
'{} {}\n'.format(device['vm'].name, device['device']))
if p.returncode != 0:
# TODO: sanitize and include stdout
raise QubesException('Device attach failed')
finally:
# FIXME: there is a race condition here - some other process might
# modify the file in the meantime. This may result in unexpected
# denials, but will not allow too much
if not policy_exists:
os.unlink(policy_path)
else:
with open(policy_path, 'r+') as f:
policy = f.readlines()
policy.remove('{} {} allow\n'.format(vm.name, device['vm'].name))
f.truncate(0)
f.seek(0)
f.write(''.join(policy))
def usb_detach(backend_vm, attachment): def usb_detach(vm, device):
xl_cmd = [ '/usr/lib/qubes/xl-qvm-usb-detach.py', str(attachment['xid']), attachment['devid'], attachment['frontend'], str(backend_vm.xid) ] connected_to = usb_check_attached(device)
subprocess.check_call(xl_cmd) # detect race conditions; there is still race here, but much smaller
if vm.name != connected_to:
raise QubesException(
"Device {} not connected to VM {}".format(device['name'], vm.name))
p = vm.run_service('qubes.USBDetach', passio_popen=True, user='root')
(stdout, stderr) = p.communicate(
'{} {}\n'.format(device['vm'].name, device['device']))
if p.returncode != 0:
# TODO: sanitize and include stdout
raise QubesException('Device detach failed')
def usb_detach_all(vm): def usb_detach_all(vm):
raise NotImplementedError("Detaching all devices from a given VM is not implemented yet") raise NotImplementedError("Detaching all devices from a given VM is not implemented yet")

View File

@ -78,11 +78,10 @@ def main():
print >> sys.stderr, "Only one of -l -a -d is allowed!" print >> sys.stderr, "Only one of -l -a -d is allowed!"
exit (1) exit (1)
if options.do_attach or options.do_detach: qvm_collection = QubesVmCollection()
qvm_collection = QubesVmCollection() qvm_collection.lock_db_for_reading()
qvm_collection.lock_db_for_reading() qvm_collection.load()
qvm_collection.load() qvm_collection.unlock_db()
qvm_collection.unlock_db()
if options.do_attach: if options.do_attach:
if (len (args) != 2): if (len (args) != 2):
@ -91,14 +90,17 @@ def main():
if vm is None: if vm is None:
parser.error ("Invalid VM name: %s" % args[0]) parser.error ("Invalid VM name: %s" % args[0])
# FIXME: here we assume that device is always in form "domain:dev", which can be changed in the future # FIXME: here we assume that device is always in form "domain:dev",
# which can be changed in the future
if args[1].find(":") < 0: if args[1].find(":") < 0:
parser.error ("Invalid device syntax: %s" % args[1]) parser.error("Invalid device syntax: %s" % args[1])
dev_list = usb_list() backend_vm = qvm_collection.get_vm_by_name(args[1].split(":")[0])
if backend_vm is None:
parser.error("No such VM: {}".format(args[1].split(":")[0]))
dev_list = usb_list(vm=backend_vm)
if not args[1] in dev_list.keys(): if not args[1] in dev_list.keys():
parser.error ("Invalid device name: %s" % args[1]) parser.error("Invalid device name: %s" % args[1])
dev = dev_list[args[1]] dev = dev_list[args[1]]
backend_vm = qvm_collection.get_vm_by_name(dev['vm'])
assert backend_vm is not None assert backend_vm is not None
kwargs = {} kwargs = {}
@ -106,14 +108,14 @@ def main():
# kwargs['frontend'] = options.frontend # kwargs['frontend'] = options.frontend
kwargs['auto_detach'] = options.auto_detach kwargs['auto_detach'] = options.auto_detach
try: try:
usb_attach(vm, backend_vm, dev['device'], **kwargs) usb_attach(vm, dev, **kwargs)
except QubesException as e: except QubesException as e:
print >> sys.stderr, "ERROR: %s" % str(e) print >> sys.stderr, "ERROR: %s" % str(e)
sys.exit(1) sys.exit(1)
elif options.do_detach: elif options.do_detach:
if (len (args) < 1): if (len (args) < 1):
parser.error ("You must provide device or vm name!") parser.error ("You must provide device or vm name!")
if len(args) > 1: if len(args) > 1:
parser.error ("Too many parameters") parser.error ("Too many parameters")
# Check if provided name is VM # Check if provided name is VM
vm = qvm_collection.get_vm_by_name(args[0]) vm = qvm_collection.get_vm_by_name(args[0])
@ -127,32 +129,34 @@ def main():
else: else:
# Maybe usbvm:device? # Maybe usbvm:device?
# FIXME: nasty copy-paste from attach code half a page above # FIXME: nasty copy-paste from attach code half a page above
# FIXME: here we assume that device is always in form "domain:dev", which can be changed in the future # FIXME: here we assume that device is always in form "domain:dev",
# which can be changed in the future
if args[0].find(":") < 0: if args[0].find(":") < 0:
parser.error ("Invalid device syntax: %s" % args[0]) parser.error("Invalid device syntax: %s" % args[0])
dev_list = usb_list() backend_vm = qvm_collection.get_vm_by_name(args[0].split(":")[0])
if backend_vm is None:
parser.error("No such VM: {}".format(args[0].split(":")[0]))
dev_list = usb_list(vm=backend_vm)
if not args[0] in dev_list.keys(): if not args[0] in dev_list.keys():
parser.error ("Invalid device name: %s" % args[0]) parser.error("Invalid device name: %s" % args[0])
dev = dev_list[args[0]] dev = dev_list[args[0]]
backend_vm = qvm_collection.get_vm_by_name(dev['vm']) attached_to = usb_check_attached(dev)
assert backend_vm is not None
attached_to = usb_check_attached('', backend_vm.xid, dev['device'])
if attached_to is None: if attached_to is None:
print >> sys.stderr, "WARNING: Device not connected to any VM" print >> sys.stderr, "WARNING: Device not connected to any VM"
exit(0) exit(0)
usb_detach(backend_vm, attached_to) vm = qvm_collection.get_vm_by_name(attached_to)
usb_detach(vm, dev)
else: else:
if len(args) > 0: if len(args) > 0:
parser.error ("Too many parameters") parser.error("Too many parameters")
# do_list # do_list
for dev in usb_list().values(): for dev in usb_list(qvmc=qvm_collection).values():
attached_to = usb_check_attached('', dev['xid'], dev['device']) attached_to = usb_check_attached(dev)
attached_to_str = "" attached_to_str = ""
if attached_to: if attached_to:
attached_to_str = " (attached to %s:%s)" % (attached_to['vm'], attached_to['frontend']) attached_to_str = " (attached to %s)" % (attached_to)
print "%s\t%s%s (USBv%s)" % (dev['name'], dev['desc'], attached_to_str, dev['usb_ver']) print "%s\t%s%s" % (dev['name'], dev['desc'], attached_to_str)
exit (0) exit (0)
main() main()