#!/usr/bin/env python from builtins import bytes from functools import total_ordering import sys, math from math import ceil, log from collections import defaultdict, OrderedDict import attr # must match enum nl80211_reg_rule_flags flag_definitions = { 'NO-OFDM': 1<<0, 'NO-CCK': 1<<1, 'NO-INDOOR': 1<<2, 'NO-OUTDOOR': 1<<3, 'DFS': 1<<4, 'PTP-ONLY': 1<<5, 'PTMP-ONLY': 1<<6, 'NO-IR': 1<<7, # hole at bit 8 # hole at bit 9. FIXME: Where is NO-HT40 defined? 'NO-HT40': 1<<10, 'AUTO-BW': 1<<11, } dfs_regions = { 'DFS-FCC': 1, 'DFS-ETSI': 2, 'DFS-JP': 3, } @total_ordering @attr.s(frozen=True, cmp=False) class WmmRule(object): vo_c = attr.ib() vi_c = attr.ib() be_c = attr.ib() bk_c = attr.ib() vo_ap = attr.ib() vi_ap = attr.ib() be_ap = attr.ib() bk_ap = attr.ib() def _as_tuple(self): return (self.vo_c, self.vi_c, self.be_c, self.bk_c, self.vo_ap, self.vi_ap, self.be_ap, self.bk_ap) def __eq__(self, other): if other is None: return False return (self._as_tuple() == other._as_tuple()) def __ne__(self, other): return not (self == other) def __lt__(self, other): if other is None: return False return (self._as_tuple() < other._as_tuple()) def __hash__(self): return hash(self._as_tuple()) class FreqBand(object): def __init__(self, start, end, bw, comments=None): self.start = start self.end = end self.maxbw = bw self.comments = comments or [] def _as_tuple(self): return (self.start, self.end, self.maxbw) def __eq__(self, other): return (self._as_tuple() == other._as_tuple()) def __ne__(self, other): return not (self == other) def __lt__(self, other): return (self._as_tuple() < other._as_tuple()) def __hash__(self): return hash(self._as_tuple()) def __str__(self): return '' % ( self.start, self.end, self.maxbw) @total_ordering class PowerRestriction(object): def __init__(self, max_ant_gain, max_eirp, comments = None): self.max_ant_gain = max_ant_gain self.max_eirp = max_eirp self.comments = comments or [] def _as_tuple(self): return (self.max_ant_gain, self.max_eirp) def __eq__(self, other): return (self._as_tuple() == other._as_tuple()) def __ne__(self, other): return not (self == other) def __lt__(self, other): return (self._as_tuple() < other._as_tuple()) def __hash__(self): return hash(self._as_tuple()) def __str__(self): return '' class DFSRegionError(Exception): def __init__(self, dfs_region): self.dfs_region = dfs_region class FlagError(Exception): def __init__(self, flag): self.flag = flag @total_ordering class Permission(object): def __init__(self, freqband, power, flags, wmmrule): assert isinstance(freqband, FreqBand) assert isinstance(power, PowerRestriction) assert isinstance(wmmrule, WmmRule) or wmmrule is None self.freqband = freqband self.power = power self.wmmrule = wmmrule self.flags = 0 for flag in flags: if not flag in flag_definitions: raise FlagError(flag) self.flags |= flag_definitions[flag] self.textflags = flags def _as_tuple(self): return (self.freqband, self.power, self.flags, self.wmmrule) def __eq__(self, other): return (self._as_tuple() == other._as_tuple()) def __ne__(self, other): return not (self == other) def __lt__(self, other): return (self._as_tuple() < other._as_tuple()) def __hash__(self): return hash(self._as_tuple()) def __str__(self): return str(self.freqband) + str(self.power) + str(self.wmmrule) class Country(object): def __init__(self, dfs_region, permissions=None, comments=None): self._permissions = permissions or [] self.comments = comments or [] self.dfs_region = 0 if dfs_region: if not dfs_region in dfs_regions: raise DFSRegionError(dfs_region) self.dfs_region = dfs_regions[dfs_region] def add(self, perm): assert isinstance(perm, Permission) self._permissions.append(perm) self._permissions.sort() def __contains__(self, perm): assert isinstance(perm, Permission) return perm in self._permissions def __str__(self): r = ['(%s, %s)' % (str(b), str(p)) for b, p in self._permissions] return '' % (', '.join(r)) def _get_permissions_tuple(self): return tuple(self._permissions) permissions = property(_get_permissions_tuple) class SyntaxError(Exception): pass class DBParser(object): def __init__(self, warn=None): self._warn_callout = warn or sys.stderr.write def _syntax_error(self, txt=None): txt = txt and ' (%s)' % txt or '' raise SyntaxError("Syntax error in line %d%s" % (self._lineno, txt)) def _warn(self, txt): self._warn_callout("Warning (line %d): %s\n" % (self._lineno, txt)) def _parse_band_def(self, bname, banddef, dupwarn=True): try: freqs, bw = banddef.split('@') bw = float(bw) except ValueError: bw = 20.0 try: start, end = freqs.split('-') start = float(start) end = float(end) # The kernel will reject these, so might as well reject this # upon building it. if start <= 0: self._syntax_error("Invalid start freq (%d)" % start) if end <= 0: self._syntax_error("Invalid end freq (%d)" % end) if start > end: self._syntax_error("Inverted freq range (%d - %d)" % (start, end)) if start == end: self._syntax_error("Start and end freqs are equal (%d)" % start) except ValueError: self._syntax_error("band must have frequency range") b = FreqBand(start, end, bw, comments=self._comments) self._comments = [] self._banddup[bname] = bname if b in self._bandrev: if dupwarn: self._warn('Duplicate band definition ("%s" and "%s")' % ( bname, self._bandrev[b])) self._banddup[bname] = self._bandrev[b] self._bands[bname] = b self._bandrev[b] = bname self._bandline[bname] = self._lineno def _parse_band(self, line): try: bname, line = line.split(':', 1) if not bname: self._syntax_error("'band' keyword must be followed by name") except ValueError: self._syntax_error("band name must be followed by colon") if bname in flag_definitions: self._syntax_error("Invalid band name") self._parse_band_def(bname, line) def _parse_power(self, line): try: pname, line = line.split(':', 1) if not pname: self._syntax_error("'power' keyword must be followed by name") except ValueError: self._syntax_error("power name must be followed by colon") if pname in flag_definitions: self._syntax_error("Invalid power name") self._parse_power_def(pname, line) def _parse_power_def(self, pname, line, dupwarn=True): try: max_eirp = line if max_eirp == 'N/A': max_eirp = '0' max_ant_gain = float(0) def conv_pwr(pwr): if pwr.endswith('mW'): pwr = float(pwr[:-2]) return 10.0 * math.log10(pwr) else: return float(pwr) max_eirp = conv_pwr(max_eirp) except ValueError: self._syntax_error("invalid power data") p = PowerRestriction(max_ant_gain, max_eirp, comments=self._comments) self._comments = [] self._powerdup[pname] = pname if p in self._powerrev: if dupwarn: self._warn('Duplicate power definition ("%s" and "%s")' % ( pname, self._powerrev[p])) self._powerdup[pname] = self._powerrev[p] self._power[pname] = p self._powerrev[p] = pname self._powerline[pname] = self._lineno def _parse_wmmrule(self, line): regions = line[:-1].strip() if not regions: self._syntax_error("'wmmrule' keyword must be followed by region") regions = regions.split(',') self._current_regions = {} for region in regions: if region in self._wmm_rules: self._warn("region %s was added already to wmm rules" % region) self._current_regions[region] = 1 self._comments = [] def _validate_input(self, cw_min, cw_max, aifsn, cot): if cw_min < 1: self._syntax_error("Invalid cw_min value (%d)" % cw_min) if cw_max < 1: self._syntax_error("Invalid cw_max value (%d)" % cw_max) if cw_min > cw_max: self._syntax_error("Inverted contention window (%d - %d)" % (cw_min, cw_max)) if not (bin(cw_min + 1).count('1') == 1 and cw_min < 2**15): self._syntax_error("Invalid cw_min value should be power of 2 - 1 (%d)" % cw_min) if not (bin(cw_max + 1).count('1') == 1 and cw_max < 2**15): self._syntax_error("Invalid cw_max value should be power of 2 - 1 (%d)" % cw_max) if aifsn < 1: self._syntax_error("Invalid aifsn value (%d)" % aifsn) if cot < 0: self._syntax_error("Invalid cot value (%d)" % cot) def _validate_size(self, var, bytcnt): return bytcnt < ceil(len(bin(var)[2:]) / 8.0) def _parse_wmmrule_item(self, line): bytcnt = (2.0, 2.0, 1.0, 2.0) try: ac, cval = line.split(':') if not ac: self._syntax_error("wmm item must have ac prefix") except ValueError: self._syntax_error("access category must be followed by colon") p = tuple([int(v.split('=', 1)[1]) for v in cval.split(',')]) self._validate_input(*p) for v, b in zip(p, bytcnt): if self._validate_size(v, b): self._syntax_error("unexpected input size expect %d got %d" % (b, v)) for r in self._current_regions: self._wmm_rules[r][ac] = p def _parse_country(self, line): try: cname, cvals= line.split(':', 1) dfs_region = cvals.strip() if not cname: self._syntax_error("'country' keyword must be followed by name") except ValueError: self._syntax_error("country name must be followed by colon") cnames = cname.split(',') self._current_countries = {} for cname in cnames: if len(cname) != 2: self._warn("country '%s' not alpha2" % cname) cname = bytes(cname, 'ascii') if not cname in self._countries: self._countries[cname] = Country(dfs_region, comments=self._comments) self._current_countries[cname] = self._countries[cname] self._comments = [] def _parse_country_item(self, line): if line[0] == '(': try: band, line = line[1:].split('),', 1) bname = 'UNNAMED %d' % self._lineno self._parse_band_def(bname, band, dupwarn=False) except: self._syntax_error("Badly parenthesised band definition") else: try: bname, line = line.split(',', 1) if not bname: self._syntax_error("country definition must have band") if not line: self._syntax_error("country definition must have power") except ValueError: self._syntax_error("country definition must have band and power") if line[0] == '(': items = line.split('),', 1) if len(items) == 1: pname = items[0] line = '' if not pname[-1] == ')': self._syntax_error("Badly parenthesised power definition") pname = pname[:-1] flags = [] else: pname = items[0] flags = items[1].split(',') power = pname[1:] pname = 'UNNAMED %d' % self._lineno self._parse_power_def(pname, power, dupwarn=False) else: line = line.split(',') pname = line[0] flags = line[1:] w = None if flags and 'wmmrule' in flags[-1]: try: region = flags.pop().split('=', 1)[1] if region not in self._wmm_rules.keys(): self._syntax_error("No wmm rule for %s" % region) except IndexError: self._syntax_error("flags is empty list or no region was found") w = WmmRule(*self._wmm_rules[region].values()) if not bname in self._bands: self._syntax_error("band does not exist") if not pname in self._power: self._syntax_error("power does not exist") self._bands_used[bname] = True self._power_used[pname] = True # de-duplicate so binary database is more compact bname = self._banddup[bname] pname = self._powerdup[pname] b = self._bands[bname] p = self._power[pname] try: perm = Permission(b, p, flags, w) except FlagError as e: self._syntax_error("Invalid flag '%s'" % e.flag) for cname, c in self._current_countries.items(): if perm in c: self._warn('Rule "%s, %s" added to "%s" twice' % ( bname, pname, cname)) else: c.add(perm) def parse(self, f): self._current_countries = None self._current_regions = None self._bands = {} self._power = {} self._countries = {} self._bands_used = {} self._power_used = {} self._bandrev = {} self._powerrev = {} self._banddup = {} self._powerdup = {} self._bandline = {} self._powerline = {} self._wmm_rules = defaultdict(lambda: OrderedDict()) self._comments = [] self._lineno = 0 for line in f: self._lineno += 1 line = line.strip() if line[0:1] == '#': self._comments.append(line[1:].strip()) line = line.replace(' ', '').replace('\t', '') if not line: self._current_regions = None self._comments = [] line = line.split('#')[0] if not line: continue if line[0:4] == 'band': self._parse_band(line[4:]) self._current_countries = None self._current_regions = None self._comments = [] elif line[0:5] == 'power': self._parse_power(line[5:]) self._current_countries = None self._current_regions = None self._comments = [] elif line[0:7] == 'country': self._parse_country(line[7:]) self._comments = [] self._current_regions = None elif self._current_countries is not None: self._current_regions = None self._parse_country_item(line) self._comments = [] elif line[0:7] == 'wmmrule': self._parse_wmmrule(line[7:]) self._current_countries = None self._comments = [] elif self._current_regions is not None: self._parse_wmmrule_item(line) self._current_countries = None self._comments = [] else: self._syntax_error("Expected band, power or country definition") countries = self._countries bands = {} for k, v in self._bands.items(): if k in self._bands_used: bands[self._banddup[k]] = v continue # we de-duplicated, but don't warn again about the dupes if self._banddup[k] == k: self._lineno = self._bandline[k] self._warn('Unused band definition "%s"' % k) power = {} for k, v in self._power.items(): if k in self._power_used: power[self._powerdup[k]] = v continue # we de-duplicated, but don't warn again about the dupes if self._powerdup[k] == k: self._lineno = self._powerline[k] self._warn('Unused power definition "%s"' % k) return countries