backup.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905
  1. #
  2. # The Qubes OS Project, http://www.qubes-os.org
  3. #
  4. # Copyright (C) 2013-2017 Marek Marczykowski-Górecki
  5. # <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2013 Olivier Médoc <o_medoc@yahoo.fr>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. #
  22. import asyncio
  23. import datetime
  24. import fcntl
  25. import functools
  26. import grp
  27. import itertools
  28. import logging
  29. import os
  30. import pwd
  31. import re
  32. import shutil
  33. import stat
  34. import string
  35. import subprocess
  36. import tempfile
  37. import termios
  38. import time
  39. from .utils import size_to_human
  40. import qubes
  41. import qubes.storage
  42. import qubes.storage.file
  43. import qubes.vm.templatevm
  44. QUEUE_ERROR = "ERROR"
  45. QUEUE_FINISHED = "FINISHED"
  46. HEADER_FILENAME = 'backup-header'
  47. DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
  48. # 'scrypt' is not exactly HMAC algorithm, but a tool we use to
  49. # integrity-protect the data
  50. DEFAULT_HMAC_ALGORITHM = 'scrypt'
  51. DEFAULT_COMPRESSION_FILTER = 'gzip'
  52. CURRENT_BACKUP_FORMAT_VERSION = '4'
  53. # Maximum size of error message get from process stderr (including VM process)
  54. MAX_STDERR_BYTES = 1024
  55. # header + qubes.xml max size
  56. HEADER_QUBES_XML_MAX_SIZE = 1024 * 1024
  57. # hmac file max size - regardless of backup format version!
  58. HMAC_MAX_SIZE = 4096
  59. BLKSIZE = 512
  60. _re_alphanum = re.compile(r'^[A-Za-z0-9-]*$')
  61. class BackupCanceledError(qubes.exc.QubesException):
  62. def __init__(self, msg, tmpdir=None):
  63. super(BackupCanceledError, self).__init__(msg)
  64. self.tmpdir = tmpdir
  65. class BackupHeader:
  66. '''Structure describing backup-header file included as the first file in
  67. backup archive
  68. '''
  69. # pylint: disable=too-few-public-methods
  70. header_keys = {
  71. 'version': 'version',
  72. 'encrypted': 'encrypted',
  73. 'compressed': 'compressed',
  74. 'compression-filter': 'compression_filter',
  75. 'crypto-algorithm': 'crypto_algorithm',
  76. 'hmac-algorithm': 'hmac_algorithm',
  77. 'backup-id': 'backup_id'
  78. }
  79. bool_options = ['encrypted', 'compressed']
  80. int_options = ['version']
  81. def __init__(self,
  82. version=None,
  83. encrypted=None,
  84. compressed=None,
  85. compression_filter=None,
  86. hmac_algorithm=None,
  87. crypto_algorithm=None,
  88. backup_id=None):
  89. # repeat the list to help code completion...
  90. self.version = version
  91. self.encrypted = encrypted
  92. self.compressed = compressed
  93. # Options introduced in backup format 3+, which always have a header,
  94. # so no need for fallback in function parameter
  95. self.compression_filter = compression_filter
  96. self.hmac_algorithm = hmac_algorithm
  97. self.crypto_algorithm = crypto_algorithm
  98. self.backup_id = backup_id
  99. def save(self, filename):
  100. with open(filename, "w") as f_header:
  101. # make sure 'version' is the first key
  102. f_header.write('version={}\n'.format(self.version))
  103. for key, attr in self.header_keys.items():
  104. if key == 'version':
  105. continue
  106. if getattr(self, attr) is None:
  107. continue
  108. f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
  109. class SendWorker:
  110. # pylint: disable=too-few-public-methods
  111. def __init__(self, queue, base_dir, backup_stdout):
  112. super(SendWorker, self).__init__()
  113. self.queue = queue
  114. self.base_dir = base_dir
  115. self.backup_stdout = backup_stdout
  116. self.log = logging.getLogger('qubes.backup')
  117. @asyncio.coroutine
  118. def run(self):
  119. self.log.debug("Started sending thread")
  120. while True:
  121. filename = yield from self.queue.get()
  122. if filename in (QUEUE_FINISHED, QUEUE_ERROR):
  123. break
  124. self.log.debug("Sending file {}".format(filename))
  125. # This tar used for sending data out need to be as simple, as
  126. # simple, as featureless as possible. It will not be
  127. # verified before untaring.
  128. tar_final_cmd = ["tar", "-cO", "--posix",
  129. "-C", self.base_dir, filename]
  130. # pylint: disable=not-an-iterable
  131. final_proc = yield from asyncio.create_subprocess_exec(
  132. *tar_final_cmd,
  133. stdout=self.backup_stdout)
  134. retcode = yield from final_proc.wait()
  135. if retcode >= 2:
  136. # handle only exit code 2 (tar fatal error) or
  137. # greater (call failed?)
  138. raise qubes.exc.QubesException(
  139. "ERROR: Failed to write the backup, out of disk space? "
  140. "Check console output or ~/.xsession-errors for details.")
  141. # Delete the file as we don't need it anymore
  142. self.log.debug("Removing file {}".format(filename))
  143. os.remove(os.path.join(self.base_dir, filename))
  144. self.log.debug("Finished sending thread")
  145. @asyncio.coroutine
  146. def launch_proc_with_pty(args, stdin=None, stdout=None, stderr=None, echo=True):
  147. """Similar to pty.fork, but handle stdin/stdout according to parameters
  148. instead of connecting to the pty
  149. :return tuple (subprocess.Popen, pty_master)
  150. """
  151. def set_ctty(ctty_fd, master_fd):
  152. os.setsid()
  153. os.close(master_fd)
  154. fcntl.ioctl(ctty_fd, termios.TIOCSCTTY, 0)
  155. if not echo:
  156. termios_p = termios.tcgetattr(ctty_fd)
  157. # termios_p.c_lflags
  158. termios_p[3] &= ~termios.ECHO
  159. termios.tcsetattr(ctty_fd, termios.TCSANOW, termios_p)
  160. (pty_master, pty_slave) = os.openpty()
  161. # pylint: disable=not-an-iterable
  162. p = yield from asyncio.create_subprocess_exec(*args,
  163. stdin=stdin,
  164. stdout=stdout,
  165. stderr=stderr,
  166. preexec_fn=lambda: set_ctty(pty_slave, pty_master))
  167. os.close(pty_slave)
  168. return p, open(pty_master, 'wb+', buffering=0)
  169. @asyncio.coroutine
  170. def launch_scrypt(action, input_name, output_name, passphrase):
  171. '''
  172. Launch 'scrypt' process, pass passphrase to it and return
  173. subprocess.Popen object.
  174. :param action: 'enc' or 'dec'
  175. :param input_name: input path or '-' for stdin
  176. :param output_name: output path or '-' for stdout
  177. :param passphrase: passphrase
  178. :type passphrase: bytes
  179. :return: subprocess.Popen object
  180. '''
  181. command_line = ['scrypt', action, input_name, output_name]
  182. (p, pty) = yield from launch_proc_with_pty(command_line,
  183. stdin=subprocess.PIPE if input_name == '-' else None,
  184. stdout=subprocess.PIPE if output_name == '-' else None,
  185. stderr=subprocess.PIPE,
  186. echo=False)
  187. if action == 'enc':
  188. prompts = (b'Please enter passphrase: ', b'Please confirm passphrase: ')
  189. else:
  190. prompts = (b'Please enter passphrase: ',)
  191. for prompt in prompts:
  192. actual_prompt = yield from p.stderr.read(len(prompt))
  193. if actual_prompt != prompt:
  194. raise qubes.exc.QubesException(
  195. 'Unexpected prompt from scrypt: {}'.format(actual_prompt))
  196. pty.write(passphrase + b'\n')
  197. pty.flush()
  198. # save it here, so garbage collector would not close it (which would kill
  199. # the child)
  200. p.pty = pty
  201. return p
  202. class Backup:
  203. '''Backup operation manager. Usage:
  204. >>> app = qubes.Qubes()
  205. >>> # optional - you can use 'None' to use default list (based on
  206. >>> # vm.include_in_backups property)
  207. >>> vms = [app.domains[name] for name in ['my-vm1', 'my-vm2', 'my-vm3']]
  208. >>> exclude_vms = []
  209. >>> options = {
  210. >>> 'encrypted': True,
  211. >>> 'compressed': True,
  212. >>> 'passphrase': 'This is very weak backup passphrase',
  213. >>> 'target_vm': app.domains['sys-usb'],
  214. >>> 'target_dir': '/media/disk',
  215. >>> }
  216. >>> backup_op = Backup(app, vms, exclude_vms, **options)
  217. >>> print(backup_op.get_backup_summary())
  218. >>> asyncio.get_event_loop().run_until_complete(backup_op.backup_do())
  219. See attributes of this object for all available options.
  220. '''
  221. # pylint: disable=too-many-instance-attributes
  222. class FileToBackup:
  223. # pylint: disable=too-few-public-methods
  224. def __init__(self, file_path_or_func, subdir=None, name=None, size=None,
  225. cleanup_func=None):
  226. """Store a single file to backup
  227. :param file_path_or_func: path to the file or a function
  228. returning one; in case of function, it can be a coroutine;
  229. if a function is given, *name*, *subdir* and *size* needs to be
  230. given too
  231. :param subdir: directory in a backup archive to place file in
  232. :param name: name of the file in the backup archive
  233. :param size: size
  234. :param cleanup_func: function to call after processing the file;
  235. the function will get the file path as an argument
  236. """
  237. if callable(file_path_or_func):
  238. assert subdir is not None \
  239. and name is not None \
  240. and size is not None
  241. if size is None:
  242. size = qubes.storage.file.get_disk_usage(file_path_or_func)
  243. if subdir is None:
  244. abs_file_path = os.path.abspath(file_path_or_func)
  245. abs_base_dir = os.path.abspath(
  246. qubes.config.system_path["qubes_base_dir"]) + '/'
  247. abs_file_dir = os.path.dirname(abs_file_path) + '/'
  248. (nothing, directory, subdir) = \
  249. abs_file_dir.partition(abs_base_dir)
  250. assert nothing == ""
  251. assert directory == abs_base_dir
  252. else:
  253. if subdir and not subdir.endswith('/'):
  254. subdir += '/'
  255. if name is None:
  256. name = os.path.basename(file_path_or_func)
  257. #: real path to the file (or callable to get one)
  258. self.path = file_path_or_func
  259. #: size of the file
  260. self.size = size
  261. #: directory in backup archive where file should be placed
  262. self.subdir = subdir
  263. #: use this name in the archive (aka rename)
  264. self.name = name
  265. #: function to call after processing the file
  266. self.cleanup_func = cleanup_func
  267. class VMToBackup:
  268. # pylint: disable=too-few-public-methods
  269. def __init__(self, vm, files, subdir):
  270. self.vm = vm
  271. self.files = files
  272. self.subdir = subdir
  273. @property
  274. def size(self):
  275. return functools.reduce(lambda x, y: x + y.size, self.files, 0)
  276. def __init__(self, app, vms_list=None, exclude_list=None, **kwargs):
  277. """
  278. If vms = None, use default list based on vm.include_in_backups property;
  279. exclude_list is always applied
  280. """
  281. super(Backup, self).__init__()
  282. #: progress of the backup - bytes handled of the current VM
  283. self.chunk_size = 100 * 1024 * 1024
  284. self._current_vm_bytes = 0
  285. #: progress of the backup - bytes handled of finished VMs
  286. self._done_vms_bytes = 0
  287. #: total backup size (set by :py:meth:`get_files_to_backup`)
  288. self.total_backup_bytes = 0
  289. #: application object
  290. self.app = app
  291. #: directory for temporary files - set after creating the directory
  292. self.tmpdir = None
  293. # Backup settings - defaults
  294. #: should the backup be compressed?
  295. self.compressed = True
  296. #: what passphrase should be used to intergrity protect (and encrypt)
  297. #: the backup; required
  298. self.passphrase = None
  299. #: custom compression filter; a program which process stdin to stdout
  300. self.compression_filter = DEFAULT_COMPRESSION_FILTER
  301. #: VM to which backup should be sent (if any)
  302. self.target_vm = None
  303. #: directory to save backup in (either in dom0 or target VM,
  304. #: depending on :py:attr:`target_vm`
  305. self.target_dir = None
  306. #: callback for progress reporting. Will be called with one argument
  307. #: - progress in percents
  308. self.progress_callback = None
  309. self.last_progress_time = time.time()
  310. #: backup ID, needs to be unique (for a given user),
  311. #: not necessary unpredictable; automatically generated
  312. self.backup_id = datetime.datetime.now().strftime(
  313. '%Y%m%dT%H%M%S-' + str(os.getpid()))
  314. for key, value in kwargs.items():
  315. if hasattr(self, key):
  316. setattr(self, key, value)
  317. else:
  318. raise AttributeError(key)
  319. self.log = logging.getLogger('qubes.backup')
  320. if exclude_list is None:
  321. exclude_list = []
  322. if vms_list is None:
  323. vms_list = [vm for vm in app.domains if vm.include_in_backups]
  324. # Apply exclude list
  325. self.vms_for_backup = [vm for vm in vms_list
  326. if vm.name not in exclude_list]
  327. self._files_to_backup = self.get_files_to_backup()
  328. def __del__(self):
  329. if self.tmpdir and os.path.exists(self.tmpdir):
  330. shutil.rmtree(self.tmpdir)
  331. def get_files_to_backup(self):
  332. files_to_backup = {}
  333. for vm in self.vms_for_backup:
  334. if vm.qid == 0:
  335. # handle dom0 later
  336. continue
  337. subdir = 'vm%d/' % vm.qid
  338. vm_files = []
  339. for name, volume in vm.volumes.items():
  340. if not volume.save_on_stop:
  341. continue
  342. vm_files.append(self.FileToBackup(
  343. volume.export,
  344. subdir,
  345. name + '.img',
  346. volume.usage,
  347. cleanup_func=volume.export_end))
  348. vm_files.extend(self.FileToBackup(i, subdir)
  349. for i in vm.fire_event('backup-get-files'))
  350. firewall_conf = os.path.join(vm.dir_path, vm.firewall_conf)
  351. if os.path.exists(firewall_conf):
  352. vm_files.append(self.FileToBackup(firewall_conf, subdir))
  353. if not vm_files:
  354. # subdir/ is needed in the tar file, otherwise restore
  355. # of a (Disp)VM without any backed up files is going
  356. # to fail. Adding a zero-sized file here happens to be
  357. # more straightforward than adding an empty directory.
  358. empty = self.FileToBackup("/var/run/qubes/empty", subdir)
  359. assert empty.size == 0
  360. vm_files.append(empty)
  361. files_to_backup[vm.qid] = self.VMToBackup(vm, vm_files, subdir)
  362. # Dom0 user home
  363. if 0 in [vm.qid for vm in self.vms_for_backup]:
  364. local_user = grp.getgrnam('qubes').gr_mem[0]
  365. home_dir = pwd.getpwnam(local_user).pw_dir
  366. # Home dir should have only user-owned files, so fix it now
  367. # to prevent permissions problems - some root-owned files can
  368. # left after 'sudo bash' and similar commands
  369. subprocess.check_call(['sudo', 'chown', '-R', local_user, home_dir])
  370. home_to_backup = [
  371. self.FileToBackup(home_dir, 'dom0-home/')]
  372. vm_files = home_to_backup
  373. files_to_backup[0] = self.VMToBackup(self.app.domains[0],
  374. vm_files,
  375. os.path.join('dom0-home', os.path.basename(home_dir)))
  376. self.total_backup_bytes = functools.reduce(
  377. lambda x, y: x + y.size, files_to_backup.values(), 0)
  378. return files_to_backup
  379. def get_backup_summary(self):
  380. summary = ""
  381. fields_to_display = [
  382. {"name": "VM", "width": 16},
  383. {"name": "type", "width": 12},
  384. {"name": "size", "width": 12}
  385. ]
  386. # Display the header
  387. for field in fields_to_display:
  388. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  389. summary += fmt.format('-')
  390. summary += "\n"
  391. for field in fields_to_display:
  392. fmt = "{{0:>{0}}} |".format(field["width"] + 1)
  393. summary += fmt.format(field["name"])
  394. summary += "\n"
  395. for field in fields_to_display:
  396. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  397. summary += fmt.format('-')
  398. summary += "\n"
  399. files_to_backup = self._files_to_backup
  400. for qid, vm_info in files_to_backup.items():
  401. summary_line = ""
  402. fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
  403. summary_line += fmt.format(vm_info.vm.name)
  404. fmt = "{{0:>{0}}} |".format(fields_to_display[1]["width"] + 1)
  405. if qid == 0:
  406. summary_line += fmt.format("User home")
  407. elif isinstance(vm_info.vm, qubes.vm.templatevm.TemplateVM):
  408. summary_line += fmt.format("Template VM")
  409. else:
  410. summary_line += fmt.format("VM" + (" + Sys" if
  411. vm_info.vm.updateable else ""))
  412. vm_size = vm_info.size
  413. fmt = "{{0:>{0}}} |".format(fields_to_display[2]["width"] + 1)
  414. summary_line += fmt.format(size_to_human(vm_size))
  415. if qid != 0 and vm_info.vm.is_running():
  416. summary_line += " <-- The VM is running, backup will contain " \
  417. "its state from before its start!"
  418. summary += summary_line + "\n"
  419. for field in fields_to_display:
  420. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  421. summary += fmt.format('-')
  422. summary += "\n"
  423. fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
  424. summary += fmt.format("Total size:")
  425. fmt = "{{0:>{0}}} |".format(
  426. fields_to_display[1]["width"] + 1 + 2 + fields_to_display[2][
  427. "width"] + 1)
  428. summary += fmt.format(size_to_human(self.total_backup_bytes))
  429. summary += "\n"
  430. for field in fields_to_display:
  431. fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
  432. summary += fmt.format('-')
  433. summary += "\n"
  434. vms_not_for_backup = [vm.name for vm in self.app.domains
  435. if vm not in self.vms_for_backup]
  436. summary += "VMs not selected for backup:\n - " + "\n - ".join(
  437. sorted(vms_not_for_backup)) + "\n"
  438. return summary
  439. @asyncio.coroutine
  440. def _prepare_backup_header(self):
  441. header_file_path = os.path.join(self.tmpdir, HEADER_FILENAME)
  442. backup_header = BackupHeader(
  443. version=CURRENT_BACKUP_FORMAT_VERSION,
  444. hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
  445. encrypted=True,
  446. compressed=self.compressed,
  447. compression_filter=self.compression_filter,
  448. backup_id=self.backup_id,
  449. )
  450. backup_header.save(header_file_path)
  451. # Start encrypt, scrypt will also handle integrity
  452. # protection
  453. scrypt_passphrase = '{filename}!'.format(
  454. filename=HEADER_FILENAME).encode() + self.passphrase
  455. scrypt = yield from launch_scrypt(
  456. 'enc', header_file_path, header_file_path + '.hmac',
  457. scrypt_passphrase)
  458. retcode = yield from scrypt.wait()
  459. if retcode:
  460. raise qubes.exc.QubesException(
  461. "Failed to compute hmac of header file: "
  462. + scrypt.stderr.read())
  463. return HEADER_FILENAME, HEADER_FILENAME + ".hmac"
  464. def _send_progress_update(self):
  465. if not self.total_backup_bytes:
  466. return
  467. if callable(self.progress_callback):
  468. if time.time() - self.last_progress_time >= 1: # avoid flooding
  469. progress = (
  470. 100 * (self._done_vms_bytes + self._current_vm_bytes) /
  471. self.total_backup_bytes)
  472. self.last_progress_time = time.time()
  473. # pylint: disable=not-callable
  474. self.progress_callback(progress)
  475. def _add_vm_progress(self, bytes_done):
  476. self._current_vm_bytes += bytes_done
  477. self._send_progress_update()
  478. @asyncio.coroutine
  479. def _split_and_send(self, input_stream, file_basename,
  480. output_queue):
  481. '''Split *input_stream* into parts of max *chunk_size* bytes and send
  482. to *output_queue*.
  483. :param input_stream: stream (asyncio reader stream) of data to split
  484. :param file_basename: basename (i.e. without part number and '.enc')
  485. of output files
  486. :param output_queue: asyncio.Queue instance to put produced files to
  487. - queue will get only filenames of written chunks
  488. '''
  489. # Wait for compressor (tar) process to finish or for any
  490. # error of other subprocesses
  491. i = 0
  492. run_error = "size_limit"
  493. scrypt = None
  494. while run_error == "size_limit":
  495. # Prepare a first chunk
  496. chunkfile = file_basename + ".%03d.enc" % i
  497. i += 1
  498. # Start encrypt, scrypt will also handle integrity
  499. # protection
  500. scrypt_passphrase = \
  501. '{backup_id}!{filename}!'.format(
  502. backup_id=self.backup_id,
  503. filename=os.path.relpath(chunkfile[:-4],
  504. self.tmpdir)).encode() + self.passphrase
  505. try:
  506. scrypt = yield from launch_scrypt(
  507. "enc", "-", chunkfile, scrypt_passphrase)
  508. run_error = yield from handle_streams(
  509. input_stream,
  510. scrypt.stdin,
  511. self.chunk_size,
  512. self._add_vm_progress
  513. )
  514. self.log.debug(
  515. "handle_streams returned: {}".format(run_error))
  516. except:
  517. scrypt.terminate()
  518. raise
  519. scrypt.stdin.close()
  520. yield from scrypt.wait()
  521. self.log.debug("scrypt return code: {}".format(
  522. scrypt.returncode))
  523. # Send the chunk to the backup target
  524. yield from output_queue.put(
  525. os.path.relpath(chunkfile, self.tmpdir))
  526. @asyncio.coroutine
  527. def _wrap_and_send_files(self, files_to_backup, output_queue):
  528. for vm_info in files_to_backup:
  529. for file_info in vm_info.files:
  530. self.log.debug("Backing up {}".format(file_info))
  531. backup_tempfile = os.path.join(
  532. self.tmpdir, file_info.subdir,
  533. file_info.name)
  534. self.log.debug("Using temporary location: {}".format(
  535. backup_tempfile))
  536. # Ensure the temporary directory exists
  537. if not os.path.isdir(os.path.dirname(backup_tempfile)):
  538. os.makedirs(os.path.dirname(backup_tempfile))
  539. # The first tar cmd can use any complex feature as we want.
  540. # Files will be verified before untaring this.
  541. # Prefix the path in archive with filename["subdir"] to have it
  542. # verified during untar
  543. path = file_info.path
  544. if callable(path):
  545. path = yield from qubes.utils.coro_maybe(path())
  546. tar_cmdline = (["tar", "-Pc", '--sparse',
  547. '-C', os.path.dirname(path)] +
  548. (['--dereference'] if
  549. file_info.subdir != "dom0-home/" else []) +
  550. ['--xform=s:^%s:%s\\0:' % (
  551. os.path.basename(path),
  552. file_info.subdir),
  553. os.path.basename(path)
  554. ])
  555. file_stat = os.stat(path)
  556. if stat.S_ISBLK(file_stat.st_mode) or \
  557. file_info.name != os.path.basename(path):
  558. # tar doesn't handle content of block device, use our
  559. # writer
  560. # also use our tar writer when renaming file
  561. assert not stat.S_ISDIR(file_stat.st_mode), \
  562. "Renaming directories not supported"
  563. tar_cmdline = ['python3', '-m', 'qubes.tarwriter',
  564. '--override-name=%s' % (
  565. os.path.join(file_info.subdir, os.path.basename(
  566. file_info.name))),
  567. path]
  568. if self.compressed:
  569. tar_cmdline.insert(-2,
  570. "--use-compress-program=%s" % self.compression_filter)
  571. self.log.debug(" ".join(tar_cmdline))
  572. # Pipe: tar-sparse | scrypt | tar | backup_target
  573. # TODO: log handle stderr
  574. # pylint: disable=not-an-iterable
  575. tar_sparse = yield from asyncio.create_subprocess_exec(
  576. *tar_cmdline, stdout=subprocess.PIPE)
  577. try:
  578. yield from self._split_and_send(
  579. tar_sparse.stdout,
  580. backup_tempfile,
  581. output_queue)
  582. except:
  583. try:
  584. tar_sparse.terminate()
  585. except ProcessLookupError:
  586. pass
  587. raise
  588. finally:
  589. if file_info.cleanup_func is not None:
  590. yield from qubes.utils.coro_maybe(
  591. file_info.cleanup_func(path))
  592. yield from tar_sparse.wait()
  593. if tar_sparse.returncode:
  594. raise qubes.exc.QubesException(
  595. 'Failed to archive {} file'.format(file_info.path))
  596. # This VM done, update progress
  597. self._done_vms_bytes += vm_info.size
  598. self._current_vm_bytes = 0
  599. self._send_progress_update()
  600. yield from output_queue.put(QUEUE_FINISHED)
  601. @staticmethod
  602. @asyncio.coroutine
  603. def _monitor_process(proc, error_message):
  604. try:
  605. yield from proc.wait()
  606. except:
  607. proc.terminate()
  608. raise
  609. if proc.returncode:
  610. if proc.stderr is not None:
  611. proc_stderr = (yield from proc.stderr.read())
  612. proc_stderr = proc_stderr.decode('ascii', errors='ignore')
  613. proc_stderr = ''.join(
  614. c for c in proc_stderr if c in string.printable and
  615. c not in '\r\n%{}')
  616. error_message += ': ' + proc_stderr
  617. raise qubes.exc.QubesException(error_message)
  618. @staticmethod
  619. @asyncio.coroutine
  620. def _cancel_on_error(future, previous_task):
  621. '''If further element of chain fail, cancel previous one to
  622. avoid deadlock.
  623. When earlier element of chain fail, it will be handled by
  624. :py:meth:`backup_do`.
  625. The chain is:
  626. :py:meth:`_wrap_and_send_files` -> :py:class:`SendWorker` -> vmproc
  627. '''
  628. try:
  629. yield from future
  630. except: # pylint: disable=bare-except
  631. previous_task.cancel()
  632. @asyncio.coroutine
  633. def backup_do(self):
  634. # pylint: disable=too-many-statements
  635. if self.passphrase is None:
  636. raise qubes.exc.QubesException("No passphrase set")
  637. if not isinstance(self.passphrase, bytes):
  638. self.passphrase = self.passphrase.encode('utf-8')
  639. qubes_xml = self.app.store
  640. self.tmpdir = tempfile.mkdtemp()
  641. shutil.copy(qubes_xml, os.path.join(self.tmpdir, 'qubes.xml'))
  642. qubes_xml = os.path.join(self.tmpdir, 'qubes.xml')
  643. backup_app = qubes.Qubes(qubes_xml, offline_mode=True)
  644. backup_app.events_enabled = False
  645. files_to_backup = self._files_to_backup
  646. # make sure backup_content isn't set initially
  647. for vm in backup_app.domains:
  648. vm.events_enabled = False
  649. vm.features['backup-content'] = False
  650. for qid, vm_info in files_to_backup.items():
  651. # VM is included in the backup
  652. backup_app.domains[qid].features['backup-content'] = True
  653. backup_app.domains[qid].features['backup-path'] = vm_info.subdir
  654. backup_app.domains[qid].features['backup-size'] = vm_info.size
  655. backup_app.save()
  656. del backup_app
  657. vmproc = None
  658. if self.target_vm is not None:
  659. # Prepare the backup target (Qubes service call)
  660. # If APPVM, STDOUT is a PIPE
  661. read_fd, write_fd = os.pipe()
  662. vmproc = yield from self.target_vm.run_service('qubes.Backup',
  663. stdin=read_fd,
  664. stderr=subprocess.PIPE,
  665. stdout=subprocess.DEVNULL)
  666. os.close(read_fd)
  667. os.write(write_fd, (self.target_dir.
  668. replace("\r", "").replace("\n", "") + "\n").encode())
  669. backup_stdout = write_fd
  670. else:
  671. # Prepare the backup target (local file)
  672. if os.path.isdir(self.target_dir):
  673. backup_target = self.target_dir + "/qubes-{0}". \
  674. format(time.strftime("%Y-%m-%dT%H%M%S"))
  675. else:
  676. backup_target = self.target_dir
  677. # Create the target directory
  678. if not os.path.exists(os.path.dirname(self.target_dir)):
  679. raise qubes.exc.QubesException(
  680. "ERROR: the backup directory for {0} does not exists".
  681. format(self.target_dir))
  682. # If not APPVM, STDOUT is a local file
  683. backup_stdout = open(backup_target, 'wb')
  684. # Tar with tape length does not deals well with stdout
  685. # (close stdout between two tapes)
  686. # For this reason, we will use named pipes instead
  687. self.log.debug("Working in {}".format(self.tmpdir))
  688. self.log.debug("Will backup: {}".format(files_to_backup))
  689. header_files = yield from self._prepare_backup_header()
  690. # Setup worker to send encrypted data chunks to the backup_target
  691. to_send = asyncio.Queue(10)
  692. send_proc = SendWorker(to_send, self.tmpdir, backup_stdout)
  693. send_task = asyncio.ensure_future(send_proc.run())
  694. vmproc_task = None
  695. if vmproc is not None:
  696. vmproc_task = asyncio.ensure_future(
  697. self._monitor_process(vmproc,
  698. 'Writing backup to VM {} failed'.format(
  699. self.target_vm.name)))
  700. asyncio.ensure_future(self._cancel_on_error(
  701. vmproc_task, send_task))
  702. for file_name in header_files:
  703. yield from to_send.put(file_name)
  704. qubes_xml_info = self.VMToBackup(
  705. None,
  706. [self.FileToBackup(qubes_xml, '')],
  707. ''
  708. )
  709. inner_archive_task = asyncio.ensure_future(
  710. self._wrap_and_send_files(
  711. itertools.chain([qubes_xml_info], files_to_backup.values()),
  712. to_send
  713. ))
  714. asyncio.ensure_future(
  715. self._cancel_on_error(send_task, inner_archive_task))
  716. try:
  717. try:
  718. yield from inner_archive_task
  719. except:
  720. yield from to_send.put(QUEUE_ERROR)
  721. # in fact we may be handling CancelledError, induced by
  722. # exception in send_task or vmproc_task (and propagated by
  723. # self._cancel_on_error call above); in such a case this
  724. # yield from will raise exception, covering CancelledError -
  725. # this is intended behaviour
  726. if vmproc_task:
  727. yield from vmproc_task
  728. yield from send_task
  729. raise
  730. yield from send_task
  731. finally:
  732. if isinstance(backup_stdout, int):
  733. os.close(backup_stdout)
  734. else:
  735. backup_stdout.close()
  736. try:
  737. if vmproc_task:
  738. yield from vmproc_task
  739. finally:
  740. shutil.rmtree(self.tmpdir)
  741. # Save date of last backup, only when backup succeeded
  742. for qid, vm_info in files_to_backup.items():
  743. if vm_info.vm:
  744. vm_info.vm.backup_timestamp = \
  745. int(datetime.datetime.now().strftime('%s'))
  746. self.app.save()
  747. @asyncio.coroutine
  748. def handle_streams(stream_in, stream_out, size_limit=None,
  749. progress_callback=None):
  750. '''
  751. Copy stream_in to all streams_out and monitor all mentioned processes.
  752. If any of them terminate with non-zero code, interrupt the process. Copy
  753. at most `size_limit` data (if given).
  754. :param stream_in: StreamReader object to read data from
  755. :param stream_out: StreamWriter object to write data to
  756. :param size_limit: int maximum data amount to process
  757. :param progress_callback: callable function to report progress, will be
  758. given copied data size (it should accumulate internally)
  759. :return: "size_limit" or None (no error)
  760. '''
  761. buffer_size = 409600
  762. bytes_copied = 0
  763. while True:
  764. if size_limit:
  765. to_copy = min(buffer_size, size_limit - bytes_copied)
  766. if to_copy <= 0:
  767. return "size_limit"
  768. else:
  769. to_copy = buffer_size
  770. buf = yield from stream_in.read(to_copy)
  771. if not buf:
  772. # done
  773. break
  774. if callable(progress_callback):
  775. progress_callback(len(buf))
  776. stream_out.write(buf)
  777. bytes_copied += len(buf)
  778. return None
  779. # vim:sw=4:et: