backup.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
  1. #
  2. # The Qubes OS Project, http://www.qubes-os.org
  3. #
  4. # Copyright (C) 2013-2017 Marek Marczykowski-Górecki
  5. # <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2013 Olivier Médoc <o_medoc@yahoo.fr>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. #
  22. from __future__ import unicode_literals
  23. import asyncio
  24. import datetime
  25. import fcntl
  26. import functools
  27. import grp
  28. import itertools
  29. import logging
  30. import os
  31. import pwd
  32. import re
  33. import shutil
  34. import stat
  35. import string
  36. import subprocess
  37. import tempfile
  38. import termios
  39. import time
  40. from .utils import size_to_human
  41. import qubes
  42. import qubes.core2migration
  43. import qubes.storage
  44. import qubes.storage.file
  45. import qubes.vm.templatevm
  46. QUEUE_ERROR = "ERROR"
  47. QUEUE_FINISHED = "FINISHED"
  48. HEADER_FILENAME = 'backup-header'
  49. DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
  50. # 'scrypt' is not exactly HMAC algorithm, but a tool we use to
  51. # integrity-protect the data
  52. DEFAULT_HMAC_ALGORITHM = 'scrypt'
  53. DEFAULT_COMPRESSION_FILTER = 'gzip'
  54. CURRENT_BACKUP_FORMAT_VERSION = '4'
  55. # Maximum size of error message get from process stderr (including VM process)
  56. MAX_STDERR_BYTES = 1024
  57. # header + qubes.xml max size
  58. HEADER_QUBES_XML_MAX_SIZE = 1024 * 1024
  59. # hmac file max size - regardless of backup format version!
  60. HMAC_MAX_SIZE = 4096
  61. BLKSIZE = 512
  62. _re_alphanum = re.compile(r'^[A-Za-z0-9-]*$')
  63. class BackupCanceledError(qubes.exc.QubesException):
  64. def __init__(self, msg, tmpdir=None):
  65. super(BackupCanceledError, self).__init__(msg)
  66. self.tmpdir = tmpdir
  67. class BackupHeader:
  68. '''Structure describing backup-header file included as the first file in
  69. backup archive
  70. '''
  71. # pylint: disable=too-few-public-methods
  72. header_keys = {
  73. 'version': 'version',
  74. 'encrypted': 'encrypted',
  75. 'compressed': 'compressed',
  76. 'compression-filter': 'compression_filter',
  77. 'crypto-algorithm': 'crypto_algorithm',
  78. 'hmac-algorithm': 'hmac_algorithm',
  79. 'backup-id': 'backup_id'
  80. }
  81. bool_options = ['encrypted', 'compressed']
  82. int_options = ['version']
  83. def __init__(self,
  84. version=None,
  85. encrypted=None,
  86. compressed=None,
  87. compression_filter=None,
  88. hmac_algorithm=None,
  89. crypto_algorithm=None,
  90. backup_id=None):
  91. # repeat the list to help code completion...
  92. self.version = version
  93. self.encrypted = encrypted
  94. self.compressed = compressed
  95. # Options introduced in backup format 3+, which always have a header,
  96. # so no need for fallback in function parameter
  97. self.compression_filter = compression_filter
  98. self.hmac_algorithm = hmac_algorithm
  99. self.crypto_algorithm = crypto_algorithm
  100. self.backup_id = backup_id
  101. def save(self, filename):
  102. with open(filename, "w") as f_header:
  103. # make sure 'version' is the first key
  104. f_header.write('version={}\n'.format(self.version))
  105. for key, attr in self.header_keys.items():
  106. if key == 'version':
  107. continue
  108. if getattr(self, attr) is None:
  109. continue
  110. f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
  111. class SendWorker:
  112. # pylint: disable=too-few-public-methods
  113. def __init__(self, queue, base_dir, backup_stdout):
  114. super(SendWorker, self).__init__()
  115. self.queue = queue
  116. self.base_dir = base_dir
  117. self.backup_stdout = backup_stdout
  118. self.log = logging.getLogger('qubes.backup')
  119. @asyncio.coroutine
  120. def run(self):
  121. self.log.debug("Started sending thread")
  122. while True:
  123. filename = yield from self.queue.get()
  124. if filename in (QUEUE_FINISHED, QUEUE_ERROR):
  125. break
  126. self.log.debug("Sending file {}".format(filename))
  127. # This tar used for sending data out need to be as simple, as
  128. # simple, as featureless as possible. It will not be
  129. # verified before untaring.
  130. tar_final_cmd = ["tar", "-cO", "--posix",
  131. "-C", self.base_dir, filename]
  132. # pylint: disable=not-an-iterable
  133. final_proc = yield from asyncio.create_subprocess_exec(
  134. *tar_final_cmd,
  135. stdout=self.backup_stdout)
  136. retcode = yield from final_proc.wait()
  137. if retcode >= 2:
  138. # handle only exit code 2 (tar fatal error) or
  139. # greater (call failed?)
  140. raise qubes.exc.QubesException(
  141. "ERROR: Failed to write the backup, out of disk space? "
  142. "Check console output or ~/.xsession-errors for details.")
  143. # Delete the file as we don't need it anymore
  144. self.log.debug("Removing file {}".format(filename))
  145. os.remove(os.path.join(self.base_dir, filename))
  146. self.log.debug("Finished sending thread")
  147. @asyncio.coroutine
  148. def launch_proc_with_pty(args, stdin=None, stdout=None, stderr=None, echo=True):
  149. """Similar to pty.fork, but handle stdin/stdout according to parameters
  150. instead of connecting to the pty
  151. :return tuple (subprocess.Popen, pty_master)
  152. """
  153. def set_ctty(ctty_fd, master_fd):
  154. os.setsid()
  155. os.close(master_fd)
  156. fcntl.ioctl(ctty_fd, termios.TIOCSCTTY, 0)
  157. if not echo:
  158. termios_p = termios.tcgetattr(ctty_fd)
  159. # termios_p.c_lflags
  160. termios_p[3] &= ~termios.ECHO
  161. termios.tcsetattr(ctty_fd, termios.TCSANOW, termios_p)
  162. (pty_master, pty_slave) = os.openpty()
  163. # pylint: disable=not-an-iterable
  164. p = yield from asyncio.create_subprocess_exec(*args,
  165. stdin=stdin,
  166. stdout=stdout,
  167. stderr=stderr,
  168. preexec_fn=lambda: set_ctty(pty_slave, pty_master))
  169. os.close(pty_slave)
  170. return p, open(pty_master, 'wb+', buffering=0)
  171. @asyncio.coroutine
  172. def launch_scrypt(action, input_name, output_name, passphrase):
  173. '''
  174. Launch 'scrypt' process, pass passphrase to it and return
  175. subprocess.Popen object.
  176. :param action: 'enc' or 'dec'
  177. :param input_name: input path or '-' for stdin
  178. :param output_name: output path or '-' for stdout
  179. :param passphrase: passphrase
  180. :type passphrase: bytes
  181. :return: subprocess.Popen object
  182. '''
  183. command_line = ['scrypt', action, input_name, output_name]
  184. (p, pty) = yield from launch_proc_with_pty(command_line,
  185. stdin=subprocess.PIPE if input_name == '-' else None,
  186. stdout=subprocess.PIPE if output_name == '-' else None,
  187. stderr=subprocess.PIPE,
  188. echo=False)
  189. if action == 'enc':
  190. prompts = (b'Please enter passphrase: ', b'Please confirm passphrase: ')
  191. else:
  192. prompts = (b'Please enter passphrase: ',)
  193. for prompt in prompts:
  194. actual_prompt = yield from p.stderr.read(len(prompt))
  195. if actual_prompt != prompt:
  196. raise qubes.exc.QubesException(
  197. 'Unexpected prompt from scrypt: {}'.format(actual_prompt))
  198. pty.write(passphrase + b'\n')
  199. pty.flush()
  200. # save it here, so garbage collector would not close it (which would kill
  201. # the child)
  202. p.pty = pty
  203. return p
  204. class Backup:
  205. '''Backup operation manager. Usage:
  206. >>> app = qubes.Qubes()
  207. >>> # optional - you can use 'None' to use default list (based on
  208. >>> # vm.include_in_backups property)
  209. >>> vms = [app.domains[name] for name in ['my-vm1', 'my-vm2', 'my-vm3']]
  210. >>> exclude_vms = []
  211. >>> options = {
  212. >>> 'encrypted': True,
  213. >>> 'compressed': True,
  214. >>> 'passphrase': 'This is very weak backup passphrase',
  215. >>> 'target_vm': app.domains['sys-usb'],
  216. >>> 'target_dir': '/media/disk',
  217. >>> }
  218. >>> backup_op = Backup(app, vms, exclude_vms, **options)
  219. >>> print(backup_op.get_backup_summary())
  220. >>> asyncio.get_event_loop().run_until_complete(backup_op.backup_do())
  221. See attributes of this object for all available options.
  222. '''
  223. # pylint: disable=too-many-instance-attributes
  224. class FileToBackup:
  225. # pylint: disable=too-few-public-methods
  226. def __init__(self, file_path, subdir=None, name=None, size=None):
  227. if size is None:
  228. size = qubes.storage.file.get_disk_usage(file_path)
  229. if subdir is None:
  230. abs_file_path = os.path.abspath(file_path)
  231. abs_base_dir = os.path.abspath(
  232. qubes.config.system_path["qubes_base_dir"]) + '/'
  233. abs_file_dir = os.path.dirname(abs_file_path) + '/'
  234. (nothing, directory, subdir) = \
  235. abs_file_dir.partition(abs_base_dir)
  236. assert nothing == ""
  237. assert directory == abs_base_dir
  238. else:
  239. if subdir and not subdir.endswith('/'):
  240. subdir += '/'
  241. #: real path to the file
  242. self.path = file_path
  243. #: size of the file
  244. self.size = size
  245. #: directory in backup archive where file should be placed
  246. self.subdir = subdir
  247. #: use this name in the archive (aka rename)
  248. self.name = os.path.basename(file_path)
  249. if name is not None:
  250. self.name = name
  251. class VMToBackup:
  252. # pylint: disable=too-few-public-methods
  253. def __init__(self, vm, files, subdir):
  254. self.vm = vm
  255. self.files = files
  256. self.subdir = subdir
  257. @property
  258. def size(self):
  259. return functools.reduce(lambda x, y: x + y.size, self.files, 0)
  260. def __init__(self, app, vms_list=None, exclude_list=None, **kwargs):
  261. """
  262. If vms = None, include all (sensible) VMs;
  263. exclude_list is always applied
  264. """
  265. super(Backup, self).__init__()
  266. #: progress of the backup - bytes handled of the current VM
  267. self.chunk_size = 100 * 1024 * 1024
  268. self._current_vm_bytes = 0
  269. #: progress of the backup - bytes handled of finished VMs
  270. self._done_vms_bytes = 0
  271. #: total backup size (set by :py:meth:`get_files_to_backup`)
  272. self.total_backup_bytes = 0
  273. #: application object
  274. self.app = app
  275. #: directory for temporary files - set after creating the directory
  276. self.tmpdir = None
  277. # Backup settings - defaults
  278. #: should the backup be compressed?
  279. self.compressed = True
  280. #: what passphrase should be used to intergrity protect (and encrypt)
  281. #: the backup; required
  282. self.passphrase = None
  283. #: custom compression filter; a program which process stdin to stdout
  284. self.compression_filter = DEFAULT_COMPRESSION_FILTER
  285. #: VM to which backup should be sent (if any)
  286. self.target_vm = None
  287. #: directory to save backup in (either in dom0 or target VM,
  288. #: depending on :py:attr:`target_vm`
  289. self.target_dir = None
  290. #: callback for progress reporting. Will be called with one argument
  291. #: - progress in percents
  292. self.progress_callback = None
  293. self.last_progress_time = time.time()
  294. #: backup ID, needs to be unique (for a given user),
  295. #: not necessary unpredictable; automatically generated
  296. self.backup_id = datetime.datetime.now().strftime(
  297. '%Y%m%dT%H%M%S-' + str(os.getpid()))
  298. for key, value in kwargs.items():
  299. if hasattr(self, key):
  300. setattr(self, key, value)
  301. else:
  302. raise AttributeError(key)
  303. self.log = logging.getLogger('qubes.backup')
  304. if exclude_list is None:
  305. exclude_list = []
  306. if vms_list is None:
  307. vms_list = [vm for vm in app.domains if vm.include_in_backups]
  308. # Apply exclude list
  309. self.vms_for_backup = [vm for vm in vms_list
  310. if vm.name not in exclude_list]
  311. self._files_to_backup = self.get_files_to_backup()
  312. def __del__(self):
  313. if self.tmpdir and os.path.exists(self.tmpdir):
  314. shutil.rmtree(self.tmpdir)
  315. def get_files_to_backup(self):
  316. files_to_backup = {}
  317. for vm in self.vms_for_backup:
  318. if vm.qid == 0:
  319. # handle dom0 later
  320. continue
  321. subdir = 'vm%d/' % vm.qid
  322. vm_files = []
  323. for name, volume in vm.volumes.items():
  324. if not volume.save_on_stop:
  325. continue
  326. vm_files.append(self.FileToBackup(
  327. volume.export(),
  328. subdir,
  329. name + '.img',
  330. volume.usage))
  331. vm_files.extend(self.FileToBackup(i, subdir)
  332. for i in vm.fire_event('backup-get-files'))
  333. firewall_conf = os.path.join(vm.dir_path, vm.firewall_conf)
  334. if os.path.exists(firewall_conf):
  335. vm_files.append(self.FileToBackup(firewall_conf, subdir))
  336. if not vm_files:
  337. # subdir/ is needed in the tar file, otherwise restore
  338. # of a (Disp)VM without any backed up files is going
  339. # to fail. Adding a zero-sized file here happens to be
  340. # more straightforward than adding an empty directory.
  341. empty = self.FileToBackup("/var/run/qubes/empty", subdir)
  342. assert empty.size == 0
  343. vm_files.append(empty)
  344. files_to_backup[vm.qid] = self.VMToBackup(vm, vm_files, subdir)
  345. # Dom0 user home
  346. if 0 in [vm.qid for vm in self.vms_for_backup]:
  347. local_user = grp.getgrnam('qubes').gr_mem[0]
  348. home_dir = pwd.getpwnam(local_user).pw_dir
  349. # Home dir should have only user-owned files, so fix it now
  350. # to prevent permissions problems - some root-owned files can
  351. # left after 'sudo bash' and similar commands
  352. subprocess.check_call(['sudo', 'chown', '-R', local_user, home_dir])
  353. home_to_backup = [
  354. self.FileToBackup(home_dir, 'dom0-home/')]
  355. vm_files = home_to_backup
  356. files_to_backup[0] = self.VMToBackup(self.app.domains[0],
  357. vm_files,
  358. os.path.join('dom0-home', os.path.basename(home_dir)))
  359. self.total_backup_bytes = functools.reduce(
  360. lambda x, y: x + y.size, files_to_backup.values(), 0)
  361. return files_to_backup
  362. def get_backup_summary(self):
  363. summary = ""
  364. fields_to_display = [
  365. {"name": "VM", "width": 16},
  366. {"name": "type", "width": 12},
  367. {"name": "size", "width": 12}
  368. ]
  369. # Display the header
  370. for field in fields_to_display:
  371. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  372. summary += fmt.format('-')
  373. summary += "\n"
  374. for field in fields_to_display:
  375. fmt = "{{0:>{0}}} |".format(field["width"] + 1)
  376. summary += fmt.format(field["name"])
  377. summary += "\n"
  378. for field in fields_to_display:
  379. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  380. summary += fmt.format('-')
  381. summary += "\n"
  382. files_to_backup = self._files_to_backup
  383. for qid, vm_info in files_to_backup.items():
  384. summary_line = ""
  385. fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
  386. summary_line += fmt.format(vm_info.vm.name)
  387. fmt = "{{0:>{0}}} |".format(fields_to_display[1]["width"] + 1)
  388. if qid == 0:
  389. summary_line += fmt.format("User home")
  390. elif isinstance(vm_info.vm, qubes.vm.templatevm.TemplateVM):
  391. summary_line += fmt.format("Template VM")
  392. else:
  393. summary_line += fmt.format("VM" + (" + Sys" if
  394. vm_info.vm.updateable else ""))
  395. vm_size = vm_info.size
  396. fmt = "{{0:>{0}}} |".format(fields_to_display[2]["width"] + 1)
  397. summary_line += fmt.format(size_to_human(vm_size))
  398. if qid != 0 and vm_info.vm.is_running():
  399. summary_line += " <-- The VM is running, backup will contain " \
  400. "its state from before its start!"
  401. summary += summary_line + "\n"
  402. for field in fields_to_display:
  403. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  404. summary += fmt.format('-')
  405. summary += "\n"
  406. fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
  407. summary += fmt.format("Total size:")
  408. fmt = "{{0:>{0}}} |".format(
  409. fields_to_display[1]["width"] + 1 + 2 + fields_to_display[2][
  410. "width"] + 1)
  411. summary += fmt.format(size_to_human(self.total_backup_bytes))
  412. summary += "\n"
  413. for field in fields_to_display:
  414. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  415. summary += fmt.format('-')
  416. summary += "\n"
  417. vms_not_for_backup = [vm.name for vm in self.app.domains
  418. if vm not in self.vms_for_backup]
  419. summary += "VMs not selected for backup:\n - " + "\n - ".join(
  420. sorted(vms_not_for_backup)) + "\n"
  421. return summary
  422. @asyncio.coroutine
  423. def _prepare_backup_header(self):
  424. header_file_path = os.path.join(self.tmpdir, HEADER_FILENAME)
  425. backup_header = BackupHeader(
  426. version=CURRENT_BACKUP_FORMAT_VERSION,
  427. hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
  428. encrypted=True,
  429. compressed=self.compressed,
  430. compression_filter=self.compression_filter,
  431. backup_id=self.backup_id,
  432. )
  433. backup_header.save(header_file_path)
  434. # Start encrypt, scrypt will also handle integrity
  435. # protection
  436. scrypt_passphrase = '{filename}!'.format(
  437. filename=HEADER_FILENAME).encode() + self.passphrase
  438. scrypt = yield from launch_scrypt(
  439. 'enc', header_file_path, header_file_path + '.hmac',
  440. scrypt_passphrase)
  441. retcode = yield from scrypt.wait()
  442. if retcode:
  443. raise qubes.exc.QubesException(
  444. "Failed to compute hmac of header file: "
  445. + scrypt.stderr.read())
  446. return HEADER_FILENAME, HEADER_FILENAME + ".hmac"
  447. def _send_progress_update(self):
  448. if not self.total_backup_bytes:
  449. return
  450. if callable(self.progress_callback):
  451. if time.time() - self.last_progress_time >= 1: # avoid flooding
  452. progress = (
  453. 100 * (self._done_vms_bytes + self._current_vm_bytes) /
  454. self.total_backup_bytes)
  455. self.last_progress_time = time.time()
  456. # pylint: disable=not-callable
  457. self.progress_callback(progress)
  458. def _add_vm_progress(self, bytes_done):
  459. self._current_vm_bytes += bytes_done
  460. self._send_progress_update()
  461. @asyncio.coroutine
  462. def _split_and_send(self, input_stream, file_basename,
  463. output_queue):
  464. '''Split *input_stream* into parts of max *chunk_size* bytes and send
  465. to *output_queue*.
  466. :param input_stream: stream (asyncio reader stream) of data to split
  467. :param file_basename: basename (i.e. without part number and '.enc')
  468. of output files
  469. :param output_queue: asyncio.Queue instance to put produced files to
  470. - queue will get only filenames of written chunks
  471. '''
  472. # Wait for compressor (tar) process to finish or for any
  473. # error of other subprocesses
  474. i = 0
  475. run_error = "size_limit"
  476. scrypt = None
  477. while run_error == "size_limit":
  478. # Prepare a first chunk
  479. chunkfile = file_basename + ".%03d.enc" % i
  480. i += 1
  481. # Start encrypt, scrypt will also handle integrity
  482. # protection
  483. scrypt_passphrase = \
  484. '{backup_id}!{filename}!'.format(
  485. backup_id=self.backup_id,
  486. filename=os.path.relpath(chunkfile[:-4],
  487. self.tmpdir)).encode() + self.passphrase
  488. try:
  489. scrypt = yield from launch_scrypt(
  490. "enc", "-", chunkfile, scrypt_passphrase)
  491. run_error = yield from handle_streams(
  492. input_stream,
  493. scrypt.stdin,
  494. self.chunk_size,
  495. self._add_vm_progress
  496. )
  497. self.log.debug(
  498. "handle_streams returned: {}".format(run_error))
  499. except:
  500. scrypt.terminate()
  501. raise
  502. scrypt.stdin.close()
  503. yield from scrypt.wait()
  504. self.log.debug("scrypt return code: {}".format(
  505. scrypt.returncode))
  506. # Send the chunk to the backup target
  507. yield from output_queue.put(
  508. os.path.relpath(chunkfile, self.tmpdir))
  509. @asyncio.coroutine
  510. def _wrap_and_send_files(self, files_to_backup, output_queue):
  511. for vm_info in files_to_backup:
  512. for file_info in vm_info.files:
  513. self.log.debug("Backing up {}".format(file_info))
  514. backup_tempfile = os.path.join(
  515. self.tmpdir, file_info.subdir,
  516. file_info.name)
  517. self.log.debug("Using temporary location: {}".format(
  518. backup_tempfile))
  519. # Ensure the temporary directory exists
  520. if not os.path.isdir(os.path.dirname(backup_tempfile)):
  521. os.makedirs(os.path.dirname(backup_tempfile))
  522. # The first tar cmd can use any complex feature as we want.
  523. # Files will be verified before untaring this.
  524. # Prefix the path in archive with filename["subdir"] to have it
  525. # verified during untar
  526. tar_cmdline = (["tar", "-Pc", '--sparse',
  527. '-C', os.path.dirname(file_info.path)] +
  528. (['--dereference'] if
  529. file_info.subdir != "dom0-home/" else []) +
  530. ['--xform=s:^%s:%s\\0:' % (
  531. os.path.basename(file_info.path),
  532. file_info.subdir),
  533. os.path.basename(file_info.path)
  534. ])
  535. file_stat = os.stat(file_info.path)
  536. if stat.S_ISBLK(file_stat.st_mode) or \
  537. file_info.name != os.path.basename(file_info.path):
  538. # tar doesn't handle content of block device, use our
  539. # writer
  540. # also use our tar writer when renaming file
  541. assert not stat.S_ISDIR(file_stat.st_mode), \
  542. "Renaming directories not supported"
  543. tar_cmdline = ['python3', '-m', 'qubes.tarwriter',
  544. '--override-name=%s' % (
  545. os.path.join(file_info.subdir, os.path.basename(
  546. file_info.name))),
  547. file_info.path]
  548. if self.compressed:
  549. tar_cmdline.insert(-2,
  550. "--use-compress-program=%s" % self.compression_filter)
  551. self.log.debug(" ".join(tar_cmdline))
  552. # Pipe: tar-sparse | scrypt | tar | backup_target
  553. # TODO: log handle stderr
  554. # pylint: disable=not-an-iterable
  555. tar_sparse = yield from asyncio.create_subprocess_exec(
  556. *tar_cmdline, stdout=subprocess.PIPE)
  557. try:
  558. yield from self._split_and_send(
  559. tar_sparse.stdout,
  560. backup_tempfile,
  561. output_queue)
  562. except:
  563. try:
  564. tar_sparse.terminate()
  565. except ProcessLookupError:
  566. pass
  567. raise
  568. yield from tar_sparse.wait()
  569. if tar_sparse.returncode:
  570. raise qubes.exc.QubesException(
  571. 'Failed to archive {} file'.format(file_info.path))
  572. # This VM done, update progress
  573. self._done_vms_bytes += vm_info.size
  574. self._current_vm_bytes = 0
  575. self._send_progress_update()
  576. yield from output_queue.put(QUEUE_FINISHED)
  577. @staticmethod
  578. @asyncio.coroutine
  579. def _monitor_process(proc, error_message):
  580. try:
  581. yield from proc.wait()
  582. except:
  583. proc.terminate()
  584. raise
  585. if proc.returncode:
  586. if proc.stderr is not None:
  587. proc_stderr = (yield from proc.stderr.read())
  588. proc_stderr = proc_stderr.decode('ascii', errors='ignore')
  589. proc_stderr = ''.join(
  590. c for c in proc_stderr if c in string.printable and
  591. c not in '\r\n%{}')
  592. error_message += ': ' + proc_stderr
  593. raise qubes.exc.QubesException(error_message)
  594. @staticmethod
  595. @asyncio.coroutine
  596. def _cancel_on_error(future, previous_task):
  597. '''If further element of chain fail, cancel previous one to
  598. avoid deadlock.
  599. When earlier element of chain fail, it will be handled by
  600. :py:meth:`backup_do`.
  601. The chain is:
  602. :py:meth:`_wrap_and_send_files` -> :py:class:`SendWorker` -> vmproc
  603. '''
  604. try:
  605. yield from future
  606. except: # pylint: disable=bare-except
  607. previous_task.cancel()
  608. @asyncio.coroutine
  609. def backup_do(self):
  610. # pylint: disable=too-many-statements
  611. if self.passphrase is None:
  612. raise qubes.exc.QubesException("No passphrase set")
  613. if not isinstance(self.passphrase, bytes):
  614. self.passphrase = self.passphrase.encode('utf-8')
  615. qubes_xml = self.app.store
  616. self.tmpdir = tempfile.mkdtemp()
  617. shutil.copy(qubes_xml, os.path.join(self.tmpdir, 'qubes.xml'))
  618. qubes_xml = os.path.join(self.tmpdir, 'qubes.xml')
  619. backup_app = qubes.Qubes(qubes_xml, offline_mode=True)
  620. backup_app.events_enabled = False
  621. files_to_backup = self._files_to_backup
  622. # make sure backup_content isn't set initially
  623. for vm in backup_app.domains:
  624. vm.events_enabled = False
  625. vm.features['backup-content'] = False
  626. for qid, vm_info in files_to_backup.items():
  627. # VM is included in the backup
  628. backup_app.domains[qid].features['backup-content'] = True
  629. backup_app.domains[qid].features['backup-path'] = vm_info.subdir
  630. backup_app.domains[qid].features['backup-size'] = vm_info.size
  631. backup_app.save()
  632. del backup_app
  633. vmproc = None
  634. if self.target_vm is not None:
  635. # Prepare the backup target (Qubes service call)
  636. # If APPVM, STDOUT is a PIPE
  637. read_fd, write_fd = os.pipe()
  638. vmproc = yield from self.target_vm.run_service('qubes.Backup',
  639. stdin=read_fd,
  640. stderr=subprocess.PIPE,
  641. stdout=subprocess.DEVNULL)
  642. os.close(read_fd)
  643. os.write(write_fd, (self.target_dir.
  644. replace("\r", "").replace("\n", "") + "\n").encode())
  645. backup_stdout = write_fd
  646. else:
  647. # Prepare the backup target (local file)
  648. if os.path.isdir(self.target_dir):
  649. backup_target = self.target_dir + "/qubes-{0}". \
  650. format(time.strftime("%Y-%m-%dT%H%M%S"))
  651. else:
  652. backup_target = self.target_dir
  653. # Create the target directory
  654. if not os.path.exists(os.path.dirname(self.target_dir)):
  655. raise qubes.exc.QubesException(
  656. "ERROR: the backup directory for {0} does not exists".
  657. format(self.target_dir))
  658. # If not APPVM, STDOUT is a local file
  659. backup_stdout = open(backup_target, 'wb')
  660. # Tar with tape length does not deals well with stdout
  661. # (close stdout between two tapes)
  662. # For this reason, we will use named pipes instead
  663. self.log.debug("Working in {}".format(self.tmpdir))
  664. self.log.debug("Will backup: {}".format(files_to_backup))
  665. header_files = yield from self._prepare_backup_header()
  666. # Setup worker to send encrypted data chunks to the backup_target
  667. to_send = asyncio.Queue(10)
  668. send_proc = SendWorker(to_send, self.tmpdir, backup_stdout)
  669. send_task = asyncio.ensure_future(send_proc.run())
  670. vmproc_task = None
  671. if vmproc is not None:
  672. vmproc_task = asyncio.ensure_future(
  673. self._monitor_process(vmproc,
  674. 'Writing backup to VM {} failed'.format(
  675. self.target_vm.name)))
  676. asyncio.ensure_future(self._cancel_on_error(
  677. vmproc_task, send_task))
  678. for file_name in header_files:
  679. yield from to_send.put(file_name)
  680. qubes_xml_info = self.VMToBackup(
  681. None,
  682. [self.FileToBackup(qubes_xml, '')],
  683. ''
  684. )
  685. inner_archive_task = asyncio.ensure_future(
  686. self._wrap_and_send_files(
  687. itertools.chain([qubes_xml_info], files_to_backup.values()),
  688. to_send
  689. ))
  690. asyncio.ensure_future(
  691. self._cancel_on_error(send_task, inner_archive_task))
  692. try:
  693. try:
  694. yield from inner_archive_task
  695. except:
  696. yield from to_send.put(QUEUE_ERROR)
  697. # in fact we may be handling CancelledError, induced by
  698. # exception in send_task or vmproc_task (and propagated by
  699. # self._cancel_on_error call above); in such a case this
  700. # yield from will raise exception, covering CancelledError -
  701. # this is intended behaviour
  702. if vmproc_task:
  703. yield from vmproc_task
  704. yield from send_task
  705. raise
  706. yield from send_task
  707. finally:
  708. if isinstance(backup_stdout, int):
  709. os.close(backup_stdout)
  710. else:
  711. backup_stdout.close()
  712. try:
  713. if vmproc_task:
  714. yield from vmproc_task
  715. finally:
  716. shutil.rmtree(self.tmpdir)
  717. # Save date of last backup, only when backup succeeded
  718. for qid, vm_info in files_to_backup.items():
  719. if vm_info.vm:
  720. vm_info.vm.backup_timestamp = \
  721. int(datetime.datetime.now().strftime('%s'))
  722. self.app.save()
  723. @asyncio.coroutine
  724. def handle_streams(stream_in, stream_out, size_limit=None,
  725. progress_callback=None):
  726. '''
  727. Copy stream_in to all streams_out and monitor all mentioned processes.
  728. If any of them terminate with non-zero code, interrupt the process. Copy
  729. at most `size_limit` data (if given).
  730. :param stream_in: StreamReader object to read data from
  731. :param stream_out: StreamWriter object to write data to
  732. :param size_limit: int maximum data amount to process
  733. :param progress_callback: callable function to report progress, will be
  734. given copied data size (it should accumulate internally)
  735. :return: "size_limit" or None (no error)
  736. '''
  737. buffer_size = 409600
  738. bytes_copied = 0
  739. while True:
  740. if size_limit:
  741. to_copy = min(buffer_size, size_limit - bytes_copied)
  742. if to_copy <= 0:
  743. return "size_limit"
  744. else:
  745. to_copy = buffer_size
  746. buf = yield from stream_in.read(to_copy)
  747. if not buf:
  748. # done
  749. break
  750. if callable(progress_callback):
  751. progress_callback(len(buf))
  752. stream_out.write(buf)
  753. bytes_copied += len(buf)
  754. return None
  755. # vim:sw=4:et: