123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958 |
- # -*- encoding: utf-8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with this program; if not, see <http://www.gnu.org/licenses/>.
- import os
- import shutil
- import socket
- import subprocess
- import unittest
- import multiprocessing
- try:
- import unittest.mock as mock
- from unittest.mock import call
- except ImportError:
- import mock
- from mock import call
- import tempfile
- import qubesadmin.exc
- import qubesadmin.tests
- class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
- def test_000_list(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n'
- self.assertEqual(
- list(self.app.domains.keys()),
- ['test-vm'])
- self.assertAllCalled()
- def test_001_getitem(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n'
- try:
- vm = self.app.domains['test-vm']
- self.assertEqual(vm.name, 'test-vm')
- except KeyError:
- self.fail('VM not found in collection')
- self.assertAllCalled()
- with self.assertRaises(KeyError):
- vm = self.app.domains['test-non-existent']
- self.assertAllCalled()
- def test_002_in(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n'
- self.assertIn('test-vm', self.app.domains)
- self.assertAllCalled()
- self.assertNotIn('test-non-existent', self.app.domains)
- self.assertAllCalled()
- def test_003_iter(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n'
- self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
- self.assertAllCalled()
- def test_004_delitem(self):
- self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
- b'0\x00'
- del self.app.domains['test-vm']
- self.assertAllCalled()
- def test_005_keys(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n' \
- b'test-vm2 class=AppVM state=Running\n'
- self.assertEqual(set(self.app.domains.keys()),
- set(['test-vm', 'test-vm2']))
- self.assertAllCalled()
- def test_006_values(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n' \
- b'test-vm2 class=AppVM state=Running\n'
- values = self.app.domains.values()
- for obj in values:
- self.assertIsInstance(obj, qubesadmin.vm.QubesVM)
- self.assertEqual(set([vm.name for vm in values]),
- set(['test-vm', 'test-vm2']))
- self.assertAllCalled()
- def test_007_getitem_blind_mode(self):
- self.app.blind_mode = True
- try:
- vm = self.app.domains['test-vm']
- self.assertEqual(vm.name, 'test-vm')
- except KeyError:
- self.fail('VM not found in collection')
- self.assertAllCalled()
- with self.assertNotRaises(KeyError):
- vm = self.app.domains['test-non-existent']
- self.assertAllCalled()
- def test_008_in_blind_mode(self):
- self.app.blind_mode = True
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n'
- self.assertIn('test-vm', self.app.domains)
- self.assertAllCalled()
- self.assertNotIn('test-non-existent', self.app.domains)
- self.assertAllCalled()
- def test_009_getitem_cache_class(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00test-vm class=AppVM state=Running\n'
- try:
- vm = self.app.domains['test-vm']
- self.assertEqual(vm.name, 'test-vm')
- self.assertEqual(vm.klass, 'AppVM')
- except KeyError:
- self.fail('VM not found in collection')
- self.assertAllCalled()
- class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
- def setUp(self):
- super(TC_10_QubesBase, self).setUp()
- self.check_output_patch = mock.patch(
- 'subprocess.check_output')
- self.check_output_mock = self.check_output_patch.start()
- def tearDown(self):
- self.check_output_patch.stop()
- super(TC_10_QubesBase, self).tearDown()
- def test_010_new_simple(self):
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', None,
- b'name=new-vm label=red')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-vm class=AppVM state=Running\n'
- vm = self.app.add_new_vm('AppVM', 'new-vm', 'red')
- self.assertEqual(vm.name, 'new-vm')
- self.assertEqual(vm.klass, 'AppVM')
- self.assertAllCalled()
- def test_011_new_template(self):
- self.app.expected_calls[('dom0', 'admin.vm.Create.TemplateVM', None,
- b'name=new-template label=red')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-template class=TemplateVM state=Running\n'
- vm = self.app.add_new_vm('TemplateVM', 'new-template', 'red')
- self.assertEqual(vm.name, 'new-template')
- self.assertEqual(vm.klass, 'TemplateVM')
- self.assertAllCalled()
- def test_012_new_template_based(self):
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'some-template', b'name=new-vm label=red')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-vm class=AppVM state=Running\n'
- vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', 'some-template')
- self.assertEqual(vm.name, 'new-vm')
- self.assertEqual(vm.klass, 'AppVM')
- self.assertAllCalled()
- def test_013_new_objects_params(self):
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'some-template', b'name=new-vm label=red')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
- b'0\x00red\nblue\n'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-vm class=AppVM state=Running\n' \
- b'some-template class=TemplateVM state=Running\n'
- vm = self.app.add_new_vm(self.app.get_vm_class('AppVM'), 'new-vm',
- self.app.get_label('red'), self.app.domains['some-template'])
- self.assertEqual(vm.name, 'new-vm')
- self.assertEqual(vm.klass, 'AppVM')
- self.assertAllCalled()
- def test_014_new_pool(self):
- self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
- b'name=new-vm label=red pool=some-pool')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-vm class=AppVM state=Running\n'
- vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', pool='some-pool')
- self.assertEqual(vm.name, 'new-vm')
- self.assertEqual(vm.klass, 'AppVM')
- self.assertAllCalled()
- def test_015_new_pools(self):
- self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
- b'name=new-vm label=red pool:private=some-pool '
- b'pool:volatile=other-pool')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-vm class=AppVM state=Running\n'
- vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
- pools={'private': 'some-pool', 'volatile': 'other-pool'})
- self.assertEqual(vm.name, 'new-vm')
- self.assertEqual(vm.klass, 'AppVM')
- self.assertAllCalled()
- def test_016_new_template_based_default(self):
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- None, b'name=new-vm label=red')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-vm class=AppVM state=Running\n'
- vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
- template=qubesadmin.DEFAULT)
- self.assertEqual(vm.name, 'new-vm')
- self.assertEqual(vm.klass, 'AppVM')
- self.assertAllCalled()
- def test_020_get_label(self):
- self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
- b'0\x00red\nblue\n'
- label = self.app.get_label('red')
- self.assertEqual(label.name, 'red')
- self.assertAllCalled()
- def clone_setup_common_calls(self, src, dst):
- # have each property type with default=no, each special-cased,
- # and some with default=yes
- properties = {
- 'label': 'default=False type=label red',
- 'template': 'default=False type=vm test-template',
- 'memory': 'default=False type=int 400',
- 'kernel': 'default=False type=str 4.9.31',
- 'netvm': 'default=False type=vm test-net',
- 'virt_mode': 'default=False type=str hvm',
- 'default_user': 'default=True type=str user',
- }
- self.app.expected_calls[
- (src, 'admin.vm.property.List', None, None)] = \
- b'0\0qid\nname\n' + \
- b'\n'.join(prop.encode() for prop in properties.keys()) + \
- b'\n'
- for prop, value in properties.items():
- self.app.expected_calls[
- (src, 'admin.vm.property.Get', prop, None)] = \
- b'0\0' + value.encode()
- # special cases handled by admin.vm.Create call
- if prop in ('label', 'template'):
- continue
- # default properties should not be set
- if 'default=True' in value:
- continue
- self.app.expected_calls[
- (dst, 'admin.vm.property.Set', prop,
- value.split()[-1].encode())] = b'0\0'
- # tags
- self.app.expected_calls[
- (src, 'admin.vm.tag.List', None, None)] = \
- b'0\0tag1\ntag2\n'
- self.app.expected_calls[
- (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
- self.app.expected_calls[
- (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0'
- # features
- self.app.expected_calls[
- (src, 'admin.vm.feature.List', None, None)] = \
- b'0\0feat1\nfeat2\n'
- self.app.expected_calls[
- (src, 'admin.vm.feature.Get', 'feat1', None)] = \
- b'0\0feat1-value with spaces'
- self.app.expected_calls[
- (src, 'admin.vm.feature.Get', 'feat2', None)] = \
- b'0\x001'
- self.app.expected_calls[
- (dst, 'admin.vm.feature.Set', 'feat1',
- b'feat1-value with spaces')] = b'0\0'
- self.app.expected_calls[
- (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0'
- # firewall
- rules = (
- b'action=drop dst4=192.168.0.0/24\n'
- b'action=accept\n'
- )
- self.app.expected_calls[
- (src, 'admin.vm.firewall.Get', None, None)] = \
- b'0\x00' + rules
- self.app.expected_calls[
- (dst, 'admin.vm.firewall.Set', None, rules)] = \
- b'0\x00'
- # storage
- for vm in (src, dst):
- self.app.expected_calls[
- (vm, 'admin.vm.volume.Info', 'root', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=vm-' + vm.encode() + b'/root\n' \
- b'size=10737418240\n' \
- b'usage=2147483648\n' \
- b'rw=False\n' \
- b'internal=True\n' \
- b'source=vm-test-template/root\n' \
- b'save_on_stop=False\n' \
- b'snap_on_start=True\n'
- self.app.expected_calls[
- (vm, 'admin.vm.volume.Info', 'private', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=vm-' + vm.encode() + b'/private\n' \
- b'size=2147483648\n' \
- b'usage=214748364\n' \
- b'rw=True\n' \
- b'internal=True\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n'
- self.app.expected_calls[
- (vm, 'admin.vm.volume.Info', 'volatile', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=vm-' + vm.encode() + b'/volatile\n' \
- b'size=10737418240\n' \
- b'usage=0\n' \
- b'rw=True\n' \
- b'internal=True\n' \
- b'source=None\n' \
- b'save_on_stop=False\n' \
- b'snap_on_start=False\n'
- self.app.expected_calls[
- (vm, 'admin.vm.volume.Info', 'kernel', None)] = \
- b'0\x00pool=linux-kernel\n' \
- b'vid=\n' \
- b'size=0\n' \
- b'usage=0\n' \
- b'rw=False\n' \
- b'internal=True\n' \
- b'source=None\n' \
- b'save_on_stop=False\n' \
- b'snap_on_start=False\n'
- self.app.expected_calls[
- (vm, 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\nvolatile\nkernel\n'
- self.app.expected_calls[
- (src, 'admin.vm.volume.CloneFrom', 'private', None)] = \
- b'0\x00token-private'
- self.app.expected_calls[
- (dst, 'admin.vm.volume.CloneTo', 'private', b'token-private')] = \
- b'0\x00'
- self.app.expected_calls[
- ('dom0', 'admin.property.Get', 'default_pool_private', None)] = \
- b'0\0default=True type=str lvm'
- def test_030_clone(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- new_vm = self.app.clone_vm('test-vm', 'new-name')
- self.assertEqual(new_vm.name, 'new-name')
- self.check_output_mock.assert_called_once_with(
- ['qvm-appmenus', '--init', '--update',
- '--source', 'test-vm', 'new-name'],
- stderr=subprocess.STDOUT
- )
- self.assertAllCalled()
- def test_031_clone_object(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
- self.assertEqual(new_vm.name, 'new-name')
- self.assertAllCalled()
- def test_032_clone_pool(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- for volume in ('root', 'private', 'volatile', 'kernel'):
- del self.app.expected_calls[
- 'test-vm', 'admin.vm.volume.Info', volume, None]
- del self.app.expected_calls[
- 'dom0', 'admin.property.Get', 'default_pool_private', None]
- self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
- 'test-template',
- b'name=new-name label=red pool=some-pool')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
- self.assertEqual(new_vm.name, 'new-name')
- self.assertAllCalled()
- def test_033_clone_pools(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- for volume in ('root', 'private', 'volatile', 'kernel'):
- del self.app.expected_calls[
- 'test-vm', 'admin.vm.volume.Info', volume, None]
- del self.app.expected_calls[
- 'dom0', 'admin.property.Get', 'default_pool_private', None]
- self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
- 'test-template',
- b'name=new-name label=red pool:private=some-pool '
- b'pool:volatile=other-pool')] = b'0\x00'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- new_vm = self.app.clone_vm('test-vm', 'new-name',
- pools={'private': 'some-pool', 'volatile': 'other-pool'})
- self.assertEqual(new_vm.name, 'new-name')
- self.assertAllCalled()
- def test_034_clone_class_change(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=StandaloneVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[
- ('new-name', 'admin.vm.volume.Info', 'root', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=vm-new-name/root\n' \
- b'size=10737418240\n' \
- b'usage=2147483648\n' \
- b'rw=True\n' \
- b'internal=True\n' \
- b'source=None\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.CloneFrom', 'root', None)] = \
- b'0\x00token-root'
- self.app.expected_calls[
- ('new-name', 'admin.vm.volume.CloneTo', 'root', b'token-root')] = \
- b'0\x00'
- new_vm = self.app.clone_vm('test-vm', 'new-name',
- new_cls='StandaloneVM')
- self.assertEqual(new_vm.name, 'new-name')
- self.assertEqual(new_vm.klass, 'StandaloneVM')
- self.assertAllCalled()
- def test_035_clone_fail(self):
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.List', None, None)] = \
- b'0\0qid\nname\ntemplate\nlabel\nmemory\n'
- # simplify it a little, shouldn't get this far anyway
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Get', 'label', None)] = \
- b'0\0default=False type=label red'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
- b'0\0default=False type=vm test-template'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \
- b'0\0default=False type=int 400'
- self.app.expected_calls[
- ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
- b'2\0QubesException\0\0something happened\0'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
- b'0\x00'
- with self.assertRaises(qubesadmin.exc.QubesException):
- self.app.clone_vm('test-vm', 'new-name')
- self.assertAllCalled()
- def test_036_clone_ignore_errors_prop(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[
- ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
- b'2\0QubesException\0\0something happened\0'
- new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
- self.assertEqual(new_vm.name, 'new-name')
- self.assertAllCalled()
- def test_037_clone_ignore_errors_feature(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[
- ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \
- b'2\0QubesException\0\0something happened\0'
- new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
- self.assertEqual(new_vm.name, 'new-name')
- self.assertAllCalled()
- def test_038_clone_ignore_errors_tag(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[
- ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \
- b'2\0QubesException\0\0something happened\0'
- new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
- self.assertEqual(new_vm.name, 'new-name')
- self.assertAllCalled()
- def test_039_clone_ignore_errors_firewall(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[
- ('new-name', 'admin.vm.firewall.Set', None,
- b'action=drop dst4=192.168.0.0/24\naction=accept\n')] = \
- b'2\0QubesException\0\0something happened\0'
- new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
- self.assertEqual(new_vm.name, 'new-name')
- self.assertAllCalled()
- def test_040_clone_ignore_errors_storage(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[
- ('new-name', 'admin.vm.volume.CloneTo', 'private',
- b'token-private')] = \
- b'2\0QubesException\0\0something happened\0'
- self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
- b'0\x00'
- del self.app.expected_calls[
- ('new-name', 'admin.vm.volume.Info', 'root', None)]
- del self.app.expected_calls[
- ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
- with self.assertRaises(qubesadmin.exc.QubesException):
- self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
- self.assertAllCalled()
- def test_041_clone_fail_storage(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
- 'test-template', b'name=new-name label=red')] = b'0\x00'
- self.app.expected_calls[
- ('new-name', 'admin.vm.volume.CloneTo', 'private',
- b'token-private')] = \
- b'2\0QubesException\0\0something happened\0'
- self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
- b'0\x00'
- del self.app.expected_calls[
- ('new-name', 'admin.vm.volume.Info', 'root', None)]
- del self.app.expected_calls[
- ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
- with self.assertRaises(qubesadmin.exc.QubesException):
- self.app.clone_vm('test-vm', 'new-name')
- self.assertAllCalled()
- def test_042_clone_nondefault_pool(self):
- self.clone_setup_common_calls('test-vm', 'new-name')
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.Info', 'private', None)] = \
- b'0\x00pool=another\n' \
- b'vid=vm-test-vm/private\n' \
- b'size=2147483648\n' \
- b'usage=214748364\n' \
- b'rw=True\n' \
- b'internal=True\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00new-name class=AppVM state=Halted\n' \
- b'test-vm class=AppVM state=Halted\n' \
- b'test-template class=TemplateVM state=Halted\n' \
- b'test-net class=AppVM state=Halted\n'
- self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
- 'test-template', b'name=new-name label=red pool:private=another')]\
- = b'0\x00'
- new_vm = self.app.clone_vm('test-vm', 'new-name')
- self.assertEqual(new_vm.name, 'new-name')
- self.check_output_mock.assert_called_once_with(
- ['qvm-appmenus', '--init', '--update',
- '--source', 'test-vm', 'new-name'],
- stderr=subprocess.STDOUT
- )
- self.assertAllCalled()
- class TC_20_QubesLocal(unittest.TestCase):
- def setUp(self):
- super(TC_20_QubesLocal, self).setUp()
- self.socket_dir = tempfile.mkdtemp()
- self.orig_sock = qubesadmin.config.QUBESD_SOCKET
- qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
- self.proc = None
- self.app = qubesadmin.app.QubesLocal()
- self.tmpdir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmpdir)
- def listen_and_send(self, send_data):
- '''Listen on socket and send data in response.
- :param bytes send_data: data to send
- '''
- self.socket_pipe, child_pipe = multiprocessing.Pipe()
- self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.socket.bind(os.path.join(self.socket_dir, 'sock'))
- self.socket.listen(1)
- def worker(sock, pipe, send_data_):
- conn, addr = sock.accept()
- pipe.send(conn.makefile('rb').read())
- conn.sendall(send_data_)
- conn.close()
- self.proc = multiprocessing.Process(target=worker,
- args=(self.socket, child_pipe, send_data))
- self.proc.start()
- self.socket.close()
- def get_request(self):
- '''Get request sent to "qubesd" mock'''
- return self.socket_pipe.recv()
- def tearDown(self):
- qubesadmin.config.QUBESD_SOCKET = self.orig_sock
- if self.proc is not None:
- try:
- self.proc.terminate()
- except OSError:
- pass
- shutil.rmtree(self.socket_dir)
- super(TC_20_QubesLocal, self).tearDown()
- def test_000_qubesd_call(self):
- self.listen_and_send(b'0\0')
- self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
- self.assertEqual(self.get_request(),
- b'dom0\0some.method\0test-vm\0arg1\0payload')
- def test_001_qubesd_call_none_arg(self):
- self.listen_and_send(b'0\0')
- self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
- self.assertEqual(self.get_request(),
- b'dom0\0some.method\0test-vm\0\0payload')
- def test_002_qubesd_call_none_payload(self):
- self.listen_and_send(b'0\0')
- self.app.qubesd_call('test-vm', 'some.method', None, None)
- self.assertEqual(self.get_request(),
- b'dom0\0some.method\0test-vm\0\0')
- def test_003_qubesd_call_payload_stream(self):
- payload_input = os.path.join(self.tmpdir, 'payload-input')
- with open(payload_input, 'w+b') as payload_file:
- payload_file.write(b'some payload\n')
- payload_file.seek(0)
- self._call_test_service_with_payload_stream(
- payload_file, expected=b'some payload\n')
- def test_004_qubesd_call_payload_stream_proc(self):
- service_path = os.path.join(self.tmpdir, 'test.service')
- echo = subprocess.Popen(['echo', 'some payload'],
- stdout=subprocess.PIPE)
- self._call_test_service_with_payload_stream(
- echo.stdout, expected=b'some payload\n')
- def test_005_qubesd_call_payload_stream_with_prefix(self):
- payload_input = os.path.join(self.tmpdir, 'payload-input')
- with open(payload_input, 'w+b') as payload_file:
- payload_file.write(b'some payload\n')
- payload_file.seek(0)
- self._call_test_service_with_payload_stream(
- payload_file, payload=b'first line\n',
- expected=b'first line\nsome payload\n')
- def _call_test_service_with_payload_stream(
- self, payload_stream, payload=None, expected=b''):
- service_path = os.path.join(self.tmpdir, 'test.service')
- with open(service_path, 'w') as f:
- f.write('#!/bin/bash\n'
- 'env > {dir}/env\n'
- 'echo "$@" > {dir}/args\n'
- 'cat > {dir}/payload\n'
- 'echo -en \'0\\0return-value\'\n'.format(dir=self.tmpdir))
- os.chmod(service_path, 0o755)
- with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
- self.tmpdir):
- value = self.app.qubesd_call(
- 'test-vm', 'test.service',
- 'some-arg', payload=payload, payload_stream=payload_stream)
- self.assertEqual(value, b'return-value')
- self.assertTrue(os.path.exists(self.tmpdir + '/env'))
- with open(self.tmpdir + '/env') as env:
- self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
- self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
- self.assertTrue(os.path.exists(self.tmpdir + '/args'))
- with open(self.tmpdir + '/args') as args:
- self.assertEqual(args.read(), 'some-arg\n')
- self.assertTrue(os.path.exists(self.tmpdir + '/payload'))
- with open(self.tmpdir + '/payload', 'rb') as payload_f:
- self.assertEqual(payload_f.read(), expected)
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_010_run_service(self):
- self.listen_and_send(b'0\0')
- with mock.patch('subprocess.Popen') as mock_proc:
- p = self.app.run_service('some-vm', 'service.name')
- mock_proc.assert_called_once_with([
- qubesadmin.config.QREXEC_CLIENT,
- '-d', 'some-vm', '-T', 'DEFAULT:QUBESRPC service.name dom0'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.assertEqual(self.get_request(),
- b'dom0\0admin.vm.Start\0some-vm\0\0')
- def test_011_run_service_filter_esc(self):
- self.listen_and_send(b'0\0')
- with mock.patch('subprocess.Popen') as mock_proc:
- p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
- mock_proc.assert_called_once_with([
- qubesadmin.config.QREXEC_CLIENT,
- '-d', 'some-vm', '-t', '-T',
- 'DEFAULT:QUBESRPC service.name dom0'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.assertEqual(self.get_request(),
- b'dom0\0admin.vm.Start\0some-vm\0\0')
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_012_run_service_user(self):
- self.listen_and_send(b'0\0')
- with mock.patch('subprocess.Popen') as mock_proc:
- p = self.app.run_service('some-vm', 'service.name', user='user')
- mock_proc.assert_called_once_with([
- qubesadmin.config.QREXEC_CLIENT,
- '-d', 'some-vm', '-T',
- 'user:QUBESRPC service.name dom0'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.assertEqual(self.get_request(),
- b'dom0\0admin.vm.Start\0some-vm\0\0')
- def test_013_run_service_default_target(self):
- with self.assertRaises(ValueError):
- self.app.run_service('', 'service.name')
- class TC_30_QubesRemote(unittest.TestCase):
- def setUp(self):
- super(TC_30_QubesRemote, self).setUp()
- self.proc_mock = mock.Mock()
- self.proc_mock.configure_mock(**{
- 'return_value.returncode': 0
- })
- self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
- self.proc_patch.start()
- self.app = qubesadmin.app.QubesRemote()
- def set_proc_stdout(self, send_data):
- self.proc_mock.configure_mock(**{
- 'return_value.communicate.return_value': (send_data, None)
- })
- def tearDown(self):
- self.proc_patch.stop()
- super(TC_30_QubesRemote, self).tearDown()
- def test_000_qubesd_call(self):
- self.set_proc_stdout(b'0\0')
- self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
- self.assertEqual(self.proc_mock.mock_calls, [
- mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
- 'some.method+arg1'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE),
- mock.call().communicate(b'payload')
- ])
- def test_001_qubesd_call_none_arg(self):
- self.set_proc_stdout(b'0\0')
- self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
- self.assertEqual(self.proc_mock.mock_calls, [
- mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
- 'some.method'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE),
- mock.call().communicate(b'payload')
- ])
- def test_002_qubesd_call_none_payload(self):
- self.set_proc_stdout(b'0\0')
- self.app.qubesd_call('test-vm', 'some.method', None, None)
- self.assertEqual(self.proc_mock.mock_calls, [
- mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
- 'some.method'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE),
- mock.call().communicate(None)
- ])
- def test_003_qubesd_call_payload_stream(self):
- self.set_proc_stdout(b'0\0return-value')
- tmpdir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, tmpdir)
- payload_input = os.path.join(tmpdir, 'payload-input')
- with open(payload_input, 'w+b') as payload_file:
- payload_file.write(b'some payload\n')
- payload_file.seek(0)
- value = self.app.qubesd_call('test-vm', 'some.method',
- 'some-arg', payload_stream=payload_file)
- self.assertListEqual(self.proc_mock.mock_calls, [
- mock.call(
- [qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
- 'some.method+some-arg'],
- stdin=payload_file,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE),
- mock.call().communicate()
- ])
- self.assertEqual(value, b'return-value')
- def test_004_qubesd_call_payload_stream_with_prefix(self):
- self.set_proc_stdout(b'0\0return-value')
- tmpdir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, tmpdir)
- payload_input = os.path.join(tmpdir, 'payload-input')
- with open(payload_input, 'w+b') as payload_file:
- payload_file.write(b'some payload\n')
- payload_file.seek(0)
- value = self.app.qubesd_call('test-vm', 'some.method',
- 'some-arg', payload=b'first line\n', payload_stream=payload_file)
- self.assertListEqual(self.proc_mock.mock_calls, [
- mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
- 'some.method+some-arg'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE),
- mock.call().stdin.write(b'first line\n'),
- mock.call().stdin.write(b'some payload\n'),
- mock.call().communicate()
- ])
- self.assertEqual(value, b'return-value')
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_010_run_service(self):
- self.app.run_service('some-vm', 'service.name')
- self.proc_mock.assert_called_once_with([
- qubesadmin.config.QREXEC_CLIENT_VM,
- '-T', 'some-vm', 'service.name'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_011_run_service_filter_esc(self):
- self.app.run_service('some-vm', 'service.name', filter_esc=True)
- self.proc_mock.assert_called_once_with([
- qubesadmin.config.QREXEC_CLIENT_VM,
- '-t', '-T', 'some-vm', 'service.name'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_012_run_service_user(self):
- with self.assertRaises(ValueError):
- p = self.app.run_service('some-vm', 'service.name', user='user')
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_013_run_service_default_target(self):
- self.app.run_service('', 'service.name')
- self.proc_mock.assert_called_once_with([
- qubesadmin.config.QREXEC_CLIENT_VM,
- '-T', '', 'service.name'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_014_run_service_no_autostart1(self):
- self.set_proc_stdout( b'0\x00power_state=Running')
- self.app.run_service('some-vm', 'service.name', autostart=False)
- self.proc_mock.assert_has_calls([
- call([qubesadmin.config.QREXEC_CLIENT_VM,
- 'some-vm', 'admin.vm.CurrentState'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE),
- call().communicate(None),
- call([qubesadmin.config.QREXEC_CLIENT_VM,
- '-T', 'some-vm', 'service.name'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE),
- ])
- @mock.patch('os.isatty', lambda fd: fd == 2)
- def test_015_run_service_no_autostart2(self):
- self.set_proc_stdout( b'0\x00power_state=Halted')
- with self.assertRaises(qubesadmin.exc.QubesVMNotRunningError):
- self.app.run_service('some-vm', 'service.name', autostart=False)
- self.proc_mock.assert_called_once_with([
- qubesadmin.config.QREXEC_CLIENT_VM,
- 'some-vm', 'admin.vm.CurrentState'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
|