__init__.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import requests
  2. from bs4 import BeautifulSoup
  3. import re
  4. class Censys_API:
  5. def __init__(self, uid, secret):
  6. self.url = 'https://censys.io/api/v1'
  7. self.uid = uid
  8. self.secret = secret
  9. self.login()
  10. self.ipv4 = []
  11. def login(self):
  12. r = requests.get(self.url + "/data", auth=(self.uid, self.secret))
  13. if r.status_code != 200:
  14. print("Wrong creds for Censys")
  15. sys.exit(1)
  16. return True
  17. def build_query_ipv4(self, targets):
  18. query = ""
  19. for t in targets:
  20. query += "ip:[" + t['start'] + " TO " + t['end'] + "]"
  21. query += " OR "
  22. return query[:-4]
  23. def search_ipv4(self, query):
  24. r = requests.post(self.url + "/search/ipv4", json={'query': query}, auth=(self.uid, self.secret))
  25. data = r.json()
  26. self.parse_ipv4(data)
  27. if data['status'] == 'ok':
  28. count = data['metadata']['count']
  29. pages = data['metadata']['pages']
  30. for page in range(2, pages + 1):
  31. r = requests.post(self.url + "/search/ipv4", json={'query': query, 'page' : page}, auth=(self.uid, self.secret))
  32. data = r.json()
  33. self.parse_ipv4(data)
  34. return self.ipv4
  35. def parse_ipv4(self, data):
  36. for host in data['results']:
  37. r = requests.get(self.url + "/view/ipv4/" + host['ip'], auth=(self.uid, self.secret))
  38. data = r.json()
  39. try:
  40. vhosts = data['443']['https']['tls']['certificate']['parsed']['names']
  41. except:
  42. vhosts = []
  43. self.ipv4.append({'ip': host['ip'], 'protocols': host['protocols'], 'vhosts': vhosts})
  44. return True
  45. class Censys_WEB:
  46. def __init__(self, username, password):
  47. self.url = 'https://censys.io/'
  48. self.username = username
  49. self.password = password
  50. if self.login():
  51. self.session = self.login()
  52. self.ipv4 = []
  53. def login(self):
  54. s = requests.session()
  55. requests.get(self.url)
  56. return s
  57. def build_query_ipv4(self, targets):
  58. query = ""
  59. for t in targets:
  60. query += "ip:[" + t['start'] + " TO " + t['end'] + "]"
  61. query += " OR "
  62. return query[:-4]
  63. def search_ipv4(self, query):
  64. r = self.session.get(self.url + "ipv4/_search?q=", params={"q": query, "page": 1})
  65. data = r.text
  66. '''Per usare etree bisogna fixare l'html rotto
  67. data = "<root>" + data + "</root>"
  68. data = re.sub("\<a\ href=\/.*\>.*\<\/a\>", "", data)'''
  69. self.parse_ipv4(data)
  70. html = BeautifulSoup(data, "lxml")
  71. spans = html.find_all('span', {'class': 'SearchResultSectionHeader__statistic'})
  72. pages = int(spans[0].text.split('/')[1].strip())
  73. count = spans[1].text
  74. for page in range(2, pages + 1):
  75. r = self.session.get(self.url + "ipv4/_search?q=", params={"q": query, "page": page})
  76. data = r.text
  77. self.parse_ipv4(data)
  78. return self.ipv4
  79. def parse_ipv4(self, data):
  80. html = BeautifulSoup(data, "lxml")
  81. results = html.find_all('div', {'class': 'SearchResult result'})
  82. for raw in results:
  83. vhosts = []
  84. urls = []
  85. protocols = []
  86. ip = raw.find_all('span', {'class': 'dns'})[0].get('id')
  87. vhosts_html = raw.find_all('i', {'title': 'names on certificate'})
  88. if vhosts_html:
  89. l = vhosts_html[0].next_sibling.replace(' ', '')
  90. for vhost in l.split(','):
  91. vhosts.append(vhost)
  92. protocols_html = raw.find_all('i', {'title': 'public protocols'})
  93. if protocols_html:
  94. l = protocols_html[0].next_sibling.replace(' ', '')
  95. for protocol in l.split(','):
  96. protocols.append(protocol)
  97. self.ipv4.append({'ip': ip, 'protocols': protocols, 'vhosts': vhosts, 'urls': urls})
  98. return True