Added webtech, still alpha

This commit is contained in:
Giulio 2019-04-14 16:04:33 +02:00
parent 8a0e31a66a
commit d0ba4c3b75
15 changed files with 18663 additions and 6 deletions

View File

@ -1,12 +1,20 @@
import ripe
import censys
import bong
import webtech
import sys
import json
import ripe
import censys
import bong
import webtech
import sys
import json
r = ripe.Ripe()
c = censys.Censys_WEB("dummyuser", "dummypass")
c = censys.Censys_WEB("stripped", "stripped")
b = bong.Bing()
w = webtech.WebTech(options={'json': True})
targets = r.search(sys.argv[1])
print("Found " + str(len(targets)) + " ranges from Ripe")
hosts = c.search_ipv4(c.build_query_ipv4(targets))
@ -23,5 +31,44 @@ for host in hosts:
for vhost in host_bing['vhosts']:
if vhost not in result_vhosts:
result_vhosts.append(vhost)
result.append({'ip': result_ip, 'urls': result_urls, 'vhosts': result_vhosts, 'protocols': host['protocols']})
print(json.dumps(result))
result.append({'ip': result_ip, 'urls': result_urls, 'vhosts': list(dict.fromkeys(result_vhosts)), 'protocols': host['protocols']})
print("Result has " + str(len(result)) + " entries")
final = {}
for host in result:
if "443/https" in host['protocols']:
try:
url = 'https://' + host['ip']
report = w.start_from_url(url, timeout=2)
final[url] = report
except webtech.utils.ConnectionException:
print("Site down " + url)
if "80/http" in host['protocols']:
try:
url = 'http://' + host['ip']
report = w.start_from_url('http://' + host['ip'], timeout=2)
final[url] = report
except webtech.utils.ConnectionException:
print("Site down " + url)
for vhost in host['vhosts']:
if "443/https" in host['protocols']:
try:
url = 'https://' + host['ip'] + ' (' + vhost + ')'
report = w.start_from_url(url, headers={'Host': vhost}, timeout=2)
final[url] = report
except webtech.utils.ConnectionException:
print("Site down " + url)
if "80/http" in host['protocols']:
try:
url = 'http://' + host['ip'] + ' (' + vhost + ')'
report = w.start_from_url('http://' + host['ip'], headers={'Host': vhost}, timeout=2)
final[url] = report
except webtech.utils.ConnectionException:
print("Site down " + url)
for urls in host['urls']:
try:
report = w.start_from_url(url, timeout=2)
final[url] = report
except webtech.utils.ConnectionException:
print("Site down " + url)
print(json.dumps(final, indent=4))

View File

@ -53,13 +53,17 @@ class Censys_WEB:
self.url = 'https://censys.io/'
self.username = username
self.password = password
if self.login():
self.session = self.login()
self.session = self.login()
self.ipv4 = []
def login(self):
s = requests.session()
requests.get(self.url)
r = s.get(self.url + "/login")
html = BeautifulSoup(r.text, "lxml")
csrf = html.find('input', {'name': 'csrf_token'})['value']
r = s.post(self.url + "/login", data={'login': self.username, 'password': self.password, 'csrf_token': csrf, 'came_from': '/'}, allow_redirects=False)
if r.status_code != 302:
print("Wrong creds for Censys")
return s
def build_query_ipv4(self, targets):

110
webtech/.gitignore vendored Normal file
View File

@ -0,0 +1,110 @@
webtech/apps.json
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# editors
.vscode

View File

@ -0,0 +1,3 @@
from .webtech import WebTech
name = "webtech"

58
webtech/__main__.py Normal file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python
import sys
from optparse import OptionParser
from .__version__ import __version__ as VERSION
from .webtech import WebTech
def split_on_comma(option, opt_str, value, parser):
setattr(parser.values, option.dest, value.split(','))
def main():
"""
Main function when running from command line.
"""
parser = OptionParser(prog="webtech", version="%prog {}".format(VERSION))
parser.add_option(
"-u", "--urls",
help="url(s) to scan", type="string", action="callback", callback=split_on_comma)
parser.add_option(
"--urls-file", "--ul",
help="url(s) list file to scan", type="string")
parser.add_option(
"--user-agent", "--ua",
help="use this user agent")
parser.add_option(
"--random-user-agent", "--rua", action="store_true",
help="use a random user agent", default=False)
parser.add_option(
"--database-file", "--db",
help="custom database file")
parser.add_option(
"--json", "--oj", action="store_true",
help="output json-encoded report", default=False)
parser.add_option(
"--grep", "--og", action="store_true",
help="output grepable report", default=False)
parser.add_option(
"--update-db", "--udb", action="store_true",
help="force update of remote db files", default=False)
parser.add_option(
"--timeout", type="float", help="maximum timeout for scrape requests", default=10)
(options, _args) = parser.parse_args(sys.argv)
options = vars(options)
if options.get('urls') is None and options.get('urls_file') is None and options.get('update_db') is None:
print("No URL(s) given!")
parser.print_help()
exit()
wt = WebTech(options)
wt.start()
if __name__ == "__main__":
main()

