Merge remote-tracking branch 'qubesos/pr/108'

* qubesos/pr/108:
  last fixes (thanks, @marmarek!)
  ci/pylintrc: disable wrong-import-order
  qubespolicy: fix import order
  Fixes from marmarek's review
  qubes/tests: fix qrexec policy context
  Make pylint very ♥
  qubes.tests asyncio, part 2
  Fix miscellaneous warnings
  qubes/vm/qubesvm: don't use .run_service() for .run()
  qubes/ext/gui: remove is-fully-usable and is_guid_running
  qubes-rpc/admin.vm.volume.Import: fix dd flags
  qubes.tests asyncio
This commit is contained in:
Marek Marczykowski-Górecki 2017-06-05 17:40:01 +02:00
commit 0e073bf17e
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
21 changed files with 1264 additions and 1246 deletions

View File

@ -18,7 +18,8 @@ disable=
locally-enabled,
logging-format-interpolation,
missing-docstring,
star-args
star-args,
wrong-import-order
[REPORTS]

View File

@ -20,7 +20,7 @@ size=$(tail -c +3 "$tmpfile"|cut -d ' ' -f 1)
path=$(tail -c +3 "$tmpfile"|cut -d ' ' -f 2)
# now process stdin into this path
if dd bs=4k of="$path" count="$size" iflag=count_bytes \
if dd bs=4k of="$path" count="$size" iflag=count_bytes,fullblock \
conv=sparse,notrunc,nocreat,fdatasync; then
status="ok"
else

View File

@ -21,42 +21,16 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import os
import qubes.config
import qubes.ext
class GUI(qubes.ext.Extension):
# pylint: disable=too-few-public-methods
# TODO put this somewhere...
@staticmethod
def send_gui_mode(vm):
vm.run_service('qubes.SetGuiMode',
input=('SEAMLESS'
if vm.features.get('gui-seamless', False)
else 'FULLSCREEN'))
@staticmethod
def is_guid_running(vm):
'''Check whether gui daemon for this domain is available.
Notice: this will be irrelevant here, after real splitting GUI/Admin.
:returns: :py:obj:`True` if guid is running, \
:py:obj:`False` otherwise.
:rtype: bool
'''
xid = vm.xid
if xid < 0:
return False
if not os.path.exists('/var/run/qubes/guid-running.{}'.format(xid)):
return False
return True
@qubes.ext.handler('domain-is-fully-usable')
def on_domain_is_fully_usable(self, vm, event):
# pylint: disable=unused-argument
if not self.is_guid_running(vm):
yield False

View File

