acasown/censys/__init__.py

109 lines
3.3 KiB
Python
Raw Normal View History

2018-11-08 12:56:36 +01:00
import requests
2018-11-08 22:24:53 +01:00
from bs4 import BeautifulSoup
import re
2018-11-08 12:56:36 +01:00
2018-11-08 22:24:53 +01:00
class Censys_API:
2018-11-08 15:55:24 +01:00
def __init__(self, uid, secret):
self.url = 'https://censys.io/api/v1'
self.uid = uid
self.secret = secret
self.login()
self.ipv4 = []
2018-11-08 12:56:36 +01:00
def login(self):
2018-11-08 15:55:24 +01:00
r = requests.get(self.url + "/data", auth=(self.uid, self.secret))
if r.status_code != 200:
print("Wrong creds for Censys")
sys.exit(1)
2018-11-08 12:56:36 +01:00
return True
2018-11-08 15:55:24 +01:00
def build_query_ipv4(self, targets):
query = ""
for t in targets:
query += "ip:[" + t['start'] + " TO " + t['end'] + "]"
query += " OR "
return query[:-4]
def search_ipv4(self, query):
r = requests.post(self.url + "/search/ipv4", json={'query': query}, auth=(self.uid, self.secret))
data = r.json()
self.parse_ipv4(data)
if data['status'] == 'ok':
count = data['metadata']['count']
pages = data['metadata']['pages']
for page in range(2, pages + 1):
r = requests.post(self.url + "/search/ipv4", json={'query': query, 'page' : page}, auth=(self.uid, self.secret))
data = r.json()
self.parse_ipv4(data)
return self.ipv4
2018-11-08 12:56:36 +01:00
2018-11-08 15:55:24 +01:00
def parse_ipv4(self, data):
for host in data['results']:
r = requests.get(self.url + "/view/ipv4/" + host['ip'], auth=(self.uid, self.secret))
data = r.json()
try:
vhosts = data['443']['https']['tls']['certificate']['parsed']['names']
except:
vhosts = []
self.ipv4.append({'ip': host['ip'], 'protocols': host['protocols'], 'vhosts': vhosts})
2018-11-08 22:24:53 +01:00
return True
class Censys_WEB:
def __init__(self, username, password):
self.url = 'https://censys.io/'
self.username = username
self.password = password
if self.login():
self.session = self.login()
self.ipv4 = []
def login(self):
s = requests.session()
requests.get(self.url)
return s
def build_query_ipv4(self, targets):
query = ""
for t in targets:
query += "ip:[" + t['start'] + " TO " + t['end'] + "]"
query += " OR "
return query[:-4]
def search_ipv4(self, query):
r = self.session.get(self.url + "ipv4/_search?q=", params={"q": query, "page": 1})
data = r.text
'''Per usare etree bisogna fixare l'html rotto
data = "<root>" + data + "</root>"
data = re.sub("\<a\ href=\/.*\>.*\<\/a\>", "", data)'''
self.parse_ipv4(data)
html = BeautifulSoup(data, "lxml")
spans = html.find_all('span', {'class': 'SearchResultSectionHeader__statistic'})
pages = int(spans[0].text.split('/')[1].strip())
count = spans[1].text
for page in range(2, pages + 1):
r = self.session.get(self.url + "ipv4/_search?q=", params={"q": query, "page": page})
data = r.text
self.parse_ipv4(data)
return self.ipv4
def parse_ipv4(self, data):
html = BeautifulSoup(data, "lxml")
results = html.find_all('div', {'class': 'SearchResult result'})
for raw in results:
2018-11-09 00:30:20 +01:00
vhosts = []
urls = []
protocols = []
2018-11-08 22:24:53 +01:00
ip = raw.find_all('span', {'class': 'dns'})[0].get('id')
vhosts_html = raw.find_all('i', {'title': 'names on certificate'})
if vhosts_html:
l = vhosts_html[0].next_sibling.replace(' ', '')
for vhost in l.split(','):
vhosts.append(vhost)
2018-11-09 00:30:20 +01:00
protocols_html = raw.find_all('i', {'title': 'public protocols'})
if protocols_html:
l = protocols_html[0].next_sibling.replace(' ', '')
for protocol in l.split(','):
protocols.append(protocol)
self.ipv4.append({'ip': ip, 'protocols': protocols, 'vhosts': vhosts, 'urls': urls})
return True