qubes/tools: qvm-ls

This commit is contained in:
Wojtek Porczyk 2015-01-23 18:37:40 +01:00
parent e83d21c671
commit 6a4820c381
14 changed files with 816 additions and 280 deletions

View File

@ -38,6 +38,7 @@ persistent=no
disable=
locally-disabled,
locally-enabled,
file-ignored,
duplicate-code,
star-args,
cyclic-import,

View File

@ -15,9 +15,11 @@ endif
mkdir \
$(DESTDIR)$(PYTHON_QUBESPATH)/vm \
$(DESTDIR)$(PYTHON_QUBESPATH)/storage \
$(DESTDIR)$(PYTHON_QUBESPATH)/tools \
$(DESTDIR)$(PYTHON_QUBESPATH)/ext \
$(DESTDIR)$(PYTHON_QUBESPATH)/tests \
$(DESTDIR)$(PYTHON_QUBESPATH)/tests/vm
$(DESTDIR)$(PYTHON_QUBESPATH)/tests/vm \
$(DESTDIR)$(PYTHON_QUBESPATH)/tests/tools
cp \
__init__.py* \
@ -49,6 +51,11 @@ endif
storage/xen.py* \
$(DESTDIR)$(PYTHON_QUBESPATH)/storage
cp \
tools/__init__.py* \
tools/qvm_ls.py* \
$(DESTDIR)$(PYTHON_QUBESPATH)/tools
cp ext/__init__.py* $(DESTDIR)$(PYTHON_QUBESPATH)/ext
cp \
@ -64,3 +71,8 @@ endif
tests/vm/adminvm.py* \
tests/vm/qubesvm.py* \
$(DESTDIR)$(PYTHON_QUBESPATH)/tests/vm
cp \
tests/tools/__init__.py* \
tests/tools/qvm_ls.py* \
$(DESTDIR)$(PYTHON_QUBESPATH)/tests/tools

View File

@ -547,6 +547,8 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
:param int load_stage: stage when property should be loaded (see \
:py:class:`Qubes` for description of stages)
:param int order: order of evaluation (bigger order values are later)
:param str ls_head: column head for :program:`qvm-ls`
:param int ls_width: column width in :program:`qvm-ls`
:param str doc: docstring; this should be one paragraph of plain RST, no \
sphinx-specific features
@ -579,7 +581,7 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
def __init__(self, name, setter=None, saver=None, type=None,
default=_NO_DEFAULT, load_stage=2, order=0, save_via_ref=False,
doc=None):
ls_head=None, ls_width=None, doc=None):
# pylint: disable=redefined-builtin
self.__name__ = name
self._setter = setter
@ -593,6 +595,10 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
self.__doc__ = doc
self._attr_name = '_qubesprop_' + name
if ls_head is not None or ls_width is not None:
self.ls_head = ls_head or self.__name__.replace('_', '-').upper()
self.ls_width = max(ls_width or 0, len(self.ls_head) + 1)
def __get__(self, instance, owner):
if instance is None:
@ -682,28 +688,6 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
return self.__name__ == other.__name__
def format_doc(self):
'''Return parsed documentation string, stripping RST markup.
'''
if not self.__doc__:
return ''
# pylint: disable=unused-variable
output, pub = docutils.core.publish_programmatically(
source_class=docutils.io.StringInput,
source=' '.join(self.__doc__.strip().split()),
source_path=None,
destination_class=docutils.io.NullOutput, destination=None,
destination_path=None,
reader=None, reader_name='standalone',
parser=None, parser_name='restructuredtext',
writer=None, writer_name='null',
settings=None, settings_spec=None, settings_overrides=None,
config_section=None, enable_exit_status=None)
return pub.writer.document.astext()
#
# exceptions
#

View File

@ -33,7 +33,9 @@ test_order = [
'qubes.tests.vm.init',
'qubes.tests.vm.qubesvm',
'qubes.tests.vm.adminvm',
'qubes.tests.init'
'qubes.tests.init',
'qubes.tests.tools',
]
sys.path.insert(1, '../../')
@ -96,7 +98,10 @@ class ANSITestResult(unittest.TestResult):
def getDescription(self, test): # pylint: disable=invalid-name
teststr = str(test).split('/')
for i in range(-2, 0):
fullname = teststr[i].split('_', 2)
try:
fullname = teststr[i].split('_', 2)
except IndexError:
continue
fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
fullname[-1], color=self.color)
teststr[i] = '_'.join(fullname)
@ -241,8 +246,7 @@ def main():
suite = unittest.TestSuite()
loader = unittest.TestLoader()
for modname in test_order:
module = importlib.import_module(modname)
suite.addTests(loader.loadTestsFromModule(module))
suite.addTests(loader.loadTestsFromName(modname))
runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2)
runner.resultclass = ANSITestResult

View File

