|
@@ -78,6 +78,7 @@ class BackupHeader(object):
|
|
|
'''Structure describing backup-header file included as the first file in
|
|
|
backup archive
|
|
|
'''
|
|
|
+ # pylint: disable=too-few-public-methods
|
|
|
header_keys = {
|
|
|
'version': 'version',
|
|
|
'encrypted': 'encrypted',
|
|
@@ -91,7 +92,6 @@ class BackupHeader(object):
|
|
|
int_options = ['version']
|
|
|
|
|
|
def __init__(self,
|
|
|
- header_data=None,
|
|
|
version=None,
|
|
|
encrypted=None,
|
|
|
compressed=None,
|
|
@@ -110,67 +110,6 @@ class BackupHeader(object):
|
|
|
self.crypto_algorithm = crypto_algorithm
|
|
|
self.backup_id = backup_id
|
|
|
|
|
|
- if header_data is not None:
|
|
|
- self.load(header_data)
|
|
|
-
|
|
|
- def load(self, untrusted_header_text):
|
|
|
- """Parse backup header file.
|
|
|
-
|
|
|
- :param untrusted_header_text: header content
|
|
|
- :type untrusted_header_text: basestring
|
|
|
-
|
|
|
- .. warning::
|
|
|
- This function may be exposed to not yet verified header,
|
|
|
- so is security critical.
|
|
|
- """
|
|
|
- try:
|
|
|
- untrusted_header_text = untrusted_header_text.decode('ascii')
|
|
|
- except UnicodeDecodeError:
|
|
|
- raise qubes.exc.QubesException(
|
|
|
- "Non-ASCII characters in backup header")
|
|
|
- for untrusted_line in untrusted_header_text.splitlines():
|
|
|
- if untrusted_line.count('=') != 1:
|
|
|
- raise qubes.exc.QubesException("Invalid backup header")
|
|
|
- key, value = untrusted_line.strip().split('=', 1)
|
|
|
- if not _re_alphanum.match(key):
|
|
|
- raise qubes.exc.QubesException("Invalid backup header (key)")
|
|
|
- if key not in self.header_keys.keys():
|
|
|
- # Ignoring unknown option
|
|
|
- continue
|
|
|
- if not _re_alphanum.match(value):
|
|
|
- raise qubes.exc.QubesException("Invalid backup header (value)")
|
|
|
- if getattr(self, self.header_keys[key]) is not None:
|
|
|
- raise qubes.exc.QubesException(
|
|
|
- "Duplicated header line: {}".format(key))
|
|
|
- if key in self.bool_options:
|
|
|
- value = value.lower() in ["1", "true", "yes"]
|
|
|
- elif key in self.int_options:
|
|
|
- value = int(value)
|
|
|
- setattr(self, self.header_keys[key], value)
|
|
|
-
|
|
|
- self.validate()
|
|
|
-
|
|
|
- def validate(self):
|
|
|
- if self.version == 1:
|
|
|
- # header not really present
|
|
|
- pass
|
|
|
- elif self.version in [2, 3, 4]:
|
|
|
- expected_attrs = ['version', 'encrypted', 'compressed',
|
|
|
- 'hmac_algorithm']
|
|
|
- if self.encrypted:
|
|
|
- expected_attrs += ['crypto_algorithm']
|
|
|
- if self.version >= 3 and self.compressed:
|
|
|
- expected_attrs += ['compression_filter']
|
|
|
- if self.version >= 4:
|
|
|
- expected_attrs += ['backup_id']
|
|
|
- for key in expected_attrs:
|
|
|
- if getattr(self, key) is None:
|
|
|
- raise qubes.exc.QubesException(
|
|
|
- "Backup header lack '{}' info".format(key))
|
|
|
- else:
|
|
|
- raise qubes.exc.QubesException(
|
|
|
- "Unsupported backup version {}".format(self.version))
|
|
|
-
|
|
|
def save(self, filename):
|
|
|
with open(filename, "w") as f_header:
|
|
|
# make sure 'version' is the first key
|