tools/qvm-firewall: Show EXPIRE column in list output

This commit is contained in:
Peter Gerber 2018-04-30 16:26:54 +02:00
parent ab79bd2a44
commit ed9b42d5b4
No known key found for this signature in database
GPG Key ID: 07C068AEE44683A1
3 changed files with 44 additions and 12 deletions

View File

@ -34,6 +34,11 @@ class RuleOption(object):
'''API representation of this rule element''' '''API representation of this rule element'''
raise NotImplementedError raise NotImplementedError
@property
def pretty_value(self):
'''Human readable representation'''
return str(self)
def __str__(self): def __str__(self):
return self._value return self._value
@ -212,9 +217,16 @@ class Expire(RuleOption):
@property @property
def expired(self): def expired(self):
'''Have this rule expired already?''' '''Has this rule expired already?'''
return self.datetime < datetime.datetime.utcnow() return self.datetime < datetime.datetime.utcnow()
@property
def pretty_value(self):
'''Human readable representation'''
now = datetime.datetime.utcnow()
duration = (self.datetime - now).total_seconds()
return "{:+.0f}s".format(duration)
class Comment(RuleOption): class Comment(RuleOption):
'''User comment''' '''User comment'''

View File

@ -22,6 +22,8 @@
import argparse import argparse
import datetime import datetime
import re
import time
import qubesadmin.firewall import qubesadmin.firewall
import qubesadmin.tests import qubesadmin.tests
@ -103,11 +105,11 @@ class TC_10_qvm_firewall(qubesadmin.tests.QubesTestCase):
self.assertEqual( self.assertEqual(
[l.strip() for l in stdout.getvalue().splitlines()], [l.strip() for l in stdout.getvalue().splitlines()],
['NO ACTION HOST PROTOCOL PORT(S) SPECIAL ' ['NO ACTION HOST PROTOCOL PORT(S) SPECIAL '
'TARGET ICMP TYPE COMMENT', 'TARGET ICMP TYPE EXPIRE COMMENT',
'0 accept qubes-os.org - - - ' '0 accept qubes-os.org - - - '
' - -', ' - - -',
'1 drop - icmp - - ' '1 drop - icmp - - '
' - -', ' - - -',
]) ])
def test_001_list_default(self): def test_001_list_default(self):
@ -122,15 +124,33 @@ class TC_10_qvm_firewall(qubesadmin.tests.QubesTestCase):
self.assertEqual( self.assertEqual(
[l.strip() for l in stdout.getvalue().splitlines()], [l.strip() for l in stdout.getvalue().splitlines()],
['NO ACTION HOST PROTOCOL PORT(S) SPECIAL ' ['NO ACTION HOST PROTOCOL PORT(S) SPECIAL '
'TARGET ICMP TYPE COMMENT', 'TARGET ICMP TYPE EXPIRE COMMENT',
'0 accept qubes-os.org tcp 443 - ' '0 accept qubes-os.org tcp 443 - '
' - -', ' - - -',
'1 drop - icmp - - ' '1 drop - icmp - - '
' 8 -', ' 8 - -',
'2 accept - - - dns ' '2 accept - - - dns '
' - Allow DNS', ' - - Allow DNS',
]) ])
def test_002_list_expire(self):
in_1h = int(time.time()) + 3600
self.app.expected_calls[('test-vm', 'admin.vm.firewall.Get',
None, None)] = \
'0\0action=accept dsthost=qubes-os.org proto=tcp ' \
'dstports=443-443 expire={}\n'.format(in_1h).encode()
with qubesadmin.tests.tools.StdoutBuffer() as stdout:
qubesadmin.tools.qvm_firewall.main(['test-vm'], app=self.app)
line = stdout.getvalue().splitlines()[-1]
match = re.match(
'0 accept qubes-os.org tcp 443 - '
' - \+(.{4})s -',
line)
self.assertTrue(match, "no match for: {!r}".format(line))
duration = int(match.group(1))
self.assertTrue(3590 < duration <= 3600)
def test_002_list_raw(self): def test_002_list_raw(self):
self.app.expected_calls[('test-vm', 'admin.vm.firewall.Get', self.app.expected_calls[('test-vm', 'admin.vm.firewall.Get',
None, None)] = \ None, None)] = \

View File

@ -135,20 +135,20 @@ def rules_list_table(vm):
:return: None :return: None
''' '''
header = ['NO', 'ACTION', 'HOST', 'PROTOCOL', 'PORT(S)', header = ['NO', 'ACTION', 'HOST', 'PROTOCOL', 'PORT(S)',
'SPECIAL TARGET', 'ICMP TYPE', 'COMMENT'] 'SPECIAL TARGET', 'ICMP TYPE', 'EXPIRE', 'COMMENT']
rows = [] rows = []
for (rule, rule_no) in zip(vm.firewall.rules, itertools.count()): for (rule, rule_no) in zip(vm.firewall.rules, itertools.count()):
row = [str(x) if x is not None else '-' for x in [ row = [x.pretty_value if x is not None else '-' for x in [
rule_no,
rule.action, rule.action,
rule.dsthost, rule.dsthost,
rule.proto, rule.proto,
rule.dstports, rule.dstports,
rule.specialtarget, rule.specialtarget,
rule.icmptype, rule.icmptype,
rule.expire,
rule.comment, rule.comment,
]] ]]
rows.append(row) rows.append([str(rule_no)] + row)
qubesadmin.tools.print_table([header] + rows) qubesadmin.tools.print_table([header] + rows)