From 0d9574d9fc274303b814db22dcad793ec95e6b5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Thu, 27 Jul 2017 22:15:02 +0200 Subject: [PATCH 1/5] api: use str(subject) instead of explicit subject.name This allows better flexibility, when subject is not necessary a VM object. --- qubes/api/__init__.py | 2 +- qubes/tests/api.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/qubes/api/__init__.py b/qubes/api/__init__.py index 38532913..f35974ec 100644 --- a/qubes/api/__init__.py +++ b/qubes/api/__init__.py @@ -323,7 +323,7 @@ class QubesDaemonProtocol(asyncio.Protocol): self.send_header(0x31) if subject is not self.app: - self.transport.write(subject.name.encode('ascii')) + self.transport.write(str(subject).encode('ascii')) self.transport.write(b'\0') self.transport.write(event.encode('ascii') + b'\0') diff --git a/qubes/tests/api.py b/qubes/tests/api.py index b24ab80e..ba74a208 100644 --- a/qubes/tests/api.py +++ b/qubes/tests/api.py @@ -77,7 +77,11 @@ class TestMgmt(object): class Subject: name = 'subject' - self.send_event(Subject, 'event', payload=untrusted_payload.decode()) + + def __str__(self): + return 'subject' + + self.send_event(Subject(), 'event', payload=untrusted_payload.decode()) try: # give some time to close the other end yield from asyncio.sleep(0.1) From e8b875f552339d03a1e54b28c7217668b1d0c466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Thu, 27 Jul 2017 22:16:03 +0200 Subject: [PATCH 2/5] app: refresh getting VM statistics, rename to QubesHost.get_vm_stats Get a VM statistics once. If previous measurements are provided, calculate difference too. This is backend part of upcoming admin.vm.Stats service. QubesOS/qubes-issues#853 --- qubes/app.py | 77 ++++++++++++++++++------------ qubes/tests/app.py | 116 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 161 insertions(+), 32 deletions(-) diff --git a/qubes/app.py b/qubes/app.py index fec7788f..61926a04 100644 --- a/qubes/app.py +++ b/qubes/app.py @@ -172,7 +172,7 @@ class VMMConnection(object): if 'xen.lowlevel.xs' in sys.modules: self._xs = xen.lowlevel.xs.xs() - if 'xen.lowlevel.cs' in sys.modules: + if 'xen.lowlevel.xc' in sys.modules: self._xc = xen.lowlevel.xc.xc() self._libvirt_conn = VirConnectWrapper( qubes.config.defaults['libvirt_uri']) @@ -214,7 +214,7 @@ class VMMConnection(object): 'xc object is available under Xen hypervisor only') self.init_vmm_connection() - return self._xs + return self._xc def register_event_handlers(self, app): '''Register libvirt event handlers, which will translate libvirt @@ -314,55 +314,70 @@ class QubesHost(object): return int(self._physinfo['free_memory']) - def measure_cpu_usage(self, previous_time=None, previous=None, - wait_time=1): + def get_vm_stats(self, previous_time=None, previous=None, only_vm=None): '''Measure cpu usage for all domains at once. + If previous measurements are given, CPU usage will be given in + percents of time. Otherwise only absolute value (seconds). + + Return a tuple of (measurements_time, measurements), + where measurements is a dictionary with key: domid, value: dict: + - cpu_time - absolute CPU usage (seconds since its startup) + - cpu_usage - CPU usage in % + - memory_kb - current memory assigned, in kb + This function requires Xen hypervisor. - .. versionchanged:: 3.0 - argument order to match return tuple + ..warning: + + This function may return info about implementation-specific VMs, + like stubdomains for HVM + + :param previous: previous measurement + :param previous_time: time of previous measurement + :param only_vm: get measurements only for this VM :raises NotImplementedError: when not under Xen ''' - if previous is None: - previous_time = time.time() - previous = {} - try: - info = self.app.vmm.xc.domain_getinfo(0, qubes.config.max_qid) - except AttributeError: - raise NotImplementedError( - 'This function requires Xen hypervisor') + if (previous_time is None) != (previous is None): + raise ValueError( + 'previous and previous_time must be given together (or none)') - for vm in info: - previous[vm['domid']] = {} - previous[vm['domid']]['cpu_time'] = ( - vm['cpu_time'] / max(vm['online_vcpus'], 1)) - previous[vm['domid']]['cpu_usage'] = 0 - time.sleep(wait_time) + if previous is None: + previous = {} current_time = time.time() current = {} try: - info = self.app.vmm.xc.domain_getinfo(0, qubes.config.max_qid) + if only_vm: + xid = only_vm.xid + if xid < 0: + raise qubes.exc.QubesVMNotRunningError(only_vm) + info = self.app.vmm.xc.domain_getinfo(xid, 1) + if info[0]['domid'] != xid: + raise qubes.exc.QubesVMNotRunningError(only_vm) + else: + info = self.app.vmm.xc.domain_getinfo(0, 1024) except AttributeError: raise NotImplementedError( 'This function requires Xen hypervisor') + # TODO: add stubdomain stats to actual VMs for vm in info: - current[vm['domid']] = {} - current[vm['domid']]['cpu_time'] = ( + domid = vm['domid'] + current[domid] = {} + current[domid]['memory_kb'] = vm['mem_kb'] + current[domid]['cpu_time'] = int( vm['cpu_time'] / max(vm['online_vcpus'], 1)) - if vm['domid'] in previous.keys(): - current[vm['domid']]['cpu_usage'] = ( - float(current[vm['domid']]['cpu_time'] - - previous[vm['domid']]['cpu_time']) / - 1000 ** 3 / (current_time - previous_time) * 100) - if current[vm['domid']]['cpu_usage'] < 0: + if domid in previous: + current[domid]['cpu_usage'] = int( + (current[domid]['cpu_time'] - previous[domid]['cpu_time']) + / 1000 ** 3 * 100 / (current_time - previous_time)) + if current[domid]['cpu_usage'] < 0: # VM has been rebooted - current[vm['domid']]['cpu_usage'] = 0 + current[domid]['cpu_usage'] = 0 else: - current[vm['domid']]['cpu_usage'] = 0 + current[domid]['cpu_usage'] = 0 return (current_time, current) diff --git a/qubes/tests/app.py b/qubes/tests/app.py index f9c6987f..e0fa065f 100644 --- a/qubes/tests/app.py +++ b/qubes/tests/app.py @@ -22,7 +22,7 @@ # import os -import uuid +import unittest.mock as mock import lxml.etree @@ -35,6 +35,120 @@ import qubes.tests.init class TestApp(qubes.tests.TestEmitter): pass + +class TC_20_QubesHost(qubes.tests.QubesTestCase): + sample_xc_domain_getinfo = [ + {'paused': 0, 'cpu_time': 243951379111104, 'ssidref': 0, + 'hvm': 0, 'shutdown_reason': 255, 'dying': 0, + 'mem_kb': 3733212, 'domid': 0, 'max_vcpu_id': 7, + 'crashed': 0, 'running': 1, 'maxmem_kb': 3734236, + 'shutdown': 0, 'online_vcpus': 8, + 'handle': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + 'cpupool': 0, 'blocked': 0}, + {'paused': 0, 'cpu_time': 2849496569205, 'ssidref': 0, + 'hvm': 0, 'shutdown_reason': 255, 'dying': 0, + 'mem_kb': 303916, 'domid': 1, 'max_vcpu_id': 0, + 'crashed': 0, 'running': 0, 'maxmem_kb': 308224, + 'shutdown': 0, 'online_vcpus': 1, + 'handle': [116, 174, 229, 207, 17, 1, 79, 39, 191, 37, 41, + 186, 205, 158, 219, 8], + 'cpupool': 0, 'blocked': 1}, + {'paused': 0, 'cpu_time': 249658663079978, 'ssidref': 0, + 'hvm': 0, 'shutdown_reason': 255, 'dying': 0, + 'mem_kb': 3782668, 'domid': 11, 'max_vcpu_id': 7, + 'crashed': 0, 'running': 0, 'maxmem_kb': 3783692, + 'shutdown': 0, 'online_vcpus': 8, + 'handle': [169, 95, 55, 127, 140, 94, 79, 220, 186, 210, + 117, 5, 148, 11, 185, 206], + 'cpupool': 0, 'blocked': 1}] + + def setUp(self): + super(TC_20_QubesHost, self).setUp() + self.app = TestApp() + self.app.vmm = mock.Mock() + self.qubes_host = qubes.app.QubesHost(self.app) + + def test_000_get_vm_stats_single(self): + self.app.vmm.configure_mock(**{ + 'xc.domain_getinfo.return_value': self.sample_xc_domain_getinfo + }) + + info_time, info = self.qubes_host.get_vm_stats() + self.assertEqual(self.app.vmm.mock_calls, [ + ('xc.domain_getinfo', (0, 1024), {}), + ]) + self.assertIsNotNone(info_time) + expected_info = { + 0: { + 'cpu_time': 243951379111104//8, + 'cpu_usage': 0, + 'memory_kb': 3733212, + }, + 1: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + 11: { + 'cpu_time': 249658663079978//8, + 'cpu_usage': 0, + 'memory_kb': 3782668, + }, + } + self.assertEqual(info, expected_info) + + def test_001_get_vm_stats_twice(self): + self.app.vmm.configure_mock(**{ + 'xc.domain_getinfo.return_value': self.sample_xc_domain_getinfo + }) + + prev_time, prev_info = self.qubes_host.get_vm_stats() + prev_time -= 1 + prev_info[0]['cpu_time'] -= 10**8 + prev_info[1]['cpu_time'] -= 10**9 + prev_info[11]['cpu_time'] -= 125 * 10**6 + info_time, info = self.qubes_host.get_vm_stats(prev_time, prev_info) + self.assertIsNotNone(info_time) + expected_info = { + 0: { + 'cpu_time': 243951379111104//8, + 'cpu_usage': 9, + 'memory_kb': 3733212, + }, + 1: { + 'cpu_time': 2849496569205, + 'cpu_usage': 99, + 'memory_kb': 303916, + }, + 11: { + 'cpu_time': 249658663079978//8, + 'cpu_usage': 12, + 'memory_kb': 3782668, + }, + } + self.assertEqual(info, expected_info) + self.assertEqual(self.app.vmm.mock_calls, [ + ('xc.domain_getinfo', (0, 1024), {}), + ('xc.domain_getinfo', (0, 1024), {}), + ]) + + def test_002_get_vm_stats_one_vm(self): + self.app.vmm.configure_mock(**{ + 'xc.domain_getinfo.return_value': [self.sample_xc_domain_getinfo[1]] + }) + + vm = mock.Mock + vm.xid = 1 + vm.name = 'somevm' + + info_time, info = self.qubes_host.get_vm_stats(only_vm=vm) + self.assertIsNotNone(info_time) + self.assertEqual(self.app.vmm.mock_calls, [ + ('xc.domain_getinfo', (1, 1), {}), + ]) + + + class TC_30_VMCollection(qubes.tests.QubesTestCase): def setUp(self): super().setUp() From 147bca16484a74f9eac408a966b1b35cc7907905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Thu, 27 Jul 2017 22:20:12 +0200 Subject: [PATCH 3/5] api/admin: implement admin.vm.Stats QubesOS/qubes-issues#853 --- Makefile | 1 + qubes/api/admin.py | 74 ++++++++++++++++++++++++++++++++++++++++++++++ qubes/app.py | 5 ++++ 3 files changed, 80 insertions(+) diff --git a/Makefile b/Makefile index 24ee7b1e..a8ba83bd 100644 --- a/Makefile +++ b/Makefile @@ -90,6 +90,7 @@ ADMIN_API_METHODS_SIMPLE = \ admin.vm.volume.ListSnapshots \ admin.vm.volume.Resize \ admin.vm.volume.Revert \ + admin.vm.Stats \ $(null) ifeq ($(OS),Linux) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 1f2659c4..4372d418 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1093,3 +1093,77 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): self.fire_event_for_permission() self.dest.fire_event('firewall-changed') + + def _send_stats_single(self, info_time, info, only_vm, filters, + id_to_name_map): + '''A single iteration of sending VM stats + + :param info_time: time of previous iteration + :param info: information retrieved in previous iteration + :param only_vm: send information only about this VM + :param filters: filters to apply on stats before sending + :param id_to_name_map: ID->VM name map, may be modified + :return: tuple(info_time, info) - new information (to be passed to + the next iteration) + ''' + + (info_time, info) = self.app.host.get_vm_stats(info_time, info, + only_vm=only_vm) + for vm_id, vm_info in info.items(): + if vm_id not in id_to_name_map: + try: + name = \ + self.app.vmm.libvirt_conn.lookupByID(vm_id).name() + except libvirt.libvirtError as err: + if err.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: + # stubdomain or so + name = None + else: + raise + id_to_name_map[vm_id] = name + else: + name = id_to_name_map[vm_id] + + # skip VMs with unknown name + if name is None: + continue + + if not list(qubes.api.apply_filters([name], filters)): + continue + + self.send_event(name, 'vm-stats', + memory_kb=int(vm_info['memory_kb']), + cpu_time=int(vm_info['cpu_time'] / 1000000), + cpu_usage=int(vm_info['cpu_usage'])) + + return info_time, info + + @qubes.api.method('admin.vm.Stats', no_payload=True, + scope='global', read=True) + @asyncio.coroutine + def vm_stats(self): + assert not self.arg + + # run until client connection is terminated + self.cancellable = True + + # cache event filters, to not call an event each time an event arrives + stats_filters = self.fire_event_for_permission() + + only_vm = None + if self.dest.name != 'dom0': + only_vm = self.dest + + self.send_event(self.app, 'connection-established') + + info_time = None + info = None + id_to_name_map = {0: 'dom0'} + try: + while True: + info_time, info = self._send_stats_single(info_time, info, + only_vm, stats_filters, id_to_name_map) + yield from asyncio.sleep(self.app.stats_interval) + except asyncio.CancelledError: + # valid method to terminate this loop + pass diff --git a/qubes/app.py b/qubes/app.py index 61926a04..793903c8 100644 --- a/qubes/app.py +++ b/qubes/app.py @@ -706,6 +706,11 @@ class Qubes(qubes.PropertyHolder): default=lambda app: app.default_pool, doc='Default storage pool for kernel volumes') + stats_interval = qubes.property('stats_interval', + default=3, + type=int, + doc='Interval in seconds for VM stats reporting (memory, CPU usage)') + # TODO #1637 #892 check_updates_vm = qubes.property('check_updates_vm', type=bool, setter=qubes.property.bool, From 2f4b4d97e7e44cdf90b0824e72d4d758434a5dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Fri, 28 Jul 2017 01:23:23 +0200 Subject: [PATCH 4/5] api: fix handling interrupted calls When an API call is interrupted, the relevant coroutine is cancelled - which means it may throw CancelledError. At the same time, cancelled call have related socket already closed (and transport set to None). But QubesDaemonProtocol.respond try to close the transport again, which fails. Fix handling this case. --- qubes/api/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/qubes/api/__init__.py b/qubes/api/__init__.py index f35974ec..4998b031 100644 --- a/qubes/api/__init__.py +++ b/qubes/api/__init__.py @@ -307,7 +307,8 @@ class QubesDaemonProtocol(asyncio.Protocol): # this is reached if from except: blocks; do not put it in finally:, # because this will prevent the good case from sending the reply - self.transport.abort() + if self.transport: + self.transport.abort() def send_header(self, *args): self.transport.write(self.header.pack(*args)) From 9bbefa0edbb86745633422410be333363bdecdf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Fri, 28 Jul 2017 02:47:47 +0200 Subject: [PATCH 5/5] tests: admin.vm.Stats QubesOS/qubes-issues#853 --- qubes/tests/api_admin.py | 134 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 41acc3eb..17dd1327 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -27,6 +27,7 @@ import shutil import unittest.mock import libvirt +import copy import qubes import qubes.devices @@ -1820,6 +1821,139 @@ class TC_00_VMs(AdminAPITestCase): self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) + def test_600_vm_stats(self): + send_event = unittest.mock.Mock(spec=[]) + + stats1 = { + 0: { + 'cpu_time': 243951379111104 // 8, + 'cpu_usage': 0, + 'memory_kb': 3733212, + }, + 1: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + } + stats2 = copy.deepcopy(stats1) + stats2[0]['cpu_time'] += 100000000 + stats2[0]['cpu_usage'] = 10 + stats2[1]['cpu_usage'] = 5 + self.app.host.get_vm_stats = unittest.mock.Mock() + self.app.host.get_vm_stats.side_effect = [ + (0, stats1), (1, stats2), + ] + self.app.stats_interval = 1 + mgmt_obj = qubes.api.admin.QubesAdminAPI( + self.app, b'dom0', b'admin.vm.Stats', + b'dom0', b'', send_event=send_event) + + def cancel_call(): + mgmt_obj.cancel() + + class MockVM(object): + def __init__(self, name): + self._name = name + + def name(self): + return self._name + + loop = asyncio.get_event_loop() + self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: { + 0: MockVM('Domain-0'), + 1: MockVM('test-template'), + 2: MockVM('test-vm1')}[xid] + execute_task = asyncio.ensure_future( + mgmt_obj.execute(untrusted_payload=b'')) + loop.call_later(1.1, cancel_call) + loop.run_until_complete(execute_task) + self.assertIsNone(execute_task.result()) + self.assertEventFired(self.emitter, + 'admin-permission:' + 'admin.vm.Stats') + self.assertEqual(self.app.host.get_vm_stats.mock_calls, [ + unittest.mock.call(None, None, only_vm=None), + unittest.mock.call(0, stats1, only_vm=None), + ]) + self.assertEqual(send_event.mock_calls, [ + unittest.mock.call(self.app, 'connection-established'), + unittest.mock.call('dom0', 'vm-stats', + cpu_time=stats1[0]['cpu_time'] // 1000000, + cpu_usage=stats1[0]['cpu_usage'], + memory_kb=stats1[0]['memory_kb']), + unittest.mock.call('test-template', 'vm-stats', + cpu_time=stats1[1]['cpu_time'] // 1000000, + cpu_usage=stats1[1]['cpu_usage'], + memory_kb=stats1[1]['memory_kb']), + unittest.mock.call('dom0', 'vm-stats', + cpu_time=stats2[0]['cpu_time'] // 1000000, + cpu_usage=stats2[0]['cpu_usage'], + memory_kb=stats2[0]['memory_kb']), + unittest.mock.call('test-template', 'vm-stats', + cpu_time=stats2[1]['cpu_time'] // 1000000, + cpu_usage=stats2[1]['cpu_usage'], + memory_kb=stats2[1]['memory_kb']), + ]) + + def test_601_vm_stats_single_vm(self): + send_event = unittest.mock.Mock(spec=[]) + + stats1 = { + 2: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + } + stats2 = copy.deepcopy(stats1) + stats2[2]['cpu_usage'] = 5 + self.app.host.get_vm_stats = unittest.mock.Mock() + self.app.host.get_vm_stats.side_effect = [ + (0, stats1), (1, stats2), + ] + self.app.stats_interval = 1 + mgmt_obj = qubes.api.admin.QubesAdminAPI( + self.app, b'dom0', b'admin.vm.Stats', + b'test-vm1', b'', send_event=send_event) + + def cancel_call(): + mgmt_obj.cancel() + + class MockVM(object): + def __init__(self, name): + self._name = name + + def name(self): + return self._name + + loop = asyncio.get_event_loop() + self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: { + 0: MockVM('Domain-0'), + 1: MockVM('test-template'), + 2: MockVM('test-vm1')}[xid] + execute_task = asyncio.ensure_future( + mgmt_obj.execute(untrusted_payload=b'')) + loop.call_later(1.1, cancel_call) + loop.run_until_complete(execute_task) + self.assertIsNone(execute_task.result()) + self.assertEventFired(self.emitter, + 'admin-permission:' + 'admin.vm.Stats') + self.assertEqual(self.app.host.get_vm_stats.mock_calls, [ + unittest.mock.call(None, None, only_vm=self.vm), + unittest.mock.call(0, stats1, only_vm=self.vm), + ]) + self.assertEqual(send_event.mock_calls, [ + unittest.mock.call(self.app, 'connection-established'), + unittest.mock.call('test-vm1', 'vm-stats', + cpu_time=stats1[2]['cpu_time'] // 1000000, + cpu_usage=stats1[2]['cpu_usage'], + memory_kb=stats1[2]['memory_kb']), + unittest.mock.call('test-vm1', 'vm-stats', + cpu_time=stats2[2]['cpu_time'] // 1000000, + cpu_usage=stats2[2]['cpu_usage'], + memory_kb=stats2[2]['memory_kb']), + ]) + def test_990_vm_unexpected_payload(self): methods_with_no_payload = [