2
webtech/__version__.py Normal file
View File

@ -0,0 +1,2 @@
# DON'T EDIT THIS FILE
__version__ = "1.2.5"

13379
webtech/apps.json Normal file

File diff suppressed because it is too large Load Diff

146
webtech/database.py Normal file
View File

@ -0,0 +1,146 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os.path
import time
try:
from urllib.request import urlopen
from urllib.error import URLError
except ImportError as e:
from urllib2 import urlopen, URLError
from .utils import UpdateInBurpException
INSTALLATION_DIR = os.path.realpath(os.path.dirname(__file__))
DATABASE_FILE = os.path.join(INSTALLATION_DIR, "webtech.json")
WAPPALYZER_DATABASE_FILE = os.path.join(INSTALLATION_DIR, "apps.json")
WAPPALYZER_DATABASE_URL = "https://raw.githubusercontent.com/AliasIO/Wappalyzer/master/src/apps.json"
WEBTECH_DATABASE_URL = "https://raw.githubusercontent.com/ShielderSec/webtech/master/webtech/webtech.json"
DAYS = 60 * 60 * 24
def download_database_file(url, target_file):
"""
Download the database file from the WAPPPALIZER repository
"""
print("Updating database...")
response = urlopen(url)
with open(target_file, 'wb') as out_file:
out_file.write(response.read())
print("Database updated successfully!")
def save_database_file(content, target_file):
with open(target_file, 'wb') as out_file:
out_file.write(content)
print("Database updated successfully!")
def download(webfile, dbfile, name, force=False, burp=False):
"""
Check if outdated and download file
"""
now = int(time.time())
if not os.path.isfile(dbfile):
print("{} Database file not present.".format(name))
if burp:
raise UpdateInBurpException()
download_database_file(webfile, dbfile)
# set timestamp in filename
else:
last_update = int(os.path.getmtime(dbfile))
if last_update < now - 30 * DAYS or force:
if burp:
raise UpdateInBurpException()
if force:
print("Force update of {} Database file".format(name))
else:
print("{} Database file is older than 30 days.".format(name))
os.remove(dbfile)
download_database_file(webfile, dbfile)
def update_database(args=None, force=False, burp=False):
"""
Update the database if it's not present or too old
"""
try:
download(WAPPALYZER_DATABASE_URL, WAPPALYZER_DATABASE_FILE, "Wappalyzer", force=force, burp=burp)
download(WEBTECH_DATABASE_URL, DATABASE_FILE, "WebTech", force=force, burp=burp)
return True
except URLError as e:
print("Unable to update database, check your internet connection and Github.com availability.")
return False
def merge_databases(db1, db2):
"""
This helper function merge elements from two databases without overrding its elements
This function is not generic and *follow the Wappalyzer db scheme*
"""
# Wappalyzer DB format must have an apps object
db1 = db1['apps']
db2 = db2['apps']
merged_db = db1
for prop in db2:
if merged_db.get(prop) is None:
# if the element appears only in db2, add it to db1
# TODO: Validate type of db2[prop]
merged_db[prop] = db2[prop]
else:
# both db contains the same property, merge its children
element = merged_db[prop]
for key, value in db2[prop].items():
if merged_db[prop].get(key) is None:
# db1's prop doesn't have this key, add it freely
if type(value) in [str, list, dict]:
element[key] = value
else:
raise ValueError('Wrong type in database: only "dict", "list" or "str" are permitted - element of type {}'.format(type(value).__name__))
else:
# both db's prop have the same key, pretty disappointing :(
element[key] = merge_elements(merged_db[prop][key], value)
merged_db[prop] = element
return {'apps': merged_db}
def merge_elements(el1, el2):
"""
Helper function to merge 2 element of different types
Note: el2 has priority over el1 and can override it
The possible cases are:
dict & dict -> merge keys and values
list & list -> merge arrays and remove duplicates
list & str -> add str to array and remove duplicates
str & str -> make a list and remove duplicates
all other cases will raise a ValueError exception
"""
if isinstance(el1, dict):
if isinstance(el2, dict):
# merge keys and value
el1.update(el2)
return el1
else:
raise ValueError('Incompatible types when merging databases: element1 of type {}, element2 of type {}'.format(type(el1).__name__, type(el2).__name__))
elif isinstance(el1, list):
if isinstance(el2, list):
# merge arrays and remove duplicates
el1.extend(el2)
return list(set(el1))
elif isinstance(el2, str):
# add string to array and remove duplicates
el1.append(el2)
return list(set(el1))
else:
raise ValueError('Incompatible types when merging databases: element1 of type {}, element2 of type {}'.format(type(el1).__name__, type(el2).__name__))
elif isinstance(el1, str):
if isinstance(el2, str):
# make a list and remove duplicates
return list(set([el1, el2]))
else:
return merge_elements(el2, el1)
raise ValueError('Wrong type in database: only "dict", "list" or "str" are permitted - element of type {}'.format(type(el1).__name__))