@ -0,0 +1,14 @@
# pylint: skip-file
import importlib
import qubes.plugins
import qubes.tests
__all__ = qubes.plugins.load(__file__)
def load_tests(loader, tests, pattern):
for name in __all__:
mod = importlib.import_module('.' + name, __name__)
tests.addTests(loader.loadTestsFromModule(mod))
return tests

View File

@ -0,0 +1,77 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
# pylint: disable=protected-access,pointless-statement
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import qubes
import qubes.vm.adminvm
import qubes.tools.qvm_ls
import qubes.tests
import qubes.tests.vm.adminvm
class TC_00_Column(qubes.tests.QubesTestCase):
def test_000_collected(self):
self.assertIn('NAME', qubes.tools.qvm_ls.Column.columns)
def test_100_init(self):
try:
testcolumn = qubes.tools.qvm_ls.Column('TESTCOLUMN', width=50)
self.assertEqual(testcolumn.ls_head, 'TESTCOLUMN')
self.assertEqual(testcolumn.ls_width, 50)
finally:
try:
qubes.tools.qvm_ls.Column.columns['TESTCOLUMN']
except KeyError:
pass
def test_101_fix_width(self):
try:
testcolumn = qubes.tools.qvm_ls.Column('TESTCOLUMN', width=2)
self.assertGreater(testcolumn.ls_width, len('TESTCOLUMN'))
finally:
try:
qubes.tools.qvm_ls.Column.columns['TESTCOLUMN']
except KeyError:
pass
class TC_90_globals(qubes.tests.QubesTestCase):
# @qubes.tests.skipUnlessDom0
def test_100_simple_flag(self):
flag = qubes.tools.qvm_ls.simple_flag(1, 'T', 'qid')
# TODO after serious testing of QubesVM and Qubes app, this should be
# using normal components
app = qubes.tests.vm.adminvm.TestApp()
vm = qubes.vm.adminvm.AdminVM(app, None, qid=0, name='dom0')
self.assertFalse(flag(None, vm))
vm.qid = 1
self.assertTrue(flag(None, vm))
def test_900_formats_columns(self):
for fmt in qubes.tools.qvm_ls.formats:
for col in qubes.tools.qvm_ls.formats[fmt]:
self.assertIn(col.upper(), qubes.tools.qvm_ls.Column.columns)

1
qubes/tools/__init__.py Normal file
View File

@ -0,0 +1 @@
# pylint: skip-file

635
qubes/tools/qvm_ls.py Normal file
View File

@ -0,0 +1,635 @@
#!/usr/bin/python -O
# vim: fileencoding=utf-8
# pylint: disable=too-few-public-methods
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
'''qvm-ls - List available domains'''
from __future__ import print_function
import __builtin__
import argparse
import collections
import os
import sys
import textwrap
import qubes
import qubes.config
import qubes.utils
#
# columns
#
class Column(object):
'''A column in qvm-ls output characterised by its head, a width and a way
to fetch a parameter describing the domain.
:param str head: Column head (usually uppercase).
:param int width: Column width.
:param str attr: Attribute, possibly complex (containing ``.``). This may \
also be a callable that gets as its only argument the domain.
:param str fmt: if specified, used as base for :py:meth:`str.format` for \
column's value
:param str doc: Description of column (will be visible in --help-columns).
'''
#: collection of all columns
columns = {}
def __init__(self, head, width=0, attr=None, fmt=None, doc=None):
self.ls_head = head
self.ls_width = max(width, len(self.ls_head) + 1)
self._fmt = fmt
self.__doc__ = doc if doc is None else qubes.utils.format_doc(doc)
# intentionally not always do set self._attr,
# to cause AttributeError in self.format()
if attr is not None:
self._attr = attr
self.__class__.columns[self.ls_head] = self
def cell(self, vm):
'''Format one cell.
.. note::
This is only for technical formatting (filling with space). If you
want to subclass the :py:class:`Column` class, you should override
:py:meth:`Column.format` method instead.
:param qubes.vm.qubesvm.QubesVM: Domain to get a value from.
:returns: string that is at least as wide as needed to fill table row.
:rtype: str
'''
value = self.format(vm) or '-'
return value.ljust(self.ls_width)
def format(self, vm):
'''Format one cell value.
Return value to put in a table cell.
:param qubes.vm.qubesvm.QubesVM: Domain to get a value from.
:returns: Value to put, or :py:obj:`None` if no value .
:rtype: str or None
'''
ret = None
try:
if isinstance(self._attr, basestring):
ret = vm
for attrseg in self._attr.split('.'):
ret = getattr(ret, attrseg)
elif isinstance(self._attr, collections.Callable):
ret = self._attr(vm)
except (AttributeError, ZeroDivisionError):
# division by 0 may be caused by arithmetic in callable attr
return None
if ret is None:
return None
if self._fmt is not None:
return self._fmt.format(ret)
# late import to avoid circular import
# pylint: disable=redefined-outer-name
import qubes.vm
if isinstance(ret, (qubes.vm.BaseVM, qubes.Label)):
return ret.name
return ret
def __repr__(self):
return '{}(head={!r}, width={!r})'.format(self.__class__.__name__,
self.ls_head, self.ls_width)
def __eq__(self, other):
return self.ls_head == other.ls_head
def __lt__(self, other):
return self.ls_head < other.ls_head
def column(width=0, head=None, fmt=None):
'''Mark function or plain property as valid column in :program:`qvm-ls`.
By default all instances of :py:class:`qubes.property` are valid.
:param int width: Column width
:param str head: Column head (default: take property's name)
'''
def decorator(obj):
# pylint: disable=missing-docstring
# we keep hints on fget, so the order of decorators does not matter
holder = obj.fget if isinstance(obj, __builtin__.property) else obj
try:
holder.ls_head = head or holder.__name__.replace('_', '-').upper()
except AttributeError:
raise TypeError('Cannot find default column name '
'for a strange object {!r}'.format(obj))
holder.ls_width = max(width, len(holder.ls_head) + 1)
holder.ls_fmt = fmt
return obj
return decorator
class PropertyColumn(Column):
'''Column that displays value from property (:py:class:`property` or
:py:class:`qubes.property`) of domain.
You shouldn't use this class directly, see :py:func:`column` decorator.
:param holder: Holder of magic attributes.
'''
def __init__(self, holder):
super(PropertyColumn, self).__init__(
head=holder.ls_head,
width=holder.ls_width,
doc=holder.__doc__)
self.holder = holder
def format(self, vm):
try:
value = getattr(vm, self.holder.__name__)
except AttributeError:
return None
if not hasattr(self.holder, 'ls_fmt') or self.holder.ls_fmt is None:
return value
return self.holder.ls_fmt.format(
getattr(vm, self.holder.__name__)).ljust(
self.ls_width)
def __repr__(self):
return '{}(head={!r}, width={!r} holder={!r})'.format(
self.__class__.__name__,
self.ls_head,
self.ls_width,
self.holder)
def process_class(cls):
'''Process class after definition to find all listable properties.
It is used in metaclass of the domain.
:param qubes.vm.BaseVMMeta cls: Class to round up.
'''
for prop in cls.__dict__.values():
holder = prop.fget if isinstance(prop, __builtin__.property) else prop
if not hasattr(holder, 'ls_head') or holder.ls_head is None:
continue
for col in Column.columns.values():
if not isinstance(col, PropertyColumn):
continue
if col.holder.__name__ != holder.__name__:
continue
if col.ls_head != holder.ls_head:
raise TypeError('Found column head mismatch in class {!r} '
'({!r} != {!r})'.format(cls.__name__,
holder.ls_head, col.ls_head))
if col.ls_width != holder.ls_width:
raise TypeError('Found column width mismatch in class {!r} '
'({!r} != {!r})'.format(cls.__name__,
holder.ls_width, col.ls_width))
PropertyColumn(holder)
def flag(field):
'''Mark method as flag field.
:param int field: Which field to fill (counted from 1)
'''
def decorator(obj):
# pylint: disable=missing-docstring
obj.field = field
return obj
return decorator
def simple_flag(field, letter, attr, doc=None):
'''Create simple, binary flag.
:param str attr: Attribute name to check. If result is true, flag is fired.
:param str letter: The letter to show.
'''
def helper(self, vm):
# pylint: disable=missing-docstring,unused-argument
try:
value = getattr(vm, attr)
except AttributeError:
value = False
if value:
return letter[0]
helper.__doc__ = doc
helper.field = field
return helper
class StatusColumn(Column):
'''Some fancy flags that describe general status of the domain.'''
# pylint: disable=no-self-use
def __init__(self):
super(StatusColumn, self).__init__(
head='STATUS',
width=len(self.get_flags()) + 1,
doc=self.__class__.__doc__)
@flag(1)
def type(self, vm):
'''Type of domain.
0 AdminVM (AKA Dom0)
aA AppVM
dD DisposableVM
sS StandaloneVM
tT TemplateVM
When it is HVM (optimised VM), the letter is capital.
'''
# late import because of circular dependency
# pylint: disable=redefined-outer-name
import qubes.vm
import qubes.vm.adminvm
import qubes.vm.appvm
import qubes.vm.dispvm
import qubes.vm.hvm
import qubes.vm.qubesvm
import qubes.vm.templatevm
if isinstance(vm, qubes.vm.adminvm.AdminVM):
return '0'
ret = None
# TODO right order, depending on inheritance
if isinstance(vm, qubes.vm.templatevm.TemplateVM):
ret = 't'
if isinstance(vm, qubes.vm.appvm.AppVM):
ret = 'a'
# if isinstance(vm, qubes.vm.standalonevm.StandaloneVM):
# ret = 's'
if isinstance(vm, qubes.vm.dispvm.DispVM):
ret = 'd'
if ret is not None:
if isinstance(vm, qubes.vm.hvm.HVM):
return ret.upper()
else:
return ret
@flag(2)
def power(self, vm):
'''Current power state.
r running
t transient
p paused
s suspended
h halting
d dying
c crashed
? unknown
'''
state = vm.get_power_state().lower()
if state == 'unknown':
return '?'
elif state in ('running', 'transient', 'paused', 'suspended',
'halting', 'dying', 'crashed'):
return state[0]
updateable = simple_flag(3, 'U', 'updateable',
doc='If the domain is updateable.')
provides_network = simple_flag(4, 'N', 'provides_network',
doc='If the domain provides network.')
installed_by_rpm = simple_flag(5, 'R', 'installed_by_rpm',
doc='If the domain is installed by RPM.')
internal = simple_flag(6, 'i', 'internal',
doc='If the domain is internal (not normally shown, no appmenus).')
debug = simple_flag(7, 'D', 'debug',
doc='If the domain is being debugged.')
autostart = simple_flag(8, 'A', 'autostart',
doc='If the domain is marked for autostart.')
# TODO (not sure if really):
# include in backups
# uses_custom_config
def _no_flag(self, vm):
'''Reserved for future use.'''
@classmethod
def get_flags(cls):
'''Get all flags as list.
Holes between flags are filled with :py:meth:`_no_flag`.
:rtype: list
'''
flags = {}
for mycls in cls.__mro__:
for attr in mycls.__dict__.values():
if not hasattr(attr, 'field'):
continue
if attr.field in flags:
continue
flags[attr.field] = attr
return [(flags[i] if i in flags else cls._no_flag)
for i in range(1, max(flags) + 1)]
def format(self, vm):
return ''.join((flag(self, vm) or '-') for flag in self.get_flags())
# todo maxmem
Column('GATEWAY', width=15,
attr='netvm.gateway',
doc='Network gateway.')
Column('MEMORY', width=5,
attr=(lambda vm: vm.get_mem()/1024 if vm.is_running() else None),
doc='Memory currently used by VM')
Column('DISK', width=5,
attr=(lambda vm: vm.get_disk_utilization()/1024/1024),
doc='Total disk utilisation.')
Column('PRIV-CURR', width=5,
attr=(lambda vm: vm.get_disk_utilization_private_img()/1024/1024),
fmt='{:.0f}',
doc='Disk utilisation by private image (/home, /usr/local).')
Column('PRIV-MAX', width=5,
attr=(lambda vm: vm.get_private_img_sz()/1024/1024),
fmt='{:.0f}',
doc='Maximum available space for private image.')
Column('PRIV-USED', width=5,
attr=(lambda vm: vm.get_disk_utilization_private_img() * 100
/ vm.get_private_img_sz()),
fmt='{:.0f}',
doc='Disk utilisation by private image as a percentage of available space.')
Column('ROOT-CURR', width=5,
attr=(lambda vm: vm.get_disk_utilization_private_img()/1024/1024),
fmt='{:.0f}',
doc='Disk utilisation by root image (/usr, /lib, /etc, ...).')
Column('ROOT-MAX', width=5,
attr=(lambda vm: vm.get_private_img_sz()/1024/1024),
fmt='{:.0f}',
doc='Maximum available space for root image.')
Column('ROOT-USED', width=5,
attr=(lambda vm: vm.get_disk_utilization_private_img() * 100
/ vm.get_private_img_sz()),
fmt='{:.0f}',
doc='Disk utilisation by root image as a percentage of available space.')
StatusColumn()
class Table(object):
'''Table that is displayed to the user.
:param qubes.Qubes app: Qubes application object.
:param list colnames: Names of the columns (need not to be uppercase).
'''
def __init__(self, app, colnames):
self.app = app
self.columns = tuple(Column.columns[col.upper()] for col in colnames)
def format_head(self):
'''Format table head (all column heads).'''
return ''.join('{head:{width}s}'.format(
head=col.ls_head, width=col.ls_width)
for col in self.columns[:-1]) + \
self.columns[-1].ls_head
def format_row(self, vm):
'''Format single table row (all columns for one domain).'''
return ''.join(col.cell(vm) for col in self.columns)
def write_table(self, stream=sys.stdout):
'''Write whole table to file-like object.
:param file stream: Stream to write the table to.
'''
stream.write(self.format_head() + '\n')
for vm in self.app.domains:
stream.write(self.format_row(vm) + '\n')
#: Available formats. Feel free to plug your own one.
formats = {
'simple': ('name', 'status', 'label', 'template', 'netvm'),
'network': ('name', 'status', 'netvm', 'ip', 'ipback', 'gateway'),
'full': ('name', 'status', 'label', 'qid', 'xid', 'uuid'),
# 'perf': ('name', 'status', 'cpu', 'memory'),
'disk': ('name', 'status', 'disk',
'priv-curr', 'priv-max', 'priv-used',
'root-curr', 'root-max', 'root-used'),
}
class _HelpColumnsAction(argparse.Action):
'''Action for argument parser that displays all columns and exits.'''
# pylint: disable=redefined-builtin
def __init__(self,
option_strings,
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help='list all available columns with short descriptions and exit'):
super(_HelpColumnsAction, self).__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help)
def __call__(self, parser, namespace, values, option_string=None):
width = max(len(column.ls_head) for column in Column.columns.values())
wrapper = textwrap.TextWrapper(width=80,
initial_indent=' ', subsequent_indent=' ' * (width + 6))
text = 'Available columns:\n' + '\n'.join(
wrapper.fill('{head:{width}s} {doc}'.format(
head=column.ls_head,
doc=column.__doc__ or '',
width=width))
for column in sorted(Column.columns.values()))
parser.exit(message=text + '\n')
class _HelpFormatsAction(argparse.Action):
'''Action for argument parser that displays all formats and exits.'''
# pylint: disable=redefined-builtin
def __init__(self,
option_strings,
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help='list all available formats with their definitions and exit'):
super(_HelpFormatsAction, self).__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help)
def __call__(self, parser, namespace, values, option_string=None):
width = max(len(fmt) for fmt in formats)
text = 'Available formats:\n' + ''.join(
' {fmt:{width}s} {columns}\n'.format(
fmt=fmt, columns=','.join(formats[fmt]).upper(), width=width)
for fmt in sorted(formats))
parser.exit(message=text)
def get_parser():
'''Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-ls`.
'''
# parser creation is delayed to get all the columns that are scattered
# thorough the modules
wrapper = textwrap.TextWrapper(width=80, break_on_hyphens=False,
initial_indent=' ', subsequent_indent=' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description='List Qubes domains and their parametres.',
epilog='available formats (see --help-formats):\n{}\n\n'
'available columns (see --help-columns):\n{}'.format(
wrapper.fill(', '.join(sorted(formats.keys()))),
wrapper.fill(', '.join(sorted(sorted(Column.columns.keys()))))))
parser.add_argument('--help-columns', action=_HelpColumnsAction)
parser.add_argument('--help-formats', action=_HelpFormatsAction)
parser_formats = parser.add_mutually_exclusive_group()
parser_formats.add_argument('--format', '-o', metavar='FORMAT',
action='store', choices=formats.keys(),
help='preset format')
parser_formats.add_argument('--fields', '-O', metavar='FIELD,...',
action='store',
help='user specified format (see available columns below)')
# parser.add_argument('--conf', '-c',
# action='store', metavar='CFGFILE',
# help='Qubes config file')
parser.add_argument('--xml', metavar='XMLFILE',
action='store',
help='Qubes store file')
parser.set_defaults(
qubesxml=os.path.join(qubes.config.system_path['qubes_base_dir'],
qubes.config.system_path['qubes_store_filename']),
format='simple')
return parser
def main(args=None):
'''Main routine of :program:`qvm-ls`.
:param list args: Optional arguments to override those delivered from \
command line.
'''
parser = get_parser()
args = parser.parse_args(args)
app = qubes.Qubes(args.xml)
if args.fields:
columns = [col.strip() for col in args.fields.split(',')]
for col in columns:
if col.upper() not in Column.columns:
parser.error('no such column: {!r}'.format(col))
else:
columns = formats[args.format]
table = Table(app, columns)
table.write_table(sys.stdout)
return True
if __name__ == '__main__':
sys.exit(not main())

