|
@@ -19,6 +19,7 @@
|
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
#
|
|
|
import functools
|
|
|
+import shutil
|
|
|
import tempfile
|
|
|
import unittest
|
|
|
from distutils import spawn
|
|
@@ -29,6 +30,8 @@ import subprocess
|
|
|
|
|
|
import logging
|
|
|
|
|
|
+import time
|
|
|
+
|
|
|
try:
|
|
|
import unittest.mock as mock
|
|
|
except ImportError:
|
|
@@ -147,8 +150,8 @@ parsed_qubes_xml_r2 = {
|
|
|
'tags': set(),
|
|
|
'features': {},
|
|
|
'template': None,
|
|
|
- 'backup_path': None,
|
|
|
- 'included_in_backup': False,
|
|
|
+ 'backup_path': 'dom0-home/user',
|
|
|
+ 'included_in_backup': True,
|
|
|
},
|
|
|
'fedora-20-x64': {
|
|
|
'klass': 'TemplateVM',
|
|
@@ -426,8 +429,8 @@ parsed_qubes_xml_v4 = {
|
|
|
'tags': set(),
|
|
|
'features': {},
|
|
|
'template': None,
|
|
|
- 'backup_path': None,
|
|
|
- 'included_in_backup': False,
|
|
|
+ 'backup_path': 'dom0-home/user',
|
|
|
+ 'included_in_backup': True,
|
|
|
},
|
|
|
'fedora-25': {
|
|
|
'klass': 'TemplateVM',
|
|
@@ -967,6 +970,45 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
"vm-templates/test-template-clone")),
|
|
|
appmenus_list)
|
|
|
|
|
|
+ self.create_dom0_files()
|
|
|
+
|
|
|
+ dom0_dirs = ('Downloads', 'Pictures', 'Documents', '.config', '.local')
|
|
|
+ dom0_files = ('.bash_history', 'some-file.txt',
|
|
|
+ 'Pictures/another-file.png')
|
|
|
+
|
|
|
+ def create_dom0_files(self):
|
|
|
+ # dom0 files
|
|
|
+ os.mkdir(self.fullpath('dom0-home'))
|
|
|
+ os.mkdir(self.fullpath('dom0-home/user'))
|
|
|
+ for d in self.dom0_dirs:
|
|
|
+ os.mkdir(self.fullpath('dom0-home/user/' + d))
|
|
|
+ for f in self.dom0_files:
|
|
|
+ with open(self.fullpath('dom0-home/user/' + f), 'w') as ff:
|
|
|
+ ff.write('some content')
|
|
|
+
|
|
|
+ def assertDirectoryExists(self, path):
|
|
|
+ if not os.path.exists(path):
|
|
|
+ self.fail(path + ' missing')
|
|
|
+ if not os.path.isdir(path):
|
|
|
+ self.fail(path + ' is not a directory')
|
|
|
+
|
|
|
+ def assertFileExists(self, path):
|
|
|
+ if not os.path.exists(path):
|
|
|
+ self.fail(path + ' missing')
|
|
|
+ if not os.path.isfile(path):
|
|
|
+ self.fail(path + ' is not a file')
|
|
|
+
|
|
|
+ def assertDom0Restored(self, timestamp):
|
|
|
+ expected_dir = os.path.expanduser(
|
|
|
+ '~/home-restore-' + timestamp + '/dom0-home/user')
|
|
|
+ self.assertTrue(os.path.exists(expected_dir))
|
|
|
+ for d in self.dom0_dirs:
|
|
|
+ self.assertDirectoryExists(os.path.join(expected_dir, d))
|
|
|
+ for f in self.dom0_files:
|
|
|
+ self.assertFileExists(os.path.join(expected_dir, f))
|
|
|
+ # cleanup
|
|
|
+ shutil.rmtree(expected_dir)
|
|
|
+
|
|
|
def create_v4_files(self):
|
|
|
appmenus_list = [
|
|
|
"firefox", "gnome-terminal", "evince", "evolution",
|
|
@@ -1041,6 +1083,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
"vm-templates/test-fedora-25-clone")),
|
|
|
appmenus_list)
|
|
|
|
|
|
+ self.create_dom0_files()
|
|
|
+
|
|
|
def scrypt_encrypt(self, f_name, output_name=None, password='qubes',
|
|
|
basedir=None):
|
|
|
if basedir is None:
|
|
@@ -1094,6 +1138,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
data = encryptor.stdout
|
|
|
else:
|
|
|
data = tar.stdout
|
|
|
+ encryptor = None
|
|
|
|
|
|
stage1_dir = self.fullpath(os.path.join("stage1", subdir))
|
|
|
if not os.path.exists(stage1_dir):
|
|
@@ -1105,6 +1150,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
os.path.basename(f_name+"."))],
|
|
|
stdin=data)
|
|
|
data.close()
|
|
|
+ tar.wait()
|
|
|
+ if encryptor:
|
|
|
+ encryptor.wait()
|
|
|
|
|
|
for part in sorted(os.listdir(stage1_dir)):
|
|
|
if not re.match(
|
|
@@ -1142,6 +1190,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
os.path.basename(f_name+"."))],
|
|
|
stdin=data)
|
|
|
data.close()
|
|
|
+ tar.wait()
|
|
|
|
|
|
for part in sorted(os.listdir(stage1_dir)):
|
|
|
if not re.match(
|
|
@@ -1211,6 +1260,12 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
compressed=compressed,
|
|
|
encrypted=encrypted)
|
|
|
|
|
|
+ self.handle_v3_file(
|
|
|
+ 'dom0-home/user',
|
|
|
+ 'dom0-home/', output,
|
|
|
+ compressed=compressed,
|
|
|
+ encrypted=encrypted)
|
|
|
+
|
|
|
output.close()
|
|
|
|
|
|
def create_v4_backup(self, compressed=True):
|
|
@@ -1249,6 +1304,11 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
os.path.join(vm_dir, f_name),
|
|
|
subdir+'/', output, compressed=compressed)
|
|
|
|
|
|
+ self.handle_v4_file(
|
|
|
+ 'dom0-home/user',
|
|
|
+ 'dom0-home/', output,
|
|
|
+ compressed=compressed)
|
|
|
+
|
|
|
output.close()
|
|
|
|
|
|
def setup_expected_calls(self, parsed_qubes_xml, templates_map=None):
|
|
@@ -1260,6 +1320,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
if not vm['included_in_backup']:
|
|
|
continue
|
|
|
|
|
|
+ if name == 'dom0':
|
|
|
+ continue
|
|
|
+
|
|
|
if self.storage_pool:
|
|
|
self.app.expected_calls[
|
|
|
('dom0', 'admin.vm.CreateInPool.' + vm['klass'],
|
|
@@ -1426,6 +1489,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
firewall_data.encode())] = b'0\0'
|
|
|
qubesd_calls_queue = multiprocessing.Queue()
|
|
|
|
|
|
+ dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
|
|
|
patches = [
|
|
|
mock.patch('qubesadmin.storage.Volume',
|
|
|
functools.partial(MockVolume, qubesd_calls_queue)),
|
|
@@ -1435,6 +1499,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
mock.patch(
|
|
|
'qubesadmin.firewall.Firewall',
|
|
|
functools.partial(MockFirewall, qubesd_calls_queue)),
|
|
|
+ mock.patch(
|
|
|
+ 'time.strftime',
|
|
|
+ return_value=dummy_timestamp)
|
|
|
]
|
|
|
for patch in patches:
|
|
|
patch.start()
|
|
@@ -1455,6 +1522,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
self.assertAllCalled()
|
|
|
|
|
|
+ self.assertDom0Restored(dummy_timestamp)
|
|
|
+
|
|
|
def test_220_r2_encrypted(self):
|
|
|
self.create_v3_backup(True)
|
|
|
|
|
@@ -1488,6 +1557,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
qubesd_calls_queue = multiprocessing.Queue()
|
|
|
|
|
|
+ dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
|
|
|
patches = [
|
|
|
mock.patch('qubesadmin.storage.Volume',
|
|
|
functools.partial(MockVolume, qubesd_calls_queue)),
|
|
@@ -1497,6 +1567,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
mock.patch(
|
|
|
'qubesadmin.firewall.Firewall',
|
|
|
functools.partial(MockFirewall, qubesd_calls_queue)),
|
|
|
+ mock.patch(
|
|
|
+ 'time.strftime',
|
|
|
+ return_value=dummy_timestamp)
|
|
|
]
|
|
|
for patch in patches:
|
|
|
patch.start()
|
|
@@ -1517,6 +1590,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
self.assertAllCalled()
|
|
|
|
|
|
+ self.assertDom0Restored(dummy_timestamp)
|
|
|
+
|
|
|
def test_230_r2_uncompressed(self):
|
|
|
self.create_v3_backup(False, False)
|
|
|
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
|
|
@@ -1549,6 +1624,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
qubesd_calls_queue = multiprocessing.Queue()
|
|
|
|
|
|
+ dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
|
|
|
patches = [
|
|
|
mock.patch('qubesadmin.storage.Volume',
|
|
|
functools.partial(MockVolume, qubesd_calls_queue)),
|
|
@@ -1558,6 +1634,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
mock.patch(
|
|
|
'qubesadmin.firewall.Firewall',
|
|
|
functools.partial(MockFirewall, qubesd_calls_queue)),
|
|
|
+ mock.patch(
|
|
|
+ 'time.strftime',
|
|
|
+ return_value=dummy_timestamp)
|
|
|
]
|
|
|
for patch in patches:
|
|
|
patch.start()
|
|
@@ -1578,6 +1657,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
self.assertAllCalled()
|
|
|
|
|
|
+ self.assertDom0Restored(dummy_timestamp)
|
|
|
+
|
|
|
@unittest.skipUnless(spawn.find_executable('scrypt'),
|
|
|
"scrypt not installed")
|
|
|
def test_230_r4(self):
|
|
@@ -1612,6 +1693,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
qubesd_calls_queue = multiprocessing.Queue()
|
|
|
|
|
|
+ dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
|
|
|
patches = [
|
|
|
mock.patch('qubesadmin.storage.Volume',
|
|
|
functools.partial(MockVolume, qubesd_calls_queue)),
|
|
@@ -1621,6 +1703,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
mock.patch(
|
|
|
'qubesadmin.firewall.Firewall',
|
|
|
functools.partial(MockFirewall, qubesd_calls_queue)),
|
|
|
+ mock.patch(
|
|
|
+ 'time.strftime',
|
|
|
+ return_value=dummy_timestamp)
|
|
|
]
|
|
|
for patch in patches:
|
|
|
patch.start()
|
|
@@ -1641,6 +1726,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
self.assertAllCalled()
|
|
|
|
|
|
+ self.assertDom0Restored(dummy_timestamp)
|
|
|
+
|
|
|
@unittest.skipUnless(spawn.find_executable('scrypt'),
|
|
|
"scrypt not installed")
|
|
|
def test_230_r4_compressed(self):
|
|
@@ -1676,6 +1763,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
qubesd_calls_queue = multiprocessing.Queue()
|
|
|
|
|
|
+ dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
|
|
|
patches = [
|
|
|
mock.patch('qubesadmin.storage.Volume',
|
|
|
functools.partial(MockVolume, qubesd_calls_queue)),
|
|
@@ -1685,6 +1773,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
mock.patch(
|
|
|
'qubesadmin.firewall.Firewall',
|
|
|
functools.partial(MockFirewall, qubesd_calls_queue)),
|
|
|
+ mock.patch(
|
|
|
+ 'time.strftime',
|
|
|
+ return_value=dummy_timestamp)
|
|
|
]
|
|
|
for patch in patches:
|
|
|
patch.start()
|
|
@@ -1705,6 +1796,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
|
|
|
|
|
|
self.assertAllCalled()
|
|
|
|
|
|
+ self.assertDom0Restored(dummy_timestamp)
|
|
|
+
|
|
|
|
|
|
class TC_11_BackupCompatibilityIntoLVM(TC_10_BackupCompatibility):
|
|
|
storage_pool = 'some-pool'
|