core3 move: AdminVM class

This commit is contained in:
Wojtek Porczyk 2015-01-15 16:05:42 +01:00
parent 8afba4c5e9
commit 8805db5e5f
6 changed files with 223 additions and 112 deletions

View File

@ -1,107 +0,0 @@
#!/usr/bin/python2
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2010 Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2013 Marek Marczykowski <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
from qubes.qubes import QubesNetVm,register_qubes_vm_class
from qubes.qubes import defaults
from qubes.qubes import QubesException,dry_run,vmm
import psutil
class QubesAdminVm(QubesNetVm):
# In which order load this VM type from qubes.xml
load_order = 10
def get_attrs_config(self):
attrs = super(QubesAdminVm, self).get_attrs_config()
attrs.pop('kernel')
attrs.pop('kernels_dir')
attrs.pop('kernelopts')
attrs.pop('uses_default_kernel')
attrs.pop('uses_default_kernelopts')
return attrs
def __init__(self, **kwargs):
super(QubesAdminVm, self).__init__(qid=0, name="dom0", netid=0,
dir_path=None,
private_img = None,
template = None,
maxmem = 0,
vcpus = 0,
label = defaults["template_label"],
**kwargs)
@property
def xid(self):
return 0
@property
def libvirt_domain(self):
raise ValueError("Dom0 do not have libvirt object")
@property
def type(self):
return "AdminVM"
def is_running(self):
return True
def get_power_state(self):
return "Running"
def get_mem(self):
return psutil.virtual_memory().total/1024
def get_mem_static_max(self):
return vmm.libvirt_conn.getInfo()[1]
def get_cputime(self):
# TODO: measure it somehow
return 0
def get_disk_usage(self, file_or_dir):
return 0
def get_disk_utilization(self):
return 0
def get_disk_utilization_private_img(self):
return 0
def get_private_img_sz(self):
return 0
@property
def ip(self):
return "10.137.0.2"
def start(self, **kwargs):
raise QubesException ("Cannot start Dom0 fake domain!")
def suspend(self):
return
def verify_files(self):
return True
register_qubes_vm_class(QubesAdminVm)

View File

@ -61,5 +61,6 @@ endif
cp \
tests/vm/__init__.py* \
tests/vm/init.py* \
tests/vm/adminvm.py* \
tests/vm/qubesvm.py* \
$(DESTDIR)$(PYTHON_QUBESPATH)/tests/vm

View File

@ -9,6 +9,7 @@ test_order = [
'qubes.tests.events',
'qubes.tests.vm.init',
'qubes.tests.vm.qubesvm',
'qubes.tests.vm.adminvm',
'qubes.tests.init'
]

82
qubes/tests/vm/adminvm.py Normal file
View File

@ -0,0 +1,82 @@
#!/usr/bin/python2 -O
import sys
import unittest
import qubes
import qubes.vm.adminvm
import qubes.tests
class TestVMM(object):
def __init__(self, offline_mode=False):
self.offline_mode = offline_mode
class TestHost(object):
def __init__(self, offline_mode=False):
self.memory_total = 1000
# this probably can be shared and not as dummy as is
class TestApp(qubes.tests.TestEmitter):
def __init__(self):
self.vmm = TestVMM()
self.host = TestHost()
@qubes.tests.skipUnlessDom0
class TC_00_AdminVM(qubes.tests.QubesTestCase):
def setUp(self):
try:
self.app = TestApp()
self.vm = qubes.vm.adminvm.AdminVM(self.app,
xml=None, qid=0, name='dom0')
except:
if self.id().endswith('.test_000_init'):
raise
self.skipTest('setup failed')
def test_000_init(self):
pass
def test_100_xid(self):
self.assertEqual(self.vm.xid, 0)
def test_101_libvirt_domain(self):
self.assertIs(self.vm.libvirt_domain, None)
def test_200_libvirt_netvm(self):
self.assertIs(self.vm.netvm, None)
def test_300_is_running(self):
self.assertTrue(self.vm.is_running())
def test_301_get_power_state(self):
self.assertEqual(self.vm.get_power_state(), 'Running')
def test_302_get_mem(self):
self.assertGreater(self.vm.get_mem(), 0)
@unittest.skip('mock object does not support this')
def test_303_get_mem_static_max(self):
self.assertGreater(self.vm.get_mem_static_max(), 0)
def test_304_get_disk_utilization(self):
self.assertEqual(self.vm.get_disk_utilization(), 0)
def test_305_get_disk_utilization_private_img(self):
self.assertEqual(self.vm.get_disk_utilization_private_img(), 0)
def test_306_get_private_img_sz(self):
self.assertEqual(self.vm.get_private_img_sz(), 0)
def test_307_verify_files(self):
self.assertEqual(self.vm.get_private_img_sz(), 0)
def test_310_start(self):
with self.assertRaises(qubes.QubesException):
self.vm.start()
@unittest.skip('this functionality is undecided')
def test_311_suspend(self):
with self.assertRaises(qubes.QubesException):
self.vm.suspend()

