Merge branch 'stats'

* stats:
  tests: admin.vm.Stats
  api: fix handling interrupted calls
  api/admin: implement admin.vm.Stats
  app: refresh getting VM statistics, rename to QubesHost.get_vm_stats
  api: use str(subject) instead of explicit subject.name
This commit is contained in:
Marek Marczykowski-Górecki 2017-07-29 05:05:32 +02:00
commit c8c32d4956
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
7 changed files with 383 additions and 35 deletions

View File

@ -90,6 +90,7 @@ ADMIN_API_METHODS_SIMPLE = \
admin.vm.volume.ListSnapshots \
admin.vm.volume.Resize \
admin.vm.volume.Revert \
admin.vm.Stats \
$(null)
ifeq ($(OS),Linux)

View File

@ -312,7 +312,8 @@ class QubesDaemonProtocol(asyncio.Protocol):
# this is reached if from except: blocks; do not put it in finally:,
# because this will prevent the good case from sending the reply
self.transport.abort()
if self.transport:
self.transport.abort()
def send_header(self, *args):
self.transport.write(self.header.pack(*args))
@ -328,7 +329,7 @@ class QubesDaemonProtocol(asyncio.Protocol):
self.send_header(0x31)
if subject is not self.app:
self.transport.write(subject.name.encode('ascii'))
self.transport.write(str(subject).encode('ascii'))
self.transport.write(b'\0')
self.transport.write(event.encode('ascii') + b'\0')

View File

@ -1269,3 +1269,77 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
backup = yield from self._load_backup_profile(self.arg,
skip_passphrase=True)
return backup.get_backup_summary()
def _send_stats_single(self, info_time, info, only_vm, filters,
id_to_name_map):
'''A single iteration of sending VM stats
:param info_time: time of previous iteration
:param info: information retrieved in previous iteration
:param only_vm: send information only about this VM
:param filters: filters to apply on stats before sending
:param id_to_name_map: ID->VM name map, may be modified
:return: tuple(info_time, info) - new information (to be passed to
the next iteration)
'''
(info_time, info) = self.app.host.get_vm_stats(info_time, info,
only_vm=only_vm)
for vm_id, vm_info in info.items():
if vm_id not in id_to_name_map:
try:
name = \
self.app.vmm.libvirt_conn.lookupByID(vm_id).name()
except libvirt.libvirtError as err:
if err.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
# stubdomain or so
name = None
else:
raise
id_to_name_map[vm_id] = name
else:
name = id_to_name_map[vm_id]
# skip VMs with unknown name
if name is None:
continue
if not list(qubes.api.apply_filters([name], filters)):
continue
self.send_event(name, 'vm-stats',
memory_kb=int(vm_info['memory_kb']),
cpu_time=int(vm_info['cpu_time'] / 1000000),
cpu_usage=int(vm_info['cpu_usage']))
return info_time, info
@qubes.api.method('admin.vm.Stats', no_payload=True,
scope='global', read=True)
@asyncio.coroutine
def vm_stats(self):
assert not self.arg
# run until client connection is terminated
self.cancellable = True
# cache event filters, to not call an event each time an event arrives
stats_filters = self.fire_event_for_permission()
only_vm = None
if self.dest.name != 'dom0':
only_vm = self.dest
self.send_event(self.app, 'connection-established')
info_time = None
info = None
id_to_name_map = {0: 'dom0'}
try:
while True:
info_time, info = self._send_stats_single(info_time, info,
only_vm, stats_filters, id_to_name_map)
yield from asyncio.sleep(self.app.stats_interval)
except asyncio.CancelledError:
# valid method to terminate this loop
pass

View File