9
webtech/encoder.py Normal file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env python
import json
class Encoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list({"name": entry.name, "version": entry.version} for entry in obj)
return json.JSONEncoder.default(self, obj)

28
webtech/parser.py Normal file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python
try:
from html.parser import HTMLParser
except ImportError:
from HTMLParser import HTMLParser
# Don't blame on me for this mess, we can't use external libs and all we have is HTMLParser
class WTParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.meta = {}
self.scripts = []
def handle_starttag(self, tag, attrs):
if tag == 'meta':
m = {}
for name, value in attrs:
m[name] = value
name = m.get('name') or m.get('property')
if name:
self.meta[name] = m.get('content', '')
elif tag == 'script':
for name, value in attrs:
if name == 'src':
self.scripts.append(value)
return

371
webtech/target.py Normal file
View File

@ -0,0 +1,371 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import json
import re
from io import open
# From now on, hacky hack to work on Burp Jython2.7 without external modules
BURP = False
try:
from requests import get
from requests.utils import dict_from_cookiejar
from requests.structures import CaseInsensitiveDict
from requests.exceptions import RequestException
# Disable warning about Insecure SSL
from requests.packages.urllib3 import disable_warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
disable_warnings(InsecureRequestWarning)
except ImportError as e:
BURP = True
pass
from . import encoder
from .utils import ConnectionException, FileNotFoundException, Format, Tech, caseinsensitive_in, dict_from_caseinsensitivedict
from .parser import WTParser
# Hacky hack to hack ack. Support python2 and python3 without depending on six
if sys.version_info[0] > 2:
unicode = str
def parse_regex_string(string):
"""
Parse header string according to wappalizer DB format
strings follow the below format:
<string>[\\;version:\\\d][\\;confidence:\d]
"string" is a mandatory regex string followed by 0 or more parameters (key:value), can be empty
parameters are divided by a \\; sequence (a backslash followed by a semicolon)
examples of parameters are:
"version": indicate wich regex group store the version information
"confidence": indicate a rate of confidence
"""
parts = string.split(r"\;")
if len(parts) == 1:
return parts[0], None
else:
extra = {}
for p in parts[1:]:
p = p.split(":")
extra[p[0]] = p[1]
return parts[0], extra
class Target():
"""
This class represents a single Target (from scraping a page, from a response file, from a replayed request or from a JSON request-response exchange)
The only self attribues MUST be self.data that contains the fetched data and self.report that contains the results from various checks.response
Every function MUST do only 1 action since we are need to parallelize this and all the data must be source-independent
"""
def __init__(self):
# self.data contains the data fetched from the request
# this object SHOULD be append-only and immutable after the scraping/whitelist process
self.data = {
'url': None,
'html': None,
'headers': {},
'cookies': {},
'meta': {},
'script': {}
}
# self.report contains the information about the technologies detected
self.report = {
'tech': set(),
'headers': [],
}
def scrape_url(self, url, headers={}, cookies={}, timeout=10):
"""
Scrape the target URL and collects all the data that will be filtered afterwards
"""
if BURP:
# Burp flag is set when requests is not installed.
# When using Burp we shouldn't end up in this function so we are in a Python CLI env without requests
raise ImportError("Missing Requests module")
# By default we don't verify SSL certificates, we are only performing some useless GETs
try:
response = get(url, headers=headers, cookies=cookies, verify=False, allow_redirects=True, timeout=timeout)
except RequestException as e:
raise ConnectionException(e)
# print("status: {}".format(response.status_code))
# TODO: switch-case for various response.status_code
self.data['url'] = url
self.data['html'] = response.text
self.data['headers'] = dict_from_caseinsensitivedict(response.headers)
self.data['cookies'] = dict_from_cookiejar(response.cookies)
self.parse_html_page()
def parse_http_file(self, url):
"""
Receives an HTTP request/response file and redirect to request/response parsing
"""
path = url.replace('file://', '')
data = open(path, encoding="ISO-8859-1").read()
# e.g. HTTP/1.1 200 OK -> that's a response!
# does not check HTTP/1 since it might be HTTP/2 :)
if data.startswith("HTTP/"):
# BUG: path is not a reliable information. url matching will always fail
self.data['url'] = path
return self.parse_http_response(data)
return self.parse_http_request(data)
def parse_http_response(self, response):
"""
Parse an HTTP response file and collects all the data that will be filtered afterwards
TODO: find a better way to do this :(
"""
response = response.replace('\r', '')
headers_raw, self.data['html'] = response.split('\n\n', 1)
self.data['cookies'] = {}
for header in headers_raw.split('\n'):
header = [x.strip() for x in header.split(":", 1)]
# might be first row: HTTP/1.1 200
if len(header) != 2:
continue
if "set-cookie" in header[0].lower():
# 'Set-Cookie: dr=gonzo; path=/trmon'
cookie = [x.strip() for x in header[1].split(";", 1)[0].split("=", 1)]
# BUG: if there are cookies for different domains with the same name
# they are going to be overwritten (last occurrence will last)...
# ¯\_(ツ)_/¯
self.data['cookies'][cookie[0]] = cookie[1]
else:
self.data['headers'][header[0].lower()] = (header[1], header[0])
self.parse_html_page()
def parse_http_request(self, request, replay=True):
"""
Parse an HTTP request file and collects all the headers
TODO: find a better way to do this :(
TODO: should we support POST request?
"""
# GET / HTTP/1.1 -> /
request = request.replace('\r', '')
replay_uri = request.split('\n', 1)[0].split(" ")[1]
replay_headers = {}
replay_cookies = {}
headers_raw = request.split('\n\n', 1)[0]
for header in headers_raw.split('\n'):
header = [x.strip() for x in header.split(":", 1)]
# might be first row: GET / HTTP/1.1
if len(header) != 2:
continue
if "cookie" not in header[0].lower():
if "host" in header[0].lower():
host = header[1]
else:
replay_headers[header[0]] = header[1]
else:
# 'Cookie: dr=gonzo; mamm=ta; trmo=n'
for cookie in header[1].split(';'):
cookie = [x.strip() for x in cookie.split("=", 1)]
# BUG: if there are cookies for different domains with the same name
# they are going to be overwritten (last occurrence will last)...
# ¯\_(ツ)_/¯
replay_cookies[cookie[0]] = cookie[1]
# BUG: we don't know for sure if it's through HTTP or HTTPS
replay_url = "https://" + host + replay_uri
if replay:
self.scrape_url(replay_url, headers=replay_headers, cookies=replay_cookies)
else:
# The URL is the only usefull information when parsing a request without replaying it
self.data['url'] = replay_url
def parse_html_page(self):
"""
Parse HTML content to get meta tag and script-src
"""
p = WTParser()
p.feed(self.data['html'])
self.data['meta'] = p.meta
self.data['script'] = p.scripts
p.close()
def whitelist_data(self, common_headers):
"""
Whitelist collected data to report the important/uncommon data BEFORE matching with the database
This function is useful for CMS/technologies that are not in the database
"""
for key, value in self.data['headers'].items():
if key not in common_headers:
# In value[1] it's stored the original header name
self.report['headers'].append({"name": value[1], "value": value[0]})
def check_html(self, tech, html):
"""
Check if request html contains some database matches
"""
if isinstance(html, str) or isinstance(html, unicode):
html = [html]
for source in html:
matches = re.search(source, self.data['html'], re.IGNORECASE)
if matches is not None:
matched_tech = Tech(name=tech, version=None)
self.report['tech'].add(matched_tech)
# this tech is matched, GOTO next
return
def check_headers(self, tech, headers):
"""
Check if request headers match some database headers
"""
if not isinstance(headers, dict):
raise ValueError('Invalid headers data in database: {}'.format(headers))
# For every tech header check if there is a match in our target
for header in headers:
content = self.data['headers'].get(header.lower())
if content is None:
# Tech not found
return
else:
# Get the real content
content = content[0]
# Parse the matching regex
attr, extra = parse_regex_string(headers[header])
matches = re.search(attr, content, re.IGNORECASE)
# Attr is empty for a "generic" tech header
if attr is '' or matches is not None:
matched_tech = Tech(name=tech, version=None)
# The version extra data is present
if extra and extra['version']:
if matches.group(1):
matched_tech = matched_tech._replace(version=matches.group(1))
self.report['tech'].add(matched_tech)
# remove ALL the tech headers from the Custom Header list
# first make a list of tech headers
tech_headers = list(map(str, headers.keys()))
# then filter them in target headers case insensitively
self.report['headers'] = list(filter(lambda h: not caseinsensitive_in(str(h['name']), tech_headers), self.report['headers']))
# this tech is matched, GOTO next
return
def check_meta(self, tech, meta):
"""
Check if request meta from page's HTML contains some database matches
"""
for m in meta:
content = self.data['meta'].get(m)
# filter not-available meta
if content is None:
continue
attr, extra = parse_regex_string(meta[m])
matches = re.search(attr, content, re.IGNORECASE)
# Attr is empty for a "generic" tech meta
if attr is '' or matches is not None:
matched_tech = Tech(name=tech, version=None)
# The version extra data is present
if extra and extra['version']:
if matches.group(1):
matched_tech = matched_tech._replace(version=matches.group(1))
self.report['tech'].add(matched_tech)
# this tech is matched, GOTO next
return
def check_script(self, tech, script):
"""
Check if request script src from page's HTML contains some database matches
"""
# FIX repair to some database inconsistencies
if isinstance(script, str) or isinstance(script, unicode):
script = [script]
for source in script:
attr, extra = parse_regex_string(source)
for src in self.data['script']:
matches = re.search(attr, src, re.IGNORECASE)
# Attr is empty for a "generic" tech meta
if attr is '' or matches is not None:
matched_tech = Tech(name=tech, version=None)
# The version extra data is present
if extra and extra['version']:
if matches.group(1):
matched_tech = matched_tech._replace(version=matches.group(1))
self.report['tech'].add(matched_tech)
# this tech is matched, GOTO next
return
def check_cookies(self, tech, cookies):
"""
Check if request cookies match some database cookies
"""
for cookie in cookies:
# cookies in db are regexes so we must test them all
cookie = cookie.replace("*","") # FIX for "Fe26.2**" hapi.js cookie in the database
for biscuit in self.data['cookies'].keys():
matches = re.search(cookie, biscuit, re.IGNORECASE)
if matches is not None:
if cookies[cookie] != '':
# Let's check the cookie content
content = self.data['cookies'][biscuit]
matches = re.search(cookies[cookie], content, re.IGNORECASE)
if matches is None:
# No match, exit
return
matched_tech = Tech(name=tech, version=None)
self.report['tech'].add(matched_tech)
# this tech is matched, GOTO next
return
def check_url(self, tech, url):
"""
Check if request url match some database url rules
"""
if isinstance(url, str) or isinstance(url, unicode):
url = [url]
for source in url:
matches = re.search(source, self.data['url'], re.IGNORECASE)
if matches is not None:
matched_tech = Tech(name=tech, version=None)
self.report['tech'].add(matched_tech)
# this tech is matched, GOTO next
return
def generate_report(self, output_format):
"""
Generate a report
"""
if output_format == Format['grep']:
techs = ""
for tech in self.report['tech']:
if len(techs): techs += "//"
techs += "{}/{}".format(tech.name, 'unknown' if tech.version is None else tech.version)
headers = ""
for header in self.report['headers']:
if len(headers): headers += "//"
headers += "{}:{}".format(header["name"], header["value"])
return "Url>{}\tTechs>{}\tHeaders>{}".format(self.data['url'], techs, headers)
elif output_format == Format['json']:
return json.loads(json.dumps(self.report, cls=encoder.Encoder))
else:
retval = ""
retval += "Target URL: {}\n".format(self.data['url'])
if self.report['tech']:
retval += "Detected technologies:\n"
for tech in self.report['tech']:
retval += "\t- {} {}\n".format(tech.name, '' if tech.version is None else tech.version)
if self.report['headers']:
retval += "Detected the following interesting custom headers:\n"
for header in self.report['headers']:
retval += "\t- {}: {}\n".format(header["name"], header["value"])
return retval