View File

@ -1,8 +1,142 @@
#!/usr/bin/python2 -O
import qubes.vm.netvm
import qubes
import qubes.vm.qubesvm
class AdminVM(qubes.vm.netvm.NetVM):
class AdminVM(qubes.vm.qubesvm.QubesVM):
'''Dom0'''
def __init__(self, D):
super(AdminVM, self).__init__(D)
netvm = qubes.property('netvm', setter=qubes.property.forbidden,
default=None,
doc='Dom0 cannot have netvm')
@property
def xid(self):
'''Always ``0``.
.. seealso:
:py:attr:`qubes.vm.qubesvm.QubesVM.xid`
'''
return 0
@property
def libvirt_domain(self):
'''Always :py:obj:`None`.
.. seealso:
:py:attr:`qubes.vm.qubesvm.QubesVM.libvirt_domain`
'''
return None
# XXX probably unneeded, will return None as we don't have netvm
# @property
# def ip(self):
# return "10.137.0.2"
def is_running(self):
'''Always :py:obj:`True`.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.is_running`
'''
return True
def get_power_state(self):
'''Always ``'Running'``.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.get_power_state`
'''
return 'Running'
def get_mem(self):
'''Get current memory usage of Dom0.
Unit is KiB.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.get_mem`
'''
#return psutil.virtual_memory().total/1024
for line in open('/proc/meminfo'):
if line.startswith('MemTotal:'):
return int(line.split(':')[1].strip().split()[0])
raise NotImplementedError()
def get_mem_static_max(self):
'''Get maximum memory available to Dom0.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.get_mem_static_max`
'''
return self.app.vmm.libvirt_conn.getInfo()[1]
def get_disk_utilization(self):
'''Always ``0``.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.get_disk_utilization`
'''
return 0
def get_disk_utilization_private_img(self):
'''Always ``0``.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.get_disk_utilization_private_img`
'''
return 0
def get_private_img_sz(self):
'''Always ``0``.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.get_private_img_sz`
'''
return 0
def verify_files(self):
'''Always :py:obj:`True`
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.verify_files`
'''
return True
def start(self, **kwargs):
'''Always raises an exception.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.start`
'''
raise qubes.QubesException('Cannot start Dom0 fake domain!')
def suspend(self):
'''Does nothing.
.. seealso:
:py:meth:`qubes.vm.qubesvm.QubesVM.suspend`
'''
# XXX shouldn't we spew an exception?
return
# def __init__(self, **kwargs):
# super(QubesAdminVm, self).__init__(qid=0, name="dom0", netid=0,
# dir_path=None,
# private_img = None,
# template = None,
# maxmem = 0,
# vcpus = 0,
# label = defaults["template_label"],
# **kwargs)

View File

@ -56,7 +56,6 @@ Requires: python, pciutils, python-inotify, python-daemon
Requires: qubes-core-dom0-linux >= 2.0.24
Requires: qubes-db-dom0
Requires: python-lxml
Requires: python-psutil
# TODO: R: qubes-gui-dom0 >= 2.1.11
Conflicts: qubes-gui-dom0 < 1.1.13
Requires: libvirt-python
@ -228,6 +227,7 @@ fi
%dir %{python_sitearch}/qubes/tests/vm
%{python_sitearch}/qubes/tests/vm/__init__.py*
%{python_sitearch}/qubes/tests/vm/init.py*
%{python_sitearch}/qubes/tests/vm/adminvm.py*
%{python_sitearch}/qubes/tests/vm/qubesvm.py*
# qmemman