@ -174,7 +174,7 @@ class VMMConnection(object):
if 'xen.lowlevel.xs' in sys.modules:
self._xs = xen.lowlevel.xs.xs()
if 'xen.lowlevel.cs' in sys.modules:
if 'xen.lowlevel.xc' in sys.modules:
self._xc = xen.lowlevel.xc.xc()
self._libvirt_conn = VirConnectWrapper(
qubes.config.defaults['libvirt_uri'])
@ -216,7 +216,7 @@ class VMMConnection(object):
'xc object is available under Xen hypervisor only')
self.init_vmm_connection()
return self._xs
return self._xc
def register_event_handlers(self, app):
'''Register libvirt event handlers, which will translate libvirt
@ -316,55 +316,70 @@ class QubesHost(object):
return int(self._physinfo['free_memory'])
def measure_cpu_usage(self, previous_time=None, previous=None,
wait_time=1):
def get_vm_stats(self, previous_time=None, previous=None, only_vm=None):
'''Measure cpu usage for all domains at once.
If previous measurements are given, CPU usage will be given in
percents of time. Otherwise only absolute value (seconds).
Return a tuple of (measurements_time, measurements),
where measurements is a dictionary with key: domid, value: dict:
- cpu_time - absolute CPU usage (seconds since its startup)
- cpu_usage - CPU usage in %
- memory_kb - current memory assigned, in kb
This function requires Xen hypervisor.
.. versionchanged:: 3.0
argument order to match return tuple
..warning:
This function may return info about implementation-specific VMs,
like stubdomains for HVM
:param previous: previous measurement
:param previous_time: time of previous measurement
:param only_vm: get measurements only for this VM
:raises NotImplementedError: when not under Xen
'''
if previous is None:
previous_time = time.time()
previous = {}
try:
info = self.app.vmm.xc.domain_getinfo(0, qubes.config.max_qid)
except AttributeError:
raise NotImplementedError(
'This function requires Xen hypervisor')
if (previous_time is None) != (previous is None):
raise ValueError(
'previous and previous_time must be given together (or none)')
for vm in info:
previous[vm['domid']] = {}
previous[vm['domid']]['cpu_time'] = (
vm['cpu_time'] / max(vm['online_vcpus'], 1))
previous[vm['domid']]['cpu_usage'] = 0
time.sleep(wait_time)
if previous is None:
previous = {}
current_time = time.time()
current = {}
try:
info = self.app.vmm.xc.domain_getinfo(0, qubes.config.max_qid)
if only_vm:
xid = only_vm.xid
if xid < 0:
raise qubes.exc.QubesVMNotRunningError(only_vm)
info = self.app.vmm.xc.domain_getinfo(xid, 1)
if info[0]['domid'] != xid:
raise qubes.exc.QubesVMNotRunningError(only_vm)
else:
info = self.app.vmm.xc.domain_getinfo(0, 1024)
except AttributeError:
raise NotImplementedError(
'This function requires Xen hypervisor')
# TODO: add stubdomain stats to actual VMs
for vm in info:
current[vm['domid']] = {}
current[vm['domid']]['cpu_time'] = (
domid = vm['domid']
current[domid] = {}
current[domid]['memory_kb'] = vm['mem_kb']
current[domid]['cpu_time'] = int(
vm['cpu_time'] / max(vm['online_vcpus'], 1))
if vm['domid'] in previous.keys():
current[vm['domid']]['cpu_usage'] = (
float(current[vm['domid']]['cpu_time'] -
previous[vm['domid']]['cpu_time']) /
1000 ** 3 / (current_time - previous_time) * 100)
if current[vm['domid']]['cpu_usage'] < 0:
if domid in previous:
current[domid]['cpu_usage'] = int(
(current[domid]['cpu_time'] - previous[domid]['cpu_time'])
/ 1000 ** 3 * 100 / (current_time - previous_time))
if current[domid]['cpu_usage'] < 0:
# VM has been rebooted
current[vm['domid']]['cpu_usage'] = 0
current[domid]['cpu_usage'] = 0
else:
current[vm['domid']]['cpu_usage'] = 0
current[domid]['cpu_usage'] = 0
return (current_time, current)
@ -698,6 +713,11 @@ class Qubes(qubes.PropertyHolder):
setter=_setter_pool,
doc='Default storage pool for kernel volumes')
stats_interval = qubes.property('stats_interval',
default=3,
type=int,
doc='Interval in seconds for VM stats reporting (memory, CPU usage)')
# TODO #1637 #892
check_updates_vm = qubes.property('check_updates_vm',
type=bool, setter=qubes.property.bool,

View File

@ -77,7 +77,11 @@ class TestMgmt(object):
class Subject:
name = 'subject'
self.send_event(Subject, 'event', payload=untrusted_payload.decode())
def __str__(self):
return 'subject'
self.send_event(Subject(), 'event', payload=untrusted_payload.decode())
try:
# give some time to close the other end
yield from asyncio.sleep(0.1)

View File

@ -28,6 +28,7 @@ import tempfile
import unittest.mock
import libvirt
import copy
import qubes
import qubes.devices
@ -1965,6 +1966,139 @@ class TC_00_VMs(AdminAPITestCase):
self.vm.run_service_for_stdio.assert_called_with(
'qubes.BackupPassphrase+testprofile')
def test_630_vm_stats(self):
send_event = unittest.mock.Mock(spec=[])
stats1 = {
0: {
'cpu_time': 243951379111104 // 8,
'cpu_usage': 0,
'memory_kb': 3733212,
},
1: {
'cpu_time': 2849496569205,
'cpu_usage': 0,
'memory_kb': 303916,
},
}
stats2 = copy.deepcopy(stats1)
stats2[0]['cpu_time'] += 100000000
stats2[0]['cpu_usage'] = 10
stats2[1]['cpu_usage'] = 5
self.app.host.get_vm_stats = unittest.mock.Mock()
self.app.host.get_vm_stats.side_effect = [
(0, stats1), (1, stats2),
]
self.app.stats_interval = 1
mgmt_obj = qubes.api.admin.QubesAdminAPI(
self.app, b'dom0', b'admin.vm.Stats',
b'dom0', b'', send_event=send_event)
def cancel_call():
mgmt_obj.cancel()
class MockVM(object):
def __init__(self, name):
self._name = name
def name(self):
return self._name
loop = asyncio.get_event_loop()
self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: {
0: MockVM('Domain-0'),
1: MockVM('test-template'),
2: MockVM('test-vm1')}[xid]
execute_task = asyncio.ensure_future(
mgmt_obj.execute(untrusted_payload=b''))
loop.call_later(1.1, cancel_call)
loop.run_until_complete(execute_task)
self.assertIsNone(execute_task.result())
self.assertEventFired(self.emitter,
'admin-permission:' + 'admin.vm.Stats')
self.assertEqual(self.app.host.get_vm_stats.mock_calls, [
unittest.mock.call(None, None, only_vm=None),
unittest.mock.call(0, stats1, only_vm=None),
])
self.assertEqual(send_event.mock_calls, [
unittest.mock.call(self.app, 'connection-established'),
unittest.mock.call('dom0', 'vm-stats',
cpu_time=stats1[0]['cpu_time'] // 1000000,
cpu_usage=stats1[0]['cpu_usage'],
memory_kb=stats1[0]['memory_kb']),
unittest.mock.call('test-template', 'vm-stats',
cpu_time=stats1[1]['cpu_time'] // 1000000,
cpu_usage=stats1[1]['cpu_usage'],
memory_kb=stats1[1]['memory_kb']),
unittest.mock.call('dom0', 'vm-stats',
cpu_time=stats2[0]['cpu_time'] // 1000000,
cpu_usage=stats2[0]['cpu_usage'],
memory_kb=stats2[0]['memory_kb']),
unittest.mock.call('test-template', 'vm-stats',
cpu_time=stats2[1]['cpu_time'] // 1000000,
cpu_usage=stats2[1]['cpu_usage'],
memory_kb=stats2[1]['memory_kb']),
])
def test_631_vm_stats_single_vm(self):
send_event = unittest.mock.Mock(spec=[])
stats1 = {
2: {
'cpu_time': 2849496569205,
'cpu_usage': 0,
'memory_kb': 303916,
},
}
stats2 = copy.deepcopy(stats1)
stats2[2]['cpu_usage'] = 5
self.app.host.get_vm_stats = unittest.mock.Mock()
self.app.host.get_vm_stats.side_effect = [
(0, stats1), (1, stats2),
]
self.app.stats_interval = 1
mgmt_obj = qubes.api.admin.QubesAdminAPI(
self.app, b'dom0', b'admin.vm.Stats',
b'test-vm1', b'', send_event=send_event)
def cancel_call():
mgmt_obj.cancel()
class MockVM(object):
def __init__(self, name):
self._name = name
def name(self):
return self._name
loop = asyncio.get_event_loop()
self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: {
0: MockVM('Domain-0'),
1: MockVM('test-template'),
2: MockVM('test-vm1')}[xid]
execute_task = asyncio.ensure_future(
mgmt_obj.execute(untrusted_payload=b''))
loop.call_later(1.1, cancel_call)
loop.run_until_complete(execute_task)
self.assertIsNone(execute_task.result())
self.assertEventFired(self.emitter,
'admin-permission:' + 'admin.vm.Stats')
self.assertEqual(self.app.host.get_vm_stats.mock_calls, [
unittest.mock.call(None, None, only_vm=self.vm),
unittest.mock.call(0, stats1, only_vm=self.vm),
])
self.assertEqual(send_event.mock_calls, [
unittest.mock.call(self.app, 'connection-established'),
unittest.mock.call('test-vm1', 'vm-stats',
cpu_time=stats1[2]['cpu_time'] // 1000000,
cpu_usage=stats1[2]['cpu_usage'],
memory_kb=stats1[2]['memory_kb']),
unittest.mock.call('test-vm1', 'vm-stats',
cpu_time=stats2[2]['cpu_time'] // 1000000,
cpu_usage=stats2[2]['cpu_usage'],
memory_kb=stats2[2]['memory_kb']),
])
def test_990_vm_unexpected_payload(self):
methods_with_no_payload = [
b'admin.vm.List',

View File

@ -22,7 +22,7 @@
#
import os
import uuid
import unittest.mock as mock
import lxml.etree
@ -35,6 +35,120 @@ import qubes.tests.init
class TestApp(qubes.tests.TestEmitter):
pass
class TC_20_QubesHost(qubes.tests.QubesTestCase):
sample_xc_domain_getinfo = [
{'paused': 0, 'cpu_time': 243951379111104, 'ssidref': 0,
'hvm': 0, 'shutdown_reason': 255, 'dying': 0,
'mem_kb': 3733212, 'domid': 0, 'max_vcpu_id': 7,
'crashed': 0, 'running': 1, 'maxmem_kb': 3734236,
'shutdown': 0, 'online_vcpus': 8,
'handle': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
'cpupool': 0, 'blocked': 0},
{'paused': 0, 'cpu_time': 2849496569205, 'ssidref': 0,
'hvm': 0, 'shutdown_reason': 255, 'dying': 0,
'mem_kb': 303916, 'domid': 1, 'max_vcpu_id': 0,
'crashed': 0, 'running': 0, 'maxmem_kb': 308224,
'shutdown': 0, 'online_vcpus': 1,
'handle': [116, 174, 229, 207, 17, 1, 79, 39, 191, 37, 41,
186, 205, 158, 219, 8],
'cpupool': 0, 'blocked': 1},
{'paused': 0, 'cpu_time': 249658663079978, 'ssidref': 0,
'hvm': 0, 'shutdown_reason': 255, 'dying': 0,
'mem_kb': 3782668, 'domid': 11, 'max_vcpu_id': 7,
'crashed': 0, 'running': 0, 'maxmem_kb': 3783692,
'shutdown': 0, 'online_vcpus': 8,
'handle': [169, 95, 55, 127, 140, 94, 79, 220, 186, 210,
117, 5, 148, 11, 185, 206],
'cpupool': 0, 'blocked': 1}]
def setUp(self):
super(TC_20_QubesHost, self).setUp()
self.app = TestApp()
self.app.vmm = mock.Mock()
self.qubes_host = qubes.app.QubesHost(self.app)
def test_000_get_vm_stats_single(self):
self.app.vmm.configure_mock(**{
'xc.domain_getinfo.return_value': self.sample_xc_domain_getinfo
})
info_time, info = self.qubes_host.get_vm_stats()
self.assertEqual(self.app.vmm.mock_calls, [
('xc.domain_getinfo', (0, 1024), {}),
])
self.assertIsNotNone(info_time)
expected_info = {
0: {
'cpu_time': 243951379111104//8,
'cpu_usage': 0,
'memory_kb': 3733212,
},
1: {
'cpu_time': 2849496569205,
'cpu_usage': 0,
'memory_kb': 303916,
},
11: {
'cpu_time': 249658663079978//8,
'cpu_usage': 0,
'memory_kb': 3782668,
},
}
self.assertEqual(info, expected_info)
def test_001_get_vm_stats_twice(self):
self.app.vmm.configure_mock(**{
'xc.domain_getinfo.return_value': self.sample_xc_domain_getinfo
})
prev_time, prev_info = self.qubes_host.get_vm_stats()
prev_time -= 1
prev_info[0]['cpu_time'] -= 10**8
prev_info[1]['cpu_time'] -= 10**9
prev_info[11]['cpu_time'] -= 125 * 10**6
info_time, info = self.qubes_host.get_vm_stats(prev_time, prev_info)
self.assertIsNotNone(info_time)
expected_info = {
0: {
'cpu_time': 243951379111104//8,
'cpu_usage': 9,
'memory_kb': 3733212,
},
1: {
'cpu_time': 2849496569205,
'cpu_usage': 99,
'memory_kb': 303916,
},
11: {
'cpu_time': 249658663079978//8,
'cpu_usage': 12,
'memory_kb': 3782668,
},
}
self.assertEqual(info, expected_info)
self.assertEqual(self.app.vmm.mock_calls, [
('xc.domain_getinfo', (0, 1024), {}),
('xc.domain_getinfo', (0, 1024), {}),
])
def test_002_get_vm_stats_one_vm(self):
self.app.vmm.configure_mock(**{
'xc.domain_getinfo.return_value': [self.sample_xc_domain_getinfo[1]]
})
vm = mock.Mock
vm.xid = 1
vm.name = 'somevm'
info_time, info = self.qubes_host.get_vm_stats(only_vm=vm)
self.assertIsNotNone(info_time)
self.assertEqual(self.app.vmm.mock_calls, [
('xc.domain_getinfo', (1, 1), {}),
])
class TC_30_VMCollection(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()