View File

@ -28,6 +28,8 @@ import os
import re
import subprocess
import docutils
def get_timezone():
# fc18
@ -57,3 +59,24 @@ def get_timezone():
return tz_path.replace('/usr/share/zoneinfo/', '')
return None
def format_doc(docstring):
'''Return parsed documentation string, stripping RST markup.
'''
if not docstring:
return ''
# pylint: disable=unused-variable
output, pub = docutils.core.publish_programmatically(
source_class=docutils.io.StringInput,
source=' '.join(docstring.strip().split()),
source_path=None,
destination_class=docutils.io.NullOutput, destination=None,
destination_path=None,
reader=None, reader_name='standalone',
parser=None, parser_name='restructuredtext',
writer=None, writer_name='null',
settings=None, settings_spec=None, settings_overrides=None,
config_section=None, enable_exit_status=None)
return pub.writer.document.astext()

View File

@ -44,10 +44,15 @@ import qubes
import qubes.log
import qubes.events
import qubes.plugins
import qubes.tools.qvm_ls
class BaseVMMeta(qubes.plugins.Plugin, qubes.events.EmitterMeta):
'''Metaclass for :py:class:`.BaseVM`'''
def __init__(cls, name, bases, dict_):
super(BaseVMMeta, cls).__init__(name, bases, dict_)
qubes.tools.qvm_ls.process_class(cls)
class DeviceCollection(object):

