Merge branch 'master' into core3-devel

Remains to be fixed:
  88cb62fc
  d2640b51
  958c2926
This commit is contained in:
Wojtek Porczyk 2016-06-13 19:03:46 +02:00
commit 6a10daf7be
21 changed files with 1020 additions and 313 deletions

View File

@ -1,9 +1,16 @@
sudo: required
dist: trusty
language: python
python:
- '2.7'
install: pip install -r ci/requirements.txt
install:
- pip install -r ci/requirements.txt
- git clone https://github.com/QubesOS/qubes-builder ~/qubes-builder
# debootstrap in trusty is old...
before_script: sudo ln -s sid /usr/share/debootstrap/scripts/stretch
script:
- PYTHONPATH=test-packages pylint --rcfile=ci/pylintrc qubes
- ./run-tests --no-syslog
# vim: ts=2 sts=2 sw=2 et
- ~/qubes-builder/scripts/travis-build
env:
- DIST_DOM0=fc23 USE_QUBES_REPO_VERSION=3.2 USE_QUBES_REPO_TESTING=1

View File

@ -69,6 +69,11 @@ endif
mkdir -p $(DESTDIR)/usr/libexec/qubes
cp qubes-rpc-policy/qubes.FeaturesRequest.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.FeaturesRequest
cp qubes-rpc-policy/qubes.Filecopy.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.Filecopy
cp qubes-rpc-policy/qubes.OpenInVM.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.OpenInVM
cp qubes-rpc-policy/qubes.OpenURL.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.OpenURL
cp qubes-rpc-policy/qubes.VMShell.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.VMShell
cp qubes-rpc-policy/qubes.NotifyUpdates.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.NotifyUpdates
cp qubes-rpc-policy/qubes.NotifyTools.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.NotifyTools
cp qubes-rpc-policy/qubes.GetImageRGBA.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.GetImageRGBA
cp qubes-rpc-policy/qubes.GetRandomizedTime.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.GetRandomizedTime
cp qubes-rpc-policy/qubes.NotifyTools.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.NotifyTools

View File

