Browse Source

mgmt: vm.Create* and vm.Clone

Create methods are dynamically created based on available VM classes.

QubesOS/qubes-issues#2622
Marek Marczykowski-Górecki 7 years ago
parent
commit
53be79b3b7
4 changed files with 470 additions and 18 deletions
  1. 150 15
      qubes/mgmt.py
  2. 304 1
      qubes/tests/mgmt.py
  3. 9 1
      qubes/tests/storage.py
  4. 7 1
      setup.py

+ 150 - 15
qubes/mgmt.py

@@ -31,7 +31,7 @@ import pkg_resources
 import qubes.vm
 import qubes.vm.qubesvm
 import qubes.storage
-
+import qubes.utils
 
 class ProtocolError(AssertionError):
     '''Raised when something is wrong with data received'''
@@ -42,7 +42,7 @@ class PermissionDenied(Exception):
     pass
 
 
-def api(name, *, no_payload=False):
+def api(name, *, no_payload=False, endpoints=None):
     '''Decorator factory for methods intended to appear in API.
 
     The decorated method can be called from public API using a child of
@@ -63,20 +63,24 @@ def api(name, *, no_payload=False):
     If *no_payload* is true, then the method is called with no arguments.
     '''
 
-    # TODO regexp for vm/dev classess; supply regexp groups as untrusted_ kwargs
-
     def decorator(func):
         if no_payload:
             # the following assignment is needed for how closures work in Python
             _func = func
             @functools.wraps(_func)
-            def wrapper(self, untrusted_payload):
+            def wrapper(self, untrusted_payload, **kwargs):
                 if untrusted_payload != b'':
                     raise ProtocolError('unexpected payload')
-                return _func(self)
+                return _func(self, **kwargs)
             func = wrapper
 
-        func._rpcname = name  # pylint: disable=protected-access
+        # pylint: disable=protected-access
+        if endpoints is None:
+            func._rpcname = ((name, None),)
+        else:
+            func._rpcname = tuple(
+                (name.format(endpoint=endpoint), endpoint)
+                for endpoint in endpoints)
         return func
 
     return decorator
@@ -128,20 +132,20 @@ class AbstractQubesMgmt(object):
 
         untrusted_candidates = []
         for attr in dir(self):
-            untrusted_func = getattr(self, attr)
+            func = getattr(self, attr)
 
-            if not callable(untrusted_func):
+            if not callable(func):
                 continue
 
             try:
                 # pylint: disable=protected-access
-                if untrusted_func._rpcname != self.method:
-                    continue
+                for method, endpoint in func._rpcname:
+                    if method != self.method:
+                        continue
+                    untrusted_candidates.append((func, endpoint))
             except AttributeError:
                 continue
 
-            untrusted_candidates.append(untrusted_func)
-
         if not untrusted_candidates:
             raise ProtocolError('no such method: {!r}'.format(self.method))
 
