import requests from bs4 import BeautifulSoup import re class Censys_API: def __init__(self, uid, secret): self.url = 'https://censys.io/api/v1' self.uid = uid self.secret = secret self.login() self.ipv4 = [] def login(self): r = requests.get(self.url + "/data", auth=(self.uid, self.secret)) if r.status_code != 200: print("Wrong creds for Censys") sys.exit(1) return True def build_query_ipv4(self, targets): query = "" for t in targets: query += "ip:[" + t['start'] + " TO " + t['end'] + "]" query += " OR " return query[:-4] def search_ipv4(self, query): r = requests.post(self.url + "/search/ipv4", json={'query': query}, auth=(self.uid, self.secret)) data = r.json() self.parse_ipv4(data) if data['status'] == 'ok': count = data['metadata']['count'] pages = data['metadata']['pages'] for page in range(2, pages + 1): r = requests.post(self.url + "/search/ipv4", json={'query': query, 'page' : page}, auth=(self.uid, self.secret)) data = r.json() self.parse_ipv4(data) return self.ipv4 def parse_ipv4(self, data): for host in data['results']: r = requests.get(self.url + "/view/ipv4/" + host['ip'], auth=(self.uid, self.secret)) data = r.json() try: vhosts = data['443']['https']['tls']['certificate']['parsed']['names'] except: vhosts = [] self.ipv4.append({'ip': host['ip'], 'protocols': host['protocols'], 'vhosts': vhosts}) return True class Censys_WEB: def __init__(self, username, password): self.url = 'https://censys.io/' self.username = username self.password = password self.session = self.login() self.ipv4 = [] def login(self): s = requests.session() r = s.get(self.url + "/login") html = BeautifulSoup(r.text, "lxml") csrf = html.find('input', {'name': 'csrf_token'})['value'] r = s.post(self.url + "/login", data={'login': self.username, 'password': self.password, 'csrf_token': csrf, 'came_from': '/'}, allow_redirects=False) if r.status_code != 302: print("Wrong creds for Censys") return s def build_query_ipv4(self, targets): query = "" for t in targets: query += "ip:[" + t['start'] + " TO " + t['end'] + "]" query += " OR " return query[:-4] def search_ipv4(self, query): r = self.session.get(self.url + "ipv4/_search?q=", params={"q": query, "page": 1}) data = r.text '''Per usare etree bisogna fixare l'html rotto data = "" + data + "" data = re.sub("\.*\<\/a\>", "", data)''' self.parse_ipv4(data) html = BeautifulSoup(data, "lxml") spans = html.find_all('span', {'class': 'SearchResultSectionHeader__statistic'}) pages = int(spans[0].text.split('/')[1].strip()) count = spans[1].text for page in range(2, pages + 1): r = self.session.get(self.url + "ipv4/_search?q=", params={"q": query, "page": page}) data = r.text self.parse_ipv4(data) return self.ipv4 def parse_ipv4(self, data): html = BeautifulSoup(data, "lxml") results = html.find_all('div', {'class': 'SearchResult result'}) for raw in results: vhosts = [] urls = [] protocols = [] ip = raw.find_all('span', {'class': 'dns'})[0].get('id') vhosts_html = raw.find_all('i', {'title': 'names on certificate'}) if vhosts_html: l = vhosts_html[0].next_sibling.replace(' ', '') for vhost in l.split(','): vhosts.append(vhost) protocols_html = raw.find_all('i', {'title': 'public protocols'}) if protocols_html: l = protocols_html[0].next_sibling.replace(' ', '') for protocol in l.split(','): protocols.append(protocol) self.ipv4.append({'ip': ip, 'protocols': protocols, 'vhosts': vhosts, 'urls': urls}) return True