4195
webtech/ua.txt Normal file

File diff suppressed because it is too large Load Diff

37
webtech/utils.py Normal file
View File

@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from collections import namedtuple
try:
FileNotFoundException = FileNotFoundError
except NameError:
FileNotFoundException = IOError
Format = {
'text': 0,
'grep': 1,
'json': 2
}
Tech = namedtuple('Tech', ['name', 'version'])
class ConnectionException(Exception):
pass
class UpdateInBurpException:
pass
def caseinsensitive_in(element, elist):
"""
Given a list and an element, return true if the element is present in the list
in a case-insensitive flavor
"""
return element.lower() in map(str.lower, elist)
def dict_from_caseinsensitivedict(cidict):
# This is pretty bad, but in Python2 we don't have CaseInsensitiveDict and with Burp we cannot use requests's implementation
d = {}
for key, value in cidict.items():
d[key.lower()] = (value, key)
return d

36
webtech/webtech.json Normal file
View File

@ -0,0 +1,36 @@
{
"apps": {
"Wix": {
"cookies": {
"svSession": ""
}
},
"Google QUIC": {
"headers": {
"Alt-Svc": "quic"
}
},
"IIS": {
"headers": {
"Server": "^(?:Microsoft-)?IIS(?:/([\\d.]+))?\\;version:\\1"
}
},
"BigIP - F5": {
"headers": {
"Server": "BigIP"
}
},
"Outlook Web Access": {
"headers": {
"X-OWA-Version": "([\\d.]+)\\;version:\\1",
"X-OWA-DiagnosticsInfo": "",
"X-OWA-MinimumSupportedOWSVersion": "",
"X-OWA-OWSVersion": ""
},
"script": [
".*/([\\d.]+)/scripts/microsoft.owa\\S*.js\\;version:\\1",
".*/([\\d.]+)/scripts/owa.mail.js\\;version:\\1"
]
}
}
}

