dbparse.py 17 KB


  1. #!/usr/bin/env python
  2. from builtins import bytes
  3. from functools import total_ordering
  4. import sys, math
  5. from math import ceil, log
  6. from collections import defaultdict, OrderedDict
  7. import attr
  8. # must match <linux/nl80211.h> enum nl80211_reg_rule_flags
  9. flag_definitions = {
  10. 'NO-OFDM': 1<<0,
  11. 'NO-CCK': 1<<1,
  12. 'NO-INDOOR': 1<<2,
  13. 'NO-OUTDOOR': 1<<3,
  14. 'DFS': 1<<4,
  15. 'PTP-ONLY': 1<<5,
  16. 'PTMP-ONLY': 1<<6,
  17. 'NO-IR': 1<<7,
  18. # hole at bit 8
  19. # hole at bit 9. FIXME: Where is NO-HT40 defined?
  20. 'NO-HT40': 1<<10,
  21. 'AUTO-BW': 1<<11,
  22. }
  23. dfs_regions = {
  24. 'DFS-FCC': 1,
  25. 'DFS-ETSI': 2,
  26. 'DFS-JP': 3,
  27. }
  28. @total_ordering
  29. @attr.s(frozen=True, cmp=False)
  30. class WmmRule(object):
  31. vo_c = attr.ib()
  32. vi_c = attr.ib()
  33. be_c = attr.ib()
  34. bk_c = attr.ib()
  35. vo_ap = attr.ib()
  36. vi_ap = attr.ib()
  37. be_ap = attr.ib()
  38. bk_ap = attr.ib()
  39. def _as_tuple(self):
  40. return (self.vo_c, self.vi_c, self.be_c, self.bk_c,
  41. self.vo_ap, self.vi_ap, self.be_ap, self.bk_ap)
  42. def __eq__(self, other):
  43. if other is None:
  44. return False
  45. return (self._as_tuple() == other._as_tuple())
  46. def __ne__(self, other):
  47. return not (self == other)
  48. def __lt__(self, other):
  49. if other is None:
  50. return False
  51. return (self._as_tuple() < other._as_tuple())
  52. def __hash__(self):
  53. return hash(self._as_tuple())
  54. class FreqBand(object):
  55. def __init__(self, start, end, bw, comments=None):
  56. self.start = start
  57. self.end = end
  58. self.maxbw = bw
  59. self.comments = comments or []
  60. def _as_tuple(self):
  61. return (self.start, self.end, self.maxbw)
  62. def __eq__(self, other):
  63. return (self._as_tuple() == other._as_tuple())
  64. def __ne__(self, other):
  65. return not (self == other)
  66. def __lt__(self, other):
  67. return (self._as_tuple() < other._as_tuple())
  68. def __hash__(self):
  69. return hash(self._as_tuple())
  70. def __str__(self):
  71. return '<FreqBand %.3f - %.3f @ %.3f>' % (
  72. self.start, self.end, self.maxbw)
  73. @total_ordering
  74. class PowerRestriction(object):
  75. def __init__(self, max_ant_gain, max_eirp, comments = None):
  76. self.max_ant_gain = max_ant_gain
  77. self.max_eirp = max_eirp
  78. self.comments = comments or []
  79. def _as_tuple(self):
  80. return (self.max_ant_gain, self.max_eirp)
  81. def __eq__(self, other):
  82. return (self._as_tuple() == other._as_tuple())
  83. def __ne__(self, other):
  84. return not (self == other)
  85. def __lt__(self, other):
  86. return (self._as_tuple() < other._as_tuple())
  87. def __hash__(self):
  88. return hash(self._as_tuple())
  89. def __str__(self):
  90. return '<PowerRestriction ...>'
  91. class DFSRegionError(Exception):
  92. def __init__(self, dfs_region):
  93. self.dfs_region = dfs_region
  94. class FlagError(Exception):
  95. def __init__(self, flag):
  96. self.flag = flag
  97. @total_ordering
  98. class Permission(object):
  99. def __init__(self, freqband, power, flags, wmmrule):
  100. assert isinstance(freqband, FreqBand)
  101. assert isinstance(power, PowerRestriction)
  102. assert isinstance(wmmrule, WmmRule) or wmmrule is None
  103. self.freqband = freqband
  104. self.power = power
  105. self.wmmrule = wmmrule
  106. self.flags = 0
  107. for flag in flags:
  108. if not flag in flag_definitions:
  109. raise FlagError(flag)
  110. self.flags |= flag_definitions[flag]
  111. self.textflags = flags
  112. def _as_tuple(self):
  113. return (self.freqband, self.power, self.flags, self.wmmrule)
  114. def __eq__(self, other):
  115. return (self._as_tuple() == other._as_tuple())
  116. def __ne__(self, other):
  117. return not (self == other)
  118. def __lt__(self, other):
  119. return (self._as_tuple() < other._as_tuple())
  120. def __hash__(self):
  121. return hash(self._as_tuple())
  122. def __str__(self):
  123. return str(self.freqband) + str(self.power) + str(self.wmmrule)
  124. class Country(object):
  125. def __init__(self, dfs_region, permissions=None, comments=None):
  126. self._permissions = permissions or []
  127. self.comments = comments or []
  128. self.dfs_region = 0
  129. if dfs_region:
  130. if not dfs_region in dfs_regions:
  131. raise DFSRegionError(dfs_region)
  132. self.dfs_region = dfs_regions[dfs_region]
  133. def add(self, perm):
  134. assert isinstance(perm, Permission)
  135. self._permissions.append(perm)
  136. self._permissions.sort()
  137. def __contains__(self, perm):
  138. assert isinstance(perm, Permission)
  139. return perm in self._permissions
  140. def __str__(self):
  141. r = ['(%s, %s)' % (str(b), str(p)) for b, p in self._permissions]
  142. return '<Country (%s)>' % (', '.join(r))
  143. def _get_permissions_tuple(self):
  144. return tuple(self._permissions)
  145. permissions = property(_get_permissions_tuple)
  146. class SyntaxError(Exception):
  147. pass
  148. class DBParser(object):
  149. def __init__(self, warn=None):
  150. self._warn_callout = warn or sys.stderr.write
  151. def _syntax_error(self, txt=None):
  152. txt = txt and ' (%s)' % txt or ''
  153. raise SyntaxError("Syntax error in line %d%s" % (self._lineno, txt))
  154. def _warn(self, txt):
  155. self._warn_callout("Warning (line %d): %s\n" % (self._lineno, txt))
  156. def _parse_band_def(self, bname, banddef, dupwarn=True):
  157. try:
  158. freqs, bw = banddef.split('@')
  159. bw = float(bw)
  160. except ValueError:
  161. bw = 20.0
  162. try:
  163. start, end = freqs.split('-')
  164. start = float(start)
  165. end = float(end)
  166. # The kernel will reject these, so might as well reject this
  167. # upon building it.
  168. if start <= 0:
  169. self._syntax_error("Invalid start freq (%d)" % start)
  170. if end <= 0:
  171. self._syntax_error("Invalid end freq (%d)" % end)
  172. if start > end:
  173. self._syntax_error("Inverted freq range (%d - %d)" % (start, end))
  174. if start == end:
  175. self._syntax_error("Start and end freqs are equal (%d)" % start)
  176. except ValueError:
  177. self._syntax_error("band must have frequency range")
  178. b = FreqBand(start, end, bw, comments=self._comments)
  179. self._comments = []
  180. self._banddup[bname] = bname
  181. if b in self._bandrev:
  182. if dupwarn:
  183. self._warn('Duplicate band definition ("%s" and "%s")' % (
  184. bname, self._bandrev[b]))
  185. self._banddup[bname] = self._bandrev[b]
  186. self._bands[bname] = b
  187. self._bandrev[b] = bname
  188. self._bandline[bname] = self._lineno
  189. def _parse_band(self, line):
  190. try:
  191. bname, line = line.split(':', 1)
  192. if not bname:
  193. self._syntax_error("'band' keyword must be followed by name")
  194. except ValueError:
  195. self._syntax_error("band name must be followed by colon")
  196. if bname in flag_definitions:
  197. self._syntax_error("Invalid band name")
  198. self._parse_band_def(bname, line)
  199. def _parse_power(self, line):
  200. try:
  201. pname, line = line.split(':', 1)
  202. if not pname:
  203. self._syntax_error("'power' keyword must be followed by name")
  204. except ValueError:
  205. self._syntax_error("power name must be followed by colon")
  206. if pname in flag_definitions:
  207. self._syntax_error("Invalid power name")
  208. self._parse_power_def(pname, line)
  209. def _parse_power_def(self, pname, line, dupwarn=True):
  210. try:
  211. max_eirp = line
  212. if max_eirp == 'N/A':
  213. max_eirp = '0'
  214. max_ant_gain = float(0)
  215. def conv_pwr(pwr):
  216. if pwr.endswith('mW'):
  217. pwr = float(pwr[:-2])
  218. return 10.0 * math.log10(pwr)
  219. else:
  220. return float(pwr)
  221. max_eirp = conv_pwr(max_eirp)
  222. except ValueError:
  223. self._syntax_error("invalid power data")
  224. p = PowerRestriction(max_ant_gain, max_eirp,
  225. comments=self._comments)
  226. self._comments = []
  227. self._powerdup[pname] = pname
  228. if p in self._powerrev:
  229. if dupwarn:
  230. self._warn('Duplicate power definition ("%s" and "%s")' % (
  231. pname, self._powerrev[p]))
  232. self._powerdup[pname] = self._powerrev[p]
  233. self._power[pname] = p
  234. self._powerrev[p] = pname
  235. self._powerline[pname] = self._lineno
  236. def _parse_wmmrule(self, line):
  237. regions = line[:-1].strip()
  238. if not regions:
  239. self._syntax_error("'wmmrule' keyword must be followed by region")
  240. regions = regions.split(',')
  241. self._current_regions = {}
  242. for region in regions:
  243. if region in self._wmm_rules:
  244. self._warn("region %s was added already to wmm rules" % region)
  245. self._current_regions[region] = 1
  246. self._comments = []
  247. def _validate_input(self, cw_min, cw_max, aifsn, cot):
  248. if cw_min < 1:
  249. self._syntax_error("Invalid cw_min value (%d)" % cw_min)
  250. if cw_max < 1:
  251. self._syntax_error("Invalid cw_max value (%d)" % cw_max)
  252. if cw_min > cw_max:
  253. self._syntax_error("Inverted contention window (%d - %d)" %
  254. (cw_min, cw_max))
  255. if not (bin(cw_min + 1).count('1') == 1 and cw_min < 2**15):
  256. self._syntax_error("Invalid cw_min value should be power of 2 - 1 (%d)"
  257. % cw_min)
  258. if not (bin(cw_max + 1).count('1') == 1 and cw_max < 2**15):
  259. self._syntax_error("Invalid cw_max value should be power of 2 - 1 (%d)"
  260. % cw_max)
  261. if aifsn < 1:
  262. self._syntax_error("Invalid aifsn value (%d)" % aifsn)
  263. if cot < 0:
  264. self._syntax_error("Invalid cot value (%d)" % cot)
  265. def _validate_size(self, var, bytcnt):
  266. return bytcnt < ceil(len(bin(var)[2:]) / 8.0)
  267. def _parse_wmmrule_item(self, line):
  268. bytcnt = (2.0, 2.0, 1.0, 2.0)
  269. try:
  270. ac, cval = line.split(':')
  271. if not ac:
  272. self._syntax_error("wmm item must have ac prefix")
  273. except ValueError:
  274. self._syntax_error("access category must be followed by colon")
  275. p = tuple([int(v.split('=', 1)[1]) for v in cval.split(',')])
  276. self._validate_input(*p)
  277. for v, b in zip(p, bytcnt):
  278. if self._validate_size(v, b):
  279. self._syntax_error("unexpected input size expect %d got %d"
  280. % (b, v))
  281. for r in self._current_regions:
  282. self._wmm_rules[r][ac] = p
  283. def _parse_country(self, line):
  284. try:
  285. cname, cvals= line.split(':', 1)
  286. dfs_region = cvals.strip()
  287. if not cname:
  288. self._syntax_error("'country' keyword must be followed by name")
  289. except ValueError:
  290. self._syntax_error("country name must be followed by colon")
  291. cnames = cname.split(',')
  292. self._current_countries = {}
  293. for cname in cnames:
  294. if len(cname) != 2:
  295. self._warn("country '%s' not alpha2" % cname)
  296. cname = bytes(cname, 'ascii')
  297. if not cname in self._countries:
  298. self._countries[cname] = Country(dfs_region, comments=self._comments)
  299. self._current_countries[cname] = self._countries[cname]
  300. self._comments = []
  301. def _parse_country_item(self, line):
  302. if line[0] == '(':
  303. try:
  304. band, line = line[1:].split('),', 1)
  305. bname = 'UNNAMED %d' % self._lineno
  306. self._parse_band_def(bname, band, dupwarn=False)
  307. except:
  308. self._syntax_error("Badly parenthesised band definition")
  309. else:
  310. try:
  311. bname, line = line.split(',', 1)
  312. if not bname:
  313. self._syntax_error("country definition must have band")
  314. if not line:
  315. self._syntax_error("country definition must have power")
  316. except ValueError:
  317. self._syntax_error("country definition must have band and power")
  318. if line[0] == '(':
  319. items = line.split('),', 1)
  320. if len(items) == 1:
  321. pname = items[0]
  322. line = ''
  323. if not pname[-1] == ')':
  324. self._syntax_error("Badly parenthesised power definition")
  325. pname = pname[:-1]
  326. flags = []
  327. else:
  328. pname = items[0]
  329. flags = items[1].split(',')
  330. power = pname[1:]
  331. pname = 'UNNAMED %d' % self._lineno
  332. self._parse_power_def(pname, power, dupwarn=False)
  333. else:
  334. line = line.split(',')
  335. pname = line[0]
  336. flags = line[1:]
  337. w = None
  338. if flags and 'wmmrule' in flags[-1]:
  339. try:
  340. region = flags.pop().split('=', 1)[1]
  341. if region not in self._wmm_rules.keys():
  342. self._syntax_error("No wmm rule for %s" % region)
  343. except IndexError:
  344. self._syntax_error("flags is empty list or no region was found")
  345. w = WmmRule(*self._wmm_rules[region].values())
  346. if not bname in self._bands:
  347. self._syntax_error("band does not exist")
  348. if not pname in self._power:
  349. self._syntax_error("power does not exist")
  350. self._bands_used[bname] = True
  351. self._power_used[pname] = True
  352. # de-duplicate so binary database is more compact
  353. bname = self._banddup[bname]
  354. pname = self._powerdup[pname]
  355. b = self._bands[bname]
  356. p = self._power[pname]
  357. try:
  358. perm = Permission(b, p, flags, w)
  359. except FlagError as e:
  360. self._syntax_error("Invalid flag '%s'" % e.flag)
  361. for cname, c in self._current_countries.items():
  362. if perm in c:
  363. self._warn('Rule "%s, %s" added to "%s" twice' % (
  364. bname, pname, cname))
  365. else:
  366. c.add(perm)
  367. def parse(self, f):
  368. self._current_countries = None
  369. self._current_regions = None
  370. self._bands = {}
  371. self._power = {}
  372. self._countries = {}
  373. self._bands_used = {}
  374. self._power_used = {}
  375. self._bandrev = {}
  376. self._powerrev = {}
  377. self._banddup = {}
  378. self._powerdup = {}
  379. self._bandline = {}
  380. self._powerline = {}
  381. self._wmm_rules = defaultdict(lambda: OrderedDict())
  382. self._comments = []
  383. self._lineno = 0
  384. for line in f:
  385. self._lineno += 1
  386. line = line.strip()
  387. if line[0:1] == '#':
  388. self._comments.append(line[1:].strip())
  389. line = line.replace(' ', '').replace('\t', '')
  390. if not line:
  391. self._current_regions = None
  392. self._comments = []
  393. line = line.split('#')[0]
  394. if not line:
  395. continue
  396. if line[0:4] == 'band':
  397. self._parse_band(line[4:])
  398. self._current_countries = None
  399. self._current_regions = None
  400. self._comments = []
  401. elif line[0:5] == 'power':
  402. self._parse_power(line[5:])
  403. self._current_countries = None
  404. self._current_regions = None
  405. self._comments = []
  406. elif line[0:7] == 'country':
  407. self._parse_country(line[7:])
  408. self._comments = []
  409. self._current_regions = None
  410. elif self._current_countries is not None:
  411. self._current_regions = None
  412. self._parse_country_item(line)
  413. self._comments = []
  414. elif line[0:7] == 'wmmrule':
  415. self._parse_wmmrule(line[7:])
  416. self._current_countries = None
  417. self._comments = []
  418. elif self._current_regions is not None:
  419. self._parse_wmmrule_item(line)
  420. self._current_countries = None
  421. self._comments = []
  422. else:
  423. self._syntax_error("Expected band, power or country definition")
  424. countries = self._countries
  425. bands = {}
  426. for k, v in self._bands.items():
  427. if k in self._bands_used:
  428. bands[self._banddup[k]] = v
  429. continue
  430. # we de-duplicated, but don't warn again about the dupes
  431. if self._banddup[k] == k:
  432. self._lineno = self._bandline[k]
  433. self._warn('Unused band definition "%s"' % k)
  434. power = {}
  435. for k, v in self._power.items():
  436. if k in self._power_used:
  437. power[self._powerdup[k]] = v
  438. continue
  439. # we de-duplicated, but don't warn again about the dupes
  440. if self._powerdup[k] == k:
  441. self._lineno = self._powerline[k]
  442. self._warn('Unused power definition "%s"' % k)
  443. return countries