123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- import json
- import re
- from io import open
- # From now on, hacky hack to work on Burp Jython2.7 without external modules
- BURP = False
- try:
- from requests import get
- from requests.utils import dict_from_cookiejar
- from requests.structures import CaseInsensitiveDict
- from requests.exceptions import RequestException
- # Disable warning about Insecure SSL
- from requests.packages.urllib3 import disable_warnings
- from requests.packages.urllib3.exceptions import InsecureRequestWarning
- disable_warnings(InsecureRequestWarning)
- except ImportError as e:
- BURP = True
- pass
- from . import encoder
- from .utils import ConnectionException, FileNotFoundException, Format, Tech, caseinsensitive_in, dict_from_caseinsensitivedict
- from .parser import WTParser
- # Hacky hack to hack ack. Support python2 and python3 without depending on six
- if sys.version_info[0] > 2:
- unicode = str
- def parse_regex_string(string):
- """
- Parse header string according to wappalizer DB format
- strings follow the below format:
- <string>[\\;version:\\\d][\\;confidence:\d]
- "string" is a mandatory regex string followed by 0 or more parameters (key:value), can be empty
- parameters are divided by a \\; sequence (a backslash followed by a semicolon)
- examples of parameters are:
- "version": indicate wich regex group store the version information
- "confidence": indicate a rate of confidence
- """
- parts = string.split(r"\;")
- if len(parts) == 1:
- return parts[0], None
- else:
- extra = {}
- for p in parts[1:]:
- p = p.split(":")
- extra[p[0]] = p[1]
- return parts[0], extra
- class Target():
- """
- This class represents a single Target (from scraping a page, from a response file, from a replayed request or from a JSON request-response exchange)
- The only self attribues MUST be self.data that contains the fetched data and self.report that contains the results from various checks.response
- Every function MUST do only 1 action since we are need to parallelize this and all the data must be source-independent
- """
- def __init__(self):
- # self.data contains the data fetched from the request
- # this object SHOULD be append-only and immutable after the scraping/whitelist process
- self.data = {
- 'url': None,
- 'html': None,
- 'headers': {},
- 'cookies': {},
- 'meta': {},
- 'script': {}
- }
- # self.report contains the information about the technologies detected
- self.report = {
- 'tech': set(),
- 'headers': [],
- }
- def scrape_url(self, url, headers={}, cookies={}, timeout=10):
- """
- Scrape the target URL and collects all the data that will be filtered afterwards
- """
- if BURP:
- # Burp flag is set when requests is not installed.
- # When using Burp we shouldn't end up in this function so we are in a Python CLI env without requests
- raise ImportError("Missing Requests module")
- # By default we don't verify SSL certificates, we are only performing some useless GETs
- try:
- response = get(url, headers=headers, cookies=cookies, verify=False, allow_redirects=True, timeout=timeout)
- except RequestException as e:
- raise ConnectionException(e)
- # print("status: {}".format(response.status_code))
- # TODO: switch-case for various response.status_code
- self.data['url'] = url
- self.data['html'] = response.text
- self.data['headers'] = dict_from_caseinsensitivedict(response.headers)
- self.data['cookies'] = dict_from_cookiejar(response.cookies)
- self.parse_html_page()
- def parse_http_file(self, url):
- """
- Receives an HTTP request/response file and redirect to request/response parsing
- """
- path = url.replace('file://', '')
- data = open(path, encoding="ISO-8859-1").read()
- # e.g. HTTP/1.1 200 OK -> that's a response!
- # does not check HTTP/1 since it might be HTTP/2 :)
- if data.startswith("HTTP/"):
- # BUG: path is not a reliable information. url matching will always fail
- self.data['url'] = path
- return self.parse_http_response(data)
- return self.parse_http_request(data)
- def parse_http_response(self, response):
- """
- Parse an HTTP response file and collects all the data that will be filtered afterwards
- TODO: find a better way to do this :(
- """
- response = response.replace('\r', '')
- headers_raw, self.data['html'] = response.split('\n\n', 1)
- self.data['cookies'] = {}
- for header in headers_raw.split('\n'):
- header = [x.strip() for x in header.split(":", 1)]
- # might be first row: HTTP/1.1 200
- if len(header) != 2:
- continue
- if "set-cookie" in header[0].lower():
- # 'Set-Cookie: dr=gonzo; path=/trmon'
- cookie = [x.strip() for x in header[1].split(";", 1)[0].split("=", 1)]
- # BUG: if there are cookies for different domains with the same name
- # they are going to be overwritten (last occurrence will last)...
- # ¯\_(ツ)_/¯
- self.data['cookies'][cookie[0]] = cookie[1]
- else:
- self.data['headers'][header[0].lower()] = (header[1], header[0])
- self.parse_html_page()
- def parse_http_request(self, request, replay=True):
- """
- Parse an HTTP request file and collects all the headers
- TODO: find a better way to do this :(
- TODO: should we support POST request?
- """
- # GET / HTTP/1.1 -> /
- request = request.replace('\r', '')
- replay_uri = request.split('\n', 1)[0].split(" ")[1]
- replay_headers = {}
- replay_cookies = {}
- headers_raw = request.split('\n\n', 1)[0]
- for header in headers_raw.split('\n'):
- header = [x.strip() for x in header.split(":", 1)]
- # might be first row: GET / HTTP/1.1
- if len(header) != 2:
- continue
- if "cookie" not in header[0].lower():
- if "host" in header[0].lower():
- host = header[1]
- else:
- replay_headers[header[0]] = header[1]
- else:
- # 'Cookie: dr=gonzo; mamm=ta; trmo=n'
- for cookie in header[1].split(';'):
- cookie = [x.strip() for x in cookie.split("=", 1)]
- # BUG: if there are cookies for different domains with the same name
- # they are going to be overwritten (last occurrence will last)...
- # ¯\_(ツ)_/¯
- replay_cookies[cookie[0]] = cookie[1]
- # BUG: we don't know for sure if it's through HTTP or HTTPS
- replay_url = "https://" + host + replay_uri
- if replay:
- self.scrape_url(replay_url, headers=replay_headers, cookies=replay_cookies)
- else:
- # The URL is the only usefull information when parsing a request without replaying it
- self.data['url'] = replay_url
- def parse_html_page(self):
- """
- Parse HTML content to get meta tag and script-src
- """
- p = WTParser()
- p.feed(self.data['html'])
- self.data['meta'] = p.meta
- self.data['script'] = p.scripts
- p.close()
- def whitelist_data(self, common_headers):
- """
- Whitelist collected data to report the important/uncommon data BEFORE matching with the database
- This function is useful for CMS/technologies that are not in the database
- """
- for key, value in self.data['headers'].items():
- if key not in common_headers:
- # In value[1] it's stored the original header name
- self.report['headers'].append({"name": value[1], "value": value[0]})
- def check_html(self, tech, html):
- """
- Check if request html contains some database matches
- """
- if isinstance(html, str) or isinstance(html, unicode):
- html = [html]
- for source in html:
- matches = re.search(source, self.data['html'], re.IGNORECASE)
- if matches is not None:
- matched_tech = Tech(name=tech, version=None)
- self.report['tech'].add(matched_tech)
- # this tech is matched, GOTO next
- return
- def check_headers(self, tech, headers):
- """
- Check if request headers match some database headers
- """
- if not isinstance(headers, dict):
- raise ValueError('Invalid headers data in database: {}'.format(headers))
- # For every tech header check if there is a match in our target
- for header in headers:
- content = self.data['headers'].get(header.lower())
- if content is None:
- # Tech not found
- return
- else:
- # Get the real content
- content = content[0]
- # Parse the matching regex
- attr, extra = parse_regex_string(headers[header])
- matches = re.search(attr, content, re.IGNORECASE)
- # Attr is empty for a "generic" tech header
- if attr is '' or matches is not None:
- matched_tech = Tech(name=tech, version=None)
- # The version extra data is present
- if extra and extra['version']:
- if matches.group(1):
- matched_tech = matched_tech._replace(version=matches.group(1))
- self.report['tech'].add(matched_tech)
- # remove ALL the tech headers from the Custom Header list
- # first make a list of tech headers
- tech_headers = list(map(str, headers.keys()))
- # then filter them in target headers case insensitively
- self.report['headers'] = list(filter(lambda h: not caseinsensitive_in(str(h['name']), tech_headers), self.report['headers']))
- # this tech is matched, GOTO next
- return
- def check_meta(self, tech, meta):
- """
- Check if request meta from page's HTML contains some database matches
- """
- for m in meta:
- content = self.data['meta'].get(m)
- # filter not-available meta
- if content is None:
- continue
- attr, extra = parse_regex_string(meta[m])
- matches = re.search(attr, content, re.IGNORECASE)
- # Attr is empty for a "generic" tech meta
- if attr is '' or matches is not None:
- matched_tech = Tech(name=tech, version=None)
- # The version extra data is present
- if extra and extra['version']:
- if matches.group(1):
- matched_tech = matched_tech._replace(version=matches.group(1))
- self.report['tech'].add(matched_tech)
- # this tech is matched, GOTO next
- return
- def check_script(self, tech, script):
- """
- Check if request script src from page's HTML contains some database matches
- """
- # FIX repair to some database inconsistencies
- if isinstance(script, str) or isinstance(script, unicode):
- script = [script]
- for source in script:
- attr, extra = parse_regex_string(source)
- for src in self.data['script']:
- matches = re.search(attr, src, re.IGNORECASE)
- # Attr is empty for a "generic" tech meta
- if attr is '' or matches is not None:
- matched_tech = Tech(name=tech, version=None)
- # The version extra data is present
- if extra and extra['version']:
- if matches.group(1):
- matched_tech = matched_tech._replace(version=matches.group(1))
- self.report['tech'].add(matched_tech)
- # this tech is matched, GOTO next
- return
- def check_cookies(self, tech, cookies):
- """
- Check if request cookies match some database cookies
- """
- for cookie in cookies:
- # cookies in db are regexes so we must test them all
- cookie = cookie.replace("*","") # FIX for "Fe26.2**" hapi.js cookie in the database
- for biscuit in self.data['cookies'].keys():
- matches = re.search(cookie, biscuit, re.IGNORECASE)
- if matches is not None:
- if cookies[cookie] != '':
- # Let's check the cookie content
- content = self.data['cookies'][biscuit]
- matches = re.search(cookies[cookie], content, re.IGNORECASE)
- if matches is None:
- # No match, exit
- return
- matched_tech = Tech(name=tech, version=None)
- self.report['tech'].add(matched_tech)
- # this tech is matched, GOTO next
- return
- def check_url(self, tech, url):
- """
- Check if request url match some database url rules
- """
- if isinstance(url, str) or isinstance(url, unicode):
- url = [url]
- for source in url:
- matches = re.search(source, self.data['url'], re.IGNORECASE)
- if matches is not None:
- matched_tech = Tech(name=tech, version=None)
- self.report['tech'].add(matched_tech)
- # this tech is matched, GOTO next
- return
- def generate_report(self, output_format):
- """
- Generate a report
- """
- if output_format == Format['grep']:
- techs = ""
- for tech in self.report['tech']:
- if len(techs): techs += "//"
- techs += "{}/{}".format(tech.name, 'unknown' if tech.version is None else tech.version)
- headers = ""
- for header in self.report['headers']:
- if len(headers): headers += "//"
- headers += "{}:{}".format(header["name"], header["value"])
- return "Url>{}\tTechs>{}\tHeaders>{}".format(self.data['url'], techs, headers)
- elif output_format == Format['json']:
- return json.loads(json.dumps(self.report, cls=encoder.Encoder))
- else:
- retval = ""
- retval += "Target URL: {}\n".format(self.data['url'])
- if self.report['tech']:
- retval += "Detected technologies:\n"
- for tech in self.report['tech']:
- retval += "\t- {} {}\n".format(tech.name, '' if tech.version is None else tech.version)
- if self.report['headers']:
- retval += "Detected the following interesting custom headers:\n"
- for header in self.report['headers']:
- retval += "\t- {}: {}\n".format(header["name"], header["value"])
- return retval
|