core-admin-client/qubesadmin/tests/app.py
Marek Marczykowski-Górecki bcd026d141
Implement VM clone as create + copy data+metadata
This way we don't need separate admin.vm.Clone call, which is tricky to
handler properly with policy.
A VM may not have access to all the properties and other metadata, so
add ignore_errors argument, for best-effort approach (copy what is
possible). In any case, failure of cloning VM data fails the whole
operation.
When operation fails, VM is removed.

While at it, allow to specify alternative VM class - this allows
morphing one VM into another (for example AppVM -> StandaloneVM).

Adjust qvm-clone tool and tests accordingly.

QubesOS/qubes-issues#2622
2017-06-20 01:34:18 +02:00

790 lines
34 KiB
Python

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import shutil
import socket
import subprocess
import unittest
import multiprocessing
try:
import unittest.mock as mock
except ImportError:
import mock
import tempfile
import qubesadmin.exc
import qubesadmin.tests
class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
def test_000_list(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.assertEqual(
list(self.app.domains.keys()),
['test-vm'])
self.assertAllCalled()
def test_001_getitem(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
try:
vm = self.app.domains['test-vm']
self.assertEqual(vm.name, 'test-vm')
except KeyError:
self.fail('VM not found in collection')
self.assertAllCalled()
with self.assertRaises(KeyError):
vm = self.app.domains['test-non-existent']
self.assertAllCalled()
def test_002_in(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.assertIn('test-vm', self.app.domains)
self.assertAllCalled()
self.assertNotIn('test-non-existent', self.app.domains)
self.assertAllCalled()
def test_003_iter(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
self.assertAllCalled()
def test_004_delitem(self):
self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
b'0\x00'
del self.app.domains['test-vm']
self.assertAllCalled()
class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
def test_010_new_simple(self):
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', None,
b'name=new-vm label=red')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-vm class=AppVM state=Running\n'
vm = self.app.add_new_vm('AppVM', 'new-vm', 'red')
self.assertEqual(vm.name, 'new-vm')
self.assertEqual(vm.__class__.__name__, 'AppVM')
self.assertAllCalled()
def test_011_new_template(self):
self.app.expected_calls[('dom0', 'admin.vm.Create.TemplateVM', None,
b'name=new-template label=red')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-template class=TemplateVM state=Running\n'
vm = self.app.add_new_vm('TemplateVM', 'new-template', 'red')
self.assertEqual(vm.name, 'new-template')
self.assertEqual(vm.__class__.__name__, 'TemplateVM')
self.assertAllCalled()
def test_012_new_template_based(self):
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'some-template', b'name=new-vm label=red')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-vm class=AppVM state=Running\n'
vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', 'some-template')
self.assertEqual(vm.name, 'new-vm')
self.assertEqual(vm.__class__.__name__, 'AppVM')
self.assertAllCalled()
def test_013_new_objects_params(self):
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'some-template', b'name=new-vm label=red')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
b'0\x00red\nblue\n'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-vm class=AppVM state=Running\n' \
b'some-template class=TemplateVM state=Running\n'
vm = self.app.add_new_vm(self.app.get_vm_class('AppVM'), 'new-vm',
self.app.get_label('red'), self.app.domains['some-template'])
self.assertEqual(vm.name, 'new-vm')
self.assertEqual(vm.__class__.__name__, 'AppVM')
self.assertAllCalled()
def test_014_new_pool(self):
self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
b'name=new-vm label=red pool=some-pool')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-vm class=AppVM state=Running\n'
vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', pool='some-pool')
self.assertEqual(vm.name, 'new-vm')
self.assertEqual(vm.__class__.__name__, 'AppVM')
self.assertAllCalled()
def test_015_new_pools(self):
self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
b'name=new-vm label=red pool:private=some-pool '
b'pool:volatile=other-pool')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-vm class=AppVM state=Running\n'
vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
pools={'private': 'some-pool', 'volatile': 'other-pool'})
self.assertEqual(vm.name, 'new-vm')
self.assertEqual(vm.__class__.__name__, 'AppVM')
self.assertAllCalled()
def test_020_get_label(self):
self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
b'0\x00red\nblue\n'
label = self.app.get_label('red')
self.assertEqual(label.name, 'red')
self.assertAllCalled()
def clone_setup_common_calls(self, src, dst):
# labels
self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
b'0\x00red\ngreen\nblue\n'
# have each property type with default=no, each special-cased,
# and some with default=yes
properties = {
'label': 'default=False type=label red',
'template': 'default=False type=vm test-template',
'memory': 'default=False type=int 400',
'kernel': 'default=False type=str 4.9.31',
'netvm': 'default=False type=vm test-net',
'hvm': 'default=False type=bool True',
'default_user': 'default=True type=str user',
}
self.app.expected_calls[
(src, 'admin.vm.property.List', None, None)] = \
b'0\0qid\nname\n' + \
b'\n'.join(prop.encode() for prop in properties.keys()) + \
b'\n'
for prop, value in properties.items():
self.app.expected_calls[
(src, 'admin.vm.property.Get', prop, None)] = \
b'0\0' + value.encode()
# special cases handled by admin.vm.Create call
if prop in ('label', 'template'):
continue
# default properties should not be set
if 'default=True' in value:
continue
self.app.expected_calls[
(dst, 'admin.vm.property.Set', prop,
value.split()[-1].encode())] = b'0\0'
# tags
self.app.expected_calls[
(src, 'admin.vm.tag.List', None, None)] = \
b'0\0tag1\ntag2\n'
self.app.expected_calls[
(dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
self.app.expected_calls[
(dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0'
# features
self.app.expected_calls[
(src, 'admin.vm.feature.List', None, None)] = \
b'0\0feat1\nfeat2\n'
self.app.expected_calls[
(src, 'admin.vm.feature.Get', 'feat1', None)] = \
b'0\0feat1-value with spaces'
self.app.expected_calls[
(src, 'admin.vm.feature.Get', 'feat2', None)] = \
b'0\x001'
self.app.expected_calls[
(dst, 'admin.vm.feature.Set', 'feat1',
b'feat1-value with spaces')] = b'0\0'
self.app.expected_calls[
(dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0'
# firewall
rules = (
b'action=drop dst4=192.168.0.0/24\n'
b'action=accept\n'
)
self.app.expected_calls[
(src, 'admin.vm.firewall.GetPolicy', None, None)] = \
b'0\x00accept'
self.app.expected_calls[
(src, 'admin.vm.firewall.Get', None, None)] = \
b'0\x00' + rules
self.app.expected_calls[
(dst, 'admin.vm.firewall.SetPolicy', None, b'accept')] = \
b'0\x00'
self.app.expected_calls[
(dst, 'admin.vm.firewall.Set', None, rules)] = \
b'0\x00'
# storage
self.app.expected_calls[
(dst, 'admin.vm.volume.List', None, None)] = \
b'0\x00root\nprivate\nvolatile\nkernel\n'
self.app.expected_calls[
(dst, 'admin.vm.volume.Info', 'root', None)] = \
b'0\x00pool=lvm\n' \
b'vid=vm-test-vm/root\n' \
b'size=10737418240\n' \
b'usage=2147483648\n' \
b'rw=False\n' \
b'internal=True\n' \
b'source=vm-test-template/root\n' \
b'save_on_stop=False\n' \
b'snap_on_start=True\n'
self.app.expected_calls[
(dst, 'admin.vm.volume.Info', 'private', None)] = \
b'0\x00pool=lvm\n' \
b'vid=vm-test-vm/private\n' \
b'size=2147483648\n' \
b'usage=214748364\n' \
b'rw=True\n' \
b'internal=True\n' \
b'save_on_stop=True\n' \
b'snap_on_start=False\n'
self.app.expected_calls[
(dst, 'admin.vm.volume.Info', 'volatile', None)] = \
b'0\x00pool=lvm\n' \
b'vid=vm-test-vm/volatile\n' \
b'size=10737418240\n' \
b'usage=0\n' \
b'rw=True\n' \
b'internal=True\n' \
b'source=None\n' \
b'save_on_stop=False\n' \
b'snap_on_start=False\n'
self.app.expected_calls[
(dst, 'admin.vm.volume.Info', 'kernel', None)] = \
b'0\x00pool=linux-kernel\n' \
b'vid=\n' \
b'size=0\n' \
b'usage=0\n' \
b'rw=False\n' \
b'internal=True\n' \
b'source=None\n' \
b'save_on_stop=False\n' \
b'snap_on_start=False\n'
self.app.expected_calls[
(src, 'admin.vm.volume.List', None, None)] = \
b'0\x00root\nprivate\nvolatile\nkernel\n'
self.app.expected_calls[
(src, 'admin.vm.volume.Clone', 'private', dst.encode())] = \
b'0\x00'
def test_030_clone(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
new_vm = self.app.clone_vm('test-vm', 'new-name')
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_031_clone_object(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_032_clone_pool(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
'test-template',
b'name=new-name label=red pool=some-pool')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_033_clone_pools(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
'test-template',
b'name=new-name label=red pool:private=some-pool '
b'pool:volatile=other-pool')] = b'0\x00'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
new_vm = self.app.clone_vm('test-vm', 'new-name',
pools={'private': 'some-pool', 'volatile': 'other-pool'})
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_034_clone_class_change(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=StandaloneVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('new-name', 'admin.vm.volume.Info', 'root', None)] = \
b'0\x00pool=lvm\n' \
b'vid=vm-new-name/root\n' \
b'size=10737418240\n' \
b'usage=2147483648\n' \
b'rw=True\n' \
b'internal=True\n' \
b'source=None\n' \
b'save_on_stop=True\n' \
b'snap_on_start=False\n'
self.app.expected_calls[
('test-vm', 'admin.vm.volume.Clone', 'root', b'new-name')] = \
b'0\x00'
new_vm = self.app.clone_vm('test-vm', 'new-name',
new_cls='StandaloneVM')
self.assertEqual(new_vm.name, 'new-name')
self.assertEqual(new_vm.__class__.__name__, 'StandaloneVM')
self.assertAllCalled()
def test_035_clone_fail(self):
self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
b'0\x00red\ngreen\nblue\n'
self.app.expected_calls[
('test-vm', 'admin.vm.property.List', None, None)] = \
b'0\0qid\nname\ntemplate\nlabel\nmemory\n'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'label', None)] = \
b'0\0default=False type=label red'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'template', None)] = \
b'0\0default=False type=vm test-template'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'memory', None)] = \
b'0\0default=False type=int 400'
self.app.expected_calls[
('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
b'0\x00'
with self.assertRaises(qubesadmin.exc.QubesException):
self.app.clone_vm('test-vm', 'new-name')
self.assertAllCalled()
def test_036_clone_ignore_errors_prop(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
b'2\0QubesException\0\0something happened\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_037_clone_ignore_errors_feature(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \
b'2\0QubesException\0\0something happened\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_038_clone_ignore_errors_tag(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \
b'2\0QubesException\0\0something happened\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_039_clone_ignore_errors_firewall(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('new-name', 'admin.vm.firewall.SetPolicy', None, b'accept')] = \
b'2\0QubesException\0\0something happened\0'
del self.app.expected_calls[
('test-vm', 'admin.vm.firewall.Get', None, None)]
del self.app.expected_calls[
('new-name', 'admin.vm.firewall.Set', None,
b'action=drop dst4=192.168.0.0/24\naction=accept\n')]
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
def test_040_clone_ignore_errors_storage(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
b'0\x00'
del self.app.expected_calls[
('new-name', 'admin.vm.volume.Info', 'root', None)]
del self.app.expected_calls[
('new-name', 'admin.vm.volume.Info', 'volatile', None)]
with self.assertRaises(qubesadmin.exc.QubesException):
self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertAllCalled()
def test_041_clone_fail_storage(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
b'0\x00'
del self.app.expected_calls[
('new-name', 'admin.vm.volume.Info', 'root', None)]
del self.app.expected_calls[
('new-name', 'admin.vm.volume.Info', 'volatile', None)]
with self.assertRaises(qubesadmin.exc.QubesException):
self.app.clone_vm('test-vm', 'new-name')
self.assertAllCalled()
class TC_20_QubesLocal(unittest.TestCase):
def setUp(self):
super(TC_20_QubesLocal, self).setUp()
self.socket_dir = tempfile.mkdtemp()
self.orig_sock = qubesadmin.config.QUBESD_SOCKET
qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
self.proc = None
self.app = qubesadmin.app.QubesLocal()
def listen_and_send(self, send_data):
'''Listen on socket and send data in response.
:param bytes send_data: data to send
'''
self.socket_pipe, child_pipe = multiprocessing.Pipe()
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.bind(os.path.join(self.socket_dir, 'sock'))
self.socket.listen(1)
def worker(sock, pipe, send_data_):
conn, addr = sock.accept()
pipe.send(conn.makefile('rb').read())
conn.sendall(send_data_)
conn.close()
self.proc = multiprocessing.Process(target=worker,
args=(self.socket, child_pipe, send_data))
self.proc.start()
self.socket.close()
def get_request(self):
'''Get request sent to "qubesd" mock'''
return self.socket_pipe.recv()
def tearDown(self):
qubesadmin.config.QUBESD_SOCKET = self.orig_sock
if self.proc is not None:
try:
self.proc.terminate()
except OSError:
pass
shutil.rmtree(self.socket_dir)
super(TC_20_QubesLocal, self).tearDown()
def test_000_qubesd_call(self):
self.listen_and_send(b'0\0')
self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
self.assertEqual(self.get_request(),
b'dom0\0some.method\0test-vm\0arg1\0payload')
def test_001_qubesd_call_none_arg(self):
self.listen_and_send(b'0\0')
self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
self.assertEqual(self.get_request(),
b'dom0\0some.method\0test-vm\0\0payload')
def test_002_qubesd_call_none_payload(self):
self.listen_and_send(b'0\0')
self.app.qubesd_call('test-vm', 'some.method', None, None)
self.assertEqual(self.get_request(),
b'dom0\0some.method\0test-vm\0\0')
def test_003_qubesd_call_payload_stream(self):
# this should really be in setUp()...
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
service_path = os.path.join(tmpdir, 'test.service')
payload_input = os.path.join(tmpdir, 'payload-input')
with open(service_path, 'w') as f:
f.write('#!/bin/bash\n'
'env > {dir}/env\n'
'echo "$@" > {dir}/args\n'
'cat > {dir}/payload\n'
'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
os.chmod(service_path, 0o755)
with open(payload_input, 'w+') as payload_file:
payload_file.write('some payload\n')
payload_file.seek(0)
with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
tmpdir):
value = self.app.qubesd_call('test-vm', 'test.service',
'some-arg', payload_stream=payload_file)
self.assertEqual(value, b'return-value')
self.assertTrue(os.path.exists(tmpdir + '/env'))
with open(tmpdir + '/env') as env:
self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
self.assertTrue(os.path.exists(tmpdir + '/args'))
with open(tmpdir + '/args') as args:
self.assertEqual(args.read(), 'some-arg\n')
self.assertTrue(os.path.exists(tmpdir + '/payload'))
with open(tmpdir + '/payload') as payload:
self.assertEqual(payload.read(), 'some payload\n')
def test_004_qubesd_call_payload_stream_proc(self):
# this should really be in setUp()...
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
service_path = os.path.join(tmpdir, 'test.service')
echo = subprocess.Popen(['echo', 'some payload'],
stdout=subprocess.PIPE)
with open(service_path, 'w') as f:
f.write('#!/bin/bash\n'
'env > {dir}/env\n'
'echo "$@" > {dir}/args\n'
'cat > {dir}/payload\n'
'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
os.chmod(service_path, 0o755)
with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
tmpdir):
value = self.app.qubesd_call('test-vm', 'test.service',
'some-arg', payload_stream=echo.stdout)
echo.stdout.close()
self.assertEqual(value, b'return-value')
self.assertTrue(os.path.exists(tmpdir + '/env'))
with open(tmpdir + '/env') as env:
self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
self.assertTrue(os.path.exists(tmpdir + '/args'))
with open(tmpdir + '/args') as args:
self.assertEqual(args.read(), 'some-arg\n')
self.assertTrue(os.path.exists(tmpdir + '/payload'))
with open(tmpdir + '/payload') as payload:
self.assertEqual(payload.read(), 'some payload\n')
def test_010_run_service(self):
self.listen_and_send(b'0\0')
with mock.patch('subprocess.Popen') as mock_proc:
p = self.app.run_service('some-vm', 'service.name')
mock_proc.assert_called_once_with([
qubesadmin.config.QREXEC_CLIENT,
'-d', 'some-vm', 'DEFAULT:QUBESRPC service.name dom0'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.assertEqual(self.get_request(),
b'dom0\0admin.vm.Start\0some-vm\0\0')
def test_011_run_service_filter_esc(self):
self.listen_and_send(b'0\0')
with mock.patch('subprocess.Popen') as mock_proc:
p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
mock_proc.assert_called_once_with([
qubesadmin.config.QREXEC_CLIENT,
'-d', 'some-vm', '-t', '-T',
'DEFAULT:QUBESRPC service.name dom0'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.assertEqual(self.get_request(),
b'dom0\0admin.vm.Start\0some-vm\0\0')
def test_012_run_service_user(self):
self.listen_and_send(b'0\0')
with mock.patch('subprocess.Popen') as mock_proc:
p = self.app.run_service('some-vm', 'service.name', user='user')
mock_proc.assert_called_once_with([
qubesadmin.config.QREXEC_CLIENT,
'-d', 'some-vm',
'user:QUBESRPC service.name dom0'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.assertEqual(self.get_request(),
b'dom0\0admin.vm.Start\0some-vm\0\0')
def test_013_run_service_default_target(self):
with self.assertRaises(ValueError):
self.app.run_service('', 'service.name')
class TC_30_QubesRemote(unittest.TestCase):
def setUp(self):
super(TC_30_QubesRemote, self).setUp()
self.proc_mock = mock.Mock()
self.proc_mock.configure_mock(**{
'return_value.returncode': 0
})
self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
self.proc_patch.start()
self.app = qubesadmin.app.QubesRemote()
def set_proc_stdout(self, send_data):
self.proc_mock.configure_mock(**{
'return_value.communicate.return_value': (send_data, None)
})
def tearDown(self):
self.proc_patch.stop()
super(TC_30_QubesRemote, self).tearDown()
def test_000_qubesd_call(self):
self.set_proc_stdout(b'0\0')
self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
self.assertEqual(self.proc_mock.mock_calls, [
mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
'some.method+arg1'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE),
mock.call().communicate(b'payload')
])
def test_001_qubesd_call_none_arg(self):
self.set_proc_stdout(b'0\0')
self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
self.assertEqual(self.proc_mock.mock_calls, [
mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
'some.method'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE),
mock.call().communicate(b'payload')
])
def test_002_qubesd_call_none_payload(self):
self.set_proc_stdout(b'0\0')
self.app.qubesd_call('test-vm', 'some.method', None, None)
self.assertEqual(self.proc_mock.mock_calls, [
mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
'some.method'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE),
mock.call().communicate(None)
])
def test_003_qubesd_call_payload_stream(self):
self.set_proc_stdout(b'0\0return-value')
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
payload_input = os.path.join(tmpdir, 'payload-input')
with open(payload_input, 'w+') as payload_file:
payload_file.write('some payload\n')
payload_file.seek(0)
value = self.app.qubesd_call('test-vm', 'some.method',
'some-arg', payload_stream=payload_file)
self.assertEqual(self.proc_mock.mock_calls, [
mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
'some.method+some-arg'],
stdin=payload_file, stdout=subprocess.PIPE,
stderr=subprocess.PIPE),
mock.call().communicate(None)
])
self.assertEqual(value, b'return-value')
def test_010_run_service(self):
self.app.run_service('some-vm', 'service.name')
self.proc_mock.assert_called_once_with([
qubesadmin.config.QREXEC_CLIENT_VM,
'some-vm', 'service.name'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_011_run_service_filter_esc(self):
with self.assertRaises(NotImplementedError):
p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
def test_012_run_service_user(self):
with self.assertRaises(ValueError):
p = self.app.run_service('some-vm', 'service.name', user='user')
def test_013_run_service_default_target(self):
self.app.run_service('', 'service.name')
self.proc_mock.assert_called_once_with([
qubesadmin.config.QREXEC_CLIENT_VM,
'', 'service.name'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)