__init__.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  5. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. '''Qubes' command line tools
  21. '''
  22. import argparse
  23. import importlib
  24. import logging
  25. import os
  26. import subprocess
  27. import sys
  28. import textwrap
  29. import qubes.log
  30. #: constant returned when some action should be performed on all qubes
  31. VM_ALL = object()
  32. class QubesAction(argparse.Action):
  33. ''' Interface providing a convinience method to be called, after
  34. `namespace.app` is instantiated.
  35. '''
  36. # pylint: disable=too-few-public-methods
  37. def parse_qubes_app(self, parser, namespace):
  38. ''' This method is called by :py:class:`qubes.tools.QubesArgumentParser`
  39. after the `namespace.app` is instantiated. Oerwrite this method when
  40. extending :py:class:`qubes.tools.QubesAction` to initialized values
  41. based on the `namespace.app`
  42. '''
  43. raise NotImplementedError
  44. class PropertyAction(argparse.Action):
  45. '''Action for argument parser that stores a property.'''
  46. # pylint: disable=redefined-builtin,too-few-public-methods
  47. def __init__(self,
  48. option_strings,
  49. dest,
  50. metavar='NAME=VALUE',
  51. required=False,
  52. help='set property to a value'):
  53. super().__init__(option_strings, 'properties',
  54. metavar=metavar, default={}, help=help)
  55. def __call__(self, parser, namespace, values, option_string=None):
  56. try:
  57. prop, value = values.split('=', 1)
  58. except ValueError:
  59. parser.error('invalid property token: {!r}'.format(values))
  60. getattr(namespace, self.dest)[prop] = value
  61. class SinglePropertyAction(argparse.Action):
  62. '''Action for argument parser that stores a property.'''
  63. # pylint: disable=redefined-builtin,too-few-public-methods
  64. def __init__(self,
  65. option_strings,
  66. dest,
  67. metavar='VALUE',
  68. const=None,
  69. nargs=None,
  70. required=False,
  71. help=None):
  72. if help is None:
  73. help = 'set {!r} property to a value'.format(dest)
  74. if const is not None:
  75. help += ' {!r}'.format(const)
  76. if const is not None:
  77. nargs = 0
  78. super().__init__(option_strings, 'properties',
  79. metavar=metavar, help=help, default={}, const=const,
  80. nargs=nargs)
  81. self.name = dest
  82. def __call__(self, parser, namespace, values, option_string=None):
  83. getattr(namespace, self.dest)[self.name] = values \
  84. if self.const is None else self.const
  85. class HelpPropertiesAction(argparse.Action):
  86. '''Action for argument parser that displays all properties and exits.'''
  87. # pylint: disable=redefined-builtin,too-few-public-methods
  88. def __init__(self,
  89. option_strings,
  90. klass=None,
  91. dest=argparse.SUPPRESS,
  92. default=argparse.SUPPRESS,
  93. help='list all available properties with short descriptions'
  94. ' and exit'):
  95. super().__init__(
  96. option_strings=option_strings,
  97. dest=dest,
  98. default=default,
  99. nargs=0,
  100. help=help)
  101. # late import because of circular dependency
  102. import qubes # pylint: disable=redefined-outer-name
  103. self._klass = klass if klass is not None else qubes.Qubes
  104. def __call__(self, parser, namespace, values, option_string=None):
  105. # pylint: disable=redefined-outer-name
  106. properties = self._klass.property_list()
  107. width = max(len(prop.__name__) for prop in properties)
  108. wrapper = textwrap.TextWrapper(width=80,
  109. initial_indent=' ', subsequent_indent=' ' * (width + 6))
  110. text = 'Common properties:\n' + '\n'.join(
  111. wrapper.fill('{name:{width}s} {doc}'.format(
  112. name=prop.__name__,
  113. doc=qubes.utils.format_doc(prop.__doc__) if prop.__doc__ else'',
  114. width=width))
  115. for prop in sorted(properties))
  116. if self._klass is not qubes.Qubes:
  117. text += '\n\n' \
  118. 'There may be more properties in specific domain classes.\n'
  119. parser.exit(message=text)
  120. class VmNameAction(QubesAction):
  121. ''' Action for parsing one ore multiple domains from provided VMNAMEs '''
  122. # pylint: disable=too-few-public-methods,redefined-builtin
  123. def __init__(self, option_strings, nargs=1, dest='vmnames', help=None,
  124. **kwargs):
  125. if help is None:
  126. if nargs == argparse.OPTIONAL:
  127. help = 'at most one domain name'
  128. elif nargs == 1:
  129. help = 'a domain name'
  130. elif nargs == argparse.ZERO_OR_MORE:
  131. help = 'zero or more domain names'
  132. elif nargs == argparse.ONE_OR_MORE:
  133. help = 'one or more domain names'
  134. elif nargs > 1:
  135. help = '%s domain names' % nargs
  136. else:
  137. raise argparse.ArgumentError(
  138. nargs, "Passed unexpected value {!s} as {!s} nargs ".format(
  139. nargs, dest))
  140. super().__init__(option_strings, dest=dest, help=help,
  141. nargs=nargs, **kwargs)
  142. def __call__(self, parser, namespace, values, option_string=None):
  143. ''' Set ``namespace.vmname`` to ``values`` '''
  144. setattr(namespace, self.dest, values)
  145. def parse_qubes_app(self, parser, namespace):
  146. assert hasattr(namespace, 'app')
  147. setattr(namespace, 'domains', [])
  148. app = namespace.app
  149. if hasattr(namespace, 'all_domains') and namespace.all_domains:
  150. namespace.domains = [
  151. vm
  152. for vm in app.domains
  153. if vm.qid != 0 and vm.name not in namespace.exclude
  154. ]
  155. else:
  156. if hasattr(namespace, 'exclude') and namespace.exclude:
  157. parser.error('--exclude can only be used with --all')
  158. for vm_name in getattr(namespace, self.dest):
  159. try:
  160. namespace.domains += [app.domains[vm_name]]
  161. except KeyError:
  162. parser.error('no such domain: {!r}'.format(vm_name))
  163. class RunningVmNameAction(VmNameAction):
  164. ''' Action for argument parser that gets a running domain from VMNAME '''
  165. # pylint: disable=too-few-public-methods
  166. def __init__(self, option_strings, nargs=1, dest='vmnames', help=None,
  167. **kwargs):
  168. # pylint: disable=redefined-builtin
  169. if help is None:
  170. if nargs == argparse.OPTIONAL:
  171. help = 'at most one running domain'
  172. elif nargs == 1:
  173. help = 'running domain name'
  174. elif nargs == argparse.ZERO_OR_MORE:
  175. help = 'zero or more running domains'
  176. elif nargs == argparse.ONE_OR_MORE:
  177. help = 'one or more running domains'
  178. elif nargs > 1:
  179. help = '%s running domains' % nargs
  180. else:
  181. raise argparse.ArgumentError(
  182. nargs, "Passed unexpected value {!s} as {!s} nargs ".format(
  183. nargs, dest))
  184. super().__init__(
  185. option_strings, dest=dest, help=help, nargs=nargs, **kwargs)
  186. def parse_qubes_app(self, parser, namespace):
  187. super().parse_qubes_app(parser, namespace)
  188. for vm in namespace.domains:
  189. if not vm.is_running():
  190. parser.error_runtime("domain {!r} is not running".format(
  191. vm.name))
  192. class VolumeAction(QubesAction):
  193. ''' Action for argument parser that gets the
  194. :py:class:``qubes.storage.Volume`` from a POOL_NAME:VOLUME_ID string.
  195. '''
  196. # pylint: disable=too-few-public-methods
  197. def __init__(self, help='A pool & volume id combination',
  198. required=True, **kwargs):
  199. # pylint: disable=redefined-builtin
  200. super().__init__(help=help, required=required,
  201. **kwargs)
  202. def __call__(self, parser, namespace, values, option_string=None):
  203. ''' Set ``namespace.vmname`` to ``values`` '''
  204. setattr(namespace, self.dest, values)
  205. def parse_qubes_app(self, parser, namespace):
  206. ''' Acquire the :py:class:``qubes.storage.Volume`` object from
  207. ``namespace.app``.
  208. '''
  209. assert hasattr(namespace, 'app')
  210. app = namespace.app
  211. try:
  212. pool_name, vid = getattr(namespace, self.dest).split(':')
  213. try:
  214. pool = app.pools[pool_name]
  215. volume = [v for v in pool.volumes if v.vid == vid]
  216. assert len(volume) == 1, 'Duplicate vids in pool %s' % pool_name
  217. if not volume:
  218. parser.error_runtime(
  219. 'no volume with id {!r} pool: {!r}'.format(vid,
  220. pool_name))
  221. else:
  222. setattr(namespace, self.dest, volume[0])
  223. except KeyError:
  224. parser.error_runtime('no pool {!r}'.format(pool_name))
  225. except ValueError:
  226. parser.error('expected a pool & volume id combination like foo:bar')
  227. class PoolsAction(QubesAction):
  228. ''' Action for argument parser to gather multiple pools '''
  229. # pylint: disable=too-few-public-methods
  230. def __call__(self, parser, namespace, values, option_string=None):
  231. ''' Set ``namespace.vmname`` to ``values`` '''
  232. if hasattr(namespace, self.dest) and getattr(namespace, self.dest):
  233. names = getattr(namespace, self.dest)
  234. else:
  235. names = []
  236. names += [values]
  237. setattr(namespace, self.dest, names)
  238. def parse_qubes_app(self, parser, namespace):
  239. app = namespace.app
  240. pool_names = getattr(namespace, self.dest)
  241. if pool_names:
  242. try:
  243. pools = [app.get_pool(name) for name in pool_names]
  244. setattr(namespace, self.dest, pools)
  245. except qubes.exc.QubesException as e:
  246. parser.error(str(e))
  247. sys.exit(2)
  248. class QubesArgumentParser(argparse.ArgumentParser):
  249. '''Parser preconfigured for use in most of the Qubes command-line tools.
  250. :param bool want_app: instantiate :py:class:`qubes.Qubes` object
  251. :param bool want_app_no_instance: don't actually instantiate \
  252. :py:class:`qubes.Qubes` object, just add argument for custom xml file
  253. :param bool want_force_root: add ``--force-root`` option
  254. :param mixed vmname_nargs: The number of ``VMNAME`` arguments that should be
  255. consumed. Values include:
  256. - N (an integer) consumes N arguments (and produces a list)
  257. - '?' consumes zero or one arguments
  258. - '*' consumes zero or more arguments (and produces a list)
  259. - '+' consumes one or more arguments (and produces a list)
  260. *kwargs* are passed to :py:class:`argparser.ArgumentParser`.
  261. Currenty supported options:
  262. ``--force-root`` (optional)
  263. ``--qubesxml`` location of :file:`qubes.xml` (help is suppressed)
  264. ``--offline-mode`` do not talk to hypervisor (help is suppressed)
  265. ``--verbose`` and ``--quiet``
  266. '''
  267. def __init__(self, want_app=True, want_app_no_instance=False,
  268. want_force_root=False, vmname_nargs=None, **kwargs):
  269. super().__init__(**kwargs)
  270. self._want_app = want_app
  271. self._want_app_no_instance = want_app_no_instance
  272. self._want_force_root = want_force_root
  273. self._vmname_nargs = vmname_nargs
  274. if self._want_app:
  275. self.add_argument('--qubesxml', metavar='FILE', action='store',
  276. dest='app', help=argparse.SUPPRESS)
  277. self.add_argument('--offline-mode', action='store_true',
  278. default=None, dest='offline_mode', help=argparse.SUPPRESS)
  279. self.add_argument('--verbose', '-v', action='count',
  280. help='increase verbosity')
  281. self.add_argument('--quiet', '-q', action='count',
  282. help='decrease verbosity')
  283. if self._want_force_root:
  284. self.add_argument('--force-root', action='store_true',
  285. default=False, help='force to run as root')
  286. if self._vmname_nargs in [argparse.ZERO_OR_MORE, argparse.ONE_OR_MORE]:
  287. vm_name_group = VmNameGroup(self, self._vmname_nargs)
  288. self._mutually_exclusive_groups.append(vm_name_group)
  289. elif self._vmname_nargs is not None:
  290. self.add_argument('VMNAME', nargs=self._vmname_nargs,
  291. action=VmNameAction)
  292. self.set_defaults(verbose=1, quiet=0)
  293. def parse_args(self, args=None, namespace=None):
  294. namespace = super().parse_args(args, namespace)
  295. if self._want_app and not self._want_app_no_instance:
  296. self.set_qubes_verbosity(namespace)
  297. namespace.app = qubes.Qubes(namespace.app,
  298. offline_mode=namespace.offline_mode)
  299. if self._want_force_root:
  300. self.dont_run_as_root(namespace)
  301. for action in self._actions:
  302. # pylint: disable=protected-access
  303. if issubclass(action.__class__, QubesAction):
  304. action.parse_qubes_app(self, namespace)
  305. elif issubclass(action.__class__,
  306. argparse._SubParsersAction): # pylint: disable=no-member
  307. assert hasattr(namespace, 'command')
  308. command = namespace.command
  309. subparser = action._name_parser_map[command]
  310. for subaction in subparser._actions:
  311. if issubclass(subaction.__class__, QubesAction):
  312. subaction.parse_qubes_app(self, namespace)
  313. return namespace
  314. def error_runtime(self, message):
  315. '''Runtime error, without showing usage.
  316. :param str message: message to show
  317. '''
  318. self.exit(1, '{}: error: {}\n'.format(self.prog, message))
  319. def dont_run_as_root(self, namespace):
  320. '''Prevent running as root.
  321. :param argparse.Namespace args: if there is ``.force_root`` attribute \
  322. set to true, run anyway
  323. '''
  324. try:
  325. euid = os.geteuid()
  326. except AttributeError: # no geteuid(), probably NT
  327. return
  328. if euid == 0 and not namespace.force_root:
  329. self.error_runtime(
  330. 'refusing to run as root; add --force-root to override')
  331. @staticmethod
  332. def set_qubes_verbosity(namespace):
  333. '''Apply a verbosity setting.
  334. This is done by configuring global logging.
  335. :param argparse.Namespace args: args as parsed by parser
  336. '''
  337. verbose = namespace.verbose - namespace.quiet
  338. if verbose >= 2:
  339. qubes.log.enable_debug()
  340. elif verbose >= 1:
  341. qubes.log.enable()
  342. # pylint: disable=no-self-use
  343. def print_error(self, *args, **kwargs):
  344. ''' Print to ``sys.stderr``'''
  345. print(*args, file=sys.stderr, **kwargs)
  346. class AliasedSubParsersAction(argparse._SubParsersAction):
  347. # source https://gist.github.com/sampsyo/471779
  348. # pylint: disable=protected-access,too-few-public-methods
  349. class _AliasedPseudoAction(argparse.Action):
  350. # pylint: disable=redefined-builtin,arguments-differ
  351. def __init__(self, name, aliases, help):
  352. dest = name
  353. if aliases:
  354. dest += ' (%s)' % ','.join(aliases)
  355. super().__init__(option_strings=[], dest=dest, help=help)
  356. def __call__(self, parser, namespace, values, option_string=None):
  357. raise NotImplementedError
  358. def add_parser(self, name, **kwargs):
  359. if 'aliases' in kwargs:
  360. aliases = kwargs['aliases']
  361. del kwargs['aliases']
  362. else:
  363. aliases = []
  364. local_parser = super().add_parser(name, **kwargs)
  365. # Make the aliases work.
  366. for alias in aliases:
  367. self._name_parser_map[alias] = local_parser
  368. # Make the help text reflect them, first removing old help entry.
  369. if 'help' in kwargs:
  370. self._choices_actions.pop()
  371. pseudo_action = self._AliasedPseudoAction(name, aliases,
  372. kwargs.pop('help'))
  373. self._choices_actions.append(pseudo_action)
  374. return local_parser
  375. def get_parser_for_command(command):
  376. '''Get parser for given qvm-tool.
  377. :param str command: command name
  378. :rtype: argparse.ArgumentParser
  379. :raises ImportError: when command's module is not found
  380. :raises AttributeError: when parser was not found
  381. '''
  382. module = importlib.import_module(
  383. '.' + command.replace('-', '_'), 'qubes.tools')
  384. try:
  385. parser = module.parser
  386. except AttributeError:
  387. try:
  388. parser = module.get_parser()
  389. except AttributeError:
  390. raise AttributeError('cannot find parser in module')
  391. return parser
  392. # pylint: disable=protected-access
  393. class VmNameGroup(argparse._MutuallyExclusiveGroup):
  394. ''' Adds an a VMNAME, --all & --exclude parameters to a
  395. :py:class:``argparse.ArgumentParser```.
  396. '''
  397. def __init__(self, container, required, vm_action=VmNameAction, help=None):
  398. # pylint: disable=redefined-builtin
  399. super().__init__(container, required=required)
  400. if not help:
  401. help = 'perform the action on all qubes'
  402. self.add_argument('--all', action='store_true', dest='all_domains',
  403. help=help)
  404. container.add_argument('--exclude', action='append', default=[],
  405. help='exclude the qube from --all')
  406. # ⚠ the default parameter below is important! ⚠
  407. # See https://stackoverflow.com/questions/35044288 and
  408. # `argparse.ArgumentParser.parse_args()` implementation
  409. self.add_argument('VMNAME', action=vm_action, nargs='*', default=[])
  410. def print_table(table):
  411. ''' Uses the unix column command to print pretty table.
  412. :param str text: list of lists/sets
  413. '''
  414. unit_separator = chr(31)
  415. cmd = ['column', '-t', '-s', unit_separator]
  416. text_table = '\n'.join([unit_separator.join(row) for row in table])
  417. text_table += '\n'
  418. # for tests...
  419. if sys.stdout != sys.__stdout__:
  420. p = subprocess.Popen(cmd + ['-c', '80'], stdin=subprocess.PIPE,
  421. stdout=subprocess.PIPE)
  422. p.stdin.write(text_table.encode())
  423. (out, _) = p.communicate()
  424. sys.stdout.write(out.decode())
  425. else:
  426. p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
  427. p.communicate(text_table.encode())