123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- #!/usr/bin/env python
- from builtins import bytes
- from functools import total_ordering
- import sys, math
- from math import ceil, log
- from collections import defaultdict, OrderedDict
- import attr
- # must match <linux/nl80211.h> enum nl80211_reg_rule_flags
- flag_definitions = {
- 'NO-OFDM': 1<<0,
- 'NO-CCK': 1<<1,
- 'NO-INDOOR': 1<<2,
- 'NO-OUTDOOR': 1<<3,
- 'DFS': 1<<4,
- 'PTP-ONLY': 1<<5,
- 'PTMP-ONLY': 1<<6,
- 'NO-IR': 1<<7,
- # hole at bit 8
- # hole at bit 9. FIXME: Where is NO-HT40 defined?
- 'NO-HT40': 1<<10,
- 'AUTO-BW': 1<<11,
- }
- dfs_regions = {
- 'DFS-FCC': 1,
- 'DFS-ETSI': 2,
- 'DFS-JP': 3,
- }
- @total_ordering
- @attr.s(frozen=True, cmp=False)
- class WmmRule(object):
- vo_c = attr.ib()
- vi_c = attr.ib()
- be_c = attr.ib()
- bk_c = attr.ib()
- vo_ap = attr.ib()
- vi_ap = attr.ib()
- be_ap = attr.ib()
- bk_ap = attr.ib()
- def _as_tuple(self):
- return (self.vo_c, self.vi_c, self.be_c, self.bk_c,
- self.vo_ap, self.vi_ap, self.be_ap, self.bk_ap)
- def __eq__(self, other):
- if other is None:
- return False
- return (self._as_tuple() == other._as_tuple())
- def __ne__(self, other):
- return not (self == other)
- def __lt__(self, other):
- if other is None:
- return False
- return (self._as_tuple() < other._as_tuple())
- def __hash__(self):
- return hash(self._as_tuple())
- class FreqBand(object):
- def __init__(self, start, end, bw, comments=None):
- self.start = start
- self.end = end
- self.maxbw = bw
- self.comments = comments or []
- def _as_tuple(self):
- return (self.start, self.end, self.maxbw)
- def __eq__(self, other):
- return (self._as_tuple() == other._as_tuple())
- def __ne__(self, other):
- return not (self == other)
- def __lt__(self, other):
- return (self._as_tuple() < other._as_tuple())
- def __hash__(self):
- return hash(self._as_tuple())
- def __str__(self):
- return '<FreqBand %.3f - %.3f @ %.3f>' % (
- self.start, self.end, self.maxbw)
- @total_ordering
- class PowerRestriction(object):
- def __init__(self, max_ant_gain, max_eirp, comments = None):
- self.max_ant_gain = max_ant_gain
- self.max_eirp = max_eirp
- self.comments = comments or []
- def _as_tuple(self):
- return (self.max_ant_gain, self.max_eirp)
- def __eq__(self, other):
- return (self._as_tuple() == other._as_tuple())
- def __ne__(self, other):
- return not (self == other)
- def __lt__(self, other):
- return (self._as_tuple() < other._as_tuple())
- def __hash__(self):
- return hash(self._as_tuple())
- def __str__(self):
- return '<PowerRestriction ...>'
- class DFSRegionError(Exception):
- def __init__(self, dfs_region):
- self.dfs_region = dfs_region
- class FlagError(Exception):
- def __init__(self, flag):
- self.flag = flag
- @total_ordering
- class Permission(object):
- def __init__(self, freqband, power, flags, wmmrule):
- assert isinstance(freqband, FreqBand)
- assert isinstance(power, PowerRestriction)
- assert isinstance(wmmrule, WmmRule) or wmmrule is None
- self.freqband = freqband
- self.power = power
- self.wmmrule = wmmrule
- self.flags = 0
- for flag in flags:
- if not flag in flag_definitions:
- raise FlagError(flag)
- self.flags |= flag_definitions[flag]
- self.textflags = flags
- def _as_tuple(self):
- return (self.freqband, self.power, self.flags, self.wmmrule)
- def __eq__(self, other):
- return (self._as_tuple() == other._as_tuple())
- def __ne__(self, other):
- return not (self == other)
- def __lt__(self, other):
- return (self._as_tuple() < other._as_tuple())
- def __hash__(self):
- return hash(self._as_tuple())
- def __str__(self):
- return str(self.freqband) + str(self.power) + str(self.wmmrule)
- class Country(object):
- def __init__(self, dfs_region, permissions=None, comments=None):
- self._permissions = permissions or []
- self.comments = comments or []
- self.dfs_region = 0
- if dfs_region:
- if not dfs_region in dfs_regions:
- raise DFSRegionError(dfs_region)
- self.dfs_region = dfs_regions[dfs_region]
- def add(self, perm):
- assert isinstance(perm, Permission)
- self._permissions.append(perm)
- self._permissions.sort()
- def __contains__(self, perm):
- assert isinstance(perm, Permission)
- return perm in self._permissions
- def __str__(self):
- r = ['(%s, %s)' % (str(b), str(p)) for b, p in self._permissions]
- return '<Country (%s)>' % (', '.join(r))
- def _get_permissions_tuple(self):
- return tuple(self._permissions)
- permissions = property(_get_permissions_tuple)
- class SyntaxError(Exception):
- pass
- class DBParser(object):
- def __init__(self, warn=None):
- self._warn_callout = warn or sys.stderr.write
- def _syntax_error(self, txt=None):
- txt = txt and ' (%s)' % txt or ''
- raise SyntaxError("Syntax error in line %d%s" % (self._lineno, txt))
- def _warn(self, txt):
- self._warn_callout("Warning (line %d): %s\n" % (self._lineno, txt))
- def _parse_band_def(self, bname, banddef, dupwarn=True):
- try:
- freqs, bw = banddef.split('@')
- bw = float(bw)
- except ValueError:
- bw = 20.0
- try:
- start, end = freqs.split('-')
- start = float(start)
- end = float(end)
- # The kernel will reject these, so might as well reject this
- # upon building it.
- if start <= 0:
- self._syntax_error("Invalid start freq (%d)" % start)
- if end <= 0:
- self._syntax_error("Invalid end freq (%d)" % end)
- if start > end:
- self._syntax_error("Inverted freq range (%d - %d)" % (start, end))
- if start == end:
- self._syntax_error("Start and end freqs are equal (%d)" % start)
- except ValueError:
- self._syntax_error("band must have frequency range")
- b = FreqBand(start, end, bw, comments=self._comments)
- self._comments = []
- self._banddup[bname] = bname
- if b in self._bandrev:
- if dupwarn:
- self._warn('Duplicate band definition ("%s" and "%s")' % (
- bname, self._bandrev[b]))
- self._banddup[bname] = self._bandrev[b]
- self._bands[bname] = b
- self._bandrev[b] = bname
- self._bandline[bname] = self._lineno
- def _parse_band(self, line):
- try:
- bname, line = line.split(':', 1)
- if not bname:
- self._syntax_error("'band' keyword must be followed by name")
- except ValueError:
- self._syntax_error("band name must be followed by colon")
- if bname in flag_definitions:
- self._syntax_error("Invalid band name")
- self._parse_band_def(bname, line)
- def _parse_power(self, line):
- try:
- pname, line = line.split(':', 1)
- if not pname:
- self._syntax_error("'power' keyword must be followed by name")
- except ValueError:
- self._syntax_error("power name must be followed by colon")
- if pname in flag_definitions:
- self._syntax_error("Invalid power name")
- self._parse_power_def(pname, line)
- def _parse_power_def(self, pname, line, dupwarn=True):
- try:
- max_eirp = line
- if max_eirp == 'N/A':
- max_eirp = '0'
- max_ant_gain = float(0)
- def conv_pwr(pwr):
- if pwr.endswith('mW'):
- pwr = float(pwr[:-2])
- return 10.0 * math.log10(pwr)
- else:
- return float(pwr)
- max_eirp = conv_pwr(max_eirp)
- except ValueError:
- self._syntax_error("invalid power data")
- p = PowerRestriction(max_ant_gain, max_eirp,
- comments=self._comments)
- self._comments = []
- self._powerdup[pname] = pname
- if p in self._powerrev:
- if dupwarn:
- self._warn('Duplicate power definition ("%s" and "%s")' % (
- pname, self._powerrev[p]))
- self._powerdup[pname] = self._powerrev[p]
- self._power[pname] = p
- self._powerrev[p] = pname
- self._powerline[pname] = self._lineno
- def _parse_wmmrule(self, line):
- regions = line[:-1].strip()
- if not regions:
- self._syntax_error("'wmmrule' keyword must be followed by region")
- regions = regions.split(',')
- self._current_regions = {}
- for region in regions:
- if region in self._wmm_rules:
- self._warn("region %s was added already to wmm rules" % region)
- self._current_regions[region] = 1
- self._comments = []
- def _validate_input(self, cw_min, cw_max, aifsn, cot):
- if cw_min < 1:
- self._syntax_error("Invalid cw_min value (%d)" % cw_min)
- if cw_max < 1:
- self._syntax_error("Invalid cw_max value (%d)" % cw_max)
- if cw_min > cw_max:
- self._syntax_error("Inverted contention window (%d - %d)" %
- (cw_min, cw_max))
- if not (bin(cw_min + 1).count('1') == 1 and cw_min < 2**15):
- self._syntax_error("Invalid cw_min value should be power of 2 - 1 (%d)"
- % cw_min)
- if not (bin(cw_max + 1).count('1') == 1 and cw_max < 2**15):
- self._syntax_error("Invalid cw_max value should be power of 2 - 1 (%d)"
- % cw_max)
- if aifsn < 1:
- self._syntax_error("Invalid aifsn value (%d)" % aifsn)
- if cot < 0:
- self._syntax_error("Invalid cot value (%d)" % cot)
- def _validate_size(self, var, bytcnt):
- return bytcnt < ceil(len(bin(var)[2:]) / 8.0)
- def _parse_wmmrule_item(self, line):
- bytcnt = (2.0, 2.0, 1.0, 2.0)
- try:
- ac, cval = line.split(':')
- if not ac:
- self._syntax_error("wmm item must have ac prefix")
- except ValueError:
- self._syntax_error("access category must be followed by colon")
- p = tuple([int(v.split('=', 1)[1]) for v in cval.split(',')])
- self._validate_input(*p)
- for v, b in zip(p, bytcnt):
- if self._validate_size(v, b):
- self._syntax_error("unexpected input size expect %d got %d"
- % (b, v))
- for r in self._current_regions:
- self._wmm_rules[r][ac] = p
- def _parse_country(self, line):
- try:
- cname, cvals= line.split(':', 1)
- dfs_region = cvals.strip()
- if not cname:
- self._syntax_error("'country' keyword must be followed by name")
- except ValueError:
- self._syntax_error("country name must be followed by colon")
- cnames = cname.split(',')
- self._current_countries = {}
- for cname in cnames:
- if len(cname) != 2:
- self._warn("country '%s' not alpha2" % cname)
- cname = bytes(cname, 'ascii')
- if not cname in self._countries:
- self._countries[cname] = Country(dfs_region, comments=self._comments)
- self._current_countries[cname] = self._countries[cname]
- self._comments = []
- def _parse_country_item(self, line):
- if line[0] == '(':
- try:
- band, line = line[1:].split('),', 1)
- bname = 'UNNAMED %d' % self._lineno
- self._parse_band_def(bname, band, dupwarn=False)
- except:
- self._syntax_error("Badly parenthesised band definition")
- else:
- try:
- bname, line = line.split(',', 1)
- if not bname:
- self._syntax_error("country definition must have band")
- if not line:
- self._syntax_error("country definition must have power")
- except ValueError:
- self._syntax_error("country definition must have band and power")
- if line[0] == '(':
- items = line.split('),', 1)
- if len(items) == 1:
- pname = items[0]
- line = ''
- if not pname[-1] == ')':
- self._syntax_error("Badly parenthesised power definition")
- pname = pname[:-1]
- flags = []
- else:
- pname = items[0]
- flags = items[1].split(',')
- power = pname[1:]
- pname = 'UNNAMED %d' % self._lineno
- self._parse_power_def(pname, power, dupwarn=False)
- else:
- line = line.split(',')
- pname = line[0]
- flags = line[1:]
- w = None
- if flags and 'wmmrule' in flags[-1]:
- try:
- region = flags.pop().split('=', 1)[1]
- if region not in self._wmm_rules.keys():
- self._syntax_error("No wmm rule for %s" % region)
- except IndexError:
- self._syntax_error("flags is empty list or no region was found")
- w = WmmRule(*self._wmm_rules[region].values())
- if not bname in self._bands:
- self._syntax_error("band does not exist")
- if not pname in self._power:
- self._syntax_error("power does not exist")
- self._bands_used[bname] = True
- self._power_used[pname] = True
- # de-duplicate so binary database is more compact
- bname = self._banddup[bname]
- pname = self._powerdup[pname]
- b = self._bands[bname]
- p = self._power[pname]
- try:
- perm = Permission(b, p, flags, w)
- except FlagError as e:
- self._syntax_error("Invalid flag '%s'" % e.flag)
- for cname, c in self._current_countries.items():
- if perm in c:
- self._warn('Rule "%s, %s" added to "%s" twice' % (
- bname, pname, cname))
- else:
- c.add(perm)
- def parse(self, f):
- self._current_countries = None
- self._current_regions = None
- self._bands = {}
- self._power = {}
- self._countries = {}
- self._bands_used = {}
- self._power_used = {}
- self._bandrev = {}
- self._powerrev = {}
- self._banddup = {}
- self._powerdup = {}
- self._bandline = {}
- self._powerline = {}
- self._wmm_rules = defaultdict(lambda: OrderedDict())
- self._comments = []
- self._lineno = 0
- for line in f:
- self._lineno += 1
- line = line.strip()
- if line[0:1] == '#':
- self._comments.append(line[1:].strip())
- line = line.replace(' ', '').replace('\t', '')
- if not line:
- self._current_regions = None
- self._comments = []
- line = line.split('#')[0]
- if not line:
- continue
- if line[0:4] == 'band':
- self._parse_band(line[4:])
- self._current_countries = None
- self._current_regions = None
- self._comments = []
- elif line[0:5] == 'power':
- self._parse_power(line[5:])
- self._current_countries = None
- self._current_regions = None
- self._comments = []
- elif line[0:7] == 'country':
- self._parse_country(line[7:])
- self._comments = []
- self._current_regions = None
- elif self._current_countries is not None:
- self._current_regions = None
- self._parse_country_item(line)
- self._comments = []
- elif line[0:7] == 'wmmrule':
- self._parse_wmmrule(line[7:])
- self._current_countries = None
- self._comments = []
- elif self._current_regions is not None:
- self._parse_wmmrule_item(line)
- self._current_countries = None
- self._comments = []
- else:
- self._syntax_error("Expected band, power or country definition")
- countries = self._countries
- bands = {}
- for k, v in self._bands.items():
- if k in self._bands_used:
- bands[self._banddup[k]] = v
- continue
- # we de-duplicated, but don't warn again about the dupes
- if self._banddup[k] == k:
- self._lineno = self._bandline[k]
- self._warn('Unused band definition "%s"' % k)
- power = {}
- for k, v in self._power.items():
- if k in self._power_used:
- power[self._powerdup[k]] = v
- continue
- # we de-duplicated, but don't warn again about the dupes
- if self._powerdup[k] == k:
- self._lineno = self._powerline[k]
- self._warn('Unused power definition "%s"' % k)
- return countries
|