__init__.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. # encoding=utf-8
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. '''Qubes' command line tools
  21. '''
  22. from __future__ import print_function
  23. import argparse
  24. import importlib
  25. import logging
  26. import os
  27. import subprocess
  28. import sys
  29. import textwrap
  30. import qubesadmin.log
  31. import qubesadmin.exc
  32. import qubesadmin.vm
  33. #: constant returned when some action should be performed on all qubes
  34. VM_ALL = object()
  35. class QubesAction(argparse.Action):
  36. ''' Interface providing a convinience method to be called, after
  37. `namespace.app` is instantiated.
  38. '''
  39. # pylint: disable=too-few-public-methods
  40. def parse_qubes_app(self, parser, namespace):
  41. ''' This method is called by :py:class:`qubes.tools.QubesArgumentParser`
  42. after the `namespace.app` is instantiated. Oerwrite this method when
  43. extending :py:class:`qubes.tools.QubesAction` to initialized values
  44. based on the `namespace.app`
  45. '''
  46. raise NotImplementedError
  47. class PropertyAction(argparse.Action):
  48. '''Action for argument parser that stores a property.'''
  49. # pylint: disable=redefined-builtin,too-few-public-methods
  50. def __init__(self,
  51. option_strings,
  52. dest,
  53. metavar='NAME=VALUE',
  54. required=False,
  55. help='set property to a value'):
  56. super(PropertyAction, self).__init__(option_strings, 'properties',
  57. metavar=metavar, default={}, help=help)
  58. def __call__(self, parser, namespace, values, option_string=None):
  59. try:
  60. prop, value = values.split('=', 1)
  61. except ValueError:
  62. parser.error('invalid property token: {!r}'.format(values))
  63. properties = getattr(namespace, self.dest)
  64. # copy it, to not modify _mutable_ self.default
  65. if not properties:
  66. properties = properties.copy()
  67. properties[prop] = value
  68. setattr(namespace, self.dest, properties)
  69. class SinglePropertyAction(argparse.Action):
  70. '''Action for argument parser that stores a property.'''
  71. # pylint: disable=redefined-builtin,too-few-public-methods
  72. def __init__(self,
  73. option_strings,
  74. dest,
  75. metavar='VALUE',
  76. const=None,
  77. nargs=None,
  78. required=False,
  79. help=None):
  80. if help is None:
  81. help = 'set {!r} property to a value'.format(dest)
  82. if const is not None:
  83. help += ' {!r}'.format(const)
  84. if const is not None:
  85. nargs = 0
  86. super(SinglePropertyAction, self).__init__(option_strings, 'properties',
  87. metavar=metavar, help=help, default={}, const=const,
  88. nargs=nargs)
  89. self.name = dest
  90. def __call__(self, parser, namespace, values, option_string=None):
  91. if values is self.default and self.default == {}:
  92. return
  93. properties = getattr(namespace, self.dest)
  94. # copy it, to not modify _mutable_ self.default
  95. if not properties:
  96. properties = properties.copy()
  97. properties[self.name] = values \
  98. if self.const is None else self.const
  99. setattr(namespace, self.dest, properties)
  100. class VmNameAction(QubesAction):
  101. ''' Action for parsing one ore multiple domains from provided VMNAMEs '''
  102. # pylint: disable=too-few-public-methods,redefined-builtin
  103. def __init__(self, option_strings, nargs=1, dest='vmnames', help=None,
  104. **kwargs):
  105. if help is None:
  106. if nargs == argparse.OPTIONAL:
  107. help = 'at most one domain name'
  108. elif nargs == 1:
  109. help = 'a domain name'
  110. elif nargs == argparse.ZERO_OR_MORE:
  111. help = 'zero or more domain names'
  112. elif nargs == argparse.ONE_OR_MORE:
  113. help = 'one or more domain names'
  114. elif nargs > 1:
  115. help = '%s domain names' % nargs
  116. else:
  117. raise argparse.ArgumentError(
  118. nargs, "Passed unexpected value {!s} as {!s} nargs ".format(
  119. nargs, dest))
  120. super(VmNameAction, self).__init__(option_strings, dest=dest, help=help,
  121. nargs=nargs, **kwargs)
  122. def __call__(self, parser, namespace, values, option_string=None):
  123. ''' Set ``namespace.vmname`` to ``values`` '''
  124. setattr(namespace, self.dest, values)
  125. def parse_qubes_app(self, parser, namespace):
  126. assert hasattr(namespace, 'app')
  127. setattr(namespace, 'domains', [])
  128. app = namespace.app
  129. if hasattr(namespace, 'all_domains') and namespace.all_domains:
  130. namespace.domains = [
  131. vm
  132. for vm in app.domains
  133. if not vm.klass == 'AdminVM' and
  134. vm.name not in namespace.exclude
  135. ]
  136. else:
  137. if hasattr(namespace, 'exclude') and namespace.exclude:
  138. parser.error('--exclude can only be used with --all')
  139. if self.nargs == argparse.OPTIONAL:
  140. vm_name = getattr(namespace, self.dest, None)
  141. if vm_name is not None:
  142. try:
  143. namespace.domains += [app.domains[vm_name]]
  144. except KeyError:
  145. parser.error('no such domain: {!r}'.format(vm_name))
  146. else:
  147. for vm_name in getattr(namespace, self.dest):
  148. try:
  149. namespace.domains += [app.domains[vm_name]]
  150. except KeyError:
  151. parser.error('no such domain: {!r}'.format(vm_name))
  152. class RunningVmNameAction(VmNameAction):
  153. ''' Action for argument parser that gets a running domain from VMNAME '''
  154. # pylint: disable=too-few-public-methods
  155. def __init__(self, option_strings, nargs=1, dest='vmnames', help=None,
  156. **kwargs):
  157. # pylint: disable=redefined-builtin
  158. if help is None:
  159. if nargs == argparse.OPTIONAL:
  160. help = 'at most one running domain'
  161. elif nargs == 1:
  162. help = 'running domain name'
  163. elif nargs == argparse.ZERO_OR_MORE:
  164. help = 'zero or more running domains'
  165. elif nargs == argparse.ONE_OR_MORE:
  166. help = 'one or more running domains'
  167. elif nargs > 1:
  168. help = '%s running domains' % nargs
  169. else:
  170. raise argparse.ArgumentError(
  171. nargs, "Passed unexpected value {!s} as {!s} nargs ".format(
  172. nargs, dest))
  173. super(RunningVmNameAction, self).__init__(
  174. option_strings, dest=dest, help=help, nargs=nargs, **kwargs)
  175. def parse_qubes_app(self, parser, namespace):
  176. super(RunningVmNameAction, self).parse_qubes_app(parser, namespace)
  177. for vm in namespace.domains:
  178. if not vm.is_running():
  179. parser.error_runtime("domain {!r} is not running".format(
  180. vm.name))
  181. class VolumeAction(QubesAction):
  182. ''' Action for argument parser that gets the
  183. :py:class:``qubes.storage.Volume`` from a POOL_NAME:VOLUME_ID string.
  184. '''
  185. # pylint: disable=too-few-public-methods
  186. def __init__(self, help='A pool & volume id combination',
  187. required=True, **kwargs):
  188. # pylint: disable=redefined-builtin
  189. super(VolumeAction, self).__init__(help=help, required=required,
  190. **kwargs)
  191. def __call__(self, parser, namespace, values, option_string=None):
  192. ''' Set ``namespace.vmname`` to ``values`` '''
  193. setattr(namespace, self.dest, values)
  194. def parse_qubes_app(self, parser, namespace):
  195. ''' Acquire the :py:class:``qubes.storage.Volume`` object from
  196. ``namespace.app``.
  197. '''
  198. assert hasattr(namespace, 'app')
  199. app = namespace.app
  200. try:
  201. pool_name, vid = getattr(namespace, self.dest).split(':')
  202. try:
  203. pool = app.pools[pool_name]
  204. volume = [v for v in pool.volumes if v.vid == vid]
  205. assert volume > 1, 'Duplicate vids in pool %s' % pool_name
  206. if not volume:
  207. parser.error_runtime(
  208. 'no volume with id {!r} pool: {!r}'.format(vid,
  209. pool_name))
  210. else:
  211. setattr(namespace, self.dest, volume[0])
  212. except KeyError:
  213. parser.error_runtime('no pool {!r}'.format(pool_name))
  214. except ValueError:
  215. parser.error('expected a pool & volume id combination like foo:bar')
  216. class VMVolumeAction(QubesAction):
  217. ''' Action for argument parser that gets the
  218. :py:class:``qubes.storage.Volume`` from a VM:VOLUME string.
  219. '''
  220. # pylint: disable=too-few-public-methods
  221. def __init__(self, help='A pool & volume id combination',
  222. required=True, **kwargs):
  223. # pylint: disable=redefined-builtin
  224. super(VMVolumeAction, self).__init__(help=help, required=required,
  225. **kwargs)
  226. def __call__(self, parser, namespace, values, option_string=None):
  227. ''' Set ``namespace.vmname`` to ``values`` '''
  228. setattr(namespace, self.dest, values)
  229. def parse_qubes_app(self, parser, namespace):
  230. ''' Acquire the :py:class:``qubes.storage.Volume`` object from
  231. ``namespace.app``.
  232. '''
  233. assert hasattr(namespace, 'app')
  234. app = namespace.app
  235. try:
  236. vm_name, vol_name = getattr(namespace, self.dest).split(':')
  237. try:
  238. vm = app.domains[vm_name]
  239. try:
  240. volume = vm.volumes[vol_name]
  241. setattr(namespace, self.dest, volume)
  242. except KeyError:
  243. parser.error_runtime('vm {!r} has no volume {!r}'.format(
  244. vm_name, vol_name))
  245. except KeyError:
  246. parser.error_runtime('no vm {!r}'.format(vm_name))
  247. except ValueError:
  248. parser.error('expected a vm & volume combination like foo:bar')
  249. class PoolsAction(QubesAction):
  250. ''' Action for argument parser to gather multiple pools '''
  251. # pylint: disable=too-few-public-methods
  252. def __call__(self, parser, namespace, values, option_string=None):
  253. ''' Set ``namespace.vmname`` to ``values`` '''
  254. if hasattr(namespace, self.dest) and getattr(namespace, self.dest):
  255. names = getattr(namespace, self.dest)
  256. else:
  257. names = []
  258. names += [values]
  259. setattr(namespace, self.dest, names)
  260. def parse_qubes_app(self, parser, namespace):
  261. app = namespace.app
  262. pool_names = getattr(namespace, self.dest)
  263. if pool_names:
  264. try:
  265. pools = [app.pools[name] for name in pool_names]
  266. setattr(namespace, self.dest, pools)
  267. except qubesadmin.exc.QubesException as e:
  268. parser.error(str(e))
  269. sys.exit(2)
  270. except KeyError:
  271. parser.error('No such pools: %s' % pool_names)
  272. sys.exit(2)
  273. class QubesArgumentParser(argparse.ArgumentParser):
  274. '''Parser preconfigured for use in most of the Qubes command-line tools.
  275. :param bool want_app: instantiate :py:class:`qubes.Qubes` object
  276. :param bool want_app_no_instance: don't actually instantiate \
  277. :py:class:`qubes.Qubes` object, just add argument for custom xml file
  278. :param mixed vmname_nargs: The number of ``VMNAME`` arguments that should be
  279. consumed. Values include:
  280. - N (an integer) consumes N arguments (and produces a list)
  281. - '?' consumes zero or one arguments
  282. - '*' consumes zero or more arguments (and produces a list)
  283. - '+' consumes one or more arguments (and produces a list)
  284. *kwargs* are passed to :py:class:`argparser.ArgumentParser`.
  285. Currenty supported options:
  286. ``--force-root`` (optional, ignored, help is suppressed)
  287. ``--offline-mode`` do not talk to hypervisor (help is suppressed)
  288. ``--verbose`` and ``--quiet``
  289. '''
  290. def __init__(self, want_app=True, want_app_no_instance=False,
  291. vmname_nargs=None, **kwargs):
  292. super(QubesArgumentParser, self).__init__(add_help=False, **kwargs)
  293. self._want_app = want_app
  294. self._want_app_no_instance = want_app_no_instance
  295. self._vmname_nargs = vmname_nargs
  296. if self._want_app:
  297. self.add_argument('--qubesxml', metavar='FILE', action='store',
  298. dest='app', help=argparse.SUPPRESS)
  299. self.add_argument('--offline-mode', action='store_true',
  300. default=None, dest='offline_mode', help=argparse.SUPPRESS)
  301. self.add_argument('--verbose', '-v', action='count',
  302. help='increase verbosity')
  303. self.add_argument('--quiet', '-q', action='count',
  304. help='decrease verbosity')
  305. self.add_argument('--force-root', action='store_true',
  306. default=False, help=argparse.SUPPRESS)
  307. self.add_argument('--help', '-h', action=SubParsersHelpAction,
  308. help='show this help message and exit')
  309. if self._vmname_nargs in [argparse.ZERO_OR_MORE, argparse.ONE_OR_MORE]:
  310. vm_name_group = VmNameGroup(self,
  311. required=(self._vmname_nargs
  312. not in [argparse.ZERO_OR_MORE, argparse.OPTIONAL]))
  313. self._mutually_exclusive_groups.append(vm_name_group)
  314. elif self._vmname_nargs is not None:
  315. self.add_argument('VMNAME', nargs=self._vmname_nargs,
  316. action=VmNameAction)
  317. self.set_defaults(verbose=1, quiet=0)
  318. def parse_args(self, *args, **kwargs): # pylint: disable=arguments-differ
  319. # hack for tests
  320. app = kwargs.pop('app', None)
  321. namespace = super(QubesArgumentParser, self).parse_args(*args, **kwargs)
  322. if self._want_app and not self._want_app_no_instance:
  323. self.set_qubes_verbosity(namespace)
  324. if app is not None:
  325. namespace.app = app
  326. else:
  327. namespace.app = qubesadmin.Qubes()
  328. for action in self._actions:
  329. # pylint: disable=protected-access
  330. if issubclass(action.__class__, QubesAction):
  331. action.parse_qubes_app(self, namespace)
  332. elif issubclass(action.__class__,
  333. argparse._SubParsersAction): # pylint: disable=no-member
  334. assert hasattr(namespace, 'command')
  335. command = namespace.command
  336. if command is None:
  337. continue
  338. subparser = action._name_parser_map[command]
  339. for subaction in subparser._actions:
  340. if issubclass(subaction.__class__, QubesAction):
  341. subaction.parse_qubes_app(self, namespace)
  342. return namespace
  343. def error_runtime(self, message):
  344. '''Runtime error, without showing usage.
  345. :param str message: message to show
  346. '''
  347. self.exit(1, '{}: error: {}\n'.format(self.prog, message))
  348. @staticmethod
  349. def get_loglevel_from_verbosity(namespace):
  350. ''' Return loglevel calculated from quiet and verbose arguments '''
  351. return (namespace.quiet - namespace.verbose) * 10 + logging.WARNING
  352. @staticmethod
  353. def set_qubes_verbosity(namespace):
  354. '''Apply a verbosity setting.
  355. This is done by configuring global logging.
  356. :param argparse.Namespace args: args as parsed by parser
  357. '''
  358. verbose = namespace.verbose - namespace.quiet
  359. if verbose >= 2:
  360. qubesadmin.log.enable_debug()
  361. elif verbose >= 1:
  362. qubesadmin.log.enable()
  363. # pylint: disable=no-self-use
  364. def print_error(self, *args, **kwargs):
  365. ''' Print to ``sys.stderr``'''
  366. print(*args, file=sys.stderr, **kwargs)
  367. class SubParsersHelpAction(argparse._HelpAction):
  368. ''' Print help for all options _and all subparsers_ '''
  369. # source https://stackoverflow.com/a/24122778
  370. # pylint: disable=protected-access,too-few-public-methods
  371. @staticmethod
  372. def _indent(indent, text):
  373. '''Indent *text* by *indent* spaces'''
  374. return '\n'.join((' ' * indent) + l for l in text.splitlines())
  375. def __call__(self, parser, namespace, values, option_string=None):
  376. parser.print_help()
  377. # retrieve subparsers from parser
  378. subparsers_actions = [
  379. action for action in parser._actions
  380. if isinstance(action, argparse._SubParsersAction)]
  381. # there will probably only be one subparser_action,
  382. # but better save than sorry
  383. for subparsers_action in subparsers_actions:
  384. # get all subparsers and print help
  385. for pseudo_action in subparsers_action._choices_actions:
  386. choice = pseudo_action.dest.split(' ', 1)[0]
  387. subparser = subparsers_action.choices[choice]
  388. print("\nCommand '{}':".format(choice))
  389. choice_help = subparser.format_usage()
  390. choice_help = self._indent(2, choice_help)
  391. print(choice_help)
  392. parser.exit()
  393. class AliasedSubParsersAction(argparse._SubParsersAction):
  394. '''SubParser with support for action aliases'''
  395. # source https://gist.github.com/sampsyo/471779
  396. # pylint: disable=protected-access,too-few-public-methods,missing-docstring
  397. class _AliasedPseudoAction(argparse.Action):
  398. # pylint: disable=redefined-builtin
  399. def __init__(self, name, aliases, help):
  400. dest = name
  401. if aliases:
  402. dest += ' (%s)' % ','.join(aliases)
  403. super(AliasedSubParsersAction._AliasedPseudoAction, self).\
  404. __init__(option_strings=[], dest=dest, help=help)
  405. def __call__(self, parser, namespace, values, option_string=None):
  406. pass
  407. def add_parser(self, name, **kwargs):
  408. if 'aliases' in kwargs:
  409. aliases = kwargs['aliases']
  410. del kwargs['aliases']
  411. else:
  412. aliases = []
  413. local_parser = super(AliasedSubParsersAction, self).add_parser(
  414. name, **kwargs)
  415. # Make the aliases work.
  416. for alias in aliases:
  417. self._name_parser_map[alias] = local_parser
  418. # Make the help text reflect them, first removing old help entry.
  419. if 'help' in kwargs:
  420. self._choices_actions.pop()
  421. pseudo_action = self._AliasedPseudoAction(name, aliases,
  422. kwargs.pop('help'))
  423. self._choices_actions.append(pseudo_action)
  424. return local_parser
  425. def get_parser_for_command(command):
  426. '''Get parser for given qvm-tool.
  427. :param str command: command name
  428. :rtype: argparse.ArgumentParser
  429. :raises ImportError: when command's module is not found
  430. :raises AttributeError: when parser was not found
  431. '''
  432. module = importlib.import_module(
  433. '.' + command.replace('-', '_'), 'qubesadmin.tools')
  434. try:
  435. parser = module.parser
  436. except AttributeError:
  437. try:
  438. parser = module.get_parser()
  439. except AttributeError:
  440. raise AttributeError('cannot find parser in module')
  441. return parser
  442. # pylint: disable=protected-access
  443. class VmNameGroup(argparse._MutuallyExclusiveGroup):
  444. ''' Adds an a VMNAME, --all & --exclude parameters to a
  445. :py:class:``argparse.ArgumentParser```.
  446. '''
  447. def __init__(self, container, required, vm_action=VmNameAction, help=None):
  448. # pylint: disable=redefined-builtin
  449. super(VmNameGroup, self).__init__(container, required=required)
  450. if not help:
  451. help = 'perform the action on all qubes'
  452. self.add_argument('--all', action='store_true', dest='all_domains',
  453. help=help)
  454. container.add_argument('--exclude', action='append', default=[],
  455. help='exclude the qube from --all')
  456. # ⚠ the default parameter below is important! ⚠
  457. # See https://stackoverflow.com/questions/35044288 and
  458. # `argparse.ArgumentParser.parse_args()` implementation
  459. self.add_argument('VMNAME', action=vm_action, nargs='*', default=[])
  460. def print_table(table, stream=None):
  461. ''' Uses the unix column command to print pretty table.
  462. :param str text: list of lists/sets
  463. '''
  464. unit_separator = chr(31)
  465. cmd = ['column', '-t', '-s', unit_separator]
  466. text_table = '\n'.join([unit_separator.join(row) for row in table])
  467. text_table += '\n'
  468. if stream is None:
  469. stream = sys.stdout
  470. # for tests...
  471. if stream != sys.__stdout__:
  472. p = subprocess.Popen(cmd + ['-c', '80'], stdin=subprocess.PIPE,
  473. stdout=subprocess.PIPE)
  474. p.stdin.write(text_table.encode())
  475. (out, _) = p.communicate()
  476. stream.write(str(out.decode()))
  477. else:
  478. p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
  479. p.communicate(text_table.encode())