@ -26,6 +26,7 @@ import datetime
import base64
import hashlib
import logging
import grp
import lxml.etree
import os
import os.path
@ -37,6 +38,7 @@ import time
import uuid
import xml.parsers.expat
import signal
import pwd
from qubes import qmemman
from qubes import qmemman_algo
import libvirt
@ -134,9 +136,10 @@ class QubesVm(object):
eval(value) if value.find("[") >= 0 else
eval("[" + value + "]") },
"pci_strictreset": {"default": True},
"pci_e820_host": {"default": True},
# Internal VM (not shown in qubes-manager, doesn't create appmenus entries
"internal": { "default": False, 'attr': '_internal' },
"vcpus": { "default": None },
"vcpus": { "default": 2 },
"uses_default_kernel": { "default": True, 'order': 30 },
"uses_default_kernelopts": { "default": True, 'order': 30 },
"kernel": {
@ -327,11 +330,6 @@ class QubesVm(object):
if self.maxmem > self.memory * 10:
self.maxmem = self.memory * 10
# By default allow use all VCPUs
if self.vcpus is None and not vmm.offline_mode:
qubes_host = QubesHost()
self.vcpus = qubes_host.no_cpus
# Always set if meminfo-writer should be active or not
if 'meminfo-writer' not in self.services:
self.services['meminfo-writer'] = not (len(self.pcidevs) > 0)
@ -1194,6 +1192,7 @@ class QubesVm(object):
# If dynamic memory management disabled, set maxmem=mem
args['maxmem'] = args['mem']
args['vcpus'] = str(self.vcpus)
args['features'] = ''
if self.netvm is not None:
args['ip'] = self.ip
args['mac'] = self.mac
@ -1218,6 +1217,8 @@ class QubesVm(object):
args['network_end'] = '-->'
args['no_network_begin'] = ''
args['no_network_end'] = ''
if len(self.pcidevs) and self.pci_e820_host:
args['features'] = '<xen><e820_host state=\'on\'/></xen>'
args.update(self.storage.get_config_params())
if hasattr(self, 'kernelopts'):
args['kernelopts'] = self.kernelopts
@ -1412,6 +1413,11 @@ class QubesVm(object):
e.get_error_code())
raise
if os.path.exists("/etc/systemd/system/multi-user.target.wants/qubes-vm@" + self.name + ".service"):
subprocess.call(["sudo", "systemctl", "-q", "disable","qubes-vm@" + self.name + ".service"])
if retcode != 0:
raise QubesException("Failed to delete autostart entry for VM")
self.storage.remove_from_disk()
def write_firewall_conf(self, conf):
@ -1752,7 +1758,15 @@ class QubesVm(object):
if verbose:
print >> sys.stderr, "--> Starting Qubes GUId..."
guid_cmd = [system_path["qubes_guid_path"],
guid_cmd = []
if os.getuid() == 0:
# try to always have guid running as normal user, otherwise
# clipboard file may be created as root and other permission
# problems
qubes_group = grp.getgrnam('qubes')
guid_cmd = ['runuser', '-u', qubes_group.gr_mem[0], '--']
guid_cmd += [system_path["qubes_guid_path"],
"-d", str(self.xid), "-N", self.name,
"-c", self.label.color,
"-i", self.label.icon_path,
@ -1763,6 +1777,33 @@ class QubesVm(object):
guid_cmd += ['-v', '-v']
elif not verbose:
guid_cmd += ['-q']
# Avoid using environment variables for checking the current session,
# because this script may be called with cleared env (like with sudo).
if subprocess.check_output(
['xprop', '-root', '-notype', 'KDE_SESSION_VERSION']) == \
'KDE_SESSION_VERSION = 5\n':
# native decoration plugins is used, so adjust window properties
# accordingly
guid_cmd += ['-T'] # prefix window titles with VM name
# get owner of X11 session
session_owner = None
for line in subprocess.check_output(['xhost']).splitlines():
if line == 'SI:localuser:root':
pass
elif line.startswith('SI:localuser:'):
session_owner = line.split(":")[2]
if session_owner is not None:
data_dir = os.path.expanduser(
'~{}/.local/share'.format(session_owner))
else:
# fallback to current user
data_dir = os.path.expanduser('~/.local/share')
guid_cmd += ['-p',
'_KDE_NET_WM_COLOR_SCHEME=s:{}'.format(
os.path.join(data_dir,
'qubes-kde', self.label.name + '.colors'))]
retcode = subprocess.call (guid_cmd)
if (retcode != 0) :
raise QubesException("Cannot start qubes-guid!")
@ -1791,13 +1832,21 @@ class QubesVm(object):
self.log.debug('start_qrexec_daemon()')
if verbose:
print >> sys.stderr, "--> Starting the qrexec daemon..."
qrexec = []
if os.getuid() == 0:
# try to always have qrexec running as normal user, otherwise
# many qrexec services would need to deal with root/user
# permission problems
qubes_group = grp.getgrnam('qubes')
qrexec = ['runuser', '-u', qubes_group.gr_mem[0], '--']
qrexec += ['env', 'QREXEC_STARTUP_TIMEOUT=' + str(self.qrexec_timeout),
system_path["qrexec_daemon_path"]]
qrexec_args = [str(self.xid), self.name, self.default_user]
if not verbose:
qrexec_args.insert(0, "-q")
qrexec_env = os.environ
qrexec_env['QREXEC_STARTUP_TIMEOUT'] = str(self.qrexec_timeout)
retcode = subprocess.call ([system_path["qrexec_daemon_path"]] +
qrexec_args, env=qrexec_env)
retcode = subprocess.call(qrexec + qrexec_args)
if (retcode != 0) :
raise OSError ("Cannot execute qrexec-daemon!")
@ -1826,10 +1875,19 @@ class QubesVm(object):
# force connection to a new daemon
self._qdb_connection = None
retcode = subprocess.call ([
qubesdb_cmd = []
if os.getuid() == 0:
# try to always have qubesdb running as normal user, otherwise
# killing it at VM restart (see above) will always fail
qubes_group = grp.getgrnam('qubes')
qubesdb_cmd = ['runuser', '-u', qubes_group.gr_mem[0], '--']
qubesdb_cmd += [
system_path["qubesdb_daemon_path"],
str(self.xid),
self.name])
self.name]
retcode = subprocess.call (qubesdb_cmd)
if retcode != 0:
raise OSError("ERROR: Cannot execute qubesdb-daemon!")

View File

@ -91,7 +91,6 @@ qubes_max_netid = 254
##########################################
def register_qubes_vm_class(vm_class):
QubesVmClasses[vm_class.__name__] = vm_class
# register class as local for this module - to make it easy to import from

View File

@ -25,6 +25,7 @@
from __future__ import absolute_import
import string
import errno
from lxml import etree
from lxml.etree import ElementTree, SubElement, Element
@ -423,263 +424,200 @@ def block_detach_all(vm):
usb_ver_re = re.compile(r"^(1|2)$")
usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)?$")
usb_port_re = re.compile(r"^$|^[0-9]+-[0-9]+(\.[0-9]+)?$")
usb_desc_re = re.compile(r"^[ -~]{1,255}$")
# should match valid VM name
usb_connected_to_re = re.compile(r"^[a-zA-Z][a-zA-Z0-9_.-]*$")
def usb_setup(backend_vm_xid, vm_xid, devid, usb_ver):
"""
Attach frontend to the backend.
backend_vm_xid - id of the backend domain
vm_xid - id of the frontend domain
devid - id of the pvusb controller
"""
num_ports = 8
trans = vmm.xs.transaction_start()
be_path = "/local/domain/%d/backend/vusb/%d/%d" % (backend_vm_xid, vm_xid, devid)
fe_path = "/local/domain/%d/device/vusb/%d" % (vm_xid, devid)
be_perm = [{'dom': backend_vm_xid}, {'dom': vm_xid, 'read': True} ]
fe_perm = [{'dom': vm_xid}, {'dom': backend_vm_xid, 'read': True} ]
# Create directories and set permissions
vmm.xs.write(trans, be_path, "")
vmm.xs.set_permissions(trans, be_path, be_perm)
vmm.xs.write(trans, fe_path, "")
vmm.xs.set_permissions(trans, fe_path, fe_perm)
# Write backend information into the location that frontend looks for
vmm.xs.write(trans, "%s/backend-id" % fe_path, str(backend_vm_xid))
vmm.xs.write(trans, "%s/backend" % fe_path, be_path)
# Write frontend information into the location that backend looks for
vmm.xs.write(trans, "%s/frontend-id" % be_path, str(vm_xid))
vmm.xs.write(trans, "%s/frontend" % be_path, fe_path)
# Write USB Spec version field.
vmm.xs.write(trans, "%s/usb-ver" % be_path, usb_ver)
# Write virtual root hub field.
vmm.xs.write(trans, "%s/num-ports" % be_path, str(num_ports))
for port in range(1, num_ports+1):
# Set all port to disconnected state
vmm.xs.write(trans, "%s/port/%d" % (be_path, port), "")
# Set state to XenbusStateInitialising
vmm.xs.write(trans, "%s/state" % fe_path, "1")
vmm.xs.write(trans, "%s/state" % be_path, "1")
vmm.xs.write(trans, "%s/online" % be_path, "1")
vmm.xs.transaction_end(trans)
def usb_decode_device_from_xs(xs_encoded_device):
def usb_decode_device_from_qdb(qdb_encoded_device):
""" recover actual device name (xenstore doesn't allow dot in key names, so it was translated to underscore) """
return xs_encoded_device.replace('_', '.')
return qdb_encoded_device.replace('_', '.')
def usb_encode_device_for_xs(device):
def usb_encode_device_for_qdb(device):
""" encode actual device name (xenstore doesn't allow dot in key names, so translated it into underscore) """
return device.replace('.', '_')
def usb_list():
def usb_list_vm(qvmc, vm):
if not vm.is_running():
return {}
try:
untrusted_devices = vm.qdb.multiread('/qubes-usb-devices/')
except Error:
vm.refresh()
return {}
def get_dev_item(dev, item):
return untrusted_devices.get(
'/qubes-usb-devices/%s/%s' % (dev, item),
None)
devices = {}
untrusted_devices_names = list(set(map(lambda x: x.split("/")[2],
untrusted_devices.keys())))
for untrusted_dev_name in untrusted_devices_names:
if usb_device_re.match(untrusted_dev_name):
dev_name = untrusted_dev_name
untrusted_device_desc = get_dev_item(dev_name, 'desc')
if not usb_desc_re.match(untrusted_device_desc):
print >> sys.stderr, "Invalid %s device desc in VM '%s'" % (
dev_name, vm.name)
continue
device_desc = untrusted_device_desc
untrusted_connected_to = get_dev_item(dev_name, 'connected-to')
if untrusted_connected_to:
if not usb_connected_to_re.match(untrusted_connected_to):
print >>sys.stderr, \
"Invalid %s device 'connected-to' in VM '%s'" % (
dev_name, vm.name)
continue
connected_to = qvmc.get_vm_by_name(untrusted_connected_to)
if connected_to is None:
print >>sys.stderr, \
"Device {} appears to be connected to {}, " \
"but such VM doesn't exist".format(
dev_name, untrusted_connected_to)
else:
connected_to = None
device = usb_decode_device_from_qdb(dev_name)
full_name = vm.name + ':' + device
devices[full_name] = {
'vm': vm,
'device': device,
'qdb_path': '/qubes-usb-devices/' + dev_name,
'name': full_name,
'desc': device_desc,
'connected-to': connected_to,
}
return devices
def usb_list(qvmc, vm=None):
"""
Returns a dictionary of USB devices (for PVUSB backends running in all VM).
The dictionary is keyed by 'name' (see below), each element is a dictionary itself:
vm = name of the backend domain
xid = xid of the backend domain
device = <frontend device number>-<frontend port number>
name = <name of backend domain>:<frontend device number>-<frontend port number>
vm = backend domain object
device = device ID
name = <backend-vm>:<device>
desc = description
"""
# FIXME: any better idea of desc_re?
desc_re = re.compile(r"^.{1,255}$")
if vm is not None:
if not vm.is_running():
return {}
else:
vm_list = [vm]
else:
vm_list = qvmc.values()
devices_list = {}
xs_trans = vmm.xs.transaction_start()
vm_list = vmm.xs.ls(xs_trans, '/local/domain')
for xid in vm_list:
vm_name = vmm.xs.read(xs_trans, '/local/domain/%s/name' % xid)
vm_devices = vmm.xs.ls(xs_trans, '/local/domain/%s/qubes-usb-devices' % xid)
if vm_devices is None:
continue
# when listing devices in xenstore we get encoded names
for xs_encoded_device in vm_devices:
# Sanitize device id
if not usb_device_re.match(xs_encoded_device):
print >> sys.stderr, "Invalid device id in backend VM '%s'" % vm_name
continue
device = usb_decode_device_from_xs(xs_encoded_device)
device_desc = vmm.xs.read(xs_trans, '/local/domain/%s/qubes-usb-devices/%s/desc' % (xid, xs_encoded_device))
if not desc_re.match(device_desc):
print >> sys.stderr, "Invalid %s device desc in VM '%s'" % (device, vm_name)
continue
visible_name = "%s:%s" % (vm_name, device)
# grab version
usb_ver = vmm.xs.read(xs_trans, '/local/domain/%s/qubes-usb-devices/%s/usb-ver' % (xid, xs_encoded_device))
if usb_ver is None or not usb_ver_re.match(usb_ver):
print >> sys.stderr, "Invalid %s device USB version in VM '%s'" % (device, vm_name)
continue
devices_list[visible_name] = {"name": visible_name, "xid":int(xid),
"vm": vm_name, "device":device,
"desc":device_desc,
"usb_ver":usb_ver}
vmm.xs.transaction_end(xs_trans)
for vm in vm_list:
devices_list.update(usb_list_vm(qvmc, vm))
return devices_list
def usb_check_attached(xs_trans, backend_vm, device):
"""
Checks if the given device in the given backend attached to any frontend.
Parameters:
backend_vm - xid of the backend domain
device - device name in the backend domain
Returns None or a dictionary:
vm - the name of the frontend domain
xid - xid of the frontend domain
frontend - frontend device number FIXME
devid - frontend port number FIXME
"""
# sample xs content: /local/domain/0/backend/vusb/4/0/port/1 = "7-5"
attached_dev = None
vms = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb' % backend_vm)
if vms is None:
return None
for vm in vms:
if not vm.isdigit():
print >> sys.stderr, "Invalid VM id"
continue
frontend_devs = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb/%s' % (backend_vm, vm))
if frontend_devs is None:
continue
for frontend_dev in frontend_devs:
if not frontend_dev.isdigit():
print >> sys.stderr, "Invalid frontend in VM %s" % vm
continue
ports = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb/%s/%s/port' % (backend_vm, vm, frontend_dev))
if ports is None:
continue
for port in ports:
# FIXME: refactor, see similar loop in usb_find_unused_frontend(), use usb_list() instead?
if not port.isdigit():
print >> sys.stderr, "Invalid port in VM %s frontend %s" % (vm, frontend)
continue
dev = vmm.xs.read(xs_trans, '/local/domain/%d/backend/vusb/%s/%s/port/%s' % (backend_vm, vm, frontend_dev, port))
if dev == "":
continue
# Sanitize device id
if not usb_port_re.match(dev):
print >> sys.stderr, "Invalid device id in backend VM %d @ %s/%s/port/%s" % \
(backend_vm, vm, frontend_dev, port)
continue
if dev == device:
frontend = "%s-%s" % (frontend_dev, port)
#TODO
vm_name = xl_ctx.domid_to_name(int(vm))
if vm_name is None:
# FIXME: should we wipe references to frontends running on nonexistent VMs?
continue
attached_dev = {"xid":int(vm), "frontend": frontend, "devid": device, "vm": vm_name}
break
return attached_dev
#def usb_check_frontend_busy(vm, front_dev, port):
# devport = frontend.split("-")
# if len(devport) != 2:
# raise QubesException("Malformed frontend syntax, must be in device-port format")
# # FIXME:
# # return vmm.xs.read('', '/local/domain/%d/device/vusb/%d/state' % (vm.xid, frontend)) == '4'
# return False
def usb_find_unused_frontend(xs_trans, backend_vm_xid, vm_xid, usb_ver):
"""
Find an unused frontend/port to link the given backend with the given frontend.
Creates new frontend if needed.
Returns frontend specification in <device>-<port> format.
"""
# This variable holds an index of last frontend scanned by the loop below.
# If nothing found, this value will be used to derive the index of a new frontend.
last_frontend_dev = -1
frontend_devs = vmm.xs.ls(xs_trans, "/local/domain/%d/device/vusb" % vm_xid)
if frontend_devs is not None:
for frontend_dev in frontend_devs:
if not frontend_dev.isdigit():
print >> sys.stderr, "Invalid frontend_dev in VM %d" % vm_xid
continue
frontend_dev = int(frontend_dev)
fe_path = "/local/domain/%d/device/vusb/%d" % (vm_xid, frontend_dev)
if vmm.xs.read(xs_trans, "%s/backend-id" % fe_path) == str(backend_vm_xid):
if vmm.xs.read(xs_trans, '/local/domain/%d/backend/vusb/%d/%d/usb-ver' % (backend_vm_xid, vm_xid, frontend_dev)) != usb_ver:
last_frontend_dev = frontend_dev
continue
# here: found an existing frontend already connected to right backend using an appropriate USB version
ports = vmm.xs.ls(xs_trans, '/local/domain/%d/backend/vusb/%d/%d/port' % (backend_vm_xid, vm_xid, frontend_dev))
if ports is None:
print >> sys.stderr, "No ports in VM %d frontend_dev %d?" % (vm_xid, frontend_dev)
last_frontend_dev = frontend_dev
continue
for port in ports:
# FIXME: refactor, see similar loop in usb_check_attached(), use usb_list() instead?
if not port.isdigit():
print >> sys.stderr, "Invalid port in VM %d frontend_dev %d" % (vm_xid, frontend_dev)
continue
port = int(port)
dev = vmm.xs.read(xs_trans, '/local/domain/%d/backend/vusb/%d/%s/port/%s' % (backend_vm_xid, vm_xid, frontend_dev, port))
# Sanitize device id
if not usb_port_re.match(dev):
print >> sys.stderr, "Invalid device id in backend VM %d @ %d/%d/port/%d" % \
(backend_vm_xid, vm_xid, frontend_dev, port)
continue
if dev == "":
return '%d-%d' % (frontend_dev, port)
last_frontend_dev = frontend_dev
# create a new frontend_dev and link it to the backend
frontend_dev = last_frontend_dev + 1
usb_setup(backend_vm_xid, vm_xid, frontend_dev, usb_ver)
return '%d-%d' % (frontend_dev, 1)
def usb_attach(vm, backend_vm, device, frontend=None, auto_detach=False, wait=True):
device_attach_check(vm, backend_vm, device, frontend)
xs_trans = vmm.xs.transaction_start()
xs_encoded_device = usb_encode_device_for_xs(device)
usb_ver = vmm.xs.read(xs_trans, '/local/domain/%s/qubes-usb-devices/%s/usb-ver' % (backend_vm.xid, xs_encoded_device))
if usb_ver is None or not usb_ver_re.match(usb_ver):
vmm.xs.transaction_end(xs_trans)
raise QubesException("Invalid %s device USB version in VM '%s'" % (device, backend_vm.name))
if frontend is None:
frontend = usb_find_unused_frontend(xs_trans, backend_vm.xid, vm.xid, usb_ver)
def usb_check_attached(qvmc, device):
"""Reread device attachment status"""
vm = device['vm']
untrusted_connected_to = vm.qdb.read(
'{}/connected-to'.format(device['qdb_path']))
if untrusted_connected_to:
if not usb_connected_to_re.match(untrusted_connected_to):
raise QubesException(
"Invalid %s device 'connected-to' in VM '%s'" % (
device['device'], vm.name))
connected_to = qvmc.get_vm_by_name(untrusted_connected_to)
if connected_to is None:
print >>sys.stderr, \
"Device {} appears to be connected to {}, " \
"but such VM doesn't exist".format(
device['device'], untrusted_connected_to)
else:
# Check if any device attached at this frontend
#if usb_check_frontend_busy(vm, frontend):
# raise QubesException("Frontend %s busy in VM %s, detach it first" % (frontend, vm.name))
vmm.xs.transaction_end(xs_trans)
raise NotImplementedError("Explicit USB frontend specification is not implemented yet")
connected_to = None
return connected_to
# Check if this device is attached to some domain
attached_vm = usb_check_attached(xs_trans, backend_vm.xid, device)
vmm.xs.transaction_end(xs_trans)
def usb_attach(qvmc, vm, device, auto_detach=False, wait=True):
if not vm.is_running():
raise QubesException("VM {} not running".format(vm.name))
if attached_vm:
if not device['vm'].is_running():
raise QubesException("VM {} not running".format(device['vm'].name))
connected_to = usb_check_attached(qvmc, device)
if connected_to:
if auto_detach:
usb_detach(backend_vm, attached_vm)
usb_detach(qvmc, device)
else:
raise QubesException("Device %s from %s already connected to VM %s as %s" % (device, backend_vm.name, attached_vm['vm'], attached_vm['frontend']))
raise QubesException("Device {} already connected, to {}".format(
device['name'], connected_to
))
# Run helper script
xl_cmd = [ '/usr/lib/qubes/xl-qvm-usb-attach.py', str(vm.xid), device, frontend, str(backend_vm.xid) ]
subprocess.check_call(xl_cmd)
# set qrexec policy to allow this device
policy_line = '{} {} allow\n'.format(vm.name, device['vm'].name)
policy_path = '/etc/qubes-rpc/policy/qubes.USB+{}'.format(device['device'])
policy_exists = os.path.exists(policy_path)
if not policy_exists:
try:
fd = os.open(policy_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
with os.fdopen(fd, 'w') as f:
f.write(policy_line)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
else:
with open(policy_path, 'r+') as f:
policy = f.readlines()
policy.insert(0, policy_line)
f.truncate(0)
f.seek(0)
f.write(''.join(policy))
try:
# and actual attach
p = vm.run_service('qubes.USBAttach', passio_popen=True, user='root')
(stdout, stderr) = p.communicate(
'{} {}\n'.format(device['vm'].name, device['device']))
if p.returncode != 0:
# TODO: sanitize and include stdout
raise QubesException('Device attach failed')
finally:
# FIXME: there is a race condition here - some other process might
# modify the file in the meantime. This may result in unexpected
# denials, but will not allow too much
if not policy_exists:
os.unlink(policy_path)
else:
with open(policy_path, 'r+') as f:
policy = f.readlines()
policy.remove('{} {} allow\n'.format(vm.name, device['vm'].name))
f.truncate(0)
f.seek(0)
f.write(''.join(policy))
def usb_detach(backend_vm, attachment):
xl_cmd = [ '/usr/lib/qubes/xl-qvm-usb-detach.py', str(attachment['xid']), attachment['devid'], attachment['frontend'], str(backend_vm.xid) ]
subprocess.check_call(xl_cmd)
def usb_detach(qvmc, vm, device):
connected_to = usb_check_attached(qvmc, device)
# detect race conditions; there is still race here, but much smaller
if connected_to is None or connected_to.qid != vm.qid:
raise QubesException(
"Device {} not connected to VM {}".format(
device['name'], vm.name))
def usb_detach_all(vm):
raise NotImplementedError("Detaching all devices from a given VM is not implemented yet")
p = device['vm'].run_service('qubes.USBDetach', passio_popen=True,
user='root')
(stdout, stderr) = p.communicate(
'{}\n'.format(device['device']))
if p.returncode != 0:
# TODO: sanitize and include stdout
raise QubesException('Device detach failed')
def usb_detach_all(qvmc, vm):
for dev in usb_list(qvmc).values():
connected_to = dev['connected-to']
if connected_to is not None and connected_to.qid == vm.qid:
usb_detach(qvmc, connected_to, dev)
####### QubesWatch ######

View File

@ -67,6 +67,14 @@ pci_strictreset
cases it could make sense - for example when the VM to which it is assigned
is trusted one, or is running all the time.
pci_e820_host
Accepted values: ``True``, ``False``
Give VM with PCI devices a memory map (e820) of the host. This is
required for some devices to properly resolve conflicts in address space.
This option is enabled by default for VMs with PCI devices and have no
effect for VMs without devices.
label
Accepted values: ``red``, ``orange``, ``yellow``, ``green``, ``gray``,
``blue``, ``purple``, ``black``

View File

@ -3,5 +3,8 @@
## Please use a single # to start your custom comments
sys-whonix anon-whonix allow
whonix-gw anon-whonix allow
whonix-ws anon-whonix allow
$anyvm $dispvm allow
$anyvm $anyvm ask

View File

@ -0,0 +1,10 @@
## Note that policy parsing stops at the first match,
## so adding anything below "$anyvm $anyvm action" line will have no effect
## Please use a single # to start your custom comments
sys-whonix anon-whonix allow
whonix-gw anon-whonix allow
whonix-ws anon-whonix allow
$anyvm $dispvm allow
$anyvm $anyvm ask

View File

@ -38,8 +38,7 @@ class TC_00_Dom0UpgradeMixin(qubes.tests.SystemTestsMixin):
Tests for downloading dom0 updates using VMs based on different templates
"""
pkg_name = 'qubes-test-pkg'
dom0_update_common_opts = ['--disablerepo=*', '--enablerepo=test',
'--setopt=test.copy_local=1']
dom0_update_common_opts = ['--disablerepo=*', '--enablerepo=test']
update_flag_path = '/var/lib/qubes/updates/dom0-updates-available'
@classmethod
@ -84,7 +83,7 @@ Expire-Date: 0
p.stdin.write('''
[test]
name = Test
baseurl = file:///tmp/repo
baseurl = http://localhost:8080/
enabled = 1
''')
p.stdin.close()
@ -114,6 +113,7 @@ enabled = 1
subprocess.check_call(['sudo', 'rpm', '--import',
os.path.join(self.tmpdir, 'pubkey.asc')])
self.updatevm.start()
self.repo_running = False
def tearDown(self):
super(TC_00_Dom0UpgradeMixin, self).tearDown()
@ -180,6 +180,13 @@ Test package
elif retcode != 0:
self.skipTest("createrepo failed with code {}, cannot perform the "
"test".format(retcode))
self.start_repo()
def start_repo(self):
if not self.repo_running:
self.updatevm.run("cd /tmp/repo &&"
"python -m SimpleHTTPServer 8080")
self.repo_running = True
def test_000_update(self):
"""Dom0 update tests

View File

@ -351,7 +351,7 @@ def main():
for name in args.names:
suite.addTests(
[test for test in list_test_cases(alltests)
if (str(test)).startswith(name)])
if str(test).startswith(name)])
else:
suite.addTests(loader.loadTestsFromName('qubes.tests'))

228
qvm-tools/qubes-bug-report Executable file
View File

@ -0,0 +1,228 @@
#!/usr/bin/env python3
import subprocess
import argparse
import time
import sys
import os
from os.path import expanduser
#the term qube refers to a qubes vm
def is_program_installed_in_qube( program, qube_name ):
is_installed = True
try:
command = 'command -v ' + program
subprocess.check_call([ 'qvm-run', qube_name, '--pass-io', '--no-color-output', command ], stdout = open( os.devnull, 'w' ) )
except subprocess.CalledProcessError:
is_installed = False
return is_installed
#this function requires virsh
#domstate only works for Xen domU (guests)
def is_qube_running( qube_name ):
runs = False
out = subprocess.check_output([ "virsh", "-c", "xen:///", "domstate", qube_name ])
out = out.decode('utf-8').replace('\n', '')
if 'running' == out:
runs = True
return runs
def get_qube_packages( qube_name ):
content = "## Qubes Packages\n\n"
#a qube can have more than one package manager installed (only one is functional)
pkg_cmd = { 'dpkg' : 'dpkg -l qubes-*', 'pacman' : 'pacman -Qs qubes', 'rpm' : 'rpm -qa qubes-*' }
if is_qube_running( qube_name ):
for package_manager in pkg_cmd.keys():
if is_program_installed_in_qube( package_manager, qube_name ):
pkg_list_cmd = pkg_cmd[package_manager]
try:
out = subprocess.check_output([ 'qvm-run', qube_name, '--pass-io', '--no-color-output', pkg_list_cmd ], stderr = open( os.devnull, 'w' ) )
out = out.decode('utf-8')
content += create_heading( ( "Package Manager: " + package_manager ), 3 )
content += wrap_code( out )
except subprocess.CalledProcessError:
pass #do nothing
else:
content += "**No packages listed, because Qube " + qube_name + " was not running**\n\n"
return content
def get_dom0_packages():
content = create_heading( "Dom0 Packages", 2 )
out = subprocess.check_output([ "rpm", "-qa", "qubes-*" ])
out = out.decode('utf-8')
content += wrap_code( out )
return content
def wrap_code( text ):
code = "~~~\n" + text + "~~~\n\n"
return code
def create_heading( heading, level ):
heading = heading + "\n\n"
if 1 == level:
heading = "# " + heading
elif 2 == level:
heading = "## " + heading
else:
heading = "### " + heading
return heading
def get_log_file_content( qube_name ):
content = "## Log Files\n\n"
qubes_os_log = "/var/log/qubes/"
ext = ".log"
log_prefix = [ "guid", "pacat", "qubesdb", "qrexec" ]
#constructs for each log file prefix the full path and reads the log file
for prefix in log_prefix:
log_file = prefix + "." + qube_name + ext
content += create_heading( ( "Log File: " + log_file ), 3 )
content += wrap_code( get_log_file( qubes_os_log + log_file ) )
return content
def get_qube_prefs( qube_name ):
qube_prefs = subprocess.check_output([ "qvm-prefs", qube_name ])
qube_prefs = qube_prefs.decode('utf-8')
content = create_heading( "Qube Prefs", 2 )
content += wrap_code( qube_prefs )
return content
def report( qube_name ):
template = '''{title}
{content}
'''
title_text = create_heading( "Bug report: " + qube_name, 1 )
content_text = get_qube_prefs( qube_name )
content_text += get_dom0_packages()
content_text += get_log_file_content( qube_name )
content_text += get_qube_packages( qube_name )
report = template.format( title=title_text, content=content_text )
return report
def write_report( report_content, file_path ):
with open( file_path, 'w' ) as report_file:
report_file.write( report_content )
def send_report( dest_qube, file_path):
#if dest_qube is not running -> start dest_qube
if not is_qube_running( dest_qube ):
try:
subprocess.check_call([ "qvm-start", dest_qube ])
except subprocess.CalledProcessError:
print( "Error while starting: " + dest_qube, file = sys.stderr )
try:
subprocess.check_call([ "qvm-move-to-vm", dest_qube, file_path ])
except subprocess.calledProcessError:
print( "Moving file bug-report failed", file = sys.stderr )
def get_log_file( log_file ):
data = ""
#open and close the file
with open( log_file ) as log:
data = log.read()
return data
def qube_exist( qube_name ):
exists = True
try:
#calls: qvm-check --quiet vmanme
subprocess.check_call([ "qvm-check", "--quiet", qube_name ])
except subprocess.CalledProcessError:
exists = False
return exists
def get_report_file_path( qube_name ):
#exapanduser -> works corss platform
home_dir = expanduser("~")
date = time.strftime("%H%M%S")
file_path = home_dir + "/" + qube_name + "_bug-report_" + date + ".md"
return file_path
def main():
parser = argparse.ArgumentParser( description = 'Generates a bug report for a specific qube (Qubes VM)' )
parser.add_argument( 'vmname', metavar = '<vmanme>', type = str )
parser.add_argument( '-d', '--dest-vm', metavar = '<dest-vm>', dest = "destvm", type = str, default = 'dom0', help = "send the report to the destination VM" )
parser.add_argument( '-p', '--print-report', action = 'store_const', const = "print_report", required = False, help = "prints the report without writing it or sending it to a destination VM" )
args = parser.parse_args()
if qube_exist( args.vmname ):
if qube_exist( args.destvm ):
#get the report
report_content = report( args.vmname )
#if -p or --print-report is an argument print the report
if args.print_report:
print( report_content )
#write and send the report
else:
file_path = get_report_file_path( args.vmname )
write_report( report_content, file_path )
print( "Report written to: " + file_path )
if 'dom0' != args.destvm:
send_report( args.destvm, file_path )
print( "Report send to VM: " + args.destvm )
exit(0)
else:
print ( "Destination VM does not exist" )
exit(1)
else:
print( "VM does not exist" )
exit(1)
#calls the main function -> program start point
main()

