diff --git a/Makefile b/Makefile index f58172b2..308ceb2a 100644 --- a/Makefile +++ b/Makefile @@ -90,6 +90,7 @@ ADMIN_API_METHODS_SIMPLE = \ admin.vm.volume.ListSnapshots \ admin.vm.volume.Resize \ admin.vm.volume.Revert \ + admin.vm.Stats \ $(null) ifeq ($(OS),Linux) diff --git a/qubes/api/__init__.py b/qubes/api/__init__.py index caebfa8e..5e3df07c 100644 --- a/qubes/api/__init__.py +++ b/qubes/api/__init__.py @@ -312,7 +312,8 @@ class QubesDaemonProtocol(asyncio.Protocol): # this is reached if from except: blocks; do not put it in finally:, # because this will prevent the good case from sending the reply - self.transport.abort() + if self.transport: + self.transport.abort() def send_header(self, *args): self.transport.write(self.header.pack(*args)) @@ -328,7 +329,7 @@ class QubesDaemonProtocol(asyncio.Protocol): self.send_header(0x31) if subject is not self.app: - self.transport.write(subject.name.encode('ascii')) + self.transport.write(str(subject).encode('ascii')) self.transport.write(b'\0') self.transport.write(event.encode('ascii') + b'\0') diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 0fe31b62..0aaaf657 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1269,3 +1269,77 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): backup = yield from self._load_backup_profile(self.arg, skip_passphrase=True) return backup.get_backup_summary() + + def _send_stats_single(self, info_time, info, only_vm, filters, + id_to_name_map): + '''A single iteration of sending VM stats + + :param info_time: time of previous iteration + :param info: information retrieved in previous iteration + :param only_vm: send information only about this VM + :param filters: filters to apply on stats before sending + :param id_to_name_map: ID->VM name map, may be modified + :return: tuple(info_time, info) - new information (to be passed to + the next iteration) + ''' + + (info_time, info) = self.app.host.get_vm_stats(info_time, info, + only_vm=only_vm) + for vm_id, vm_info in info.items(): + if vm_id not in id_to_name_map: + try: + name = \ + self.app.vmm.libvirt_conn.lookupByID(vm_id).name() + except libvirt.libvirtError as err: + if err.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: + # stubdomain or so + name = None + else: + raise + id_to_name_map[vm_id] = name + else: + name = id_to_name_map[vm_id] + + # skip VMs with unknown name + if name is None: + continue + + if not list(qubes.api.apply_filters([name], filters)): + continue + + self.send_event(name, 'vm-stats', + memory_kb=int(vm_info['memory_kb']), + cpu_time=int(vm_info['cpu_time'] / 1000000), + cpu_usage=int(vm_info['cpu_usage'])) + + return info_time, info + + @qubes.api.method('admin.vm.Stats', no_payload=True, + scope='global', read=True) + @asyncio.coroutine + def vm_stats(self): + assert not self.arg + + # run until client connection is terminated + self.cancellable = True + + # cache event filters, to not call an event each time an event arrives + stats_filters = self.fire_event_for_permission() + + only_vm = None + if self.dest.name != 'dom0': + only_vm = self.dest + + self.send_event(self.app, 'connection-established') + + info_time = None + info = None + id_to_name_map = {0: 'dom0'} + try: + while True: + info_time, info = self._send_stats_single(info_time, info, + only_vm, stats_filters, id_to_name_map) + yield from asyncio.sleep(self.app.stats_interval) + except asyncio.CancelledError: + # valid method to terminate this loop + pass diff --git a/qubes/app.py b/qubes/app.py index f81861c7..8d4bdeb8 100644 --- a/qubes/app.py +++ b/qubes/app.py @@ -174,7 +174,7 @@ class VMMConnection(object): if 'xen.lowlevel.xs' in sys.modules: self._xs = xen.lowlevel.xs.xs() - if 'xen.lowlevel.cs' in sys.modules: + if 'xen.lowlevel.xc' in sys.modules: self._xc = xen.lowlevel.xc.xc() self._libvirt_conn = VirConnectWrapper( qubes.config.defaults['libvirt_uri']) @@ -216,7 +216,7 @@ class VMMConnection(object): 'xc object is available under Xen hypervisor only') self.init_vmm_connection() - return self._xs + return self._xc def register_event_handlers(self, app): '''Register libvirt event handlers, which will translate libvirt @@ -316,55 +316,70 @@ class QubesHost(object): return int(self._physinfo['free_memory']) - def measure_cpu_usage(self, previous_time=None, previous=None, - wait_time=1): + def get_vm_stats(self, previous_time=None, previous=None, only_vm=None): '''Measure cpu usage for all domains at once. + If previous measurements are given, CPU usage will be given in + percents of time. Otherwise only absolute value (seconds). + + Return a tuple of (measurements_time, measurements), + where measurements is a dictionary with key: domid, value: dict: + - cpu_time - absolute CPU usage (seconds since its startup) + - cpu_usage - CPU usage in % + - memory_kb - current memory assigned, in kb + This function requires Xen hypervisor. - .. versionchanged:: 3.0 - argument order to match return tuple + ..warning: + + This function may return info about implementation-specific VMs, + like stubdomains for HVM + + :param previous: previous measurement + :param previous_time: time of previous measurement + :param only_vm: get measurements only for this VM :raises NotImplementedError: when not under Xen ''' - if previous is None: - previous_time = time.time() - previous = {} - try: - info = self.app.vmm.xc.domain_getinfo(0, qubes.config.max_qid) - except AttributeError: - raise NotImplementedError( - 'This function requires Xen hypervisor') + if (previous_time is None) != (previous is None): + raise ValueError( + 'previous and previous_time must be given together (or none)') - for vm in info: - previous[vm['domid']] = {} - previous[vm['domid']]['cpu_time'] = ( - vm['cpu_time'] / max(vm['online_vcpus'], 1)) - previous[vm['domid']]['cpu_usage'] = 0 - time.sleep(wait_time) + if previous is None: + previous = {} current_time = time.time() current = {} try: - info = self.app.vmm.xc.domain_getinfo(0, qubes.config.max_qid) + if only_vm: + xid = only_vm.xid + if xid < 0: + raise qubes.exc.QubesVMNotRunningError(only_vm) + info = self.app.vmm.xc.domain_getinfo(xid, 1) + if info[0]['domid'] != xid: + raise qubes.exc.QubesVMNotRunningError(only_vm) + else: + info = self.app.vmm.xc.domain_getinfo(0, 1024) except AttributeError: raise NotImplementedError( 'This function requires Xen hypervisor') + # TODO: add stubdomain stats to actual VMs for vm in info: - current[vm['domid']] = {} - current[vm['domid']]['cpu_time'] = ( + domid = vm['domid'] + current[domid] = {} + current[domid]['memory_kb'] = vm['mem_kb'] + current[domid]['cpu_time'] = int( vm['cpu_time'] / max(vm['online_vcpus'], 1)) - if vm['domid'] in previous.keys(): - current[vm['domid']]['cpu_usage'] = ( - float(current[vm['domid']]['cpu_time'] - - previous[vm['domid']]['cpu_time']) / - 1000 ** 3 / (current_time - previous_time) * 100) - if current[vm['domid']]['cpu_usage'] < 0: + if domid in previous: + current[domid]['cpu_usage'] = int( + (current[domid]['cpu_time'] - previous[domid]['cpu_time']) + / 1000 ** 3 * 100 / (current_time - previous_time)) + if current[domid]['cpu_usage'] < 0: # VM has been rebooted - current[vm['domid']]['cpu_usage'] = 0 + current[domid]['cpu_usage'] = 0 else: - current[vm['domid']]['cpu_usage'] = 0 + current[domid]['cpu_usage'] = 0 return (current_time, current) @@ -698,6 +713,11 @@ class Qubes(qubes.PropertyHolder): setter=_setter_pool, doc='Default storage pool for kernel volumes') + stats_interval = qubes.property('stats_interval', + default=3, + type=int, + doc='Interval in seconds for VM stats reporting (memory, CPU usage)') + # TODO #1637 #892 check_updates_vm = qubes.property('check_updates_vm', type=bool, setter=qubes.property.bool, diff --git a/qubes/tests/api.py b/qubes/tests/api.py index b24ab80e..ba74a208 100644 --- a/qubes/tests/api.py +++ b/qubes/tests/api.py @@ -77,7 +77,11 @@ class TestMgmt(object): class Subject: name = 'subject' - self.send_event(Subject, 'event', payload=untrusted_payload.decode()) + + def __str__(self): + return 'subject' + + self.send_event(Subject(), 'event', payload=untrusted_payload.decode()) try: # give some time to close the other end yield from asyncio.sleep(0.1) diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 74205d69..a8873623 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -28,6 +28,7 @@ import tempfile import unittest.mock import libvirt +import copy import qubes import qubes.devices @@ -1965,6 +1966,139 @@ class TC_00_VMs(AdminAPITestCase): self.vm.run_service_for_stdio.assert_called_with( 'qubes.BackupPassphrase+testprofile') + def test_630_vm_stats(self): + send_event = unittest.mock.Mock(spec=[]) + + stats1 = { + 0: { + 'cpu_time': 243951379111104 // 8, + 'cpu_usage': 0, + 'memory_kb': 3733212, + }, + 1: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + } + stats2 = copy.deepcopy(stats1) + stats2[0]['cpu_time'] += 100000000 + stats2[0]['cpu_usage'] = 10 + stats2[1]['cpu_usage'] = 5 + self.app.host.get_vm_stats = unittest.mock.Mock() + self.app.host.get_vm_stats.side_effect = [ + (0, stats1), (1, stats2), + ] + self.app.stats_interval = 1 + mgmt_obj = qubes.api.admin.QubesAdminAPI( + self.app, b'dom0', b'admin.vm.Stats', + b'dom0', b'', send_event=send_event) + + def cancel_call(): + mgmt_obj.cancel() + + class MockVM(object): + def __init__(self, name): + self._name = name + + def name(self): + return self._name + + loop = asyncio.get_event_loop() + self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: { + 0: MockVM('Domain-0'), + 1: MockVM('test-template'), + 2: MockVM('test-vm1')}[xid] + execute_task = asyncio.ensure_future( + mgmt_obj.execute(untrusted_payload=b'')) + loop.call_later(1.1, cancel_call) + loop.run_until_complete(execute_task) + self.assertIsNone(execute_task.result()) + self.assertEventFired(self.emitter, + 'admin-permission:' + 'admin.vm.Stats') + self.assertEqual(self.app.host.get_vm_stats.mock_calls, [ + unittest.mock.call(None, None, only_vm=None), + unittest.mock.call(0, stats1, only_vm=None), + ]) + self.assertEqual(send_event.mock_calls, [ + unittest.mock.call(self.app, 'connection-established'), + unittest.mock.call('dom0', 'vm-stats', + cpu_time=stats1[0]['cpu_time'] // 1000000, + cpu_usage=stats1[0]['cpu_usage'], + memory_kb=stats1[0]['memory_kb']), + unittest.mock.call('test-template', 'vm-stats', + cpu_time=stats1[1]['cpu_time'] // 1000000, + cpu_usage=stats1[1]['cpu_usage'], + memory_kb=stats1[1]['memory_kb']), + unittest.mock.call('dom0', 'vm-stats', + cpu_time=stats2[0]['cpu_time'] // 1000000, + cpu_usage=stats2[0]['cpu_usage'], + memory_kb=stats2[0]['memory_kb']), + unittest.mock.call('test-template', 'vm-stats', + cpu_time=stats2[1]['cpu_time'] // 1000000, + cpu_usage=stats2[1]['cpu_usage'], + memory_kb=stats2[1]['memory_kb']), + ]) + + def test_631_vm_stats_single_vm(self): + send_event = unittest.mock.Mock(spec=[]) + + stats1 = { + 2: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + } + stats2 = copy.deepcopy(stats1) + stats2[2]['cpu_usage'] = 5 + self.app.host.get_vm_stats = unittest.mock.Mock() + self.app.host.get_vm_stats.side_effect = [ + (0, stats1), (1, stats2), + ] + self.app.stats_interval = 1 + mgmt_obj = qubes.api.admin.QubesAdminAPI( + self.app, b'dom0', b'admin.vm.Stats', + b'test-vm1', b'', send_event=send_event) + + def cancel_call(): + mgmt_obj.cancel() + + class MockVM(object): + def __init__(self, name): + self._name = name + + def name(self): + return self._name + + loop = asyncio.get_event_loop() + self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: { + 0: MockVM('Domain-0'), + 1: MockVM('test-template'), + 2: MockVM('test-vm1')}[xid] + execute_task = asyncio.ensure_future( + mgmt_obj.execute(untrusted_payload=b'')) + loop.call_later(1.1, cancel_call) + loop.run_until_complete(execute_task) + self.assertIsNone(execute_task.result()) + self.assertEventFired(self.emitter, + 'admin-permission:' + 'admin.vm.Stats') + self.assertEqual(self.app.host.get_vm_stats.mock_calls, [ + unittest.mock.call(None, None, only_vm=self.vm), + unittest.mock.call(0, stats1, only_vm=self.vm), + ]) + self.assertEqual(send_event.mock_calls, [ + unittest.mock.call(self.app, 'connection-established'), + unittest.mock.call('test-vm1', 'vm-stats', + cpu_time=stats1[2]['cpu_time'] // 1000000, + cpu_usage=stats1[2]['cpu_usage'], + memory_kb=stats1[2]['memory_kb']), + unittest.mock.call('test-vm1', 'vm-stats', + cpu_time=stats2[2]['cpu_time'] // 1000000, + cpu_usage=stats2[2]['cpu_usage'], + memory_kb=stats2[2]['memory_kb']), + ]) + def test_990_vm_unexpected_payload(self): methods_with_no_payload = [ b'admin.vm.List', diff --git a/qubes/tests/app.py b/qubes/tests/app.py index f9c6987f..e0fa065f 100644 --- a/qubes/tests/app.py +++ b/qubes/tests/app.py @@ -22,7 +22,7 @@ # import os -import uuid +import unittest.mock as mock import lxml.etree @@ -35,6 +35,120 @@ import qubes.tests.init class TestApp(qubes.tests.TestEmitter): pass + +class TC_20_QubesHost(qubes.tests.QubesTestCase): + sample_xc_domain_getinfo = [ + {'paused': 0, 'cpu_time': 243951379111104, 'ssidref': 0, + 'hvm': 0, 'shutdown_reason': 255, 'dying': 0, + 'mem_kb': 3733212, 'domid': 0, 'max_vcpu_id': 7, + 'crashed': 0, 'running': 1, 'maxmem_kb': 3734236, + 'shutdown': 0, 'online_vcpus': 8, + 'handle': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + 'cpupool': 0, 'blocked': 0}, + {'paused': 0, 'cpu_time': 2849496569205, 'ssidref': 0, + 'hvm': 0, 'shutdown_reason': 255, 'dying': 0, + 'mem_kb': 303916, 'domid': 1, 'max_vcpu_id': 0, + 'crashed': 0, 'running': 0, 'maxmem_kb': 308224, + 'shutdown': 0, 'online_vcpus': 1, + 'handle': [116, 174, 229, 207, 17, 1, 79, 39, 191, 37, 41, + 186, 205, 158, 219, 8], + 'cpupool': 0, 'blocked': 1}, + {'paused': 0, 'cpu_time': 249658663079978, 'ssidref': 0, + 'hvm': 0, 'shutdown_reason': 255, 'dying': 0, + 'mem_kb': 3782668, 'domid': 11, 'max_vcpu_id': 7, + 'crashed': 0, 'running': 0, 'maxmem_kb': 3783692, + 'shutdown': 0, 'online_vcpus': 8, + 'handle': [169, 95, 55, 127, 140, 94, 79, 220, 186, 210, + 117, 5, 148, 11, 185, 206], + 'cpupool': 0, 'blocked': 1}] + + def setUp(self): + super(TC_20_QubesHost, self).setUp() + self.app = TestApp() + self.app.vmm = mock.Mock() + self.qubes_host = qubes.app.QubesHost(self.app) + + def test_000_get_vm_stats_single(self): + self.app.vmm.configure_mock(**{ + 'xc.domain_getinfo.return_value': self.sample_xc_domain_getinfo + }) + + info_time, info = self.qubes_host.get_vm_stats() + self.assertEqual(self.app.vmm.mock_calls, [ + ('xc.domain_getinfo', (0, 1024), {}), + ]) + self.assertIsNotNone(info_time) + expected_info = { + 0: { + 'cpu_time': 243951379111104//8, + 'cpu_usage': 0, + 'memory_kb': 3733212, + }, + 1: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + 11: { + 'cpu_time': 249658663079978//8, + 'cpu_usage': 0, + 'memory_kb': 3782668, + }, + } + self.assertEqual(info, expected_info) + + def test_001_get_vm_stats_twice(self): + self.app.vmm.configure_mock(**{ + 'xc.domain_getinfo.return_value': self.sample_xc_domain_getinfo + }) + + prev_time, prev_info = self.qubes_host.get_vm_stats() + prev_time -= 1 + prev_info[0]['cpu_time'] -= 10**8 + prev_info[1]['cpu_time'] -= 10**9 + prev_info[11]['cpu_time'] -= 125 * 10**6 + info_time, info = self.qubes_host.get_vm_stats(prev_time, prev_info) + self.assertIsNotNone(info_time) + expected_info = { + 0: { + 'cpu_time': 243951379111104//8, + 'cpu_usage': 9, + 'memory_kb': 3733212, + }, + 1: { + 'cpu_time': 2849496569205, + 'cpu_usage': 99, + 'memory_kb': 303916, + }, + 11: { + 'cpu_time': 249658663079978//8, + 'cpu_usage': 12, + 'memory_kb': 3782668, + }, + } + self.assertEqual(info, expected_info) + self.assertEqual(self.app.vmm.mock_calls, [ + ('xc.domain_getinfo', (0, 1024), {}), + ('xc.domain_getinfo', (0, 1024), {}), + ]) + + def test_002_get_vm_stats_one_vm(self): + self.app.vmm.configure_mock(**{ + 'xc.domain_getinfo.return_value': [self.sample_xc_domain_getinfo[1]] + }) + + vm = mock.Mock + vm.xid = 1 + vm.name = 'somevm' + + info_time, info = self.qubes_host.get_vm_stats(only_vm=vm) + self.assertIsNotNone(info_time) + self.assertEqual(self.app.vmm.mock_calls, [ + ('xc.domain_getinfo', (1, 1), {}), + ]) + + + class TC_30_VMCollection(qubes.tests.QubesTestCase): def setUp(self): super().setUp()