View File

@ -8,6 +8,7 @@ class AppVM(qubes.vm.qubesvm.QubesVM):
template = qubes.VMProperty('template', load_stage=4,
vmclass=qubes.vm.templatevm.TemplateVM,
ls_width=31,
doc='Template, on which this AppVM is based.')
def __init__(self, D):

View File

@ -44,6 +44,7 @@ import qubes.config
import qubes.storage
import qubes.utils
import qubes.vm
import qubes.tools.qvm_ls
qmemman_present = False
try:
@ -115,13 +116,15 @@ class QubesVM(qubes.vm.BaseVM):
label = qubes.property('label',
setter=(lambda self, prop, value: self.app.labels[
int(value.rsplit('-', 1)[1])]),
doc='Colourful label assigned to VM. This is where you set the colour '
'of the padlock.')
ls_width=14,
doc='''Colourful label assigned to VM. This is where the colour of the
padlock is set.''')
# XXX swallowed uses_default_netvm
netvm = qubes.VMProperty('netvm', load_stage=4, allow_none=True,
default=(lambda self: self.app.default_fw_netvm if self.provides_network
else self.app.default_netvm),
ls_width=31,
doc='''VM that provides network connection to this domain. When
`None`, machine is disconnected. When absent, domain uses default
NetVM.''')
@ -132,13 +135,16 @@ class QubesVM(qubes.vm.BaseVM):
qid = qubes.property('qid', type=int,
setter=_setter_qid,
ls_width=3,
doc='''Internal, persistent identificator of particular domain. Note
this is different from Xen domid.''')
name = qubes.property('name', type=str,
ls_width=31,
doc='User-specified name of the domain.')
uuid = qubes.property('uuid', type=uuid.UUID, default=None,
ls_width=36,
doc='UUID from libvirt.')
# TODO meaningful default
@ -176,6 +182,7 @@ class QubesVM(qubes.vm.BaseVM):
# XXX what is that
vcpus = qubes.property('vcpus', default=None,
ls_width=2,
doc='FIXME')
# XXX swallowed uses_default_kernel
@ -183,6 +190,7 @@ class QubesVM(qubes.vm.BaseVM):
kernel = qubes.property('kernel', type=str,
setter=_setter_kernel,
default=(lambda self: self.app.default_kernel),
ls_width=12,
doc='Kernel used by this domain.')
# XXX swallowed uses_default_kernelopts
@ -191,10 +199,12 @@ class QubesVM(qubes.vm.BaseVM):
default=(lambda self: qubes.config.defaults['kernelopts_pcidevs'] \
if len(self.devices['pci']) > 0 \
else qubes.config.defaults['kernelopts']),
ls_width=30,
doc='Kernel command line passed to domain.')
mac = qubes.property('mac', type=str,
default=(lambda self: '00:16:3E:5E:6C:{:02X}'.format(self.qid)),
ls_width=17,
doc='MAC address of the NIC emulated inside VM')
debug = qubes.property('debug', type=bool, default=False,
@ -206,6 +216,7 @@ class QubesVM(qubes.vm.BaseVM):
# only plain property?
default_user = qubes.property('default_user', type=str,
default=(lambda self: self.template.default_user),
ls_width=12,
doc='FIXME')
# @property
@ -216,6 +227,7 @@ class QubesVM(qubes.vm.BaseVM):
# return self._default_user
qrexec_timeout = qubes.property('qrexec_timeout', type=int, default=60,
ls_width=3,
doc='''Time in seconds after which qrexec connection attempt is deemed
failed. Operating system inside VM should be able to boot in this
time.''')
@ -262,6 +274,7 @@ class QubesVM(qubes.vm.BaseVM):
# VMM-related
@qubes.tools.qvm_ls.column(width=3)
@property
def xid(self):
'''Xen ID.
@ -381,6 +394,7 @@ class QubesVM(qubes.vm.BaseVM):
# network-related
@qubes.tools.qvm_ls.column(width=15)
@property
def ip(self):
'''IP address of this domain.'''
@ -389,6 +403,7 @@ class QubesVM(qubes.vm.BaseVM):
else:
return None
@qubes.tools.qvm_ls.column(width=15)
@property
def netmask(self):
'''Netmask for this domain's IP address.'''
@ -397,6 +412,7 @@ class QubesVM(qubes.vm.BaseVM):
else:
return None
@qubes.tools.qvm_ls.column(head='IPBACK', width=15)
@property
def gateway(self):
'''Gateway for other domains that use this domain as netvm.'''
@ -405,6 +421,7 @@ class QubesVM(qubes.vm.BaseVM):
# This is gateway IP for _other_ VMs, so make sense only in NetVMs
return None
@qubes.tools.qvm_ls.column(width=15)
@property
def secondary_dns(self):
'''Secondary DNS server set up for this domain.'''
@ -413,6 +430,7 @@ class QubesVM(qubes.vm.BaseVM):
else:
return None
@qubes.tools.qvm_ls.column(width=7)
@property
def vif(self):
'''Name of the network interface backend in netvm that is connected to

View File

@ -1,254 +1,7 @@
#!/usr/bin/python2
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2010 Joanna Rutkowska <joanna@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
# vim: fileencoding=utf-8
from qubes.qubes import QubesVmCollection
from qubes.qubes import QubesHost
from qubes.qubes import QubesException
from optparse import OptionParser
import sys
import qubes.tools.qvm_ls
fields = {
"qid": {"func": "vm.qid"},
"name": {"func": "('=>' if qvm_collection.get_default_template() is not None\
and vm.qid == qvm_collection.get_default_template().qid else '')\
+ ('[' if vm.is_template() else '')\
+ ('<' if vm.is_disposablevm() else '')\
+ ('{' if vm.is_netvm() else '')\
+ vm.name \
+ (']' if vm.is_template() else '')\
+ ('>' if vm.is_disposablevm() else '')\
+ ('}' if vm.is_netvm() else '')"},
"type": {"func": "'HVM' if vm.type == 'HVM' else \
('Tpl' if vm.is_template() else \
('' if vm.type in ['AppVM', 'DisposableVM'] else \
vm.type.replace('VM','')))"},
"updbl" : {"func": "'Yes' if vm.updateable else ''"},
"template": {"func": "'n/a' if vm.is_template() else\
('None' if vm.template is None else\
vm.template.name)"},
"netvm": {"func": "'n/a' if vm.is_netvm() and not vm.is_proxyvm() else\
('*' if vm.uses_default_netvm else '') +\
qvm_collection[vm.netvm.qid].name\
if vm.netvm is not None else '-'"},
"ip" : {"func": "vm.ip"},
"ip back" : {"func": "vm.gateway if vm.is_netvm() else 'n/a'"},
"gateway/DNS" : {"func": "vm.netvm.gateway if vm.netvm else 'n/a'"},
"xid" : {"func" : "vm.get_xid() if vm.is_running() else '-'"},
"mem" : {"func" : "(str(vm.get_mem()/1024) + ' MB') if vm.is_running() else '-'"},
"cpu" : {"func" : "round (cpu_usages[vm.get_xid()]['cpu_usage'], 1) if vm.is_running() else '-'"},
"disk": {"func" : "str(vm.get_disk_utilization()/(1024*1024)) + ' MB'"},
"state": {"func" : "vm.get_power_state()"},
"priv-curr": {"func" : "str(vm.get_disk_utilization_private_img()/(1024*1024)) + ' MB'"},
"priv-max": {"func" : "str(vm.get_private_img_sz()/(1024*1024)) + ' MB'"},
"priv-util": {"func" : "str(vm.get_disk_utilization_private_img()*100/vm.get_private_img_sz()) + '%' if vm.get_private_img_sz() != 0 else '-'"},
"root-curr": {"func" : "str(vm.get_disk_utilization_root_img()/(1024*1024)) + ' MB'"},
"root-max": {"func" : "str(vm.get_root_img_sz()/(1024*1024)) + ' MB'"},
"root-util": {"func" : "str(vm.get_disk_utilization_root_img()*100/vm.get_root_img_sz()) + '%' if vm.get_root_img_sz() != 0 else '-'"},
"label" : {"func" : "vm.label.name"},
"kernel" : {"func" : "('*' if vm.uses_default_kernel else '') + str(vm.kernel) if hasattr(vm, 'kernel') else 'n/a'"},
"kernelopts" : {"func" : "('*' if vm.uses_default_kernelopts else '') + str(vm.kernelopts) if hasattr(vm, 'kernelopts') else 'n/a'"},
"on" : {"func" : "'*' if vm.is_running() else ''"},
"last backup" : {"func": "str(vm.backup_timestamp.date()) if "
"vm.backup_timestamp else '-'"},
}
def main():
usage = "usage: %prog [options] <vm-name>"
parser = OptionParser (usage)
parser.add_option ("-n", "--network", dest="network",
action="store_true", default=False,
help="Show network addresses assigned to VMs")
parser.add_option ("-c", "--cpu", dest="cpu",
action="store_true", default=False,
help="Show CPU load")
parser.add_option ("-m", "--mem", dest="mem",
action="store_true", default=False,
help="Show memory usage")
parser.add_option ("-d", "--disk", dest="disk",
action="store_true", default=False,
help="Show VM disk utilization statistics")
parser.add_option ("-k", "--kernel", dest="kernel",
action="store_true", default=False,
help="Show VM kernel options")
parser.add_option ("-i", "--ids", dest="ids",
action="store_true", default=False,
help="Show Qubes and Xen id#s")
parser.add_option("-b", "--last-backup", dest="backup",
action="store_true", default=False,
help="Show date of last VM backup")
parser.add_option("--raw-list", dest="raw_list",
action="store_true", default=False,
help="List only VM names one per line")
(options, args) = parser.parse_args ()
qvm_collection = QubesVmCollection()
qvm_collection.lock_db_for_reading()
qvm_collection.load()
qvm_collection.unlock_db()
if options.raw_list:
for vm in qvm_collection.values():
print vm.name
return
fields_to_display = ["name", "on", "state", "updbl", "type", "template", "netvm", "label" ]
cpu_usages = None
if (options.ids):
fields_to_display += ["qid", "xid"]
if (options.cpu):
qhost = QubesHost()
(measure_time, cpu_usages) = qhost.measure_cpu_usage()
fields_to_display += ["cpu"]
if (options.mem):
fields_to_display += ["mem"]
if options.backup:
fields_to_display += ["last backup"]
if (options.network):
if 'template' in fields_to_display:
fields_to_display.remove ("template")
fields_to_display += ["ip", "ip back", "gateway/DNS"]
if (options.disk):
if 'template' in fields_to_display:
fields_to_display.remove ("template")
if 'netvm' in fields_to_display:
fields_to_display.remove ("netvm")
fields_to_display += ["priv-curr", "priv-max", "root-curr", "root-max", "disk" ]
if (options.kernel):
fields_to_display += ["kernel", "kernelopts" ]
vms_list = [vm for vm in qvm_collection.values()]
if len(args) > 0:
vms_list = [vm for vm in vms_list if vm.name in args]
no_vms = len (vms_list)
vms_to_display = []
# Frist, the NetVMs...
for netvm in vms_list:
if netvm.is_netvm():
vms_to_display.append (netvm)
# Now, the AppVMs without template (or with template not included in the list)...
for appvm in vms_list:
if appvm.is_appvm() and not appvm.is_template() and \
(appvm.template is None or appvm.template not in vms_list):
vms_to_display.append (appvm)
# Now, the template, and all its AppVMs...
for tvm in vms_list:
if tvm.is_template():
vms_to_display.append (tvm)
for vm in vms_list:
if (vm.is_appvm() or vm.is_disposablevm()) and \
vm.template and vm.template.qid == tvm.qid:
vms_to_display.append(vm)
assert len(vms_to_display) == no_vms
# First calculate the maximum width of each field we want to display
# also collect data to display
for f in fields_to_display:
fields[f]["max_width"] = len(f)
data_to_display = []
for vm in vms_to_display:
data_row = {}
for f in fields_to_display:
if vm.qid == 0 and (f.startswith('priv-') or f.startswith('root-') or f == 'disk'):
data_row[f] = 'n/a'
else:
data_row[f] = str(eval(fields[f]["func"]))
l = len(data_row[f])
if l > fields[f]["max_width"]:
fields[f]["max_width"] = l
data_to_display.append(data_row)
try:
vm.verify_files()
except QubesException as err:
print >> sys.stderr, "WARNING: VM '{0}' has corrupted files!".format(vm.name)
# XXX: For what?
total_width = 0;
for f in fields_to_display:
total_width += fields[f]["max_width"]
# Display the header
s = ""
for f in fields_to_display:
fmt="{{0:-^{0}}}-+".format(fields[f]["max_width"] + 1)
s += fmt.format('-')
print s
s = ""
for f in fields_to_display:
fmt="{{0:>{0}}} |".format(fields[f]["max_width"] + 1)
s += fmt.format(f)
print s
s = ""
for f in fields_to_display:
fmt="{{0:-^{0}}}-+".format(fields[f]["max_width"] + 1)
s += fmt.format('-')
print s
# ... and the actual data
for row in data_to_display:
s = ""
for f in fields_to_display:
fmt="{{0:>{0}}} |".format(fields[f]["max_width"] + 1)
s += fmt.format(row[f])
print s
main()
sys.exit(not qubes.tools.qvm_ls.main())

View File

@ -214,6 +214,10 @@ fi
%{python_sitearch}/qubes/storage/__init__.py*
%{python_sitearch}/qubes/storage/xen.py*
%dir %{python_sitearch}/qubes/tools
%{python_sitearch}/qubes/tools/__init__.py*
%{python_sitearch}/qubes/tools/qvm_ls.py*
%dir %{python_sitearch}/qubes/ext
%{python_sitearch}/qubes/ext/__init__.py*
@ -230,6 +234,10 @@ fi
%{python_sitearch}/qubes/tests/vm/adminvm.py*
%{python_sitearch}/qubes/tests/vm/qubesvm.py*
%dir %{python_sitearch}/qubes/tests/tools
%{python_sitearch}/qubes/tests/tools/__init__.py*
%{python_sitearch}/qubes/tests/tools/qvm_ls.py*
# qmemman
%{python_sitearch}/qubes/qmemman.py*
%{python_sitearch}/qubes/qmemman_algo.py*