Minor codestyle fix in qubesadmin/firewall.py

Fix indentation, use double-quotes for docstrings.
This commit is contained in:
Marek Marczykowski-Górecki 2019-09-18 00:10:51 +02:00
parent 0bb7463d8b
commit 73648ca038
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -51,13 +51,13 @@ class FirewallWorker(object):
self.log.addHandler(logging.StreamHandler(sys.stderr))
def init(self):
'''Create appropriate chains/tables'''
"""Create appropriate chains/tables"""
raise NotImplementedError
def sd_notify(self, state):
'''Send notification to systemd, if available'''
"""Send notification to systemd, if available"""
# based on sdnotify python module
if not 'NOTIFY_SOCKET' in os.environ:
if 'NOTIFY_SOCKET' not in os.environ:
return
addr = os.environ['NOTIFY_SOCKET']
if addr[0] == '@':
@ -71,35 +71,35 @@ class FirewallWorker(object):
pass
def cleanup(self):
'''Remove tables/chains - reverse work done by init'''
"""Remove tables/chains - reverse work done by init"""
raise NotImplementedError
def apply_rules(self, source_addr, rules):
'''Apply rules in given source address'''
"""Apply rules in given source address"""
raise NotImplementedError
def run_firewall_dir(self):
'''Run scripts dir contents, before user script'''
"""Run scripts dir contents, before user script"""
script_dir_paths = ['/etc/qubes/qubes-firewall.d',
'/rw/config/qubes-firewall.d']
'/rw/config/qubes-firewall.d']
for script_dir_path in script_dir_paths:
if not os.path.isdir(script_dir_path):
continue
for d_script in sorted(os.listdir(script_dir_path)):
d_script_path = os.path.join(script_dir_path, d_script)
if os.path.isfile(d_script_path) and \
os.access(d_script_path, os.X_OK):
subprocess.call([d_script_path])
if not os.path.isdir(script_dir_path):
continue
for d_script in sorted(os.listdir(script_dir_path)):
d_script_path = os.path.join(script_dir_path, d_script)
if os.path.isfile(d_script_path) and \
os.access(d_script_path, os.X_OK):
subprocess.call([d_script_path])
def run_user_script(self):
'''Run user script in /rw/config'''
"""Run user script in /rw/config"""
user_script_path = '/rw/config/qubes-firewall-user-script'
if os.path.isfile(user_script_path) and \
os.access(user_script_path, os.X_OK):
subprocess.call([user_script_path])
def read_rules(self, target):
'''Read rules from QubesDB and return them as a list of dicts'''
"""Read rules from QubesDB and return them as a list of dicts"""
entries = self.qdb.multiread('/qubes-firewall/{}/'.format(target))
assert isinstance(entries, dict)
# drop full path
@ -196,7 +196,7 @@ class FirewallWorker(object):
class IptablesWorker(FirewallWorker):
supported_rule_opts = ['action', 'proto', 'dst4', 'dst6', 'dsthost',
'dstports', 'specialtarget', 'icmptype']
'dstports', 'specialtarget', 'icmptype']
def __init__(self):
super(IptablesWorker, self).__init__()
@ -207,7 +207,7 @@ class IptablesWorker(FirewallWorker):
@staticmethod
def chain_for_addr(addr):
'''Generate iptables chain name for given source address address'''
"""Generate iptables chain name for given source address address"""
return 'qbs-' + addr.replace('.', '-').replace(':', '-')[-20:]
def run_ipt(self, family, args, **kwargs):
@ -221,17 +221,17 @@ class IptablesWorker(FirewallWorker):
# pylint: disable=no-self-use
if family == 6:
return subprocess.Popen(['ip6tables-restore'] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
else:
return subprocess.Popen(['iptables-restore'] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
def create_chain(self, addr, chain, family):
'''
"""
Create iptables chain and hook traffic coming from `addr` to it.
:param addr: source IP from which traffic should be handled by the
@ -239,7 +239,7 @@ class IptablesWorker(FirewallWorker):
:param chain: name of the chain to create
:param family: address family (4 or 6)
:return: None
'''
"""
self.run_ipt(family, ['-N', chain])
self.run_ipt(family,
@ -247,7 +247,7 @@ class IptablesWorker(FirewallWorker):
self.chains[family].add(chain)
def prepare_rules(self, chain, rules, family):
'''
"""
Helper function to translate rules list into input for iptables-restore
:param chain: name of the chain to put rules into
@ -255,7 +255,7 @@ class IptablesWorker(FirewallWorker):
:param family: address family (4 or 6)
:return: input for iptables-restore
:rtype: str
'''
"""
iptables = "*filter\n"
@ -359,7 +359,7 @@ class IptablesWorker(FirewallWorker):
return iptables
def apply_rules_family(self, source, rules, family):
'''
"""
Apply rules for given source address.
Handle only rules for given address family (IPv4 or IPv6).
@ -367,7 +367,7 @@ class IptablesWorker(FirewallWorker):
:param rules: rules list
:param family: address family, either 4 or 6
:return: None
'''
"""
chain = self.chain_for_addr(source)
if chain not in self.chains[family]:
@ -417,7 +417,7 @@ class IptablesWorker(FirewallWorker):
class NftablesWorker(FirewallWorker):
supported_rule_opts = ['action', 'proto', 'dst4', 'dst6', 'dsthost',
'dstports', 'specialtarget', 'icmptype']
'dstports', 'specialtarget', 'icmptype']
def __init__(self):
super(NftablesWorker, self).__init__()
@ -428,21 +428,21 @@ class NftablesWorker(FirewallWorker):
@staticmethod
def chain_for_addr(addr):
'''Generate iptables chain name for given source address address'''
"""Generate iptables chain name for given source address address"""
return 'qbs-' + addr.replace('.', '-').replace(':', '-')
def run_nft(self, nft_input):
# pylint: disable=no-self-use
p = subprocess.Popen(['nft', '-f', '/dev/stdin'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout, _ = p.communicate(nft_input)
if p.returncode != 0:
raise RuleApplyError('nft failed: {}'.format(stdout))
def create_chain(self, addr, chain, family):
'''
"""
Create iptables chain and hook traffic coming from `addr` to it.
:param addr: source IP from which traffic should be handled by the
@ -450,7 +450,7 @@ class NftablesWorker(FirewallWorker):
:param chain: name of the chain to create
:param family: address family (4 or 6)
:return: None
'''
"""
nft_input = (
'table {family} {table} {{\n'
' chain {chain} {{\n'
@ -469,7 +469,7 @@ class NftablesWorker(FirewallWorker):
self.chains[family].add(chain)
def prepare_rules(self, chain, rules, family):
'''
"""
Helper function to translate rules list into input for iptables-restore
:param chain: name of the chain to put rules into
@ -477,7 +477,7 @@ class NftablesWorker(FirewallWorker):
:param family: address family (4 or 6)
:return: input for iptables-restore
:rtype: str
'''
"""
assert family in (4, 6)
nft_rules = []
@ -517,7 +517,6 @@ class NftablesWorker(FirewallWorker):
else rule['proto']
nft_rule += ' ip6 nexthdr {}'.format(proto)
if 'dst4' in rule:
nft_rule += ' ip daddr {}'.format(rule['dst4'])
elif 'dst6' in rule:
@ -587,7 +586,7 @@ class NftablesWorker(FirewallWorker):
))
def apply_rules_family(self, source, rules, family):
'''
"""
Apply rules for given source address.
Handle only rules for given address family (IPv4 or IPv6).
@ -595,7 +594,7 @@ class NftablesWorker(FirewallWorker):
:param rules: rules list
:param family: address family, either 4 or 6
:return: None
'''
"""
chain = self.chain_for_addr(source)
if chain not in self.chains[family]:
@ -649,5 +648,6 @@ def main():
with context:
worker.main()
if __name__ == '__main__':
main()