View File

@ -103,7 +103,7 @@ XL_VTX=`cat $TEMP_DIR/xl-info |grep xen_caps | grep hvm`
XL_VTD=`cat $TEMP_DIR/xl-info |grep virt_caps |grep hvm_directio`
PCRS=`find /sys/devices/ -name pcrs`
FILENAME="Qubes-HCL-${BRAND//+([^[:alnum:]])/_}-${PRODUCT//+([^[:alnum:]])/_}-$DATE"
FILENAME="Qubes-HCL-${BRAND//[^[:alnum:]]/_}-${PRODUCT//[^[:alnum:]]/_}-$DATE"
if [[ $XL_VTX ]]
then

View File

@ -72,6 +72,10 @@ def main():
dest="compress_filter", default=False,
help="Compress the backup using specified filter "
"program (default: gzip)")
parser.add_option("--tmpdir", action="store", dest="tmpdir", default=None,
help="Custom temporary directory (if you have at least "
"1GB free RAM in dom0, use of /tmp is advised) ("
"default: /var/tmp)")
parser.add_option ("--debug", action="store_true", dest="debug",
default=False, help="Enable (a lot of) debug output")
@ -187,6 +191,8 @@ def main():
kwargs['hmac_algorithm'] = options.hmac_algorithm
if options.crypto_algorithm:
kwargs['crypto_algorithm'] = options.crypto_algorithm
if options.tmpdir:
kwargs['tmpdir'] = options.tmpdir
try:
backup_do(base_backup_dir, files_to_backup, passphrase,

124
qvm-tools/qvm-top Executable file
View File

@ -0,0 +1,124 @@
#!/usr/bin/python2
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2010 Joanna Rutkowska <joanna@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
from qubes.qubes import QubesVmCollection
from qubes.qubes import QubesHost
from qubes.qubes import QubesException
from optparse import OptionParser
import sys
def main():
usage = "usage: %prog [options]"
parser = OptionParser (usage)
parser.add_option("--list", dest="list_top",
action="store_true", default=False,
help="n m : One line summary of top n vms with more than m cpu_time %")
(options, args) = parser.parse_args ()
qvm_collection = QubesVmCollection()
qvm_collection.lock_db_for_reading()
qvm_collection.load()
qvm_collection.unlock_db()
fields_to_display = ["name", "cpu", "mem"]
cpu_usages = None
qhost = QubesHost()
(measure_time, cpu_usages) = qhost.measure_cpu_usage(qvm_collection)
vms_list = [vm for vm in qvm_collection.values() if vm.is_running()]
vms_list = sorted(vms_list, key= lambda vm: 1-cpu_usages[vm.get_xid()]['cpu_usage'])
no_vms = len (vms_list)
vms_to_display = vms_list
if options.list_top:
any_shown = False
ndisp = 3
cputh = 0
if len(args) > 0:
ndisp = int(args[0])
if len(args) > 1:
cputh = int(args[1])
for vm in vms_to_display[:ndisp]:
cpu = cpu_usages[vm.get_xid()]['cpu_usage']
if cpu > cputh:
any_shown = True
sys.stdout.write("%d %s, " % (cpu, vm.name))
if any_shown:
sys.stdout.write(" ... | ")
totalMem = 0
dom0mem = 0
for vm in vms_to_display:
if not vm.name == "dom0":
totalMem += vm.get_mem()
else:
dom0mem = vm.get_mem()
totalMem /= 1024.0 * 1024.0
dom0mem /= 1024.0 * 1024.0
sys.stdout.write("%.1f G + %.1f G" % (totalMem, dom0mem))
return
max_width = { 'name': 0, 'cpu': 0, 'mem': 0 }
data_to_display = []
for vm in vms_to_display:
data_row = {}
data_row['name'] = vm.name
max_width['name'] = max(max_width['name'], len(data_row['name']))
data_row['cpu'] = "%.1f" % (cpu_usages[vm.get_xid()]['cpu_usage'])
max_width['cpu'] = max(max_width['cpu'], len(data_row['cpu']))
data_row['mem'] = "%d" % (vm.get_mem() / (1024.0))
max_width['mem'] = max(max_width['mem'], len(data_row['mem']))
data_to_display.append(data_row)
# Display the header
s = ""
for f in fields_to_display:
fmt="{{0:-^{0}}}-+".format(max_width[f] + 1)
s += fmt.format('-')
print s
s = ""
for f in fields_to_display:
fmt="{{0:>{0}}} |".format(max_width[f] + 1)
s += fmt.format(f)
print s
s = ""
for f in fields_to_display:
fmt="{{0:-^{0}}}-+".format(max_width[f] + 1)
s += fmt.format('-')
print s
# ... and the actual data
for row in data_to_display:
s = ""
for f in fields_to_display:
fmt="{{0:>{0}}} |".format(max_width[f] + 1)
s += fmt.format(row[f])
print s
main()

View File

@ -92,10 +92,21 @@ def main():
touch_dvm_savefile = is_dvm_up_to_date(tvm, dvm_tmpl)
print >> sys.stderr, "Creating temporary VM..."
fstrim_vm = qvm_collection.add_new_vm("QubesAppVm",
trim_vmname = "trim-{}".format(tvm_name[:31 - len('trim-')])
fstrim_vm = qvm_collection.get_vm_by_name(trim_vmname)
if fstrim_vm is not None:
if not fstrim_vm.internal:
print >>sys.stderr, \
"ERROR: VM '{}' already exists and is not marked as internal. " \
"Remove it manually."
fstrim_vm.remove_from_disk()
qvm_collection.pop(fstrim_vm.qid)
fstrim_vm = qvm_collection.add_new_vm(
"QubesAppVm",
template=tvm,
name="{}-fstrim".format(tvm_name),
name=trim_vmname,
netvm=None,
internal=True,
)
if not fstrim_vm:
print >> sys.stderr, "ERROR: Failed to create new VM"

View File

@ -27,8 +27,6 @@ from optparse import OptionParser
import sys
import os
pvusb_enable_flagfile = '/var/lib/qubes/pvusb-enable.flag'
def main():
usage = "usage: %prog -l [options]\n"\
"usage: %prog -a [options] <vm-name> <device-vm-name>:<device>\n"\
@ -49,24 +47,6 @@ def main():
(options, args) = parser.parse_args ()
if not os.path.exists(pvusb_enable_flagfile):
print >> sys.stderr, ""
print >> sys.stderr, "******* WARNING *** WARNING *** WARNING *** WARNING *******"
print >> sys.stderr, "*** ***"
print >> sys.stderr, "*** PVUSB passthrough kernel support is still unstable. ***"
print >> sys.stderr, "*** It can CRASH your VMs. ***"
print >> sys.stderr, "*** ***"
print >> sys.stderr, "***********************************************************"
print >> sys.stderr, ""
print >> sys.stderr, "To use it, you need install kernel from \"unstable\" repository"
print >> sys.stderr, "If you still want to enable it, type capital YES"
print >> sys.stderr, ""
prompt = raw_input ("Do you want enable PV USB support? ")
if prompt == "YES":
open(pvusb_enable_flagfile, "w").close()
else:
exit(1)
if hasattr(os, "geteuid") and os.geteuid() == 0:
if not options.force_root:
print >> sys.stderr, "*** Running this tool as root is strongly discouraged, this will lead you in permissions problems."
@ -78,7 +58,6 @@ def main():
print >> sys.stderr, "Only one of -l -a -d is allowed!"
exit (1)
if options.do_attach or options.do_detach:
qvm_collection = QubesVmCollection()
qvm_collection.lock_db_for_reading()
qvm_collection.load()
@ -91,14 +70,17 @@ def main():
if vm is None:
parser.error ("Invalid VM name: %s" % args[0])
# FIXME: here we assume that device is always in form "domain:dev", which can be changed in the future
# FIXME: here we assume that device is always in form "domain:dev",
# which can be changed in the future
if args[1].find(":") < 0:
parser.error("Invalid device syntax: %s" % args[1])
dev_list = usb_list()
backend_vm = qvm_collection.get_vm_by_name(args[1].split(":")[0])
if backend_vm is None:
parser.error("No such VM: {}".format(args[1].split(":")[0]))
dev_list = usb_list(qvm_collection, vm=backend_vm)
if not args[1] in dev_list.keys():
parser.error("Invalid device name: %s" % args[1])
dev = dev_list[args[1]]
backend_vm = qvm_collection.get_vm_by_name(dev['vm'])
assert backend_vm is not None
kwargs = {}
@ -106,7 +88,7 @@ def main():
# kwargs['frontend'] = options.frontend
kwargs['auto_detach'] = options.auto_detach
try:
usb_attach(vm, backend_vm, dev['device'], **kwargs)
usb_attach(qvm_collection, vm, dev, **kwargs)
except QubesException as e:
print >> sys.stderr, "ERROR: %s" % str(e)
sys.exit(1)
@ -123,36 +105,37 @@ def main():
# kwargs['frontend'] = options.frontend
# usb_detach(vm, **kwargs)
#else:
usb_detach_all(vm)
usb_detach_all(qvm_collection, vm)
else:
# Maybe usbvm:device?
# FIXME: nasty copy-paste from attach code half a page above
# FIXME: here we assume that device is always in form "domain:dev", which can be changed in the future
# FIXME: here we assume that device is always in form "domain:dev",
# which can be changed in the future
if args[0].find(":") < 0:
parser.error("Invalid device syntax: %s" % args[0])
dev_list = usb_list()
backend_vm = qvm_collection.get_vm_by_name(args[0].split(":")[0])
if backend_vm is None:
parser.error("No such VM: {}".format(args[0].split(":")[0]))
dev_list = usb_list(qvm_collection, vm=backend_vm)
if not args[0] in dev_list.keys():
parser.error("Invalid device name: %s" % args[0])
dev = dev_list[args[0]]
backend_vm = qvm_collection.get_vm_by_name(dev['vm'])
assert backend_vm is not None
attached_to = usb_check_attached('', backend_vm.xid, dev['device'])
attached_to = usb_check_attached(qvm_collection, dev)
if attached_to is None:
print >> sys.stderr, "WARNING: Device not connected to any VM"
exit(0)
usb_detach(backend_vm, attached_to)
usb_detach(qvm_collection, attached_to, dev)
else:
if len(args) > 0:
parser.error("Too many parameters")
# do_list
for dev in usb_list().values():
attached_to = usb_check_attached('', dev['xid'], dev['device'])
for dev in usb_list(qvm_collection).values():
attached_to = dev['connected-to']
attached_to_str = ""
if attached_to:
attached_to_str = " (attached to %s:%s)" % (attached_to['vm'], attached_to['frontend'])
print "%s\t%s%s (USBv%s)" % (dev['name'], dev['desc'], attached_to_str, dev['usb_ver'])
attached_to_str = " (attached to %s)" % (attached_to.name)
print "%s\t%s%s" % (dev['name'], dev['desc'], attached_to_str)
exit (0)
main()

View File

@ -68,6 +68,7 @@ Requires(postun): systemd-units
Requires: python, pciutils, python-inotify, python-daemon
Requires: python-setuptools
Requires: qubes-core-dom0-linux >= 3.1.8
Requires: qubes-core-dom0-doc
Requires: qubes-db-dom0
Requires: python-lxml
# TODO: R: qubes-gui-dom0 >= 2.1.11
@ -76,7 +77,7 @@ Requires: libvirt-python
%if x%{?backend_vmm} == xxen
Requires: xen-runtime
Requires: xen-hvm
Requires: libvirt-daemon-xen >= 1.2.20-4
Requires: libvirt-daemon-xen >= 1.2.20-6
%endif
Requires: createrepo
Requires: gnome-packagekit
@ -351,6 +352,8 @@ fi
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.NotifyTools
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.NotifyUpdates
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.OpenInVM
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.OpenInVM
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.OpenURL
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.VMShell
/etc/qubes-rpc/qubes.FeaturesRequest
/etc/qubes-rpc/qubes.GetRandomizedTime

View File

@ -17,6 +17,8 @@ endif
cp backupcompatibility.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp basic.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp basic.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp block.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp block.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp dom0_update.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp dom0_update.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp network.py $(DESTDIR)$(PYTHON_TESTSPATH)

304
tests/block.py Normal file
View File

@ -0,0 +1,304 @@
# vim: fileencoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2016
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import os
import qubes.tests
import qubes.qubesutils
import subprocess
# the same class for both dom0 and VMs
class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
template = None
def setUp(self):
super(TC_00_List, self).setUp()
self.img_path = '/tmp/test.img'
self.mount_point = '/tmp/test-dir'
if self.template is not None:
self.vm = self.qc.add_new_vm(
"QubesAppVm",
name=self.make_vm_name("vm"),
template=self.qc.get_vm_by_name(self.template))
self.vm.create_on_disk(verbose=False)
self.vm.start()
else:
self.vm = self.qc[0]
def tearDown(self):
super(TC_00_List, self).tearDown()
if self.template is None:
if os.path.exists(self.mount_point):
subprocess.call(['sudo', 'umount', self.mount_point])
subprocess.call(['sudo', 'rmdir', self.mount_point])
subprocess.call(['sudo', 'dmsetup', 'remove', 'test-dm'])
if os.path.exists(self.img_path):
loopdev = subprocess.check_output(['losetup', '-j',
self.img_path])
for dev in loopdev.splitlines():
subprocess.call(
['sudo', 'losetup', '-d', dev.split(':')[0]])
subprocess.call(['sudo', 'rm', '-f', self.img_path])
def run_script(self, script, user="user"):
if self.template is None:
if user == "user":
subprocess.check_call(script, shell=True)
elif user == "root":
subprocess.check_call(['sudo', 'sh', '-c', script])
else:
retcode = self.vm.run(script, user=user, wait=True)
if retcode != 0:
raise subprocess.CalledProcessError
def test_000_list_loop(self):
if self.template is None:
self.skipTest('loop devices excluded in dom0')
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"losetup -f {path}; "
"udevadm settle".format(path=self.img_path), user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
found = False
for dev in dev_list.keys():
if dev_list[dev]['desc'] == self.img_path:
self.assertTrue(dev.startswith(self.vm.name + ':loop'))
self.assertEquals(dev_list[dev]['mode'], 'w')
self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
self.assertEquals(
dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
found = True
if not found:
self.fail("Device {} not found in {!r}".format(self.img_path, dev_list))
def test_001_list_loop_mounted(self):
if self.template is None:
self.skipTest('loop devices excluded in dom0')
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"mkfs.ext4 -q -F {path}; "
"mkdir -p {mntdir}; "
"mount {path} {mntdir} -o loop; "
"udevadm settle".format(
path=self.img_path,
mntdir=self.mount_point),
user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
for dev in dev_list.keys():
if dev_list[dev]['desc'] == self.img_path:
self.fail(
'Device {} ({}) should not be listed because is mounted'
.format(dev, self.img_path))
def test_010_list_dm(self):
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"loopdev=`losetup -f`; "
"losetup $loopdev {path}; "
"dmsetup create test-dm --table \"0 262144 linear $(cat "
"/sys/block/$(basename $loopdev)/dev) 0\";"
"udevadm settle".format(path=self.img_path), user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
found = False
for dev in dev_list.keys():
if dev.startswith(self.vm.name + ':loop'):
self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
"Device {} ({}) should not be listed as it is used in "
"device-mapper".format(dev, self.img_path)
)
elif dev_list[dev]['desc'] == 'test-dm':
self.assertEquals(dev_list[dev]['mode'], 'w')
self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
self.assertEquals(
dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
found = True
if not found:
self.fail("Device {} not found in {!r}".format('test-dm', dev_list))
def test_011_list_dm_mounted(self):
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"loopdev=`losetup -f`; "
"losetup $loopdev {path}; "
"dmsetup create test-dm --table \"0 262144 linear $(cat "
"/sys/block/$(basename $loopdev)/dev) 0\";"
"mkfs.ext4 -q -F /dev/mapper/test-dm;"
"mkdir -p {mntdir};"
"mount /dev/mapper/test-dm {mntdir};"
"udevadm settle".format(
path=self.img_path,
mntdir=self.mount_point),
user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
for dev in dev_list.keys():
if dev.startswith(self.vm.name + ':loop'):
self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
"Device {} ({}) should not be listed as it is used in "
"device-mapper".format(dev, self.img_path)
)
else:
self.assertNotEquals(dev_list[dev]['desc'], 'test-dm',
"Device {} ({}) should not be listed as it is "
"mounted".format(dev, 'test-dm')
)
def test_012_list_dm_delayed(self):
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"loopdev=`losetup -f`; "
"losetup $loopdev {path}; "
"udevadm settle; "
"dmsetup create test-dm --table \"0 262144 linear $(cat "
"/sys/block/$(basename $loopdev)/dev) 0\";"
"udevadm settle".format(path=self.img_path), user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
found = False
for dev in dev_list.keys():
if dev.startswith(self.vm.name + ':loop'):
self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
"Device {} ({}) should not be listed as it is used in "
"device-mapper".format(dev, self.img_path)
)
elif dev_list[dev]['desc'] == 'test-dm':
self.assertEquals(dev_list[dev]['mode'], 'w')
self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
self.assertEquals(
dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
found = True
if not found:
self.fail("Device {} not found in {!r}".format('test-dm', dev_list))
def test_013_list_dm_removed(self):
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"loopdev=`losetup -f`; "
"losetup $loopdev {path}; "
"dmsetup create test-dm --table \"0 262144 linear $(cat "
"/sys/block/$(basename $loopdev)/dev) 0\";"
"udevadm settle;"
"dmsetup remove test-dm;"
"udevadm settle".format(path=self.img_path), user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
found = False
for dev in dev_list.keys():
if dev_list[dev]['desc'] == self.img_path:
self.assertTrue(dev.startswith(self.vm.name + ':loop'))
self.assertEquals(dev_list[dev]['mode'], 'w')
self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
self.assertEquals(
dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
found = True
if not found:
self.fail("Device {} not found in {!r}".format(self.img_path, dev_list))
def test_020_list_loop_partition(self):
if self.template is None:
self.skipTest('loop devices excluded in dom0')
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"echo ,,L | sfdisk {path};"
"loopdev=`losetup -f`; "
"losetup -P $loopdev {path}; "
"blockdev --rereadpt $loopdev; "
"udevadm settle".format(path=self.img_path), user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
found = False
for dev in dev_list.keys():
if dev_list[dev]['desc'] == self.img_path:
self.assertTrue(dev.startswith(self.vm.name + ':loop'))
self.assertEquals(dev_list[dev]['mode'], 'w')
self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
self.assertEquals(
dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
self.assertIn(dev + 'p1', dev_list)
found = True
if not found:
self.fail("Device {} not found in {!r}".format(self.img_path, dev_list))
def test_021_list_loop_partition_mounted(self):
if self.template is None:
self.skipTest('loop devices excluded in dom0')
self.run_script(
"set -e;"
"truncate -s 128M {path}; "
"echo ,,L | sfdisk {path};"
"loopdev=`losetup -f`; "
"losetup -P $loopdev {path}; "
"blockdev --rereadpt $loopdev; "
"mkfs.ext4 -q -F ${{loopdev}}p1; "
"mkdir -p {mntdir}; "
"mount ${{loopdev}}p1 {mntdir}; "
"udevadm settle".format(
path=self.img_path, mntdir=self.mount_point),
user="root")
dev_list = qubes.qubesutils.block_list_vm(self.vm)
for dev in dev_list.keys():
if dev_list[dev]['desc'] == self.img_path:
self.fail(
'Device {} ({}) should not be listed because its '
'partition is mounted'
.format(dev, self.img_path))
elif dev.startswith(self.vm.name + ':loop') and dev.endswith('p1'):
# FIXME: risky assumption that only tests create partitioned
# loop devices
self.fail(
'Device {} ({}) should not be listed because is mounted'
.format(dev, self.img_path))
def load_tests(loader, tests, pattern):
try:
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
templates = [vm.name for vm in qc.values() if
isinstance(vm, qubes.qubes.QubesTemplateVm)]
except OSError:
templates = []
for template in templates:
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_00_List_' + template,
(TC_00_List, qubes.tests.QubesTestCase),
{'template': template})))
return tests

View File

@ -1441,7 +1441,9 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
passio_popen=True)
vmpattern = "disp*"
else:
self.qrexec_policy('qubes.Filecopy', self.source_vm.name,
self.qrexec_policy('qubes.OpenInVM', self.source_vm.name,
self.target_vmname)
self.qrexec_policy('qubes.OpenURL', self.source_vm.name,
self.target_vmname)
p = self.source_vm.run("qvm-open-in-vm {} {}".format(
self.target_vmname, filename), passio_popen=True)
@ -1573,6 +1575,10 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_010_url(self):
self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
["Firefox", "Iceweasel"])
def test_100_txt_dispvm(self):
filename = "/home/user/test_file.txt"
self.prepare_txt(filename)
@ -1622,6 +1628,11 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
["shotwell", "eog", "display"],
dispvm=True)
def test_110_url_dispvm(self):
self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
["Firefox", "Iceweasel"],
dispvm=True)
def load_tests(loader, tests, pattern):
try:

View File

@ -1 +1 @@
3.2.0
3.2.3