@@ -158,8 +162,12 @@ class AbstractQubesMgmt(object):
 
         This method is a coroutine.
         '''
-        self._running_handler = asyncio.ensure_future(self._handler(
-            untrusted_payload=untrusted_payload))
+        handler, endpoint = self._handler
+        kwargs = {}
+        if endpoint is not None:
+            kwargs['endpoint'] = endpoint
+        self._running_handler = asyncio.ensure_future(handler(
+            untrusted_payload=untrusted_payload, **kwargs))
         return self._running_handler
 
     def cancel(self):
@@ -700,3 +708,130 @@ class QubesMgmt(AbstractQubesMgmt):
         self.fire_event_for_permission(value=value)
         self.dest.features[self.arg] = value
         self.app.save()
+
+    @api('mgmt.vm.Create.{endpoint}', endpoints=(ep.name
+            for ep in pkg_resources.iter_entry_points(qubes.vm.VM_ENTRY_POINT)))
+    @asyncio.coroutine
+    def vm_create(self, endpoint, untrusted_payload=None):
+        return self._vm_create(endpoint, allow_pool=False,
+            untrusted_payload=untrusted_payload)
+
+    @api('mgmt.vm.CreateInPool.{endpoint}', endpoints=(ep.name
+            for ep in pkg_resources.iter_entry_points(qubes.vm.VM_ENTRY_POINT)))
+    @asyncio.coroutine
+    def vm_create_in_pool(self, endpoint, untrusted_payload=None):
+        return self._vm_create(endpoint, allow_pool=True,
+            untrusted_payload=untrusted_payload)
+
+    def _vm_create(self, vm_type, allow_pool=False, untrusted_payload=None):
+        assert self.dest.name == 'dom0'
+
+        kwargs = {}
+        pool = None
+        pools = {}
+
+        # this will raise exception if none is found
+        vm_class = qubes.utils.get_entry_point_one(qubes.vm.VM_ENTRY_POINT,
+            vm_type)
+
+        # if argument is given, it needs to be a valid template, and only
+        # when given VM class do need a template
+        if hasattr(vm_class, 'template'):
+            assert self.arg in self.app.domains
+            kwargs['template'] = self.app.domains[self.arg]
+        else:
+            assert not self.arg
+
+        for untrusted_param in untrusted_payload.decode('ascii',
+                errors='strict').split(' '):
+            untrusted_key, untrusted_value = untrusted_param.split('=', 1)
+            if untrusted_key in kwargs:
+                raise ProtocolError('duplicated parameters')
+
+            if untrusted_key == 'name':
+                qubes.vm.validate_name(None, None, untrusted_value)
+                kwargs['name'] = untrusted_value
+
+            elif untrusted_key == 'label':
+                # don't confuse label name with label index
+                assert not untrusted_value.isdigit()
+                allowed_chars = string.ascii_letters + string.digits + '-_.'
+                assert all(c in allowed_chars for c in untrusted_value)
+                try:
+                    kwargs['label'] = self.app.get_label(untrusted_value)
+                except KeyError:
+                    raise qubes.exc.QubesValueError
+
+            elif untrusted_key == 'pool' and allow_pool:
+                if pool is not None:
+                    raise ProtocolError('duplicated pool parameter')
+                pool = self.app.get_pool(untrusted_value)
+            elif untrusted_key.startswith('pool:') and allow_pool:
+                untrusted_volume = untrusted_key.split(':', 1)[1]
+                # kind of ugly, but actual list of volumes is available only
+                # after creating a VM
+                assert untrusted_volume in ['root', 'private', 'volatile',
+                    'kernel']
+                volume = untrusted_volume
+                if volume in pools:
+                    raise ProtocolError(
+                        'duplicated pool:{} parameter'.format(volume))
+                pools[volume] = self.app.get_pool(untrusted_value)
+
+            else:
+                raise ProtocolError('Invalid param name')
+        del untrusted_payload
+
+        if 'name' not in kwargs or 'label' not in kwargs:
+            raise ProtocolError('Missing name or label')
+
+        if pool and pools:
+            raise ProtocolError(
+                'Only one of \'pool=\' and \'pool:volume=\' can be used')
+
+        if kwargs['name'] in self.app.domains:
+            raise qubes.exc.QubesValueError(
+                'VM {} already exists'.format(kwargs['name']))
+
+        self.fire_event_for_permission(pool=pool, pools=pools, **kwargs)
+
+        vm = self.app.add_new_vm(vm_class, **kwargs)
+
+        try:
+            yield from vm.create_on_disk(pool=pool, pools=pools)
+        except:
+            del self.app.domains[vm]
+            raise
+        self.app.save()
+
+    @api('mgmt.vm.Clone')
+    @asyncio.coroutine
+    def vm_clone(self, untrusted_payload):
+        assert not self.arg
+
+        assert untrusted_payload.startswith(b'name=')
+        untrusted_name = untrusted_payload[5:].decode('ascii')
+        qubes.vm.validate_name(None, None, untrusted_name)
+        new_name = untrusted_name
+
+        del untrusted_payload
+
+        if new_name in self.app.domains:
+            raise qubes.exc.QubesValueError('Already exists')
+
+        self.fire_event_for_permission(new_name=new_name)
+
+        src_vm = self.dest
+
+        dst_vm = self.app.add_new_vm(src_vm.__class__, name=new_name)
+        try:
+            dst_vm.clone_properties(src_vm)
+            # TODO: tags
+            # TODO: features
+            # TODO: firewall
+            # TODO: persistent devices
+            yield from dst_vm.clone_disk_files(src_vm)
+        except:
+            del self.app.domains[dst_vm]
+            raise
+        self.app.save()

+ 304 - 1
qubes/tests/mgmt.py

@@ -21,6 +21,8 @@
 ''' Tests for management calls endpoints '''
 
 import asyncio
+import os
+import shutil
 
 import libvirt
 import unittest.mock
@@ -46,6 +48,9 @@ class MgmtTestCase(qubes.tests.QubesTestCase):
         self.template = app.add_new_vm('TemplateVM', label='black',
             name='test-template')
         app.default_template = 'test-template'
+        with qubes.tests.substitute_entry_points('qubes.storage',
+                'qubes.tests.storage'):
+            app.add_pool('test', driver='test')
         app.save = unittest.mock.Mock()
         self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1',
             template='test-template')
@@ -62,6 +67,17 @@ class MgmtTestCase(qubes.tests.QubesTestCase):
         self.app.domains[0].fire_event = self.emitter.fire_event
         self.app.domains[0].fire_event_pre = self.emitter.fire_event_pre
 
+        self.test_base_dir = '/tmp/qubes-test-dir'
+        self.base_dir_patch = unittest.mock.patch.dict(qubes.config.system_path,
+            {'qubes_base_dir': self.test_base_dir})
+        self.base_dir_patch.start()
+
+    def tearDown(self):
+        self.base_dir_patch.stop()
+        if os.path.exists(self.test_base_dir):
+            shutil.rmtree(self.test_base_dir)
+        super(MgmtTestCase, self).tearDown()
+
     def call_mgmt_func(self, method, dest, arg=b'', payload=b''):
         mgmt_obj = qubes.mgmt.QubesMgmt(self.app, b'dom0', method, dest, arg)
 
@@ -848,7 +864,7 @@ class TC_00_VMs(MgmtTestCase):
         func_mock.assert_called_once_with()
 
     def test_270_events(self):
-        send_event = unittest.mock.Mock()
+        send_event = unittest.mock.Mock(spec=[])
         mgmt_obj = qubes.mgmt.QubesMgmt(self.app, b'dom0', b'mgmt.Events',
             b'dom0', b'', send_event=send_event)
 
@@ -945,6 +961,293 @@ class TC_00_VMs(MgmtTestCase):
         self.assertNotIn('test-feature', self.vm.features)
         self.assertFalse(self.app.save.called)
 
+    @asyncio.coroutine
+    def dummy_coro(self, *args, **kwargs):
+        pass
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_330_vm_create_standalone(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        self.call_mgmt_func(b'mgmt.vm.Create.StandaloneVM',
+            b'dom0', b'', b'name=test-vm2 label=red')
+
+        self.assertIn('test-vm2', self.app.domains)
+        vm = self.app.domains['test-vm2']
+        self.assertIsInstance(vm, qubes.vm.standalonevm.StandaloneVM)
+        self.assertEqual(vm.label, self.app.get_label('red'))
+        self.assertEqual(storage_mock.mock_calls,
+            [unittest.mock.call(self.app.domains['test-vm2']).create()])
+        self.assertTrue(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertTrue(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_331_vm_create_standalone_spurious_template(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(AssertionError):
+            self.call_mgmt_func(b'mgmt.vm.Create.StandaloneVM',
+                b'dom0', b'test-template', b'name=test-vm2 label=red')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertEqual(storage_mock.mock_calls, [])
+        self.assertFalse(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_332_vm_create_app(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        self.call_mgmt_func(b'mgmt.vm.Create.AppVM',
+            b'dom0', b'test-template', b'name=test-vm2 label=red')
+
+        self.assertIn('test-vm2', self.app.domains)
+        vm = self.app.domains['test-vm2']
+        self.assertEqual(vm.label, self.app.get_label('red'))
+        self.assertEqual(vm.template, self.app.domains['test-template'])
+        self.assertEqual(storage_mock.mock_calls,
+            [unittest.mock.call(self.app.domains['test-vm2']).create()])
+        self.assertTrue(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertTrue(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_333_vm_create_app_missing_template(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(AssertionError):
+            self.call_mgmt_func(b'mgmt.vm.Create.AppVM',
+                b'dom0', b'', b'name=test-vm2 label=red')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertEqual(storage_mock.mock_calls, [])
+        self.assertFalse(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_334_vm_create_invalid_name(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesValueError):
+            self.call_mgmt_func(b'mgmt.vm.Create.AppVM',
+                b'dom0', b'test-template', b'name=test-###')
+
+        self.assertNotIn('test-###', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_335_vm_create_missing_name(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(AssertionError):
+            self.call_mgmt_func(b'mgmt.vm.Create.AppVM',
+                b'dom0', b'test-template', b'label=red')
+
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_336_vm_create_spurious_pool(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(AssertionError):
+            self.call_mgmt_func(b'mgmt.vm.Create.AppVM',
+                b'dom0', b'test-template',
+                b'name=test-vm2 label=red pool=default')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_337_vm_create_duplicate_name(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesException):
+            self.call_mgmt_func(b'mgmt.vm.Create.AppVM',
+                b'dom0', b'test-template',
+                b'name=test-vm1 label=red')
+
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_338_vm_create_name_twice(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(AssertionError):
+            self.call_mgmt_func(b'mgmt.vm.Create.AppVM',
+                b'dom0', b'test-template',
+                b'name=test-vm2 name=test-vm3 label=red')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertNotIn('test-vm3', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_340_vm_create_in_pool_app(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        self.call_mgmt_func(b'mgmt.vm.CreateInPool.AppVM',
+            b'dom0', b'test-template', b'name=test-vm2 label=red '
+                                 b'pool=test')
+
+        self.assertIn('test-vm2', self.app.domains)
+        vm = self.app.domains['test-vm2']
+        self.assertEqual(vm.label, self.app.get_label('red'))
+        self.assertEqual(vm.template, self.app.domains['test-template'])
+        # setting pool= affect only volumes actually created for this VM,
+        # not used from a template or so
+        self.assertEqual(vm.volume_config['root']['pool'], 'default')
+        self.assertEqual(vm.volume_config['private']['pool'], 'test')
+        self.assertEqual(vm.volume_config['volatile']['pool'], 'test')
+        self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel')
+        self.assertEqual(storage_mock.mock_calls,
+            [unittest.mock.call(self.app.domains['test-vm2']).create()])
+        self.assertTrue(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertTrue(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_341_vm_create_in_pool_private(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        self.call_mgmt_func(b'mgmt.vm.CreateInPool.AppVM',
+            b'dom0', b'test-template', b'name=test-vm2 label=red '
+                                 b'pool:private=test')
+
+        self.assertIn('test-vm2', self.app.domains)
+        vm = self.app.domains['test-vm2']
+        self.assertEqual(vm.label, self.app.get_label('red'))
+        self.assertEqual(vm.template, self.app.domains['test-template'])
+        self.assertEqual(vm.volume_config['root']['pool'], 'default')
+        self.assertEqual(vm.volume_config['private']['pool'], 'test')
+        self.assertEqual(vm.volume_config['volatile']['pool'], 'default')
+        self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel')
+        self.assertEqual(storage_mock.mock_calls,
+            [unittest.mock.call(self.app.domains['test-vm2']).create()])
+        self.assertTrue(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertTrue(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_342_vm_create_in_pool_invalid_pool(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesException):
+            self.call_mgmt_func(b'mgmt.vm.CreateInPool.AppVM',
+                b'dom0', b'test-template', b'name=test-vm2 label=red '
+                                     b'pool=no-such-pool')
+
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_343_vm_create_in_pool_invalid_pool2(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesException):
+            self.call_mgmt_func(b'mgmt.vm.CreateInPool.AppVM',
+                b'dom0', b'test-template', b'name=test-vm2 label=red '
+                                     b'pool:private=no-such-pool')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_344_vm_create_in_pool_invalid_volume(self, storage_mock):
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(AssertionError):
+            self.call_mgmt_func(b'mgmt.vm.CreateInPool.AppVM',
+                b'dom0', b'test-template', b'name=test-vm2 label=red '
+                                     b'pool:invalid=test')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_345_vm_create_in_pool_app_root(self, storage_mock):
+        # setting custom pool for 'root' volume of AppVM should not be
+        # allowed - this volume belongs to the template
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesException):
+            self.call_mgmt_func(b'mgmt.vm.CreateInPool.AppVM',
+                b'dom0', b'test-template', b'name=test-vm2 label=red '
+                                     b'pool:root=test')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.create')
+    def test_346_vm_create_in_pool_duplicate_pool(self, storage_mock):
+        # setting custom pool for 'root' volume of AppVM should not be
+        # allowed - this volume belongs to the template
+        storage_mock.side_effect = self.dummy_coro
+        with self.assertRaises(AssertionError):
+            self.call_mgmt_func(b'mgmt.vm.CreateInPool.AppVM',
+                b'dom0', b'test-template', b'name=test-vm2 label=red '
+                b'pool=test pool:root=test')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.clone')
+    @unittest.mock.patch('qubes.storage.Storage.verify')
+    def test_350_vm_clone(self, mock_verify, mock_clone):
+        mock_clone.side_effect = self.dummy_coro
+        mock_verify.side_effect = self.dummy_coro
+        self.call_mgmt_func(b'mgmt.vm.Clone',
+            b'test-vm1', b'', b'name=test-vm2')
+
+        self.assertIn('test-vm2', self.app.domains)
+        vm = self.app.domains['test-vm2']
+        self.assertEqual(vm.label, self.app.get_label('red'))
+        self.assertEqual(vm.template, self.app.domains['test-template'])
+        self.assertEqual(mock_clone.mock_calls,
+            [unittest.mock.call(self.app.domains['test-vm2']).clone(
+                self.app.domains['test-vm1'])])
+        self.assertTrue(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertTrue(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.clone')
+    @unittest.mock.patch('qubes.storage.Storage.verify')
+    def test_351_vm_clone_extra_params(self, mock_verify, mock_clone):
+        mock_clone.side_effect = self.dummy_coro
+        mock_verify.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesException):
+            self.call_mgmt_func(b'mgmt.vm.Clone',
+                b'test-vm1', b'', b'name=test-vm2 label=red')
+
+        self.assertNotIn('test-vm2', self.app.domains)
+        self.assertEqual(mock_clone.mock_calls, [])
+        self.assertFalse(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2')))
+
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.clone')
+    @unittest.mock.patch('qubes.storage.Storage.verify')
+    def test_352_vm_clone_duplicate_name(self, mock_verify, mock_clone):
+        mock_clone.side_effect = self.dummy_coro
+        mock_verify.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesException):
+            self.call_mgmt_func(b'mgmt.vm.Clone',
+                b'test-vm1', b'', b'name=test-vm1')
+
+        self.assertFalse(self.app.save.called)
+
+    @unittest.mock.patch('qubes.storage.Storage.clone')
+    @unittest.mock.patch('qubes.storage.Storage.verify')
+    def test_353_vm_clone_invalid_name(self, mock_verify, mock_clone):
+        mock_clone.side_effect = self.dummy_coro
+        mock_verify.side_effect = self.dummy_coro
+        with self.assertRaises(qubes.exc.QubesException):
+            self.call_mgmt_func(b'mgmt.vm.Clone',
+                b'test-vm1', b'', b'name=test-vm2/..')
+
+        self.assertNotIn('test-vm2/..', self.app.domains)
+        self.assertEqual(mock_clone.mock_calls, [])
+        self.assertFalse(os.path.exists(os.path.join(
+            self.test_base_dir, 'appvms', 'test-vm2/..')))
+
+        self.assertFalse(self.app.save.called)
+
     def test_990_vm_unexpected_payload(self):
         methods_with_no_payload = [
             b'mgmt.vm.List',

+ 9 - 1
qubes/tests/storage.py

@@ -17,7 +17,7 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 #
-
+import unittest.mock
 import qubes.log
 from qubes.exc import QubesException
 from qubes.storage import pool_drivers
@@ -27,6 +27,14 @@ from qubes.tests import QubesTestCase
 # :pylint: disable=invalid-name
 
 
+class TestPool(unittest.mock.Mock):
+    def __init__(self, *args, **kwargs):
+        super(TestPool, self).__init__(*args, spec=qubes.storage.Pool, **kwargs)
+
+    def __str__(self):
+        return 'test'
+
+
 class TestVM(object):
     def __init__(self, test, template=None):
         self.app = test.app

+ 7 - 1
setup.py

@@ -56,5 +56,11 @@ if __name__ == '__main__':
                 'file = qubes.storage.file:FilePool',
                 'linux-kernel = qubes.storage.kernels:LinuxKernel',
                 'lvm_thin = qubes.storage.lvm:ThinPool',
-            ]
+            ],
+            'qubes.tests.storage': [
+                'test = qubes.tests.storage:TestPool',
+                'file = qubes.storage.file:FilePool',
+                'linux-kernel = qubes.storage.kernels:LinuxKernel',
+                'lvm_thin = qubes.storage.lvm:ThinPool',
+            ],
         })