@ -31,10 +31,12 @@
don't run the tests.
"""
import asyncio
import collections
import functools
import logging
import os
import pathlib
import shutil
import subprocess
import sys
@ -42,6 +44,7 @@ import tempfile
import time
import traceback
import unittest
import warnings
from distutils import spawn
import lxml.etree
@ -223,6 +226,64 @@ class _AssertNotRaisesContext(object):
self.exception = exc_value # store for later retrieval
class _QrexecPolicyContext(object):
'''Context manager for SystemTestsMixin.qrexec_policy'''
def __init__(self, service, source, destination, allow=True):
try:
source = source.name
except AttributeError:
pass
try:
destination = destination.name
except AttributeError:
pass
self._filename = pathlib.Path('/etc/qubes-rpc/policy') / service
self._rule = '{} {} {}\n'.format(source, destination,
'allow' if allow else 'deny')
self._did_create = False
self._handle = None
def load(self):
if self._handle is None:
try:
self._handle = self._filename.open('r+')
except FileNotFoundError:
self._handle = self._filename.open('w+')
self._did_create = True
self._handle.seek(0)
return self._handle.readlines()
def save(self, rules):
assert self._handle is not None
self._handle.truncate(0)
self._handle.seek(0)
self._handle.write(''.join(rules))
def close(self):
assert self._handle is not None
self._handle.close()
self._handle = None
def __enter__(self):
rules = self.load()
rules.insert(0, self._rule)
self.save(self._rule)
return self
def __exit__(self, exc_type, exc_value, tb):
if not self._did_create:
try:
rules = self.load()
rules.remove(self._rule)
self.save(rules)
finally:
self.close()
else:
self.close()
os.unlink(self._filename)
class substitute_entry_points(object):
'''Monkey-patch pkg_resources to substitute one group in iter_entry_points
@ -279,6 +340,8 @@ class QubesTestCase(unittest.TestCase):
self.addTypeEqualityFunc(qubes.devices.DeviceManager,
self.assertDevicesEqual)
self.loop = None
def __str__(self):
return '{}/{}/{}'.format(
@ -287,9 +350,20 @@ class QubesTestCase(unittest.TestCase):
self._testMethodName)
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
super(QubesTestCase, self).tearDown()
# The loop, when closing, throws a warning if there is
# some unfinished bussiness. Let's catch that.
with warnings.catch_warnings():
warnings.simplefilter('error')
self.loop.close()
# TODO: find better way in py3
try:
result = self._outcome.result
@ -624,7 +698,7 @@ class SystemTestsMixin(object):
try:
# XXX .is_running() may throw libvirtError if undefined
if vm.is_running():
vm.force_shutdown()
vm.kill()
except: # pylint: disable=bare-except
pass
@ -749,20 +823,7 @@ class SystemTestsMixin(object):
:return:
"""
def add_remove_rule(add=True):
with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy:
policy_rules = policy.readlines()
rule = "{} {} {}\n".format(source, destination,
'allow' if allow else 'deny')
if add:
policy_rules.insert(0, rule)
else:
policy_rules.remove(rule)
policy.truncate(0)
policy.seek(0)
policy.write(''.join(policy_rules))
add_remove_rule(add=True)
self.addCleanup(add_remove_rule, add=False)
return _QrexecPolicyContext(service, source, destination, allow=allow)
def wait_for_window(self, title, timeout=30, show=True):
"""

View File

@ -37,6 +37,7 @@ class TestApp(qubes.tests.TestEmitter):
class TC_30_VMCollection(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.vms = qubes.app.VMCollection(self.app)

View File

@ -75,6 +75,7 @@ class TestVM(qubes.tests.TestEmitter):
class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.emitter = TestVM(self.app, 'vm')
self.app.domains['vm'] = self.emitter
@ -152,6 +153,7 @@ class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
class TC_01_DeviceManager(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.emitter = TestVM(self.app, 'vm')
self.manager = qubes.devices.DeviceManager(self.emitter)

View File

@ -64,6 +64,7 @@ class TC_00_Label(qubes.tests.QubesTestCase):
class TC_10_property(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
try:
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
testprop1 = qubes.property('testprop1')
@ -78,7 +79,7 @@ class TC_10_property(qubes.tests.QubesTestCase):
hash(self.holder.__class__.testprop1)
def test_002_eq(self):
self.assertEquals(qubes.property('testprop2'),
self.assertEqual(qubes.property('testprop2'),
qubes.property('testprop2'))
def test_010_set(self):
@ -115,7 +116,7 @@ class TC_10_property(qubes.tests.QubesTestCase):
def setter(self2, prop, value):
self.assertIs(self2, holder)
self.assertIs(prop, MyTestHolder.testprop1)
self.assertEquals(value, 'testvalue')
self.assertEqual(value, 'testvalue')
return 'settervalue'
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
@ -206,6 +207,7 @@ class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
class TC_20_PropertyHolder(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
xml = lxml.etree.XML('''
<qubes version="3">
<properties>
@ -234,9 +236,9 @@ class TC_20_PropertyHolder(qubes.tests.QubesTestCase):
self.assertEventFired(self.holder, 'property-loaded')
self.assertEventNotFired(self.holder, 'property-set:testprop1')
self.assertEquals(self.holder.testprop1, 'testvalue1')
self.assertEquals(self.holder.testprop2, 'testref2')
self.assertEquals(self.holder.testprop3, 'testdefault')
self.assertEqual(self.holder.testprop1, 'testvalue1')
self.assertEqual(self.holder.testprop2, 'testref2')
self.assertEqual(self.holder.testprop3, 'testdefault')
with self.assertRaises(AttributeError):
self.holder.testprop4
@ -314,6 +316,7 @@ class TestApp(qubes.tests.TestEmitter):
class TC_30_VMCollection(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.vms = qubes.app.VMCollection(self.app)

View File

@ -257,7 +257,7 @@ class BackupTestsMixin(qubes.tests.SystemTestsMixin):
'internal'):
if not hasattr(vm, prop):
continue
self.assertEquals(
self.assertEqual(
getattr(vm, prop), getattr(restored_vm, prop),
"VM {} - property {} not properly restored".format(
vm.name, prop))
@ -267,11 +267,11 @@ class BackupTestsMixin(qubes.tests.SystemTestsMixin):
orig_value = getattr(vm, prop)
restored_value = getattr(restored_vm, prop)
if orig_value and restored_value:
self.assertEquals(orig_value.name, restored_value.name,
self.assertEqual(orig_value.name, restored_value.name,
"VM {} - property {} not properly restored".format(
vm.name, prop))
else:
self.assertEquals(orig_value, restored_value,
self.assertEqual(orig_value, restored_value,
"VM {} - property {} not properly restored".format(
vm.name, prop))
for dev_class in vm.devices.keys():
@ -500,14 +500,14 @@ class TC_10_BackupVMMixin(BackupTestsMixin):
def test_100_send_to_vm_file_with_spaces(self):
vms = self.create_backup_vms()
self.backupvm.start()
self.backupvm.run("mkdir '/var/tmp/backup directory'", wait=True)
self.loop.run_until_complete(self.backupvm.run_for_stdio(
"mkdir '/var/tmp/backup directory'"))
self.make_backup(vms, target_vm=self.backupvm,
compressed=True, encrypted=True,
target='/var/tmp/backup directory')
self.remove_vms(reversed(vms))
p = self.backupvm.run("ls /var/tmp/backup*/qubes-backup*",
passio_popen=True)
(backup_path, _) = p.communicate()
(backup_path, _) = self.loop.run_until_complete(
self.backupvm.run_for_stdio("ls /var/tmp/backup*/qubes-backup*"))
backup_path = backup_path.decode().strip()
self.restore_backup(source=backup_path,
appvm=self.backupvm)
@ -530,7 +530,7 @@ class TC_10_BackupVMMixin(BackupTestsMixin):
"""
vms = self.create_backup_vms()
self.backupvm.start()
retcode = self.backupvm.run(
self.loop.run_until_complete(self.backupvm.run_for_stdio(
# Debian 7 has too old losetup to handle loop-control device
"mknod /dev/loop0 b 7 0;"
"truncate -s 50M /home/user/backup.img && "
@ -538,9 +538,7 @@ class TC_10_BackupVMMixin(BackupTestsMixin):
"mkdir /home/user/backup && "
"mount /home/user/backup.img /home/user/backup -o loop &&"
"chmod 777 /home/user/backup",
user="root", wait=True)
if retcode != 0:
raise RuntimeError("Failed to prepare backup directory")
user="root"))
with self.assertRaises(qubes.exc.QubesException):
self.make_backup(vms, target_vm=self.backupvm,
compressed=False, encrypted=True,

View File

@ -24,6 +24,7 @@
from distutils import spawn
import asyncio
import os
import subprocess
import tempfile
@ -74,7 +75,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
template=self.app.default_template,
label='red')
self.vm.create_on_disk()
self.loop.run_until_complete(self.vm.create_on_disk())
def save_and_reload_db(self):
super(TC_01_Properties, self).save_and_reload_db()
@ -125,7 +126,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
os.path.join(os.getenv("HOME"), ".local/share/applications",
self.vmname + "-firefox.desktop")))
self.vm.firewall.load()
self.assertEquals(pre_rename_firewall, self.vm.firewall.rules)
self.assertEqual(pre_rename_firewall, self.vm.firewall.rules)
with self.assertNotRaises((qubes.exc.QubesException, OSError)):
self.vm.firewall.save()
self.assertTrue(self.vm.autostart)
@ -152,13 +153,13 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
name=self.make_vm_name("vm"),
template=self.app.default_template,
label='red')
testvm1.create_on_disk()
self.loop.run_until_complete(testvm1.create_on_disk())
testvm2 = self.app.add_new_vm(testvm1.__class__,
name=self.make_vm_name("clone"),
template=testvm1.template,
label='red')
testvm2.clone_properties(testvm1)
testvm2.clone_disk_files(testvm1)
self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
self.assertTrue(testvm1.storage.verify())
self.assertIn('source', testvm1.volumes['root'].config)
self.assertNotEquals(testvm2, None)
@ -170,24 +171,24 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
testvm1 = self.app.domains[testvm1.qid]
testvm2 = self.app.domains[testvm2.qid]
self.assertEquals(testvm1.label, testvm2.label)
self.assertEquals(testvm1.netvm, testvm2.netvm)
self.assertEquals(testvm1.property_is_default('netvm'),
self.assertEqual(testvm1.label, testvm2.label)
self.assertEqual(testvm1.netvm, testvm2.netvm)
self.assertEqual(testvm1.property_is_default('netvm'),
testvm2.property_is_default('netvm'))
self.assertEquals(testvm1.kernel, testvm2.kernel)
self.assertEquals(testvm1.kernelopts, testvm2.kernelopts)
self.assertEquals(testvm1.property_is_default('kernel'),
self.assertEqual(testvm1.kernel, testvm2.kernel)
self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
self.assertEqual(testvm1.property_is_default('kernel'),
testvm2.property_is_default('kernel'))
self.assertEquals(testvm1.property_is_default('kernelopts'),
self.assertEqual(testvm1.property_is_default('kernelopts'),
testvm2.property_is_default('kernelopts'))
self.assertEquals(testvm1.memory, testvm2.memory)
self.assertEquals(testvm1.maxmem, testvm2.maxmem)
self.assertEquals(testvm1.devices, testvm2.devices)
self.assertEquals(testvm1.include_in_backups,
self.assertEqual(testvm1.memory, testvm2.memory)
self.assertEqual(testvm1.maxmem, testvm2.maxmem)
self.assertEqual(testvm1.devices, testvm2.devices)
self.assertEqual(testvm1.include_in_backups,
testvm2.include_in_backups)
self.assertEquals(testvm1.default_user, testvm2.default_user)
self.assertEquals(testvm1.features, testvm2.features)
self.assertEquals(testvm1.firewall.rules,
self.assertEqual(testvm1.default_user, testvm2.default_user)
self.assertEqual(testvm1.features, testvm2.features)
self.assertEqual(testvm1.firewall.rules,
testvm2.firewall.rules)
# now some non-default values
@ -206,31 +207,31 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
template=testvm1.template,
label='red',)
testvm3.clone_properties(testvm1)
testvm3.clone_disk_files(testvm1)
self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
# qubes.xml reload
self.save_and_reload_db()
testvm1 = self.app.domains[testvm1.qid]
testvm3 = self.app.domains[testvm3.qid]
self.assertEquals(testvm1.label, testvm3.label)
self.assertEquals(testvm1.netvm, testvm3.netvm)
self.assertEquals(testvm1.property_is_default('netvm'),
self.assertEqual(testvm1.label, testvm3.label)
self.assertEqual(testvm1.netvm, testvm3.netvm)
self.assertEqual(testvm1.property_is_default('netvm'),
testvm3.property_is_default('netvm'))
self.assertEquals(testvm1.kernel, testvm3.kernel)
self.assertEquals(testvm1.kernelopts, testvm3.kernelopts)
self.assertEquals(testvm1.property_is_default('kernel'),
self.assertEqual(testvm1.kernel, testvm3.kernel)
self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
self.assertEqual(testvm1.property_is_default('kernel'),
testvm3.property_is_default('kernel'))
self.assertEquals(testvm1.property_is_default('kernelopts'),
self.assertEqual(testvm1.property_is_default('kernelopts'),
testvm3.property_is_default('kernelopts'))
self.assertEquals(testvm1.memory, testvm3.memory)
self.assertEquals(testvm1.maxmem, testvm3.maxmem)
self.assertEquals(testvm1.devices, testvm3.devices)
self.assertEquals(testvm1.include_in_backups,
self.assertEqual(testvm1.memory, testvm3.memory)
self.assertEqual(testvm1.maxmem, testvm3.maxmem)
self.assertEqual(testvm1.devices, testvm3.devices)
self.assertEqual(testvm1.include_in_backups,
testvm3.include_in_backups)
self.assertEquals(testvm1.default_user, testvm3.default_user)
self.assertEquals(testvm1.features, testvm3.features)
self.assertEquals(testvm1.firewall.rules,
self.assertEqual(testvm1.default_user, testvm3.default_user)
self.assertEqual(testvm1.features, testvm3.features)
self.assertEqual(testvm1.firewall.rules,
testvm2.firewall.rules)
def test_020_name_conflict_app(self):
@ -239,21 +240,21 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.vmname, template=self.app.default_template,
label='red')
self.vm2.create_on_disk()
self.loop.run_until_complete(self.vm2.create_on_disk())
def test_021_name_conflict_template(self):
# TODO decide what exception should be here
with self.assertRaises((qubes.exc.QubesException, ValueError)):
self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
name=self.vmname, label='red')
self.vm2.create_on_disk()
self.loop.run_until_complete(self.vm2.create_on_disk())
def test_030_rename_conflict_app(self):
vm2name = self.make_vm_name('newname')
self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vm2name, template=self.app.default_template, label='red')
self.vm2.create_on_disk()
self.loop.run_until_complete(self.vm2.create_on_disk())
with self.assertNotRaises(OSError):
with self.assertRaises(qubes.exc.QubesException):
@ -272,7 +273,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
qubes.vm.appvm.AppVM,
name=self.make_vm_name("vm"),
label='red')
self.testvm.create_on_disk()
self.loop.run_until_complete(self.testvm.create_on_disk())
self.save_and_reload_db()
def setup_hvm(self):
@ -281,7 +282,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
name=self.make_vm_name("hvm"),
label='red')
self.testvm.hvm = True
self.testvm.create_on_disk()
self.loop.run_until_complete(self.testvm.create_on_disk())
self.save_and_reload_db()
def pref_set(self, name, value, valid=True):
@ -293,7 +294,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
)
(stdout, stderr) = p.communicate()
if valid:
self.assertEquals(p.returncode, 0,
self.assertEqual(p.returncode, 0,
"qvm-prefs .. '{}' '{}' failed: {}{}".format(
name, value, stdout, stderr
))
@ -306,7 +307,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
p = subprocess.Popen(['qvm-prefs'] + self.sharedopts +
['--', self.testvm.name, name], stdout=subprocess.PIPE)
(stdout, _) = p.communicate()
self.assertEquals(p.returncode, 0)
self.assertEqual(p.returncode, 0)
return stdout.strip()
bool_test_values = [
@ -329,7 +330,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
for (value, expected, valid) in values:
self.pref_set(name, value, valid)
if valid:
self.assertEquals(self.pref_get(name), expected)
self.assertEqual(self.pref_get(name), expected)
@unittest.skip('test not converted to core3 API')
def test_006_template(self):
@ -385,7 +386,8 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
label='red'
)
self.test_template.clone_properties(self.app.default_template)
self.test_template.clone_disk_files(self.app.default_template)
self.loop.run_until_complete(
self.test_template.clone_disk_files(self.app.default_template))
self.save_and_reload_db()
def setup_hvm_template(self):
@ -395,7 +397,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
label='red',
hvm=True
)
self.test_template.create_on_disk()
self.loop.run_until_complete(self.test_template.create_on_disk())
self.save_and_reload_db()
def get_rootimg_checksum(self):
@ -406,7 +408,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
def _do_test(self):
checksum_before = self.get_rootimg_checksum()
self.test_template.start()
self.loop.run_until_complete(self.test_template.start())
self.shutdown_and_wait(self.test_template)
checksum_changed = self.get_rootimg_checksum()
if checksum_before == checksum_changed:
@ -419,7 +421,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
subprocess.check_call(revert_cmd)
checksum_after = self.get_rootimg_checksum()
self.assertEquals(checksum_before, checksum_after)
self.assertEqual(checksum_before, checksum_after)
@unittest.expectedFailure
def test_000_revert_pv(self):
@ -449,18 +451,19 @@ class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_clipboard(self):
testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm1'), label='red')
testvm1.create_on_disk()
self.loop.run_until_complete(testvm1.create_on_disk())
testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm2'), label='red')
testvm2.create_on_disk()
self.loop.run_until_complete(testvm2.create_on_disk())
self.app.save()
testvm1.start()
testvm2.start()
self.loop.run_until_complete(asyncio.wait([
testvm1.start(),
testvm2.start()]))
window_title = 'user@{}'.format(testvm1.name)
testvm1.run('zenity --text-info --editable --title={}'.format(
window_title))
self.loop.run_until_complete(testvm1.run(
'zenity --text-info --editable --title={}'.format(window_title)))
self.wait_for_window(window_title)
time.sleep(0.5)
@ -481,37 +484,37 @@ class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, test_string,
self.assertEqual(clipboard_content, test_string,
"Clipboard copy operation failed - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source',
'r').read().strip()
self.assertEquals(clipboard_source, testvm1.name,
self.assertEqual(clipboard_source, testvm1.name,
"Clipboard copy operation failed - owner")
# Then paste it to the other window
window_title = 'user@{}'.format(testvm2.name)
p = testvm2.run('zenity --entry --title={} > test.txt'.format(
window_title), passio_popen=True)
p = self.loop.run_until_complete(testvm2.run(
'zenity --entry --title={} > test.txt'.format(window_title)))
self.wait_for_window(window_title)
subprocess.check_call(['xdotool', 'key', '--delay', '100',
'ctrl+shift+v', 'ctrl+v', 'Return'])
p.wait()
self.loop.run_until_complete(p.wait())
# And compare the result
(test_output, _) = testvm2.run('cat test.txt',
passio_popen=True).communicate()
self.assertEquals(test_string, test_output.strip().decode('ascii'))
(test_output, _) = self.loop.run_until_complete(
testvm2.run_for_stdio('cat test.txt'))
self.assertEqual(test_string, test_output.strip().decode('ascii'))
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, "",
self.assertEqual(clipboard_content, "",
"Clipboard not wiped after paste - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\
read().strip()
self.assertEquals(clipboard_source, "",
self.assertEqual(clipboard_source, "",
"Clipboard not wiped after paste - owner")
class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
@ -523,24 +526,26 @@ class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase
def test_000_create_start(self):
testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
name=self.make_vm_name('vm1'), label='red')
testvm1.clone_disk_files(self.app.default_template)
self.loop.run_until_complete(
testvm1.clone_disk_files(self.app.default_template))
self.app.save()
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
self.loop.run_until_complete(testvm1.start())
self.assertEqual(testvm1.get_power_state(), "Running")
@unittest.expectedFailure
def test_100_resize_root_img(self):
testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
name=self.make_vm_name('vm1'), label='red')
testvm1.clone_disk_files(self.app.default_template)
self.loop.run_until_complete(
testvm1.clone_disk_files(self.app.default_template))
self.app.save()
testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3)
self.assertEquals(testvm1.volumes['root'].size, 20 * 1024 ** 3)
testvm1.start()
p = testvm1.run('df --output=size /|tail -n 1',
passio_popen=True)
self.loop.run_until_complete(
testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3))
self.assertEqual(testvm1.volumes['root'].size, 20 * 1024 ** 3)
self.loop.run_until_complete(testvm1.start())
# new_size in 1k-blocks
(new_size, _) = p.communicate()
(new_size, _) = self.loop.run_until_complete(
testvm1.run_for_stdio('df --output=size /|tail -n 1'))
# some safety margin for FS metadata
self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)

View File

@ -53,7 +53,7 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
p = self.testvm.run("qvm-run --dispvm bash", passio_popen=True)
(stdout, _) = p.communicate(input=b"echo test; qubesdb-read /name; "
b"echo ERROR\n")
self.assertEquals(p.returncode, 0)
self.assertEqual(p.returncode, 0)
lines = stdout.decode('ascii').splitlines()
self.assertEqual(lines[0], "test")
dispvm_name = lines[1]
@ -84,7 +84,7 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
time.sleep(1)
timeout -= 1
# includes check for None - timeout
self.assertEquals(p.returncode, 0)
self.assertEqual(p.returncode, 0)
lines = p.stdout.read().splitlines()
self.assertTrue(lines, 'No output received from DispVM')
dispvm_name = lines[0]

View File

@ -108,14 +108,14 @@ enabled = 1
name=self.make_vm_name("updatevm"),
label='red'
)
self.updatevm.create_on_disk()
self.loop.run_until_complete(self.updatevm.create_on_disk())
self.app.updatevm = self.updatevm
self.app.save()
subprocess.call(['sudo', 'rpm', '-e', self.pkg_name],
stderr=open(os.devnull, 'w'))
subprocess.check_call(['sudo', 'rpm', '--import',
os.path.join(self.tmpdir, 'pubkey.asc')])
self.updatevm.start()
self.loop.run_until_complete(self.updatevm.start())
self.repo_running = False
def tearDown(self):
@ -170,26 +170,28 @@ Test package
return pkg_path
def send_pkg(self, filename):
p = self.updatevm.run('mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
os.path.basename(
filename)), passio_popen=True)
p.stdin.write(open(filename, 'rb').read())
p.stdin.close()
p.wait()
retcode = self.updatevm.run('cd /tmp/repo; createrepo .', wait=True)
if retcode == 127:
self.skipTest("createrepo not installed in template {}".format(
self.template))
elif retcode != 0:
self.skipTest("createrepo failed with code {}, cannot perform the "
"test".format(retcode))
self.loop.run_until_complete(self.updatevm.run_for_stdio(
'mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
os.path.basename(filename)),
input=open(filename, 'rb').read()))
try:
self.loop.run_until_complete(
self.updatevm.run_for_stdio('cd /tmp/repo; createrepo .'))
except subprocess.CalledProcessError as e:
if e.returncode == 127:
self.skipTest('createrepo not installed in template {}'.format(
self.template))
else:
self.skipTest('createrepo failed with code {}, '
'cannot perform the test'.format(retcode))
self.start_repo()
def start_repo(self):
if not self.repo_running:
self.updatevm.run("cd /tmp/repo &&"
"python -m SimpleHTTPServer 8080")
self.repo_running = True
if self.repo_running:
return
self.loop.run_until_complete(self.updatevm.run(
'cd /tmp/repo && python -m SimpleHTTPServer 8080'))
self.repo_running = True
def test_000_update(self):
"""Dom0 update tests

View File

@ -22,6 +22,7 @@
from distutils import spawn
import asyncio
import multiprocessing
import os
import subprocess
@ -35,7 +36,6 @@ class NcVersion:
Trad = 1
Nmap = 2
# noinspection PyAttributeOutsideInit
class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
test_ip = '192.168.123.45'
@ -49,10 +49,11 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
template = None
def run_cmd(self, vm, cmd, user="root"):
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
p.stdin.close()
p.stdout.read().decode()
return p.wait()
try:
self.loop.run_until_complete(vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
return e.returncode
return 0
def setUp(self):
super(VmNetworkingMixin, self).setUp()
@ -81,11 +82,12 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.fail("Command '%s' failed" % cmd)
if not self.testnetvm.is_running():
self.testnetvm.start()
self.loop.run_until_complete(self.testnetvm.start())
# Ensure that dnsmasq is installed:
p = self.testnetvm.run("dnsmasq --version", user="root",
passio_popen=True)
if p.wait() != 0:
try:
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
'dnsmasq --version', user='root'))
except subprocess.CalledProcessError:
self.skipTest("dnsmasq not installed")
run_netvm_cmd("ip link add test0 type dummy")
@ -102,7 +104,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
def test_000_simple_networking(self):
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
@ -113,11 +115,11 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
label='red')
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
@ -137,13 +139,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.proxy.features['network-manager'] = True
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed")
@ -182,7 +184,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
@ -196,89 +198,97 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be blocked")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be blocked")
# block all except ICMP
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
self.testvm1.firewall.rules = [(
qubes.firewall.Rule(None, action='accept', proto='icmp')
)]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be blocked")
# block all except ICMP
# all TCP still blocked
self.testvm1.firewall.rules = [(
qubes.firewall.Rule(None, action='accept', proto='icmp')
)]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be blocked")
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
# all TCP still blocked
# block all except target
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# block all except target
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection failed (should be allowed now)")
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# allow all except target
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection failed (should be allowed now)")
self.testvm1.firewall.policy = 'accept'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# allow all except target
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
self.testvm1.firewall.policy = 'accept'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
def test_040_inter_vm(self):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
@ -286,12 +296,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm3'),
label='red')
self.testvm2.create_on_disk()
self.loop.run_until_complete(self.testvm2.create_on_disk())
self.testvm2.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.testvm2.start()
self.loop.run_until_complete(asyncio.wait([
self.testvm1.start(),
self.testvm2.start()]))
self.assertNotEqual(self.run_cmd(self.testvm1,
self.ping_cmd.format(target=self.testvm2.ip)), 0)
@ -312,14 +323,14 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
def test_050_spoof_ip(self):
"""Test if VM IP spoofing is blocked"""
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.testvm1.run("ip addr flush dev eth0", user="root", wait=True)
self.testvm1.run("ip addr add 10.137.1.128/24 dev eth0", user="root",
wait=True)
self.testvm1.run("ip route add default dev eth0", user="root",
wait=True)
self.loop.run_until_complete(self.testvm1.run_for_stdio('''
ip addr flush dev eth0 &&
ip addr add 10.137.1.128/24 dev eth0 &&
ip route add default dev eth0 &&
''', user='root'))
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Spoofed ping should be blocked")
@ -329,7 +340,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
cmd = "systemctl stop xendriverdomain"
if self.run_cmd(self.testnetvm, cmd) != 0:
self.fail("Command '%s' failed" % cmd)
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
cmd = "systemctl start xendriverdomain"
if self.run_cmd(self.testnetvm, cmd) != 0:
@ -343,24 +354,28 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm1.features['net/fake-gateway'] = '192.168.1.1'
self.testvm1.features['net/fake-netmask'] = '255.255.255.0'
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
p = self.testvm1.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip route show failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio('ip route show', user='root'))
except subprocess.CalledProcessError:
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
@ -368,15 +383,18 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'''Test hiding VM real IP'''
self.testvm1.features['net/fake-ip'] = '192.168.1.128'
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio('ip addr show dev eth0',
user='root'))
except subprocess.CalledProcessError:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
@ -390,7 +408,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
@ -408,36 +426,39 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
def test_203_fake_ip_inter_vm_allow(self):
'''Access VM with "fake IP" from other VM (when firewall allows)'''
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
@ -448,31 +469,38 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm3'),
label='red')
self.testvm2.create_on_disk()
self.loop.run_until_complete(self.testvm2.create_on_disk())
self.testvm2.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.testvm2.start()
self.loop.run_until_complete(self.testvm1.start())
self.loop.run_until_complete(self.testvm2.start())
cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
self.testvm2.ip, self.testvm1.ip)
retcode = self.proxy.run(cmd, user='root', wait=True)
self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode))
try:
self.loop.run_until_complete(self.proxy.run_for_stdio(
'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
self.testvm2.ip, self.testvm1.ip), user='root'))
except subprocess.CalledProcessError as e:
self.fail('{} failed with: {}'.format(cmd, e.returncode))
cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(
self.testvm2.ip)
retcode = self.testvm1.run(cmd, user='root', wait=True)
self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode))
try:
self.loop.run_until_complete(self.proxy.run_for_stdio(
'iptables -I INPUT -s {} -j ACCEPT'.format(
self.testvm2.ip), user='root'))
except subprocess.CalledProcessError as e:
self.fail('{} failed with: {}'.format(cmd, e.returncode))
self.assertEqual(self.run_cmd(self.testvm2,
self.ping_cmd.format(target=self.testvm1.ip)), 0)
cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
p = self.testvm1.run(cmd, user='root', passio_popen=True)
(stdout, _) = p.communicate()
self.assertEqual(p.returncode, 0,
'{} failed with {}'.format(cmd, p.returncode))
try:
(stdout, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip),
user='root'))
except subprocess.CalledProcessError as e:
self.fail(
'{} failed with {}'.format(cmd, e.returncode))
self.assertNotEqual(stdout.decode().split()[0], '0',
'Packets didn\'t managed to the VM')
@ -481,7 +509,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.features['net/fake-ip'] = '192.168.1.128'
@ -489,7 +517,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy.features['net/fake-netmask'] = '255.255.255.0'
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
@ -497,39 +525,43 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.proxy.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.proxy.run_for_stdio(
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
p = self.proxy.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip route show failed')
try:
(output, _) = self.loop.run_until_complete(
self.proxy.run_for_stdio(
'ip route show', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertNotIn('192.168.1.128', output)
self.assertIn(self.testvm1.ip, output)
p = self.testvm1.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip route show failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'ip route show', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.proxy.ip, output)
@ -537,7 +569,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'''Custom AppVM IP'''
self.testvm1.ip = '192.168.1.1'
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
@ -546,14 +578,14 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.ip = '192.168.1.1'
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
@ -566,7 +598,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
@ -584,29 +616,32 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
# noinspection PyAttributeOutsideInit
@ -686,10 +721,11 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
)
def run_cmd(self, vm, cmd, user="root"):
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
p.stdin.close()
p.stdout.read().decode()
return p.wait()
try:
self.loop.run_until_complete(vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
return e.returncode
return 0
def setUp(self):
super(VmUpdatesMixin, self).setUp()
@ -724,116 +760,95 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm1'),
label='red')
self.testvm1.create_on_disk()
self.loop.run_until_complete(self.testvm1.create_on_disk())
def test_000_simple_update(self):
self.save_and_reload_db()
# reload the VM to have all the properties properly set (especially
# default netvm)
self.testvm1 = self.app.domains[self.testvm1.qid]
self.testvm1.start()
p = self.testvm1.run(self.update_cmd, wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
self.loop.run_until_complete(self.testvm1.start())
p = self.loop.run_until_complete(
self.testvm1.run(self.update_cmd, user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(p.returncode, self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
def create_repo_apt(self):
pkg_file_name = "test-pkg_1.0-1_amd64.deb"
p = self.netvm_repo.run("mkdir /tmp/apt-repo && cd /tmp/apt-repo &&"
"base64 -d | zcat > {}".format(pkg_file_name),
passio_popen=True)
p.stdin.write(self.DEB_PACKAGE_GZIP_BASE64)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write {}".format(pkg_file_name))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir /tmp/apt-repo \
&& cd /tmp/apt-repo \
&& base64 -d | zcat > {}
'''.format(pkg_file_name),
input=self.DEB_PACKAGE_GZIP_BASE64))
# do not assume dpkg-scanpackage installed
packages_path = "dists/test/main/binary-amd64/Packages"
p = self.netvm_repo.run(
"mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 && "
"cd /tmp/apt-repo && "
"cat > {packages} && "
"echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ')"
" >> {packages} && "
"echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ')"
" >> {packages} && "
"echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ')"
" >> {packages} && "
"gzip < {packages} > {packages}.gz".format(pkg=pkg_file_name,
packages=packages_path),
passio_popen=True, passio_stderr=True)
p.stdin.write(
"Package: test-pkg\n"
"Version: 1.0-1\n"
"Architecture: amd64\n"
"Maintainer: unknown <user@host>\n"
"Installed-Size: 25\n"
"Filename: {pkg}\n"
"Size: 994\n"
"Section: unknown\n"
"Priority: optional\n"
"Description: Test package\n".format(pkg=pkg_file_name)
)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write Packages file: {}".format(
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 \
&& cd /tmp/apt-repo \
&& cat > {packages} \
&& echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ') \
>> {packages} \
&& echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ') \
>> {packages} \
&& echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ') \
>> {packages} \
&& gzip < {packages} > {packages}.gz
'''.format(pkg=pkg_file_name, packages=packages_path),
input='''\
Package: test-pkg
Version: 1.0-1
Architecture: amd64
Maintainer: unknown <user@host>
Installed-Size: 25
Filename: {pkg}
Size: 994
Section: unknown
Priority: optional
Description: Test package'''.format(pkg=pkg_file_name).encode('utf-8')))
p = self.netvm_repo.run(
"mkdir -p /tmp/apt-repo/dists/test && "
"cd /tmp/apt-repo/dists/test && "
"cat > Release && "
"echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}"
" >> Release && "
"echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}"
" >> Release"
.format(p="main/binary-amd64/Packages",
z="main/binary-amd64/Packages.gz"),
passio_popen=True, passio_stderr=True
)
p.stdin.write(
"Label: Test repo\n"
"Suite: test\n"
"Codename: test\n"
"Date: Tue, 27 Oct 2015 03:22:09 UTC\n"
"Architectures: amd64\n"
"Components: main\n"
"SHA256:\n"
)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write Release file: {}".format(
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir -p /tmp/apt-repo/dists/test \
&& cd /tmp/apt-repo/dists/test \
&& cat > Release \
&& echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}\
>> Release \
&& echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}\
>> Release
'''.format(p='main/binary-amd64/Packages',
z='main/binary-amd64/Packages.gz'),
input='''\
Label: Test repo
Suite: test
Codename: test
Date: Tue, 27 Oct 2015 03:22:09 UTC
Architectures: amd64
Components: main
SHA256:
'''))
def create_repo_yum(self):
pkg_file_name = "test-pkg-1.0-1.fc21.x86_64.rpm"
p = self.netvm_repo.run("mkdir /tmp/yum-repo && cd /tmp/yum-repo &&"
"base64 -d | zcat > {}".format(pkg_file_name),
passio_popen=True, passio_stderr=True)
p.stdin.write(self.RPM_PACKAGE_GZIP_BASE64)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write {}: {}".format(pkg_file_name,
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir /tmp/yum-repo \
&& cd /tmp/yum-repo \
&& base64 -d | zcat > {}
'''.format(pkg_file_name), input=self.RPM_PACKAGE_GZIP_BASE64))
# createrepo is installed by default in Fedora template
p = self.netvm_repo.run("createrepo /tmp/yum-repo",
passio_popen=True,
passio_stderr=True)
if p.wait() != 0:
raise RuntimeError("Failed to create yum metadata: {}".format(
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio(
'createrepo /tmp/yum-repo'))
def create_repo_and_serve(self):
if self.template.count("debian") or self.template.count("whonix"):
self.create_repo_apt()
self.netvm_repo.run("cd /tmp/apt-repo &&"
"python -m SimpleHTTPServer 8080")
self.loop.run_until_complete(self.netvm_repo.run(
'cd /tmp/apt-repo && python -m SimpleHTTPServer 8080'))
elif self.template.count("fedora"):
self.create_repo_yum()
self.netvm_repo.run("cd /tmp/yum-repo &&"
"python -m SimpleHTTPServer 8080")
self.loop.run_until_complete(self.netvm_repo.run(
'cd /tmp/yum-repo && python -m SimpleHTTPServer 8080'))
else:
# not reachable...
self.skipTest("Template {} not supported by this test".format(
@ -848,13 +863,13 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
"""
if self.template.count("debian") or self.template.count("whonix"):
self.testvm1.run(
self.loop.run_until_complete(self.testvm1.run_for_stdio(
"rm -f /etc/apt/sources.list.d/* &&"
"echo 'deb [trusted=yes] http://localhost:8080 test main' "
"> /etc/apt/sources.list",
user="root")
user="root"))
elif self.template.count("fedora"):
self.testvm1.run(
self.loop.run_until_complete(self.testvm1.run_for_stdio(
"rm -f /etc/yum.repos.d/*.repo &&"
"echo '[test]' > /etc/yum.repos.d/test.repo &&"
"echo 'name=Test repo' >> /etc/yum.repos.d/test.repo &&"
@ -862,7 +877,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
"echo 'baseurl=http://localhost:8080/'"
" >> /etc/yum.repos.d/test.repo",
user="root"
)
))
else:
# not reachable...
self.skipTest("Template {} not supported by this test".format(
@ -881,7 +896,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('net'),
label='red')
self.netvm_repo.provides_network = True
self.netvm_repo.create_on_disk()
self.loop.run_until_complete(self.netvm_repo.create_on_disk())
self.testvm1.netvm = self.netvm_repo
# NetVM should have qubes-updates-proxy enabled by default
#self.netvm_repo.features['qubes-updates-proxy'] = True
@ -890,38 +905,33 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
self.app.save()
# Setup test repo
self.netvm_repo.start()
self.loop.run_until_complete(self.netvm_repo.start())
self.create_repo_and_serve()
# Configure local repo
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.configure_test_repo()
# update repository metadata
p = self.testvm1.run(self.update_cmd, wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
p = self.loop.run_until_complete(self.testvm1.run(
self.update_cmd, user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
# install test package
p = self.testvm1.run(self.install_cmd.format('test-pkg'),
wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
p = self.loop.run_until_complete(self.testvm1.run(
self.install_cmd.format('test-pkg'), user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
# verify if it was really installed
p = self.testvm1.run(self.install_test_cmd.format('test-pkg'),
wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
p = self.loop.run_until_complete(self.testvm1.run(
self.install_test_cmd.format('test-pkg'), user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
def load_tests(loader, tests, pattern):
try:

View File

@ -19,8 +19,8 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import asyncio
import os
import shutil
import qubes.storage.lvm
@ -51,6 +51,10 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
def test_000_volatile(self):
'''Test if volatile volume is really volatile'''
return self.loop.run_until_complete(self._test_000_volatile())
@asyncio.coroutine
def _test_000_volatile(self):
size = 32*1024*1024
volume_config = {
'pool': self.pool.name,
@ -60,27 +64,29 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
self.app.save()
self.vm1.start()
p = self.vm1.run(
yield from (self.vm1.start())
# volatile image not clean
yield from (self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'volatile image not clean: {}'.format(stdout))
self.vm1.run('echo test123 > /dev/xvde', user='root', wait=True)
self.vm1.shutdown(wait=True)
self.vm1.start()
p = self.vm1.run(
user='root'))
# volatile image not volatile
yield from (
self.vm1.run_for_stdio('echo test123 > /dev/xvde', user='root'))
yield from (self.vm1.shutdown(wait=True))
yield from (self.vm1.start())
yield from (self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'volatile image not volatile: {}'.format(stdout))
user='root'))
def test_001_non_volatile(self):
'''Test if non-volatile volume is really non-volatile'''
return self.loop.run_until_complete(self._test_001_non_volatile())
@asyncio.coroutine
def _test_001_non_volatile(self):
size = 32*1024*1024
volume_config = {
'pool': self.pool.name,
@ -89,28 +95,33 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'save_on_stop': True,
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
testvol = yield from self.vm1.storage.init_volume(
'testvol', volume_config)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
self.app.save()
self.vm1.start()
p = self.vm1.run(
yield from self.vm1.start()
# non-volatile image not clean
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'non-volatile image not clean: {}'.format(stdout))
self.vm1.run('echo test123 > /dev/xvde', user='root', wait=True)
self.vm1.shutdown(wait=True)
self.vm1.start()
p = self.vm1.run(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertNotEqual(p.returncode, 0,
'non-volatile image volatile: {}'.format(stdout))
user='root')
yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde',
user='root')
yield from self.vm1.shutdown(wait=True)
yield from self.vm1.start()
# non-volatile image volatile
with self.assertRaises(subprocess.CalledProcessError):
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(
size),
user='root')
def test_002_read_only(self):
'''Test read-only volume'''
self.loop.run_until_complete(self._test_002_read_only())
@asyncio.coroutine
def _test_002_read_only(self):
size = 32 * 1024 * 1024
volume_config = {
'pool': self.pool.name,
@ -120,29 +131,28 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': False,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
self.app.save()
self.vm1.start()
p = self.vm1.run(
yield from self.vm1.start()
# non-volatile image not clean
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'non-volatile image not clean: {}'.format(stdout))
p = self.vm1.run('echo test123 > /dev/xvde', user='root',
passio_popen=True)
p.wait()
self.assertNotEqual(p.returncode, 0,
'Write to read-only volume unexpectedly succeeded')
p = self.vm1.run(
user='root')
# Write to read-only volume unexpectedly succeeded
with self.assertRaises(subprocess.CalledProcessError):
yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde',
user='root')
# read-only volume modified
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'read-only volume modified: {}'.format(stdout))
user='root')
def test_003_snapshot(self):
'''Test snapshot volume data propagation'''
self.loop.run_until_complete(self._test_003_snapshot())
@asyncio.coroutine
def _test_003_snapshot(self):
size = 128 * 1024 * 1024
volume_config = {
'pool': self.pool.name,
@ -152,7 +162,7 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
volume_config = {
'pool': self.pool.name,
'size': size,
@ -162,57 +172,55 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol_snap = self.vm2.storage.init_volume('testvol', volume_config)
self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
yield from self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
self.app.save()
self.vm1.start()
self.vm2.start()
p = self.vm1.run(
yield from self.vm1.start()
yield from self.vm2.start()
# origin image not clean
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'origin image not clean: {}'.format(stdout))
user='root')
p = self.vm2.run(
# snapshot image not clean
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero | diff -q /dev/xvde -'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'snapshot image not clean: {}'.format(stdout))
user='root')
self.vm1.run('echo test123 > /dev/xvde && sync', user='root', wait=True)
p.wait()
self.assertEqual(p.returncode, 0,
'Write to read-write volume failed')
p = self.vm2.run(
# Write to read-write volume failed
yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde && sync',
user='root')
# origin changes propagated to snapshot too early
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'origin changes propagated to snapshot too early: {}'.format(
stdout))
self.vm1.shutdown(wait=True)
user='root')
yield from self.vm1.shutdown(wait=True)
# after origin shutdown there should be still no change
p = self.vm2.run(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'origin changes propagated to snapshot too early2: {}'.format(
stdout))
self.vm2.shutdown(wait=True)
self.vm2.start()
# only after target VM restart changes should be visible
p = self.vm2.run(
# origin changes propagated to snapshot too early2
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertNotEqual(p.returncode, 0,
'origin changes not visible in snapshot: {}'.format(stdout))
user='root')
yield from self.vm2.shutdown(wait=True)
yield from self.vm2.start()
# only after target VM restart changes should be visible
# origin changes not visible in snapshot
with self.assertRaises(subprocess.CalledProcessError):
yield from self.vm2.run(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(
size),
user='root')
def test_004_snapshot_non_persistent(self):
'''Test snapshot volume non-persistence'''
return self.loop.run_until_complete(
self._test_004_snapshot_non_persistent())
@asyncio.coroutine
def _test_004_snapshot_non_persistent(self):
size = 128 * 1024 * 1024
volume_config = {
'pool': self.pool.name,
@ -222,7 +230,7 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
volume_config = {
'pool': self.pool.name,
'size': size,
@ -232,30 +240,25 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol_snap = self.vm2.storage.init_volume('testvol', volume_config)
self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
yield from self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
self.app.save()
self.vm2.start()
yield from self.vm2.start()
p = self.vm2.run(
# snapshot image not clean
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero | diff -q /dev/xvde -'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'snapshot image not clean: {}'.format(stdout))
user='root')
self.vm2.run('echo test123 > /dev/xvde && sync', user='root', wait=True)
p.wait()
self.assertEqual(p.returncode, 0,
'Write to read-write snapshot volume failed')
self.vm2.shutdown(wait=True)
self.vm2.start()
p = self.vm2.run(
# Write to read-write snapshot volume failed
yield from self.vm2.run_for_stdio('echo test123 > /dev/xvde && sync',
user='root')
yield from self.vm2.shutdown(wait=True)
yield from self.vm2.start()
# changes on snapshot survived VM restart
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'changes on snapshot survived VM restart: {}'.format(
stdout))
user='root')
class StorageFile(StorageTestMixin, qubes.tests.QubesTestCase):

File diff suppressed because it is too large Load Diff

View File

@ -82,7 +82,7 @@ class TC_00_FilePool(qubes.tests.QubesTestCase):
"""
result = self.app.get_pool("default").dir_path
expected = '/var/lib/qubes'
self.assertEquals(result, expected)
self.assertEqual(result, expected)
def test001_default_storage_class(self):
""" Check when using default pool the Storage is
@ -244,7 +244,7 @@ class TC_01_FileVolumes(qubes.tests.QubesTestCase):
volumes = vm.volumes
b_dev = volumes[dev_name].block_device()
self.assertEqual(b_dev.rw, rw)
self.assertEquals(b_dev.path, expected)
self.assertEqual(b_dev.path, expected)
class TC_03_FilePool(qubes.tests.QubesTestCase):
@ -319,12 +319,12 @@ class TC_03_FilePool(qubes.tests.QubesTestCase):
expected_vmdir = os.path.join(self.APPVMS_DIR, vm.name)
expected_private_path = os.path.join(expected_vmdir, 'private.img')
self.assertEquals(vm.volumes['private'].path, expected_private_path)
self.assertEqual(vm.volumes['private'].path, expected_private_path)
expected_volatile_path = os.path.join(expected_vmdir, 'volatile.img')
vm.storage.get_pool(vm.volumes['volatile'])\
.reset(vm.volumes['volatile'])
self.assertEqualsAndExists(vm.volumes['volatile'].path,
self.assertEqualAndExists(vm.volumes['volatile'].path,
expected_volatile_path)
def test_013_template_file_images(self):
@ -353,25 +353,25 @@ class TC_03_FilePool(qubes.tests.QubesTestCase):
expected_root_cow_path = os.path.join(expected_vmdir, 'root-cow.img')
expected_root_path = '%s:%s' % (expected_root_origin_path,
expected_root_cow_path)
self.assertEquals(vm.volumes['root'].block_device().path,
self.assertEqual(vm.volumes['root'].block_device().path,
expected_root_path)
self.assertExist(vm.volumes['root'].path)
expected_private_path = os.path.join(expected_vmdir, 'private.img')
self.assertEqualsAndExists(vm.volumes['private'].path,
self.assertEqualAndExists(vm.volumes['private'].path,
expected_private_path)
expected_rootcow_path = os.path.join(expected_vmdir, 'root-cow.img')
self.assertEqualsAndExists(vm.volumes['root'].path_cow,
self.assertEqualAndExists(vm.volumes['root'].path_cow,
expected_rootcow_path)
def assertEqualsAndExists(self, result_path, expected_path):
def assertEqualAndExists(self, result_path, expected_path):
""" Check if the ``result_path``, matches ``expected_path`` and exists.
See also: :meth:``assertExist``
"""
# :pylint: disable=invalid-name
self.assertEquals(result_path, expected_path)
self.assertEqual(result_path, expected_path)
self.assertExist(result_path)
def assertExist(self, path):

View File

@ -31,6 +31,7 @@ import qubes.tests
@qubes.tests.skipUnlessDom0
class TC_00_AdminVM(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
try:
self.app = qubes.tests.vm.TestApp()
self.vm = qubes.vm.adminvm.AdminVM(self.app,

View File

@ -51,6 +51,7 @@ class TestVM(qubes.vm.BaseVM):
class TC_10_BaseVM(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.xml = lxml.etree.XML('''
<qubes version="3"> <!-- xmlns="https://qubes-os.org/QubesXML/1" -->
<labels>

View File

@ -60,6 +60,7 @@ class TestVM(object):
class TC_00_setters(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.vm = TestVM()
self.prop = TestProp()
@ -158,13 +159,13 @@ class QubesVMTestsMixin(object):
# single exception?
with self.assertNotRaises((ValueError, TypeError, KeyError)):
setattr(vm, prop_name, set_value)
self.assertEquals(getattr(vm, prop_name), expected_value)
self.assertEqual(getattr(vm, prop_name), expected_value)
if expected_xml_content is not None:
xml = vm.__xml__()
prop_xml = xml.xpath(
'./properties/property[@name=\'{}\']'.format(prop_name))
self.assertEquals(len(prop_xml), 1, "Property not found in XML")
self.assertEquals(prop_xml[0].text, expected_xml_content)
self.assertEqual(len(prop_xml), 1, "Property not found in XML")
self.assertEqual(prop_xml[0].text, expected_xml_content)
def assertPropertyInvalidValue(self, vm, prop_name, set_value):
orig_value_set = True
@ -178,7 +179,7 @@ class QubesVMTestsMixin(object):
with self.assertRaises((ValueError, TypeError, KeyError)):
setattr(vm, prop_name, set_value)
if orig_value_set:
self.assertEquals(getattr(vm, prop_name), orig_value)
self.assertEqual(getattr(vm, prop_name), orig_value)
else:
with self.assertRaises(AttributeError):
getattr(vm, prop_name)
@ -190,11 +191,11 @@ class QubesVMTestsMixin(object):
getattr(vm, prop_name)
else:
with self.assertNotRaises(AttributeError):
self.assertEquals(getattr(vm, prop_name), expected_default)
self.assertEqual(getattr(vm, prop_name), expected_default)
xml = vm.__xml__()
prop_xml = xml.xpath(
'./properties/property[@name=\'{}\']'.format(prop_name))
self.assertEquals(len(prop_xml), 0, "Property still found in XML")
self.assertEqual(len(prop_xml), 0, "Property still found in XML")
def _test_generic_bool_property(self, vm, prop_name, default=False):
self.assertPropertyDefaultValue(vm, prop_name, default)

View File

@ -97,9 +97,10 @@ class AdminVM(qubes.vm.qubesvm.QubesVM):
'''
# return psutil.virtual_memory().total/1024
for line in open('/proc/meminfo'):
if line.startswith('MemTotal:'):
return int(line.split(':')[1].strip().split()[0])
with open('/proc/meminfo') as file:
for line in file:
if line.startswith('MemTotal:'):
return int(line.split(':')[1].strip().split()[0])
raise NotImplementedError()
def get_mem_static_max(self):

View File

@ -683,9 +683,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self._libvirt_domain = None
self._qdb_connection = None
#: this :py:class:`asyncio.Event` will fire when session is obtained
self.have_session = asyncio.Event()
if xml is None:
# we are creating new VM and attributes came through kwargs
assert hasattr(self, 'qid')
@ -923,8 +920,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if qmemman_client:
qmemman_client.close()
asyncio.ensure_future(self._wait_for_session())
return self
@qubes.events.handler('domain-shutdown')
@ -1081,9 +1076,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
raise qubes.exc.QubesVMError(
self, 'Domain {!r}: qrexec not connected'.format(self.name))
if gui and not self.have_session.is_set():
raise qubes.exc.QubesVMError(self, 'don\'t have session yet')
self.fire_event_pre('domain-cmd-pre-run', start_guid=gui)
return (yield from asyncio.create_subprocess_exec(
@ -1119,7 +1111,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
raise qubes.exc.QubesVMError(self,
'service {!r} failed with retcode {!r}; '
'stdout={!r} stderr={!r}'.format(
args, p.returncode, *stdouterr))
args[0], p.returncode, *stdouterr))
return stdouterr
@ -1131,17 +1123,23 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
input = b''
return b''.join((command.rstrip('\n').encode('utf-8'), b'\n', input))
def run(self, command, input=None, **kwargs):
'''Run a shell command inside the domain using qubes.VMShell qrexec.
def run(self, command, user=None, **kwargs):
'''Run a shell command inside the domain using qrexec.
This method is a coroutine.
*kwargs* are passed verbatim to :py:meth:`run_service`.
''' # pylint: disable=redefined-builtin
return self.run_service('qubes.VMShell',
input=self._prepare_input_for_vmshell(command, input), **kwargs)
def run_for_stdio(self, command, input=None, **kwargs):
if user is None:
user = self.default_user
return asyncio.create_subprocess_exec(
qubes.config.system_path['qrexec_client_path'],
'-d', str(self.name),
'{}:{}'.format(user, command),
**kwargs)
@asyncio.coroutine
def run_for_stdio(self, *args, input=None, **kwargs):
'''Run a shell command inside the domain using qubes.VMShell qrexec.
This method is a coroutine.
@ -1149,8 +1147,20 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
*kwargs* are passed verbatim to :py:meth:`run_service_for_stdio`.
See disclaimer there.
''' # pylint: disable=redefined-builtin
return self.run_service_for_stdio('qubes.VMShell',
input=self._prepare_input_for_vmshell(command, input), **kwargs)
kwargs.setdefault('stdin', subprocess.PIPE)
kwargs.setdefault('stdout', subprocess.PIPE)
kwargs.setdefault('stderr', subprocess.PIPE)
p = yield from self.run(*args, **kwargs)
stdouterr = yield from p.communicate(input=input)
if p.returncode:
raise qubes.exc.QubesVMError(self,
'service {!r} failed with retcode {!r}; '
'stdout={!r} stderr={!r}'.format(
args[0], p.returncode, *stdouterr))
return stdouterr
def request_memory(self, mem_required=None):
# overhead of per-qube/per-vcpu Xen structures,
@ -1264,23 +1274,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
except subprocess.CalledProcessError:
raise qubes.exc.QubesException('Cannot execute qubesdb-daemon')
@asyncio.coroutine
def _wait_for_session(self):
'''Wait until machine finished boot sequence.
This is done by executing qubes RPC call that checks if dummy system
service (which is started late in standard runlevel) is active.
'''
self.log.info('Waiting for qubes-session')
yield from self.run_service_for_stdio('qubes.WaitForSession',
user='root', gui=False, input=self.default_user.encode())
self.log.info('qubes-session acquired')
self.have_session.set()
self.fire_event('domain-has-session')
@asyncio.coroutine
def create_on_disk(self, pool=None, pools=None):
'''Create files needed for VM.

View File

@ -20,13 +20,13 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
''' Qrexec policy parser and evaluator '''
import enum
import itertools
import json
import os
import os.path
import socket
import subprocess
import enum
import itertools
# don't import 'qubes.config' please, it takes 0.3s
QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client'