diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 41acc3eb..17dd1327 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -27,6 +27,7 @@ import shutil import unittest.mock import libvirt +import copy import qubes import qubes.devices @@ -1820,6 +1821,139 @@ class TC_00_VMs(AdminAPITestCase): self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) + def test_600_vm_stats(self): + send_event = unittest.mock.Mock(spec=[]) + + stats1 = { + 0: { + 'cpu_time': 243951379111104 // 8, + 'cpu_usage': 0, + 'memory_kb': 3733212, + }, + 1: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + } + stats2 = copy.deepcopy(stats1) + stats2[0]['cpu_time'] += 100000000 + stats2[0]['cpu_usage'] = 10 + stats2[1]['cpu_usage'] = 5 + self.app.host.get_vm_stats = unittest.mock.Mock() + self.app.host.get_vm_stats.side_effect = [ + (0, stats1), (1, stats2), + ] + self.app.stats_interval = 1 + mgmt_obj = qubes.api.admin.QubesAdminAPI( + self.app, b'dom0', b'admin.vm.Stats', + b'dom0', b'', send_event=send_event) + + def cancel_call(): + mgmt_obj.cancel() + + class MockVM(object): + def __init__(self, name): + self._name = name + + def name(self): + return self._name + + loop = asyncio.get_event_loop() + self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: { + 0: MockVM('Domain-0'), + 1: MockVM('test-template'), + 2: MockVM('test-vm1')}[xid] + execute_task = asyncio.ensure_future( + mgmt_obj.execute(untrusted_payload=b'')) + loop.call_later(1.1, cancel_call) + loop.run_until_complete(execute_task) + self.assertIsNone(execute_task.result()) + self.assertEventFired(self.emitter, + 'admin-permission:' + 'admin.vm.Stats') + self.assertEqual(self.app.host.get_vm_stats.mock_calls, [ + unittest.mock.call(None, None, only_vm=None), + unittest.mock.call(0, stats1, only_vm=None), + ]) + self.assertEqual(send_event.mock_calls, [ + unittest.mock.call(self.app, 'connection-established'), + unittest.mock.call('dom0', 'vm-stats', + cpu_time=stats1[0]['cpu_time'] // 1000000, + cpu_usage=stats1[0]['cpu_usage'], + memory_kb=stats1[0]['memory_kb']), + unittest.mock.call('test-template', 'vm-stats', + cpu_time=stats1[1]['cpu_time'] // 1000000, + cpu_usage=stats1[1]['cpu_usage'], + memory_kb=stats1[1]['memory_kb']), + unittest.mock.call('dom0', 'vm-stats', + cpu_time=stats2[0]['cpu_time'] // 1000000, + cpu_usage=stats2[0]['cpu_usage'], + memory_kb=stats2[0]['memory_kb']), + unittest.mock.call('test-template', 'vm-stats', + cpu_time=stats2[1]['cpu_time'] // 1000000, + cpu_usage=stats2[1]['cpu_usage'], + memory_kb=stats2[1]['memory_kb']), + ]) + + def test_601_vm_stats_single_vm(self): + send_event = unittest.mock.Mock(spec=[]) + + stats1 = { + 2: { + 'cpu_time': 2849496569205, + 'cpu_usage': 0, + 'memory_kb': 303916, + }, + } + stats2 = copy.deepcopy(stats1) + stats2[2]['cpu_usage'] = 5 + self.app.host.get_vm_stats = unittest.mock.Mock() + self.app.host.get_vm_stats.side_effect = [ + (0, stats1), (1, stats2), + ] + self.app.stats_interval = 1 + mgmt_obj = qubes.api.admin.QubesAdminAPI( + self.app, b'dom0', b'admin.vm.Stats', + b'test-vm1', b'', send_event=send_event) + + def cancel_call(): + mgmt_obj.cancel() + + class MockVM(object): + def __init__(self, name): + self._name = name + + def name(self): + return self._name + + loop = asyncio.get_event_loop() + self.app.vmm.libvirt_conn.lookupByID.side_effect = lambda xid: { + 0: MockVM('Domain-0'), + 1: MockVM('test-template'), + 2: MockVM('test-vm1')}[xid] + execute_task = asyncio.ensure_future( + mgmt_obj.execute(untrusted_payload=b'')) + loop.call_later(1.1, cancel_call) + loop.run_until_complete(execute_task) + self.assertIsNone(execute_task.result()) + self.assertEventFired(self.emitter, + 'admin-permission:' + 'admin.vm.Stats') + self.assertEqual(self.app.host.get_vm_stats.mock_calls, [ + unittest.mock.call(None, None, only_vm=self.vm), + unittest.mock.call(0, stats1, only_vm=self.vm), + ]) + self.assertEqual(send_event.mock_calls, [ + unittest.mock.call(self.app, 'connection-established'), + unittest.mock.call('test-vm1', 'vm-stats', + cpu_time=stats1[2]['cpu_time'] // 1000000, + cpu_usage=stats1[2]['cpu_usage'], + memory_kb=stats1[2]['memory_kb']), + unittest.mock.call('test-vm1', 'vm-stats', + cpu_time=stats2[2]['cpu_time'] // 1000000, + cpu_usage=stats2[2]['cpu_usage'], + memory_kb=stats2[2]['memory_kb']), + ]) + def test_990_vm_unexpected_payload(self): methods_with_no_payload = [