232
webtech/webtech.py Executable file
View File

@ -0,0 +1,232 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
import random
try:
from urlparse import urlparse
except ImportError: # For Python 3
from urllib.parse import urlparse
from . import database
from .utils import Format, FileNotFoundException, ConnectionException
from .target import Target, BURP
from .__version__ import __version__ as VERSION
def default_user_agent():
return "webtech/{}".format(VERSION)
def get_random_user_agent():
"""
Get a random user agent from a file
"""
ua_file = os.path.join(os.path.realpath(os.path.dirname(__file__)), "ua.txt")
try:
with open(ua_file) as f:
agents = f.readlines()
return random.choice(agents).strip()
except FileNotFoundException as e:
print(e)
print('Please: Reinstall webtech correctly or provide a valid User-Agent list')
exit(-1)
class WebTech():
"""
Main class. The orchestrator that decides what to do.
This class is the bridge between the tech's database and the Targets' data
"""
COMMON_HEADERS = ['Accept-Ranges', 'Access-Control-Allow-Methods', 'Access-Control-Allow-Origin', 'Age', 'Cache-Control', 'Connection',
'Content-Encoding', 'Content-Language', 'Content-Length', 'Content-Security-Policy', 'Content-Type', 'Date', 'ETag', 'Expect-CT', 'Expires',
'Feature-Policy', 'Keep-Alive', 'Last-Modified', 'Link', 'Location', 'P3P', 'Pragma', 'Referrer-Policy', 'Set-Cookie',
'Strict-Transport-Security', 'Transfer-Encoding', 'Vary', 'X-Accel-Buffering', 'X-Cache', 'X-Cache-Hits', 'X-Content-Security-Policy',
'X-Content-Type-Options', 'X-Frame-Options', 'X-Timer', 'X-WebKit-CSP', 'X-XSS-Protection']
COMMON_HEADERS = [ch.lower() for ch in COMMON_HEADERS]
# 'cats' tech categories
# 'implies' website is using also this tech
# 'excludes' exclude this tech
# 'website' website for this tech
# 'icon' icon for this tech (useless)
# 'headers' check this patter in headers
# 'html' check this regex in html
# 'meta' check this patter in meta
# 'js' check this expression in javascript context
# 'cookies' check this patter in cookies
# 'script' check this pattern in scripts src
# 'url' check this patter in url
def __init__(self, options=None):
update = False if options is None else options.get('update_db', False)
success = database.update_database(force=update, burp=BURP)
self.fail = False
if not success:
# Hack for not crashing Burp
self.fail = True
return
with open(database.WAPPALYZER_DATABASE_FILE) as f:
self.db = json.load(f)
with open(database.DATABASE_FILE) as f:
self.db = database.merge_databases(self.db, json.load(f))
# Output text only
self.output_format = Format['text']
# Default user agent
self.USER_AGENT = default_user_agent()
if options is None:
return
if options.get('database_file'):
try:
with open(options.get('database_file')) as f:
self.db = database.merge_databases(self.db, json.load(f))
except (FileNotFoundException, ValueError) as e:
print(e)
exit(-1)
self.urls = options.get('urls', [])
if options.get('urls_file'):
try:
with open(options.get('urls_file')) as f:
self.urls = f.readlines()
except FileNotFoundException as e:
print(e)
exit(-1)
if options.get('user_agent'):
self.USER_AGENT = options.get('user_agent')
elif options.get('random_user_agent'):
self.USER_AGENT = get_random_user_agent()
if options.get('grep'):
# Greppable output
self.output_format = Format['grep']
elif options.get('json'):
# JSON output
self.output_format = Format['json']
try:
self.timeout = int(options.get('timeout', '10'))
except ValueError:
self.timeout = 10
def start(self):
"""
Start the engine, fetch an URL and report the findings
"""
if self.fail:
# Fail badly
exit(1)
self.output = {}
for url in self.urls:
try:
temp_output = self.start_from_url(url)
except (FileNotFoundException, ValueError) as e:
print(e)
continue
except ConnectionException as e:
print("Connection error while scanning {}".format(url))
continue
if self.output_format == Format['text']:
print(temp_output)
else:
self.output[url] = temp_output
if self.output_format == Format['json']:
print(self.output)
else:
for o in self.output.values():
print(o)
def start_from_url(self, url, headers={}, timeout=None):
"""
Start webtech on a single URL/target
Returns the report for that specific target
"""
timeout = timeout or self.timeout
target = Target()
parsed_url = urlparse(url)
if "http" in parsed_url.scheme:
# Scrape the URL by making a request
h = {'User-Agent': self.USER_AGENT}
h.update(headers)
target.scrape_url(url, headers=h, cookies={}, timeout=timeout)
elif "file" in parsed_url.scheme:
# Load the file and read it
target.parse_http_file(url)
else:
raise ValueError("Invalid scheme {} for URL {}. Only 'http', 'https' and 'file' are supported".format(parsed_url.scheme, url))
return self.perform(target)
def start_from_json(self, exchange):
"""
Start webtech on a single target from a HTTP request-response exchange as JSON serialized string
This function is the entry point for the Burp extension
"""
return self.start_from_exchange(json.loads(exchange))
def start_from_exchange(self, exchange):
"""
Start webtech on a single target from a HTTP request-response exchange as Object
"""
target = Target()
target.parse_http_response(exchange['response'])
target.parse_http_request(exchange['request'], replay=False)
return self.perform(target)
def perform(self, target):
"""
Performs all the checks on the current target received as argument
This function can be executed on multiple threads since "it doesn't access on shared data"
"""
if self.fail:
# Fail gracefully
if self.output_format == Format['json']:
return {}
else:
return ''
target.whitelist_data(self.COMMON_HEADERS)
# Cycle through all the db technologies and do all the checks
# It's more efficent cycling all technologies and match against the target once for tech
# instead of cycling each target feature against every technology
for tech in self.db["apps"]:
t = self.db["apps"][tech]
headers = t.get("headers")
html = t.get("html")
meta = t.get("meta")
cookies = t.get("cookies")
script = t.get("script")
url = t.get("url")
if headers:
target.check_headers(tech, headers)
if html:
target.check_html(tech, html)
if meta:
target.check_meta(tech, meta)
if cookies:
target.check_cookies(tech, cookies)
if script:
target.check_script(tech, script)
if url:
target.check_url(tech, url)
return target.generate_report(self.output_format)