123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587 |
- # -*- encoding: utf-8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2.1 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with this program; if not, see <http://www.gnu.org/licenses/>.
- import tempfile
- import unittest.mock
- import qubesadmin.tests
- import qubesadmin.tests.tools
- import qubesadmin.tools.qvm_volume
- class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase):
- def setup_expected_calls_for_list(self, vms=('vm1', 'sys-net')):
- self.app.expected_calls[
- ('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00vm1 class=AppVM state=Running\n' \
- b'sys-net class=AppVM state=Running\n'
- for vm in vms:
- for vol in ('root', 'private'):
- self.app.expected_calls[
- (vm, 'admin.vm.volume.Info', vol, None)] = \
- b'0\x00' + \
- (b'pool=pool-file\n' if vol == 'root' else
- b'pool=other-pool\n') + \
- b'vid=' + vm.encode() + b'-' + vol.encode() + b'\n' \
- b'size=10737418240\n'
- self.app.expected_calls[
- (vm, 'admin.vm.volume.ListSnapshots', vol, None)] = \
- b'0\x00snap1\n' if vol == 'private' else b'0\x00'
- self.app.expected_calls[
- (vm, 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- def test_000_list(self):
- self.setup_expected_calls_for_list()
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(['ls'], app=self.app))
- self.assertEqual(stdout.getvalue(),
- 'POOL:VOLUME VMNAME VOLUME_NAME '
- 'REVERT_POSSIBLE\n'
- 'other-pool:sys-net-private sys-net private Yes\n'
- 'other-pool:vm1-private vm1 private Yes\n'
- 'pool-file:sys-net-root sys-net root No\n'
- 'pool-file:vm1-root vm1 root No\n'
- )
- self.assertAllCalled()
- def test_001_list_domain(self):
- self.setup_expected_calls_for_list(vms=('vm1',))
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(['ls', 'vm1'],
- app=self.app))
- self.assertEqual(stdout.getvalue(),
- 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n'
- 'other-pool:vm1-private vm1 private Yes\n'
- 'pool-file:vm1-root vm1 root No\n'
- )
- self.assertAllCalled()
- def test_002_list_domain_pool(self):
- self.setup_expected_calls_for_list(vms=('vm1',))
- self.app.expected_calls[('dom0', 'admin.pool.List', None, None)] = \
- b'0\x00pool-file\nother-pool\n'
- del self.app.expected_calls[
- ('vm1', 'admin.vm.volume.ListSnapshots', 'private', None)]
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['ls', '-p', 'pool-file', 'vm1'],
- app=self.app))
- self.assertEqual(stdout.getvalue(),
- 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n'
- 'pool-file:vm1-root vm1 root No\n'
- )
- self.assertAllCalled()
- def test_003_list_pool(self):
- self.setup_expected_calls_for_list()
- self.app.expected_calls[('dom0', 'admin.pool.List', None, None)] = \
- b'0\x00pool-file\nother-pool\n'
- del self.app.expected_calls[
- ('vm1', 'admin.vm.volume.ListSnapshots', 'private', None)]
- del self.app.expected_calls[
- ('sys-net', 'admin.vm.volume.ListSnapshots', 'private', None)]
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['ls', '-p', 'pool-file'],
- app=self.app))
- self.assertEqual(stdout.getvalue(),
- 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n'
- 'pool-file:sys-net-root sys-net root No\n'
- 'pool-file:vm1-root vm1 root No\n'
- )
- self.assertAllCalled()
- def test_004_list_multiple_domains(self):
- self.setup_expected_calls_for_list(vms=('vm1', 'vm2'))
- self.app.expected_calls[
- ('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00vm1 class=AppVM state=Running\n' \
- b'vm2 class=AppVM state=Running\n' \
- b'vm3 class=AppVM state=Running\n'
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['ls', 'vm1', 'vm2'], app=self.app))
- self.assertEqual(stdout.getvalue(),
- 'POOL:VOLUME VMNAME VOLUME_NAME REVERT_POSSIBLE\n'
- 'other-pool:vm1-private vm1 private Yes\n'
- 'other-pool:vm2-private vm2 private Yes\n'
- 'pool-file:vm1-root vm1 root No\n'
- 'pool-file:vm2-root vm2 root No\n'
- )
- self.assertAllCalled()
- def test_005_list_default_action(self):
- self.setup_expected_calls_for_list()
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main([], app=self.app))
- self.assertEqual(stdout.getvalue(),
- 'POOL:VOLUME VMNAME VOLUME_NAME '
- 'REVERT_POSSIBLE\n'
- 'other-pool:sys-net-private sys-net private Yes\n'
- 'other-pool:vm1-private vm1 private Yes\n'
- 'pool-file:sys-net-root sys-net root No\n'
- 'pool-file:vm1-root vm1 root No\n'
- )
- self.assertAllCalled()
- def test_010_extend(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Info', 'private', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-testvm-private\n' \
- b'size=2147483648\n' \
- b'usage=10000000\n' \
- b'rw=True\n' \
- b'source=\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n' \
- b'revisions_to_keep=3\n' \
- b'is_outdated=False\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Resize', 'private', b'10737418240')] = \
- b'0\x00'
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['extend', 'testvm:private', '10GiB'],
- app=self.app))
- self.assertAllCalled()
- def test_011_extend_error(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Info', 'private', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-testvm-private\n' \
- b'size=2147483648\n' \
- b'usage=10000000\n' \
- b'rw=True\n' \
- b'source=\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n' \
- b'revisions_to_keep=3\n' \
- b'is_outdated=False\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Resize', 'private', b'10737418240')] = \
- b'2\x00StoragePoolException\x00\x00Failed to resize volume: ' \
- b'error: success\x00'
- with qubesadmin.tests.tools.StderrBuffer() as stderr:
- self.assertEqual(1,
- qubesadmin.tools.qvm_volume.main(
- ['extend', 'testvm:private', '10GiB'],
- app=self.app))
- self.assertIn('error: success', stderr.getvalue())
- self.assertAllCalled()
- def test_012_extend_deny_shrink(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Info', 'private', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-testvm-private\n' \
- b'size=2147483648\n' \
- b'usage=10000000\n' \
- b'rw=True\n' \
- b'source=\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n' \
- b'revisions_to_keep=3\n' \
- b'is_outdated=False\n'
- with qubesadmin.tests.tools.StderrBuffer() as stderr:
- self.assertEqual(1,
- qubesadmin.tools.qvm_volume.main(
- ['resize', 'testvm:private', '1GiB'],
- app=self.app))
- self.assertIn('shrinking of private is disabled', stderr.getvalue())
- self.assertAllCalled()
- def test_013_resize_force_shrink(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Resize', 'private', b'1073741824')] = \
- b'0\x00'
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['resize', '-f', 'testvm:private', '1GiB'],
- app=self.app))
- self.assertAllCalled()
- def test_020_revert(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \
- b'0\x00200101010000\n200201010000\n200301010000\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Revert', 'private', b'200301010000')] = \
- b'0\x00'
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['revert', 'testvm:private'],
- app=self.app))
- self.assertAllCalled()
- def test_021_revert_error(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \
- b'0\x00200101010000\n200201010000\n200301010000\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Revert', 'private', b'200301010000')] = \
- b'2\x00StoragePoolException\x00\x00Failed to revert volume: ' \
- b'some error\x00'
- with qubesadmin.tests.tools.StderrBuffer() as stderr:
- self.assertEqual(1,
- qubesadmin.tools.qvm_volume.main(
- ['revert', 'testvm:private'],
- app=self.app))
- self.assertIn('some error', stderr.getvalue())
- self.assertAllCalled()
- def test_022_revert_no_snapshots(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \
- b'0\x00'
- with qubesadmin.tests.tools.StderrBuffer() as stderr:
- self.assertEqual(1,
- qubesadmin.tools.qvm_volume.main(
- ['revert', 'testvm:private'],
- app=self.app))
- self.assertIn('No snapshots', stderr.getvalue())
- self.assertAllCalled()
- def test_023_revert_specific(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Revert', 'private', b'20050101')] = \
- b'0\x00'
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['revert', 'testvm:private', '20050101'],
- app=self.app))
- self.assertAllCalled()
- def test_030_set_revisions_to_keep(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Set.revisions_to_keep', 'private',
- b'3')] = b'0\x00'
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['set', 'testvm:private', 'revisions_to_keep', '3'],
- app=self.app))
- self.assertAllCalled()
- def test_031_set_rw(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Set.rw', 'private',
- b'True')] = b'0\x00'
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['set', 'testvm:private', 'rw', 'True'],
- app=self.app))
- self.assertAllCalled()
- def test_032_set_invalid(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.assertNotEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['set', 'testvm:private', 'invalid', 'True'],
- app=self.app))
- self.assertAllCalled()
- def test_040_info(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Info', 'private', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-testvm-private\n' \
- b'size=2147483648\n' \
- b'usage=10000000\n' \
- b'rw=True\n' \
- b'source=\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n' \
- b'revisions_to_keep=3\n' \
- b'is_outdated=False\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \
- b'0\x00200101010000\n200201010000\n200301010000\n'
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(['info', 'testvm:private'],
- app=self.app))
- output = stdout.getvalue()
- # travis...
- output = output.replace('\nsource\n', '\nsource \n')
- self.assertEqual(output,
- 'pool lvm\n'
- 'vid qubes_dom0/vm-testvm-private\n'
- 'rw True\n'
- 'source \n'
- 'save_on_stop True\n'
- 'snap_on_start False\n'
- 'size 2147483648\n'
- 'usage 10000000\n'
- 'revisions_to_keep 3\n'
- 'is_outdated False\n'
- 'Available revisions (for revert):\n'
- ' 200101010000\n'
- ' 200201010000\n'
- ' 200301010000\n')
- self.assertAllCalled()
- def test_041_info_no_revisions(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Info', 'root', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-testvm-root\n' \
- b'size=2147483648\n' \
- b'usage=10000000\n' \
- b'rw=True\n' \
- b'source=qubes_dom0/vm-fedora-26-root\n' \
- b'save_on_stop=False\n' \
- b'snap_on_start=True\n' \
- b'revisions_to_keep=0\n' \
- b'is_outdated=False\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ListSnapshots', 'root', None)] = \
- b'0\x00'
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(['info', 'testvm:root'],
- app=self.app))
- self.assertEqual(stdout.getvalue(),
- 'pool lvm\n'
- 'vid qubes_dom0/vm-testvm-root\n'
- 'rw True\n'
- 'source qubes_dom0/vm-fedora-26-root\n'
- 'save_on_stop False\n'
- 'snap_on_start True\n'
- 'size 2147483648\n'
- 'usage 10000000\n'
- 'revisions_to_keep 0\n'
- 'is_outdated False\n'
- 'Available revisions (for revert): none\n')
- self.assertAllCalled()
- def test_042_info_single_prop(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Info', 'root', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-testvm-root\n' \
- b'size=2147483648\n' \
- b'usage=10000000\n' \
- b'rw=True\n' \
- b'source=qubes_dom0/vm-fedora-26-root\n' \
- b'save_on_stop=False\n' \
- b'snap_on_start=True\n' \
- b'revisions_to_keep=0\n' \
- b'is_outdated=False\n'
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['info', 'testvm:root', 'usage'],
- app=self.app))
- self.assertEqual(stdout.getvalue(), '10000000\n')
- self.assertAllCalled()
- def test_043_info_revisions_only(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ListSnapshots', 'private', None)] = \
- b'0\x00200101010000\n200201010000\n200301010000\n'
- with qubesadmin.tests.tools.StdoutBuffer() as stdout:
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['info', 'testvm:private', 'revisions'],
- app=self.app))
- self.assertEqual(stdout.getvalue(),
- '200101010000\n'
- '200201010000\n'
- '200301010000\n')
- self.assertAllCalled()
- def test_050_import_file(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \
- b'0\x00'
- with tempfile.NamedTemporaryFile() as input_file:
- input_file.write(b'test-data')
- input_file.seek(0)
- input_file.flush()
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['import', 'testvm:private', input_file.name],
- app=self.app))
- self.assertAllCalled()
- def test_051_import_stdin(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \
- b'0\x00'
- with tempfile.NamedTemporaryFile() as input_file:
- input_file.write(b'test-data')
- input_file.seek(0)
- with unittest.mock.patch('sys.stdin') as mock_stdin:
- mock_stdin.buffer = input_file
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['import', 'testvm:private', '-'],
- app=self.app))
- self.assertAllCalled()
- def test_052_import_file_size(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'512\ntest-data')] = \
- b'0\x00'
- with tempfile.NamedTemporaryFile() as input_file:
- input_file.write(b'test-data')
- input_file.seek(0)
- input_file.flush()
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['import', '--size=512', 'testvm:private', input_file.name],
- app=self.app))
- self.assertAllCalled()
- def test_053_import_file_noresize(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \
- b'0\x00'
- with tempfile.NamedTemporaryFile() as input_file:
- input_file.write(b'test-data')
- input_file.seek(0)
- input_file.flush()
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['import', '--no-resize', 'testvm:private', input_file.name],
- app=self.app))
- self.assertAllCalled()
- def test_053_import_file_matching_size(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\x00testvm class=AppVM state=Running\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.List', None, None)] = \
- b'0\x00root\nprivate\n'
- self.app.expected_calls[
- ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \
- b'0\x00'
- with tempfile.NamedTemporaryFile() as input_file:
- input_file.write(b'test-data')
- input_file.seek(0)
- input_file.flush()
- self.assertEqual(0,
- qubesadmin.tools.qvm_volume.main(
- ['import', 'testvm:private', input_file.name],
- app=self.app))
- self.assertAllCalled()
|