backup/restore: reject uncommon compression filters and improve header validation

Compression filter named in a backup header is executed in restore
environment (commonly dom0). While this field is properly authenticated,
there may be cases where backup archive comes from less
trusted source, like migrating from potentially compromised
system.

Modify backup header parsing code to add field specific validators.
Whitelist only know crypto, hmac and compression algorithms.

Based on a patch by Jean-Philippe Ouellet <jpo@vt.edu>
Reported-by: Jean-Philippe Ouellet <jpo@vt.edu>
This commit is contained in:
Marek Marczykowski-Górecki 2019-09-08 02:27:53 +02:00
parent 9158412a24
commit 10f15e6669
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -38,6 +38,8 @@ import tempfile
import termios
import time
import collections
import qubesadmin
import qubesadmin.vm
from qubesadmin.backup import BackupVM
@ -58,6 +60,11 @@ DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
# integrity-protect the data
DEFAULT_HMAC_ALGORITHM = 'scrypt'
DEFAULT_COMPRESSION_FILTER = 'gzip'
KNOWN_COMPRESSION_FILTERS = ('gzip', 'bzip2', 'xz')
# lazy loaded
KNOWN_CRYPTO_ALGORITHMS = []
# lazy loaded
KNOWN_HMAC_ALGORITHMS = []
# Maximum size of error message get from process stderr (including VM process)
MAX_STDERR_BYTES = 1024
# header + qubes.xml max size
@ -78,22 +85,44 @@ class BackupCanceledError(QubesException):
super(BackupCanceledError, self).__init__(msg)
self.tmpdir = tmpdir
def init_supported_hmac_and_crypto():
if not KNOWN_HMAC_ALGORITHMS:
KNOWN_HMAC_ALGORITHMS.extend(get_supported_hmac_algo())
if not KNOWN_CRYPTO_ALGORITHMS:
KNOWN_CRYPTO_ALGORITHMS.extend(get_supported_crypto_algo())
class BackupHeader(object):
'''Structure describing backup-header file included as the first file in
backup archive
'''
header_keys = {
'version': 'version',
'encrypted': 'encrypted',
'compressed': 'compressed',
'compression-filter': 'compression_filter',
'crypto-algorithm': 'crypto_algorithm',
'hmac-algorithm': 'hmac_algorithm',
'backup-id': 'backup_id'
Header = collections.namedtuple('Header', ['field', 't', 'validator'])
known_headers = {
'version': Header(field='version', t=int,
validator=lambda x: x >= 1 and x <= 4),
'encrypted': Header(field='encrypted', t=bool,
validator=lambda x: True),
'compressed': Header(field='compressed', t=bool,
validator=lambda x: True),
'compression-filter': Header(
field='compression_filter',
t=str,
validator=lambda x: x in KNOWN_COMPRESSION_FILTERS),
'crypto-algorithm': Header(
field='crypto_algorithm',
t=str,
validator=lambda x: x.lower() in KNOWN_CRYPTO_ALGORITHMS),
'hmac-algorithm': Header(
field='hmac_algorithm',
t=str,
validator=lambda x: x.lower() in KNOWN_HMAC_ALGORITHMS),
'backup-id': Header(
field='backup_id',
t=str,
validator=lambda x: not x.startswith('-') and x != ''),
}
bool_options = ['encrypted', 'compressed']
int_options = ['version']
def __init__(self,
header_data=None,
@ -115,6 +144,8 @@ class BackupHeader(object):
self.crypto_algorithm = crypto_algorithm
self.backup_id = backup_id
init_supported_hmac_and_crypto()
if header_data is not None:
self.load(header_data)
@ -133,27 +164,36 @@ class BackupHeader(object):
except UnicodeDecodeError:
raise QubesException(
"Non-ASCII characters in backup header")
seen = set()
for untrusted_line in untrusted_header_text.splitlines():
if untrusted_line.count('=') != 1:
raise QubesException("Invalid backup header")
key, value = untrusted_line.strip().split('=', 1)
if not _re_alphanum.match(key):
raise QubesException("Invalid backup header ("
"key)")
if key not in self.header_keys.keys():
raise QubesException("Invalid backup header (key)")
if key not in self.known_headers:
# Ignoring unknown option
continue
header = self.known_headers[key]
if key in seen:
raise QubesException("Duplicated header line: {}".format(key))
seen.add(key)
if getattr(self, header.field, None) is not None:
# ignore options already set (potentially forced values)
continue
if not _re_alphanum.match(value):
raise QubesException("Invalid backup header ("
"value)")
if getattr(self, self.header_keys[key]) is not None:
raise QubesException(
"Duplicated header line: {}".format(key))
if key in self.bool_options:
raise QubesException("Invalid backup header (value)")
if header.t is bool:
value = value.lower() in ["1", "true", "yes"]
elif key in self.int_options:
elif header.t is int:
value = int(value)
setattr(self, self.header_keys[key], value)
elif header.t is str:
pass
else:
raise QubesException("Unrecognized header type")
if not header.validator(value):
raise QubesException("Invalid value for header: {}".format(key))
setattr(self, header.field, value)
self.validate()
@ -184,9 +224,10 @@ class BackupHeader(object):
with open(filename, "w") as f_header:
# make sure 'version' is the first key
f_header.write('version={}\n'.format(self.version))
for key, attr in self.header_keys.items():
for key, header in self.known_headers.items():
if key == 'version':
continue
attr = header.field
if getattr(self, attr) is None:
continue
f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
@ -739,7 +780,35 @@ def get_supported_hmac_algo(hmac_algorithm=None):
algo = algo.decode('ascii')
if '=>' in algo:
continue
yield algo.strip()
yield algo.strip().lower()
finally:
proc.terminate()
proc.wait()
proc.stdout.close()
def get_supported_crypto_algo(crypto_algorithm=None):
'''Generate a list of supported hmac algorithms
:param crypto_algorithm: default algorithm, if given, it is placed as a
first element
'''
# Start with provided default
if crypto_algorithm:
yield crypto_algorithm
if crypto_algorithm != 'scrypt':
yield 'scrypt'
proc = subprocess.Popen(
'openssl list-cipher-algorithms || '
'openssl list -cipher-algorithms',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
try:
for algo in proc.stdout.readlines():
algo = algo.decode('ascii')
if '=>' in algo:
continue
yield algo.strip().lower()
finally:
proc.terminate()
proc.wait()