qubes: pylint fixes (small mistakes and wrong names)
This commit is contained in:
parent
4a85c823c3
commit
adb144acfe
@ -486,7 +486,7 @@ class VMCollection(object):
|
||||
|
||||
def get_vms_connected_to(self, netvm):
|
||||
new_vms = set([netvm])
|
||||
dependend_vms = set()
|
||||
dependent_vms = set()
|
||||
|
||||
# Dependency resolving only makes sense on NetVM (or derivative)
|
||||
# if not self[netvm_qid].is_netvm():
|
||||
@ -495,9 +495,9 @@ class VMCollection(object):
|
||||
while len(new_vms) > 0:
|
||||
cur_vm = new_vms.pop()
|
||||
for vm in cur_vm.connected_vms.values():
|
||||
if vm in dependend_vms:
|
||||
if vm in dependent_vms:
|
||||
continue
|
||||
dependend_vms.add(vm.qid)
|
||||
dependent_vms.add(vm.qid)
|
||||
# if vm.is_netvm():
|
||||
new_vms.append(vm.qid)
|
||||
|
||||
@ -926,7 +926,7 @@ class PropertyHolder(qubes.events.Emitter):
|
||||
try:
|
||||
value = getattr(
|
||||
self, (prop.__name__ if with_defaults else prop._attr_name))
|
||||
except AttributeError as e:
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
try:
|
||||
@ -1361,7 +1361,7 @@ class Qubes(PropertyHolder):
|
||||
|
||||
|
||||
@qubes.events.handler('property-set:default_fw_netvm')
|
||||
def on_property_set_default_netvm(self, event, name, newvalue,
|
||||
def on_property_set_default_fw_netvm(self, event, name, newvalue,
|
||||
oldvalue=None):
|
||||
# pylint: disable=unused-argument,invalid-name
|
||||
for vm in self.domains:
|
||||
|
@ -71,7 +71,7 @@ def ticket(name, rawtext, text, lineno, inliner, options={}, content=[]):
|
||||
:param content: The directive content for customisation
|
||||
''' # pylint: disable=unused-argument
|
||||
|
||||
ticket = text.lstrip('#')
|
||||
ticketno = text.lstrip('#')
|
||||
if not ticket.isdigit():
|
||||
msg = inliner.reporter.error(
|
||||
'Invalid ticket identificator: {!r}'.format(text), line=lineno)
|
||||
@ -79,7 +79,7 @@ def ticket(name, rawtext, text, lineno, inliner, options={}, content=[]):
|
||||
return [prb], [msg]
|
||||
|
||||
app = inliner.document.settings.env.app
|
||||
uri = posixpath.join(app.config.ticket_base_uri, ticket)
|
||||
uri = posixpath.join(app.config.ticket_base_uri, ticketno)
|
||||
try:
|
||||
info = fetch_ticket_info(uri)
|
||||
except urllib2.HTTPError, e:
|
||||
@ -92,7 +92,7 @@ def ticket(name, rawtext, text, lineno, inliner, options={}, content=[]):
|
||||
|
||||
node = docutils.nodes.reference(
|
||||
rawtext,
|
||||
'#{} ({})'.format(ticket, info['summary']),
|
||||
'#{} ({})'.format(ticketno, info['summary']),
|
||||
refuri=uri,
|
||||
**options)
|
||||
|
||||
|
@ -47,14 +47,14 @@ def handler(*events):
|
||||
:param str event: event type
|
||||
'''
|
||||
|
||||
def decorator(f):
|
||||
f.ha_events = events
|
||||
return f
|
||||
def decorator(func):
|
||||
func.ha_events = events
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def ishandler(o):
|
||||
def ishandler(obj):
|
||||
'''Test if a method is hooked to an event.
|
||||
|
||||
:param object o: suspected hook
|
||||
@ -62,8 +62,8 @@ def ishandler(o):
|
||||
:rtype: bool
|
||||
'''
|
||||
|
||||
return callable(o) \
|
||||
and hasattr(o, 'ha_events')
|
||||
return callable(obj) \
|
||||
and hasattr(obj, 'ha_events')
|
||||
|
||||
|
||||
class EmitterMeta(type):
|
||||
@ -103,7 +103,7 @@ class Emitter(object):
|
||||
|
||||
|
||||
@classmethod
|
||||
def add_handler(cls, event, handler):
|
||||
def add_handler(cls, event, func):
|
||||
'''Add event handler to subject's class.
|
||||
|
||||
:param str event: event identificator
|
||||
@ -111,7 +111,7 @@ class Emitter(object):
|
||||
'''
|
||||
|
||||
# pylint: disable=no-member
|
||||
cls.__handlers__[event].add(handler)
|
||||
cls.__handlers__[event].add(func)
|
||||
|
||||
|
||||
def _fire_event_in_order(self, order, event, *args, **kwargs):
|
||||
@ -127,10 +127,10 @@ class Emitter(object):
|
||||
for cls in order:
|
||||
if not hasattr(cls, '__handlers__'):
|
||||
continue
|
||||
for handler in sorted(cls.__handlers__[event],
|
||||
for func in sorted(cls.__handlers__[event],
|
||||
key=(lambda handler: hasattr(handler, 'ha_bound')),
|
||||
reverse=True):
|
||||
handler(self, event, *args, **kwargs)
|
||||
func(self, event, *args, **kwargs)
|
||||
|
||||
|
||||
def fire_event(self, event, *args, **kwargs):
|
||||
|
@ -85,17 +85,17 @@ def handler(*events, **kwargs):
|
||||
to any VM)
|
||||
'''
|
||||
|
||||
def decorator(f):
|
||||
f.ha_events = events
|
||||
def decorator(func):
|
||||
func.ha_events = events
|
||||
|
||||
if kwargs.get('system', False):
|
||||
f.ha_vm = None
|
||||
func.ha_vm = None
|
||||
elif 'vm' in kwargs:
|
||||
f.ha_vm = kwargs['vm']
|
||||
func.ha_vm = kwargs['vm']
|
||||
else:
|
||||
f.ha_vm = qubes.vm.BaseVM
|
||||
func.ha_vm = qubes.vm.BaseVM
|
||||
|
||||
return f
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
@ -114,13 +114,13 @@ class Element(object):
|
||||
parent = xml.getparent()
|
||||
qname = lxml.etree.QName(parent)
|
||||
if parent == self.xml:
|
||||
n = '1'
|
||||
number = '1'
|
||||
elif qname.localname == 'optional':
|
||||
n = '?'
|
||||
number = '?'
|
||||
elif qname.localname == 'zeroOrMore':
|
||||
n = '\\*'
|
||||
number = '\\*'
|
||||
elif qname.localname == 'oneOrMore':
|
||||
n = '\\+'
|
||||
number = '\\+'
|
||||
else:
|
||||
print(parent.tag)
|
||||
|
||||
@ -129,7 +129,7 @@ class Element(object):
|
||||
if xml is None:
|
||||
continue
|
||||
|
||||
yield (self.schema.elements[xml.get('name')], n)
|
||||
yield (self.schema.elements[xml.get('name')], number)
|
||||
|
||||
|
||||
def write_rst(self, stream):
|
||||
@ -157,7 +157,7 @@ class Element(object):
|
||||
for child, n in self.get_child_elements()]
|
||||
if childtable:
|
||||
stream.write(make_rst_section('Child elements', '^'))
|
||||
write_rst_table(stream, childtable, ('element', 'n'))
|
||||
write_rst_table(stream, childtable, ('element', 'number'))
|
||||
|
||||
|
||||
class Schema(object):
|
||||
@ -174,23 +174,23 @@ class Schema(object):
|
||||
break_long_words=False, break_on_hyphens=False)
|
||||
|
||||
self.elements = {}
|
||||
for x in self.xml.xpath('//rng:element', namespaces=self.nsmap):
|
||||
element = Element(self, x)
|
||||
for node in self.xml.xpath('//rng:element', namespaces=self.nsmap):
|
||||
element = Element(self, node)
|
||||
self.elements[element.name] = element
|
||||
|
||||
|
||||
def make_rst_section(heading, c):
|
||||
return '{}\n{}\n\n'.format(heading, c[0] * len(heading))
|
||||
def make_rst_section(heading, char):
|
||||
return '{}\n{}\n\n'.format(heading, char[0] * len(heading))
|
||||
|
||||
|
||||
def write_rst_table(stream, it, heads):
|
||||
def write_rst_table(stream, itr, heads):
|
||||
stream.write('.. csv-table::\n')
|
||||
stream.write(' :header: {}\n'.format(', '.join('"{}"'.format(c)
|
||||
for c in heads)))
|
||||
stream.write(' :widths: {}\n\n'.format(', '.join('1'
|
||||
for c in heads)))
|
||||
|
||||
for row in it:
|
||||
for row in itr:
|
||||
stream.write(' {}\n'.format(', '.join('"{}"'.format(i) for i in row)))
|
||||
|
||||
stream.write('\n')
|
||||
|
@ -66,11 +66,11 @@ class TC_00_Label(qubes.tests.QubesTestCase):
|
||||
class TC_10_property(qubes.tests.QubesTestCase):
|
||||
def setUp(self):
|
||||
try:
|
||||
class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
testprop1 = qubes.property('testprop1')
|
||||
except: # pylint: disable=bare-except
|
||||
self.skipTest('TestHolder class definition failed')
|
||||
self.holder = TestHolder(None)
|
||||
self.skipTest('MyTestHolder class definition failed')
|
||||
self.holder = MyTestHolder(None)
|
||||
|
||||
def test_000_init(self):
|
||||
pass
|
||||
@ -96,17 +96,17 @@ class TC_10_property(qubes.tests.QubesTestCase):
|
||||
self.holder.testprop1
|
||||
|
||||
def test_022_get_default(self):
|
||||
class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
testprop1 = qubes.property('testprop1', default='defaultvalue')
|
||||
holder = TestHolder(None)
|
||||
holder = MyTestHolder(None)
|
||||
|
||||
self.assertEqual(holder.testprop1, 'defaultvalue')
|
||||
|
||||
def test_023_get_default_func(self):
|
||||
class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
testprop1 = qubes.property('testprop1',
|
||||
default=(lambda self: 'defaultvalue'))
|
||||
holder = TestHolder(None)
|
||||
holder = MyTestHolder(None)
|
||||
|
||||
self.assertEqual(holder.testprop1, 'defaultvalue')
|
||||
holder.testprop1 = 'testvalue'
|
||||
@ -115,20 +115,20 @@ class TC_10_property(qubes.tests.QubesTestCase):
|
||||
def test_030_set_setter(self):
|
||||
def setter(self2, prop, value):
|
||||
self.assertIs(self2, holder)
|
||||
self.assertIs(prop, TestHolder.testprop1)
|
||||
self.assertIs(prop, MyTestHolder.testprop1)
|
||||
self.assertEquals(value, 'testvalue')
|
||||
return 'settervalue'
|
||||
class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
testprop1 = qubes.property('testprop1', setter=setter)
|
||||
holder = TestHolder(None)
|
||||
holder = MyTestHolder(None)
|
||||
|
||||
holder.testprop1 = 'testvalue'
|
||||
self.assertEqual(holder.testprop1, 'settervalue')
|
||||
|
||||
def test_031_set_type(self):
|
||||
class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
testprop1 = qubes.property('testprop1', type=int)
|
||||
holder = TestHolder(None)
|
||||
holder = MyTestHolder(None)
|
||||
|
||||
holder.testprop1 = '5'
|
||||
self.assertEqual(holder.testprop1, 5)
|
||||
@ -161,9 +161,9 @@ class TC_10_property(qubes.tests.QubesTestCase):
|
||||
self.holder.testprop
|
||||
|
||||
def test_092_delete_default(self):
|
||||
class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
|
||||
testprop1 = qubes.property('testprop1', default='defaultvalue')
|
||||
holder = TestHolder(None)
|
||||
holder = MyTestHolder(None)
|
||||
holder.testprop1 = 'testvalue'
|
||||
|
||||
try:
|
||||
|
@ -82,8 +82,7 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.color = ANSIColor()
|
||||
|
||||
def _fmtexc(self, err):
|
||||
s = str(err[1])
|
||||
if s:
|
||||
if str(err[1]):
|
||||
return '{color[bold]}{}:{color[normal]} {!s}'.format(
|
||||
err[0].__name__, err[1], color=self.color)
|
||||
else:
|
||||
@ -196,7 +195,7 @@ class ANSITestResult(unittest.TestResult):
|
||||
|
||||
def demo(verbosity=2):
|
||||
import qubes.tests
|
||||
class TC_Demo(qubes.tests.QubesTestCase):
|
||||
class TC_00_Demo(qubes.tests.QubesTestCase):
|
||||
'''Demo class'''
|
||||
# pylint: disable=no-self-use
|
||||
def test_0_success(self):
|
||||
@ -224,7 +223,7 @@ def demo(verbosity=2):
|
||||
'''Demo test (unexpected success)'''
|
||||
pass
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TC_Demo)
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TC_00_Demo)
|
||||
runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity)
|
||||
runner.resultclass = ANSITestResult
|
||||
return runner.run(suite).wasSuccessful()
|
||||
|
@ -134,7 +134,7 @@ class TC_10_BaseVM(qubes.tests.QubesTestCase):
|
||||
</qubes>
|
||||
''')
|
||||
|
||||
def test_000_BaseVM_load(self):
|
||||
def test_000_load(self):
|
||||
node = self.xml.xpath('//domain')[0]
|
||||
vm = TestVM.fromxml(None, node)
|
||||
|
||||
@ -153,7 +153,7 @@ class TC_10_BaseVM(qubes.tests.QubesTestCase):
|
||||
|
||||
self.assertXMLIsValid(vm.__xml__(), 'domain.rng')
|
||||
|
||||
def test_001_BaseVM_nxproperty(self):
|
||||
def test_001_nxproperty(self):
|
||||
xml = lxml.etree.XML('''
|
||||
<qubes version="3">
|
||||
<domains>
|
||||
|
@ -336,7 +336,7 @@ class BaseVM(qubes.PropertyHolder):
|
||||
:rtype: lxml.etree._Element
|
||||
'''
|
||||
|
||||
dev_match = re.match('([0-9a-f]+):([0-9a-f]+)\.([0-9a-f]+)', address)
|
||||
dev_match = re.match(r'([0-9a-f]+):([0-9a-f]+)\.([0-9a-f]+)', address)
|
||||
if not dev_match:
|
||||
raise qubes.QubesException(
|
||||
'Invalid PCI device address: {}'.format(address))
|
||||
@ -509,9 +509,9 @@ class BaseVM(qubes.PropertyHolder):
|
||||
|
||||
try:
|
||||
old_umask = os.umask(002)
|
||||
with open(self.firewall_conf, 'w') as f:
|
||||
tree.write(f, encoding="UTF-8", pretty_print=True)
|
||||
f.close()
|
||||
with open(self.firewall_conf, 'w') as fd:
|
||||
tree.write(fd, encoding="UTF-8", pretty_print=True)
|
||||
fd.close()
|
||||
os.umask(old_umask)
|
||||
except EnvironmentError as err:
|
||||
print >> sys.stderr, "{0}: save error: {1}".format(
|
||||
|
@ -91,12 +91,13 @@ def _setter_kernel(self, prop, value):
|
||||
if not os.path.exists(os.path.join(
|
||||
qubes.config.system_path['qubes_kernels_base_dir'], value)):
|
||||
raise qubes.QubesException('Kernel {!r} not installed'.format(value))
|
||||
for f in ('vmlinuz', 'modules.img'):
|
||||
for filename in ('vmlinuz', 'modules.img'):
|
||||
if not os.path.exists(os.path.join(
|
||||
qubes.config.system_path['qubes_kernels_base_dir'], value, f)):
|
||||
qubes.config.system_path['qubes_kernels_base_dir'],
|
||||
value, filename)):
|
||||
raise qubes.QubesException(
|
||||
'Kernel {!r} not properly installed: missing {!r} file'.format(
|
||||
value, f))
|
||||
value, filename))
|
||||
return value
|
||||
|
||||
|
||||
@ -541,7 +542,8 @@ class QubesVM(qubes.vm.BaseVM):
|
||||
|
||||
|
||||
@qubes.events.handler('property-pre-set:dir_path')
|
||||
def on_property_pre_set_name(self, event, name, newvalue, oldvalue=None):
|
||||
def on_property_pre_set_dir_path(self, event, name, newvalue,
|
||||
oldvalue=None):
|
||||
# pylint: disable=unused-argument
|
||||
# TODO not self.is_stopped() would be more appropriate
|
||||
if self.is_running():
|
||||
@ -690,10 +692,10 @@ class QubesVM(qubes.vm.BaseVM):
|
||||
|
||||
# Bind pci devices to pciback driver
|
||||
for pci in self.devices['pci']:
|
||||
nd = self.app.vmm.libvirt_conn.nodeDeviceLookupByName(
|
||||
node = self.app.vmm.libvirt_conn.nodeDeviceLookupByName(
|
||||
'pci_0000_' + pci.replace(':', '_').replace('.', '_'))
|
||||
try:
|
||||
nd.dettach()
|
||||
node.dettach()
|
||||
except libvirt.libvirtError:
|
||||
if self.app.vmm.libvirt_conn.virConnGetLastError()[0] == \
|
||||
libvirt.VIR_ERR_INTERNAL_ERROR:
|
||||
@ -1056,10 +1058,10 @@ class QubesVM(qubes.vm.BaseVM):
|
||||
kernels_dir))
|
||||
|
||||
os.mkdir(self.dir_path + '/kernels')
|
||||
for f in ("vmlinuz", "initramfs", "modules.img"):
|
||||
shutil.copy(os.path.join(kernels_dir, f),
|
||||
for filename in ("vmlinuz", "initramfs", "modules.img"):
|
||||
shutil.copy(os.path.join(kernels_dir, filename),
|
||||
os.path.join(self.dir_path,
|
||||
qubes.config.vm_files["kernels_subdir"], f))
|
||||
qubes.config.vm_files["kernels_subdir"], filename))
|
||||
|
||||
self.log.info('Creating icon symlink: {} -> {}'.format(
|
||||
self.icon_path, self.label.icon_path))
|
||||
@ -1107,12 +1109,12 @@ class QubesVM(qubes.vm.BaseVM):
|
||||
:param qubes.vm.QubesVM src: source VM
|
||||
'''
|
||||
|
||||
if src_vm.is_running():
|
||||
if src.is_running():
|
||||
raise qubes.QubesException('Attempt to clone a running VM!')
|
||||
|
||||
self.storage.clone_disk_files(src, verbose=False)
|
||||
|
||||
if srv.icon_path is not None \
|
||||
if src.icon_path is not None \
|
||||
and os.path.exists(src.dir_path) \
|
||||
and self.icon_path is not None:
|
||||
if os.path.islink(src.icon_path):
|
||||
|
Loading…
Reference in New Issue
Block a user