backup.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900
  1. #
  2. # The Qubes OS Project, http://www.qubes-os.org
  3. #
  4. # Copyright (C) 2013-2017 Marek Marczykowski-Górecki
  5. # <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2013 Olivier Médoc <o_medoc@yahoo.fr>
  7. #
  8. # This program is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU General Public License
  10. # as published by the Free Software Foundation; either version 2
  11. # of the License, or (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License
  19. # along with this program. If not, see <http://www.gnu.org/licenses/>
  20. #
  21. #
  22. from __future__ import unicode_literals
  23. import itertools
  24. import logging
  25. import functools
  26. import termios
  27. import asyncio
  28. from qubes.utils import size_to_human
  29. import stat
  30. import os
  31. import fcntl
  32. import subprocess
  33. import re
  34. import shutil
  35. import tempfile
  36. import time
  37. import grp
  38. import pwd
  39. import datetime
  40. import qubes
  41. import qubes.core2migration
  42. import qubes.storage
  43. import qubes.storage.file
  44. import qubes.vm.templatevm
  45. QUEUE_ERROR = "ERROR"
  46. QUEUE_FINISHED = "FINISHED"
  47. HEADER_FILENAME = 'backup-header'
  48. DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
  49. # 'scrypt' is not exactly HMAC algorithm, but a tool we use to
  50. # integrity-protect the data
  51. DEFAULT_HMAC_ALGORITHM = 'scrypt'
  52. DEFAULT_COMPRESSION_FILTER = 'gzip'
  53. CURRENT_BACKUP_FORMAT_VERSION = '4'
  54. # Maximum size of error message get from process stderr (including VM process)
  55. MAX_STDERR_BYTES = 1024
  56. # header + qubes.xml max size
  57. HEADER_QUBES_XML_MAX_SIZE = 1024 * 1024
  58. # hmac file max size - regardless of backup format version!
  59. HMAC_MAX_SIZE = 4096
  60. BLKSIZE = 512
  61. _re_alphanum = re.compile(r'^[A-Za-z0-9-]*$')
  62. class BackupCanceledError(qubes.exc.QubesException):
  63. def __init__(self, msg, tmpdir=None):
  64. super(BackupCanceledError, self).__init__(msg)
  65. self.tmpdir = tmpdir
  66. class BackupHeader(object):
  67. '''Structure describing backup-header file included as the first file in
  68. backup archive
  69. '''
  70. header_keys = {
  71. 'version': 'version',
  72. 'encrypted': 'encrypted',
  73. 'compressed': 'compressed',
  74. 'compression-filter': 'compression_filter',
  75. 'crypto-algorithm': 'crypto_algorithm',
  76. 'hmac-algorithm': 'hmac_algorithm',
  77. 'backup-id': 'backup_id'
  78. }
  79. bool_options = ['encrypted', 'compressed']
  80. int_options = ['version']
  81. def __init__(self,
  82. header_data=None,
  83. version=None,
  84. encrypted=None,
  85. compressed=None,
  86. compression_filter=None,
  87. hmac_algorithm=None,
  88. crypto_algorithm=None,
  89. backup_id=None):
  90. # repeat the list to help code completion...
  91. self.version = version
  92. self.encrypted = encrypted
  93. self.compressed = compressed
  94. # Options introduced in backup format 3+, which always have a header,
  95. # so no need for fallback in function parameter
  96. self.compression_filter = compression_filter
  97. self.hmac_algorithm = hmac_algorithm
  98. self.crypto_algorithm = crypto_algorithm
  99. self.backup_id = backup_id
  100. if header_data is not None:
  101. self.load(header_data)
  102. def load(self, untrusted_header_text):
  103. """Parse backup header file.
  104. :param untrusted_header_text: header content
  105. :type untrusted_header_text: basestring
  106. .. warning::
  107. This function may be exposed to not yet verified header,
  108. so is security critical.
  109. """
  110. try:
  111. untrusted_header_text = untrusted_header_text.decode('ascii')
  112. except UnicodeDecodeError:
  113. raise qubes.exc.QubesException(
  114. "Non-ASCII characters in backup header")
  115. for untrusted_line in untrusted_header_text.splitlines():
  116. if untrusted_line.count('=') != 1:
  117. raise qubes.exc.QubesException("Invalid backup header")
  118. key, value = untrusted_line.strip().split('=', 1)
  119. if not _re_alphanum.match(key):
  120. raise qubes.exc.QubesException("Invalid backup header (key)")
  121. if key not in self.header_keys.keys():
  122. # Ignoring unknown option
  123. continue
  124. if not _re_alphanum.match(value):
  125. raise qubes.exc.QubesException("Invalid backup header (value)")
  126. if getattr(self, self.header_keys[key]) is not None:
  127. raise qubes.exc.QubesException(
  128. "Duplicated header line: {}".format(key))
  129. if key in self.bool_options:
  130. value = value.lower() in ["1", "true", "yes"]
  131. elif key in self.int_options:
  132. value = int(value)
  133. setattr(self, self.header_keys[key], value)
  134. self.validate()
  135. def validate(self):
  136. if self.version == 1:
  137. # header not really present
  138. pass
  139. elif self.version in [2, 3, 4]:
  140. expected_attrs = ['version', 'encrypted', 'compressed',
  141. 'hmac_algorithm']
  142. if self.encrypted:
  143. expected_attrs += ['crypto_algorithm']
  144. if self.version >= 3 and self.compressed:
  145. expected_attrs += ['compression_filter']
  146. if self.version >= 4:
  147. expected_attrs += ['backup_id']
  148. for key in expected_attrs:
  149. if getattr(self, key) is None:
  150. raise qubes.exc.QubesException(
  151. "Backup header lack '{}' info".format(key))
  152. else:
  153. raise qubes.exc.QubesException(
  154. "Unsupported backup version {}".format(self.version))
  155. def save(self, filename):
  156. with open(filename, "w") as f_header:
  157. # make sure 'version' is the first key
  158. f_header.write('version={}\n'.format(self.version))
  159. for key, attr in self.header_keys.items():
  160. if key == 'version':
  161. continue
  162. if getattr(self, attr) is None:
  163. continue
  164. f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
  165. class SendWorker(object):
  166. # pylint: disable=too-few-public-methods
  167. def __init__(self, queue, base_dir, backup_stdout):
  168. super(SendWorker, self).__init__()
  169. self.queue = queue
  170. self.base_dir = base_dir
  171. self.backup_stdout = backup_stdout
  172. self.log = logging.getLogger('qubes.backup')
  173. @asyncio.coroutine
  174. def run(self):
  175. self.log.debug("Started sending thread")
  176. while True:
  177. filename = yield from self.queue.get()
  178. if filename in (QUEUE_FINISHED, QUEUE_ERROR):
  179. break
  180. self.log.debug("Sending file {}".format(filename))
  181. # This tar used for sending data out need to be as simple, as
  182. # simple, as featureless as possible. It will not be
  183. # verified before untaring.
  184. tar_final_cmd = ["tar", "-cO", "--posix",
  185. "-C", self.base_dir, filename]
  186. final_proc = yield from asyncio.create_subprocess_exec(
  187. *tar_final_cmd,
  188. stdout=self.backup_stdout)
  189. retcode = yield from final_proc.wait()
  190. if retcode >= 2:
  191. # handle only exit code 2 (tar fatal error) or
  192. # greater (call failed?)
  193. raise qubes.exc.QubesException(
  194. "ERROR: Failed to write the backup, out of disk space? "
  195. "Check console output or ~/.xsession-errors for details.")
  196. # Delete the file as we don't need it anymore
  197. self.log.debug("Removing file {}".format(filename))
  198. os.remove(os.path.join(self.base_dir, filename))
  199. self.log.debug("Finished sending thread")
  200. @asyncio.coroutine
  201. def launch_proc_with_pty(args, stdin=None, stdout=None, stderr=None, echo=True):
  202. """Similar to pty.fork, but handle stdin/stdout according to parameters
  203. instead of connecting to the pty
  204. :return tuple (subprocess.Popen, pty_master)
  205. """
  206. def set_ctty(ctty_fd, master_fd):
  207. os.setsid()
  208. os.close(master_fd)
  209. fcntl.ioctl(ctty_fd, termios.TIOCSCTTY, 0)
  210. if not echo:
  211. termios_p = termios.tcgetattr(ctty_fd)
  212. # termios_p.c_lflags
  213. termios_p[3] &= ~termios.ECHO
  214. termios.tcsetattr(ctty_fd, termios.TCSANOW, termios_p)
  215. (pty_master, pty_slave) = os.openpty()
  216. p = yield from asyncio.create_subprocess_exec(*args,
  217. stdin=stdin,
  218. stdout=stdout,
  219. stderr=stderr,
  220. preexec_fn=lambda: set_ctty(pty_slave, pty_master))
  221. os.close(pty_slave)
  222. return p, open(pty_master, 'wb+', buffering=0)
  223. @asyncio.coroutine
  224. def launch_scrypt(action, input_name, output_name, passphrase):
  225. '''
  226. Launch 'scrypt' process, pass passphrase to it and return
  227. subprocess.Popen object.
  228. :param action: 'enc' or 'dec'
  229. :param input_name: input path or '-' for stdin
  230. :param output_name: output path or '-' for stdout
  231. :param passphrase: passphrase
  232. :return: subprocess.Popen object
  233. '''
  234. command_line = ['scrypt', action, input_name, output_name]
  235. (p, pty) = yield from launch_proc_with_pty(command_line,
  236. stdin=subprocess.PIPE if input_name == '-' else None,
  237. stdout=subprocess.PIPE if output_name == '-' else None,
  238. stderr=subprocess.PIPE,
  239. echo=False)
  240. if action == 'enc':
  241. prompts = (b'Please enter passphrase: ', b'Please confirm passphrase: ')
  242. else:
  243. prompts = (b'Please enter passphrase: ',)
  244. for prompt in prompts:
  245. actual_prompt = yield from p.stderr.read(len(prompt))
  246. if actual_prompt != prompt:
  247. raise qubes.exc.QubesException(
  248. 'Unexpected prompt from scrypt: {}'.format(actual_prompt))
  249. pty.write(passphrase.encode('utf-8') + b'\n')
  250. pty.flush()
  251. # save it here, so garbage collector would not close it (which would kill
  252. # the child)
  253. p.pty = pty
  254. return p
  255. class Backup(object):
  256. '''Backup operation manager. Usage:
  257. >>> app = qubes.Qubes()
  258. >>> # optional - you can use 'None' to use default list (based on
  259. >>> # vm.include_in_backups property)
  260. >>> vms = [app.domains[name] for name in ['my-vm1', 'my-vm2', 'my-vm3']]
  261. >>> exclude_vms = []
  262. >>> options = {
  263. >>> 'encrypted': True,
  264. >>> 'compressed': True,
  265. >>> 'passphrase': 'This is very weak backup passphrase',
  266. >>> 'target_vm': app.domains['sys-usb'],
  267. >>> 'target_dir': '/media/disk',
  268. >>> }
  269. >>> backup_op = Backup(app, vms, exclude_vms, **options)
  270. >>> print(backup_op.get_backup_summary())
  271. >>> asyncio.get_event_loop().run_until_complete(backup_op.backup_do())
  272. See attributes of this object for all available options.
  273. '''
  274. # pylint: disable=too-many-instance-attributes
  275. class FileToBackup(object):
  276. # pylint: disable=too-few-public-methods
  277. def __init__(self, file_path, subdir=None, name=None):
  278. file_size = qubes.storage.file.get_disk_usage(file_path)
  279. if subdir is None:
  280. abs_file_path = os.path.abspath(file_path)
  281. abs_base_dir = os.path.abspath(
  282. qubes.config.system_path["qubes_base_dir"]) + '/'
  283. abs_file_dir = os.path.dirname(abs_file_path) + '/'
  284. (nothing, directory, subdir) = \
  285. abs_file_dir.partition(abs_base_dir)
  286. assert nothing == ""
  287. assert directory == abs_base_dir
  288. else:
  289. if subdir and not subdir.endswith('/'):
  290. subdir += '/'
  291. #: real path to the file
  292. self.path = file_path
  293. #: size of the file
  294. self.size = file_size
  295. #: directory in backup archive where file should be placed
  296. self.subdir = subdir
  297. #: use this name in the archive (aka rename)
  298. self.name = os.path.basename(file_path)
  299. if name is not None:
  300. self.name = name
  301. class VMToBackup(object):
  302. # pylint: disable=too-few-public-methods
  303. def __init__(self, vm, files, subdir):
  304. self.vm = vm
  305. self.files = files
  306. self.subdir = subdir
  307. @property
  308. def size(self):
  309. return functools.reduce(lambda x, y: x + y.size, self.files, 0)
  310. def __init__(self, app, vms_list=None, exclude_list=None, **kwargs):
  311. """
  312. If vms = None, include all (sensible) VMs;
  313. exclude_list is always applied
  314. """
  315. super(Backup, self).__init__()
  316. #: progress of the backup - bytes handled of the current VM
  317. self.chunk_size = 100 * 1024 * 1024
  318. self._current_vm_bytes = 0
  319. #: progress of the backup - bytes handled of finished VMs
  320. self._done_vms_bytes = 0
  321. #: total backup size (set by :py:meth:`get_files_to_backup`)
  322. self.total_backup_bytes = 0
  323. #: application object
  324. self.app = app
  325. #: directory for temporary files - set after creating the directory
  326. self.tmpdir = None
  327. # Backup settings - defaults
  328. #: should the backup be compressed?
  329. self.compressed = True
  330. #: what passphrase should be used to intergrity protect (and encrypt)
  331. #: the backup; required
  332. self.passphrase = None
  333. #: custom compression filter; a program which process stdin to stdout
  334. self.compression_filter = DEFAULT_COMPRESSION_FILTER
  335. #: VM to which backup should be sent (if any)
  336. self.target_vm = None
  337. #: directory to save backup in (either in dom0 or target VM,
  338. #: depending on :py:attr:`target_vm`
  339. self.target_dir = None
  340. #: callback for progress reporting. Will be called with one argument
  341. #: - progress in percents
  342. self.progress_callback = None
  343. #: backup ID, needs to be unique (for a given user),
  344. #: not necessary unpredictable; automatically generated
  345. self.backup_id = datetime.datetime.now().strftime(
  346. '%Y%m%dT%H%M%S-' + str(os.getpid()))
  347. for key, value in kwargs.items():
  348. if hasattr(self, key):
  349. setattr(self, key, value)
  350. else:
  351. raise AttributeError(key)
  352. self.log = logging.getLogger('qubes.backup')
  353. if exclude_list is None:
  354. exclude_list = []
  355. if vms_list is None:
  356. vms_list = [vm for vm in app.domains if vm.include_in_backups]
  357. # Apply exclude list
  358. self.vms_for_backup = [vm for vm in vms_list
  359. if vm.name not in exclude_list]
  360. self._files_to_backup = self.get_files_to_backup()
  361. def __del__(self):
  362. if self.tmpdir and os.path.exists(self.tmpdir):
  363. shutil.rmtree(self.tmpdir)
  364. def get_files_to_backup(self):
  365. files_to_backup = {}
  366. for vm in self.vms_for_backup:
  367. if vm.qid == 0:
  368. # handle dom0 later
  369. continue
  370. subdir = 'vm%d/' % vm.qid
  371. vm_files = []
  372. if vm.volumes['private'] is not None:
  373. path_to_private_img = vm.storage.export('private')
  374. vm_files.append(self.FileToBackup(path_to_private_img, subdir,
  375. 'private.img'))
  376. vm_files.append(self.FileToBackup(vm.icon_path, subdir))
  377. vm_files.extend(self.FileToBackup(i, subdir)
  378. for i in vm.fire_event('backup-get-files'))
  379. # TODO: drop after merging firewall.xml into qubes.xml
  380. firewall_conf = os.path.join(vm.dir_path, vm.firewall_conf)
  381. if os.path.exists(firewall_conf):
  382. vm_files.append(self.FileToBackup(firewall_conf, subdir))
  383. if vm.updateable:
  384. path_to_root_img = vm.storage.export('root')
  385. vm_files.append(self.FileToBackup(path_to_root_img, subdir,
  386. 'root.img'))
  387. files_to_backup[vm.qid] = self.VMToBackup(vm, vm_files, subdir)
  388. # Dom0 user home
  389. if 0 in [vm.qid for vm in self.vms_for_backup]:
  390. local_user = grp.getgrnam('qubes').gr_mem[0]
  391. home_dir = pwd.getpwnam(local_user).pw_dir
  392. # Home dir should have only user-owned files, so fix it now
  393. # to prevent permissions problems - some root-owned files can
  394. # left after 'sudo bash' and similar commands
  395. subprocess.check_call(['sudo', 'chown', '-R', local_user, home_dir])
  396. home_to_backup = [
  397. self.FileToBackup(home_dir, 'dom0-home/')]
  398. vm_files = home_to_backup
  399. files_to_backup[0] = self.VMToBackup(self.app.domains[0],
  400. vm_files,
  401. os.path.join('dom0-home', os.path.basename(home_dir)))
  402. self.total_backup_bytes = functools.reduce(
  403. lambda x, y: x + y.size, files_to_backup.values(), 0)
  404. return files_to_backup
  405. def get_backup_summary(self):
  406. summary = ""
  407. fields_to_display = [
  408. {"name": "VM", "width": 16},
  409. {"name": "type", "width": 12},
  410. {"name": "size", "width": 12}
  411. ]
  412. # Display the header
  413. for field in fields_to_display:
  414. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  415. summary += fmt.format('-')
  416. summary += "\n"
  417. for field in fields_to_display:
  418. fmt = "{{0:>{0}}} |".format(field["width"] + 1)
  419. summary += fmt.format(field["name"])
  420. summary += "\n"
  421. for field in fields_to_display:
  422. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  423. summary += fmt.format('-')
  424. summary += "\n"
  425. files_to_backup = self._files_to_backup
  426. for qid, vm_info in files_to_backup.items():
  427. summary_line = ""
  428. fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
  429. summary_line += fmt.format(vm_info['vm'].name)
  430. fmt = "{{0:>{0}}} |".format(fields_to_display[1]["width"] + 1)
  431. if qid == 0:
  432. summary_line += fmt.format("User home")
  433. elif isinstance(vm_info['vm'], qubes.vm.templatevm.TemplateVM):
  434. summary_line += fmt.format("Template VM")
  435. else:
  436. summary_line += fmt.format("VM" + (" + Sys" if
  437. vm_info['vm'].updateable else ""))
  438. vm_size = vm_info['size']
  439. fmt = "{{0:>{0}}} |".format(fields_to_display[2]["width"] + 1)
  440. summary_line += fmt.format(size_to_human(vm_size))
  441. if qid != 0 and vm_info['vm'].is_running():
  442. summary_line += " <-- The VM is running, please shut down it " \
  443. "before proceeding with the backup!"
  444. summary += summary_line + "\n"
  445. for field in fields_to_display:
  446. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  447. summary += fmt.format('-')
  448. summary += "\n"
  449. fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
  450. summary += fmt.format("Total size:")
  451. fmt = "{{0:>{0}}} |".format(
  452. fields_to_display[1]["width"] + 1 + 2 + fields_to_display[2][
  453. "width"] + 1)
  454. summary += fmt.format(size_to_human(self.total_backup_bytes))
  455. summary += "\n"
  456. for field in fields_to_display:
  457. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  458. summary += fmt.format('-')
  459. summary += "\n"
  460. vms_not_for_backup = [vm.name for vm in self.app.domains
  461. if vm not in self.vms_for_backup]
  462. summary += "VMs not selected for backup:\n - " + "\n - ".join(
  463. sorted(vms_not_for_backup))
  464. return summary
  465. @asyncio.coroutine
  466. def _prepare_backup_header(self):
  467. header_file_path = os.path.join(self.tmpdir, HEADER_FILENAME)
  468. backup_header = BackupHeader(
  469. version=CURRENT_BACKUP_FORMAT_VERSION,
  470. hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
  471. encrypted=True,
  472. compressed=self.compressed,
  473. compression_filter=self.compression_filter,
  474. backup_id=self.backup_id,
  475. )
  476. backup_header.save(header_file_path)
  477. # Start encrypt, scrypt will also handle integrity
  478. # protection
  479. scrypt_passphrase = u'{filename}!{passphrase}'.format(
  480. filename=HEADER_FILENAME, passphrase=self.passphrase)
  481. scrypt = yield from launch_scrypt(
  482. 'enc', header_file_path, header_file_path + '.hmac',
  483. scrypt_passphrase)
  484. retcode = yield from scrypt.wait()
  485. if retcode:
  486. raise qubes.exc.QubesException(
  487. "Failed to compute hmac of header file: "
  488. + scrypt.stderr.read())
  489. return HEADER_FILENAME, HEADER_FILENAME + ".hmac"
  490. def _send_progress_update(self):
  491. if not self.total_backup_bytes:
  492. return
  493. if callable(self.progress_callback):
  494. progress = (
  495. 100 * (self._done_vms_bytes + self._current_vm_bytes) /
  496. self.total_backup_bytes)
  497. # pylint: disable=not-callable
  498. self.progress_callback(progress)
  499. def _add_vm_progress(self, bytes_done):
  500. self._current_vm_bytes += bytes_done
  501. self._send_progress_update()
  502. @asyncio.coroutine
  503. def _split_and_send(self, input_stream, file_basename,
  504. output_queue):
  505. '''Split *input_stream* into parts of max *chunk_size* bytes and send
  506. to *output_queue*.
  507. :param input_stream: stream (asyncio reader stream) of data to split
  508. :param file_basename: basename (i.e. without part number and '.enc')
  509. of output files
  510. :param output_queue: asyncio.Queue instance to put produced files to
  511. - queue will get only filenames of written chunks
  512. '''
  513. # Wait for compressor (tar) process to finish or for any
  514. # error of other subprocesses
  515. i = 0
  516. run_error = "size_limit"
  517. scrypt = None
  518. while run_error == "size_limit":
  519. # Prepare a first chunk
  520. chunkfile = file_basename + ".%03d.enc" % i
  521. i += 1
  522. # Start encrypt, scrypt will also handle integrity
  523. # protection
  524. scrypt_passphrase = \
  525. u'{backup_id}!{filename}!{passphrase}'.format(
  526. backup_id=self.backup_id,
  527. filename=os.path.relpath(chunkfile[:-4],
  528. self.tmpdir),
  529. passphrase=self.passphrase)
  530. try:
  531. scrypt = yield from launch_scrypt(
  532. "enc", "-", chunkfile, scrypt_passphrase)
  533. run_error = yield from handle_streams(
  534. input_stream,
  535. scrypt.stdin,
  536. self.chunk_size,
  537. self._add_vm_progress
  538. )
  539. self.log.debug(
  540. "handle_streams returned: {}".format(run_error))
  541. except:
  542. scrypt.terminate()
  543. raise
  544. scrypt.stdin.close()
  545. yield from scrypt.wait()
  546. self.log.debug("scrypt return code: {}".format(
  547. scrypt.returncode))
  548. # Send the chunk to the backup target
  549. yield from output_queue.put(
  550. os.path.relpath(chunkfile, self.tmpdir))
  551. @asyncio.coroutine
  552. def _wrap_and_send_files(self, files_to_backup, output_queue):
  553. for vm_info in files_to_backup:
  554. for file_info in vm_info.files:
  555. self.log.debug("Backing up {}".format(file_info))
  556. backup_tempfile = os.path.join(
  557. self.tmpdir, file_info.subdir,
  558. file_info.name)
  559. self.log.debug("Using temporary location: {}".format(
  560. backup_tempfile))
  561. # Ensure the temporary directory exists
  562. if not os.path.isdir(os.path.dirname(backup_tempfile)):
  563. os.makedirs(os.path.dirname(backup_tempfile))
  564. # The first tar cmd can use any complex feature as we want.
  565. # Files will be verified before untaring this.
  566. # Prefix the path in archive with filename["subdir"] to have it
  567. # verified during untar
  568. tar_cmdline = (["tar", "-Pc", '--sparse',
  569. '-C', os.path.dirname(file_info.path)] +
  570. (['--dereference'] if
  571. file_info.subdir != "dom0-home/" else []) +
  572. ['--xform=s:^%s:%s\\0:' % (
  573. os.path.basename(file_info.path),
  574. file_info.subdir),
  575. os.path.basename(file_info.path)
  576. ])
  577. file_stat = os.stat(file_info.path)
  578. if stat.S_ISBLK(file_stat.st_mode) or \
  579. file_info.name != os.path.basename(file_info.path):
  580. # tar doesn't handle content of block device, use our
  581. # writer
  582. # also use our tar writer when renaming file
  583. assert not stat.S_ISDIR(file_stat.st_mode), \
  584. "Renaming directories not supported"
  585. tar_cmdline = ['python3', '-m', 'qubes.tarwriter',
  586. '--override-name=%s' % (
  587. os.path.join(file_info.subdir, os.path.basename(
  588. file_info.name))),
  589. file_info.path]
  590. if self.compressed:
  591. tar_cmdline.insert(-2,
  592. "--use-compress-program=%s" % self.compression_filter)
  593. self.log.debug(" ".join(tar_cmdline))
  594. # Pipe: tar-sparse | scrypt | tar | backup_target
  595. # TODO: log handle stderr
  596. tar_sparse = yield from asyncio.create_subprocess_exec(
  597. *tar_cmdline, stdout=subprocess.PIPE)
  598. try:
  599. yield from self._split_and_send(
  600. tar_sparse.stdout,
  601. backup_tempfile,
  602. output_queue)
  603. except:
  604. try:
  605. tar_sparse.terminate()
  606. except ProcessLookupError:
  607. pass
  608. raise
  609. # This VM done, update progress
  610. self._done_vms_bytes += vm_info.size
  611. self._current_vm_bytes = 0
  612. self._send_progress_update()
  613. # Save date of last backup
  614. if vm_info.vm:
  615. vm_info.vm.backup_timestamp = datetime.datetime.now()
  616. yield from output_queue.put(QUEUE_FINISHED)
  617. @staticmethod
  618. @asyncio.coroutine
  619. def _monitor_process(proc, error_message):
  620. try:
  621. yield from proc.wait()
  622. except:
  623. proc.terminate()
  624. raise
  625. if proc.returncode:
  626. raise qubes.exc.QubesException(error_message)
  627. @staticmethod
  628. @asyncio.coroutine
  629. def _cancel_on_error(future, previous_task):
  630. '''If further element of chain fail, cancel previous one to
  631. avoid deadlock.
  632. When earlier element of chain fail, it will be handled by
  633. :py:meth:`backup_do`.
  634. The chain is:
  635. :py:meth:`_wrap_and_send_files` -> :py:class:`SendWorker` -> vmproc
  636. '''
  637. try:
  638. yield from future
  639. except: # pylint: disable=bare-except
  640. previous_task.cancel()
  641. @asyncio.coroutine
  642. def backup_do(self):
  643. # pylint: disable=too-many-statements
  644. if self.passphrase is None:
  645. raise qubes.exc.QubesException("No passphrase set")
  646. qubes_xml = self.app.store
  647. self.tmpdir = tempfile.mkdtemp()
  648. shutil.copy(qubes_xml, os.path.join(self.tmpdir, 'qubes.xml'))
  649. qubes_xml = os.path.join(self.tmpdir, 'qubes.xml')
  650. backup_app = qubes.Qubes(qubes_xml)
  651. backup_app.events_enabled = False
  652. files_to_backup = self._files_to_backup
  653. # make sure backup_content isn't set initially
  654. for vm in backup_app.domains:
  655. vm.events_enabled = False
  656. vm.features['backup-content'] = False
  657. for qid, vm_info in files_to_backup.items():
  658. if qid != 0 and vm_info.vm.is_running():
  659. raise qubes.exc.QubesVMNotHaltedError(vm_info.vm)
  660. # VM is included in the backup
  661. backup_app.domains[qid].features['backup-content'] = True
  662. backup_app.domains[qid].features['backup-path'] = vm_info.subdir
  663. backup_app.domains[qid].features['backup-size'] = vm_info.size
  664. backup_app.save()
  665. vmproc = None
  666. if self.target_vm is not None:
  667. # Prepare the backup target (Qubes service call)
  668. # If APPVM, STDOUT is a PIPE
  669. read_fd, write_fd = os.pipe()
  670. vmproc = yield from self.target_vm.run_service('qubes.Backup',
  671. stdin=read_fd, stderr=subprocess.PIPE)
  672. os.close(read_fd)
  673. os.write(write_fd, (self.target_dir.
  674. replace("\r", "").replace("\n", "") + "\n").encode())
  675. backup_stdout = write_fd
  676. else:
  677. # Prepare the backup target (local file)
  678. if os.path.isdir(self.target_dir):
  679. backup_target = self.target_dir + "/qubes-{0}". \
  680. format(time.strftime("%Y-%m-%dT%H%M%S"))
  681. else:
  682. backup_target = self.target_dir
  683. # Create the target directory
  684. if not os.path.exists(os.path.dirname(self.target_dir)):
  685. raise qubes.exc.QubesException(
  686. "ERROR: the backup directory for {0} does not exists".
  687. format(self.target_dir))
  688. # If not APPVM, STDOUT is a local file
  689. backup_stdout = open(backup_target, 'wb')
  690. # Tar with tape length does not deals well with stdout
  691. # (close stdout between two tapes)
  692. # For this reason, we will use named pipes instead
  693. self.log.debug("Working in {}".format(self.tmpdir))
  694. self.log.debug("Will backup: {}".format(files_to_backup))
  695. header_files = yield from self._prepare_backup_header()
  696. # Setup worker to send encrypted data chunks to the backup_target
  697. to_send = asyncio.Queue(10)
  698. send_proc = SendWorker(to_send, self.tmpdir, backup_stdout)
  699. send_task = asyncio.ensure_future(send_proc.run())
  700. vmproc_task = None
  701. if vmproc is not None:
  702. vmproc_task = asyncio.ensure_future(self._cancel_on_error(
  703. self._monitor_process(vmproc,
  704. 'Writing backup to VM {} failed'.format(
  705. self.target_vm.name)),
  706. send_task))
  707. for file_name in header_files:
  708. yield from to_send.put(file_name)
  709. qubes_xml_info = self.VMToBackup(
  710. None,
  711. [self.FileToBackup(qubes_xml, '')],
  712. ''
  713. )
  714. inner_archive_task = asyncio.ensure_future(
  715. self._wrap_and_send_files(
  716. itertools.chain([qubes_xml_info], files_to_backup.values()),
  717. to_send
  718. ))
  719. asyncio.ensure_future(
  720. self._cancel_on_error(send_task, inner_archive_task))
  721. try:
  722. try:
  723. yield from inner_archive_task
  724. except:
  725. yield from to_send.put(QUEUE_ERROR)
  726. # in fact we may be handling CancelledError, induced by
  727. # exception in send_task (and propagated by
  728. # self._cancel_on_error call above); in such a case this
  729. # yield from will raise exception, covering CancelledError -
  730. # this is intended behaviour
  731. yield from send_task
  732. raise
  733. yield from send_task
  734. finally:
  735. if isinstance(backup_stdout, int):
  736. os.close(backup_stdout)
  737. else:
  738. backup_stdout.close()
  739. if vmproc_task:
  740. yield from vmproc_task
  741. shutil.rmtree(self.tmpdir)
  742. self.app.save()
  743. @asyncio.coroutine
  744. def handle_streams(stream_in, stream_out, size_limit=None,
  745. progress_callback=None):
  746. '''
  747. Copy stream_in to all streams_out and monitor all mentioned processes.
  748. If any of them terminate with non-zero code, interrupt the process. Copy
  749. at most `size_limit` data (if given).
  750. :param stream_in: StreamReader object to read data from
  751. :param stream_out: StreamWriter object to write data to
  752. :param size_limit: int maximum data amount to process
  753. :param progress_callback: callable function to report progress, will be
  754. given copied data size (it should accumulate internally)
  755. :return: "size_limit" or None (no error)
  756. '''
  757. buffer_size = 409600
  758. bytes_copied = 0
  759. while True:
  760. if size_limit:
  761. to_copy = min(buffer_size, size_limit - bytes_copied)
  762. if to_copy <= 0:
  763. return "size_limit"
  764. else:
  765. to_copy = buffer_size
  766. buf = yield from stream_in.read(to_copy)
  767. if not buf:
  768. # done
  769. return None
  770. if callable(progress_callback):
  771. progress_callback(len(buf))
  772. stream_out.write(buf)
  773. bytes_copied += len(buf)
  774. # vim:sw=4:et: