From af8d7cf765dadb811965075819bc774cc9481a10 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 13 May 2020 11:53:58 +0200 Subject: [PATCH] Working rel --- Procfile | 2 +- Readme.md | 42 ++++++++++++++++++++++++ certstream/__init__.py | 1 + certstream/cli.py | 67 +++++++++++++++++++++++++++++++++++++++ certstream/core.py | 60 +++++++++++++++++++++++++++++++++++ certstream_consumer.py | 20 +++++++----- notifications_consumer.py | 10 +++--- php/bootstrap.php | 4 +-- php/certalert.php | 21 +++++++++--- 9 files changed, 208 insertions(+), 19 deletions(-) create mode 100644 Readme.md create mode 100644 certstream/__init__.py create mode 100644 certstream/cli.py create mode 100644 certstream/core.py diff --git a/Procfile b/Procfile index 5e3c061..69b75b0 100644 --- a/Procfile +++ b/Procfile @@ -1,3 +1,3 @@ certstream_producer: python3 certstream_producer.py certstream_consumer: python3 certstream_consumer.py -notifications_consumer: python3 notifications_consumer.py +notifications_consumer: TOKEN={{ TOKEN }} python3 notifications_consumer.py diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..7e8928a --- /dev/null +++ b/Readme.md @@ -0,0 +1,42 @@ +## CertAlert bot +### Intro +CertalAlert with the purpose of sending live notifications from the Certificate Transparency stream. Users can set custom rules and receive notifications only about their domains or keyword of interest. [The bot is live here](https://t.me/certalertbot). + +### Info +The source for the Certificate Transparency data is [CertStream](https://certstream.calidog.io/). Currently this script it's using their official demo server but it's not super reliable and it may miss some entries. Due to the this issue, a self deployment of CertStream is highly recommended. + + * `certstream_producer.py` This file push the stream from CertStream to a local Redis queue. + * `certstream_consumer.py` This file consumes the previous queue and checks for matching domains. If a match is found, it is puhed on another Redis queue which contains the notifications. + * `notifications_consumer.py` This file consumes the notifications queue and so is responsible for using the Telegram API. + +Users rules will be stored directly in MySQL for persistence. When a rule is added, it is inserted in both MySQL and in another specific Redis queue. `certstream_consumer.py` will consume this queue loading rule changes every 1000 domains. + + +File `bootstrap.php` needs to be run when the bot is started in order to load the saved rules in MySQL into Redis. + +File `certalertbot.php` has the actual bot logic and is used as a webhook for telegram. + +[hivemind](https://github.com/DarthSim/hivemind) is used as a process supervisor and requires `tmux`. Processes are defined in `Procfile`. + +To start the bot, configure MySQL in `certalertbot.php` and in `botostrap.php`, insert the Telergam API key in `certalertbot.php` and in `Procfile`, configure Redis, publish `certalertbot.php`, run `bootstrap.php` and then start everything with `hivemind Procfile`. + +### /start + +*CertAlert* bot +This bot sends an alert when a certificate matching a certain rule is logged in the Certificate Trasparency. + + +```/list``` + +To list the current rules. + +```/delete ``` + +To delete a rule. + +```/add ``` + +To add a rule. +_in_ matches the given substring in any postition, _start_ at the beginning and _end_ at the end. + +For special characters use the IDNA encoding. \ No newline at end of file diff --git a/certstream/__init__.py b/certstream/__init__.py new file mode 100644 index 0000000..b38213c --- /dev/null +++ b/certstream/__init__.py @@ -0,0 +1 @@ +from .core import listen_for_events \ No newline at end of file diff --git a/certstream/cli.py b/certstream/cli.py new file mode 100644 index 0000000..ae29fe1 --- /dev/null +++ b/certstream/cli.py @@ -0,0 +1,67 @@ +import argparse +import datetime +import json +import logging +import sys +import termcolor + +from signal import signal, SIGPIPE, SIG_DFL + +import certstream + +parser = argparse.ArgumentParser(description='Connect to the CertStream and process CTL list updates.') + +parser.add_argument('--json', action='store_true', help='Output raw JSON to the console.') +parser.add_argument('--full', action='store_true', help='Output all SAN addresses as well') +parser.add_argument('--disable-colors', action='store_true', help='Disable colors when writing a human readable ') +parser.add_argument('--verbose', action='store_true', default=False, dest='verbose', help='Display debug logging.') +parser.add_argument('--url', default="wss://certstream.calidog.io", dest='url', help='Connect to a certstream server.') + +def main(): + args = parser.parse_args() + + # Ignore broken pipes + signal(SIGPIPE, SIG_DFL) + + log_level = logging.INFO + if args.verbose: + log_level = logging.DEBUG + + logging.basicConfig(format='[%(levelname)s:%(name)s] %(asctime)s - %(message)s', level=log_level) + + def _handle_messages(message, context): + if args.json: + sys.stdout.flush() + sys.stdout.write(json.dumps(message) + "\n") + sys.stdout.flush() + else: + if args.disable_colors: + logging.debug("Starting normal output.") + payload = "{} {} - {} {}\n".format( + "[{}]".format(datetime.datetime.fromtimestamp(message['data']['seen']).isoformat()), + message['data']['source']['url'], + message['data']['leaf_cert']['subject']['CN'], + "[{}]".format(", ".join(message['data']['leaf_cert']['all_domains'])) if args.full else "" + ) + + sys.stdout.write(payload) + else: + logging.debug("Starting colored output.") + payload = "{} {} - {} {}\n".format( + termcolor.colored("[{}]".format(datetime.datetime.fromtimestamp(message['data']['seen']).isoformat()), 'cyan', attrs=["bold", ]), + termcolor.colored(message['data']['source']['url'], 'blue', attrs=["bold",]), + termcolor.colored(message['data']['leaf_cert']['subject']['CN'], 'green', attrs=["bold",]), + termcolor.colored("[", 'blue') + "{}".format( + termcolor.colored(", ", 'blue').join( + [termcolor.colored(x, 'white', attrs=["bold",]) for x in message['data']['leaf_cert']['all_domains']] + ) + ) + termcolor.colored("]", 'blue') if args.full else "", + ) + sys.stdout.write(payload) + + sys.stdout.flush() + + certstream.listen_for_events(_handle_messages, args.url, skip_heartbeats=True) + +if __name__ == "__main__": + main() diff --git a/certstream/core.py b/certstream/core.py new file mode 100644 index 0000000..d590413 --- /dev/null +++ b/certstream/core.py @@ -0,0 +1,60 @@ +from __future__ import print_function + +import json +import logging + +import time +from websocket import WebSocketApp + +class Context(dict): + """dot.notation access to dictionary attributes""" + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + +class CertStreamClient(WebSocketApp): + _context = Context() + + def __init__(self, message_callback, url, skip_heartbeats=True, on_open=None, on_error=None): + self.message_callback = message_callback + self.skip_heartbeats = skip_heartbeats + self.on_open_handler = on_open + self.on_error_handler = on_error + super(CertStreamClient, self).__init__( + url=url, + on_open=self._on_open, + on_message=self._on_message, + on_error=self._on_error, + ) + + def _on_open(self): + certstream_logger.info("Connection established to CertStream! Listening for events...") + if self.on_open_handler: + self.on_open_handler() + + def _on_message(self, message): + frame = json.loads(message) + + if frame.get('message_type', None) == "heartbeat" and self.skip_heartbeats: + return + + self.message_callback(frame, self._context) + + def _on_error(self, ex): + if type(ex) == KeyboardInterrupt: + raise + if self.on_error_handler: + self.on_error_handler(ex) + certstream_logger.error("Error connecting to CertStream - {} - Sleeping for a few seconds and trying again...".format(ex)) + +def listen_for_events(message_callback, url, skip_heartbeats=True, setup_logger=True, on_open=None, on_error=None, **kwargs): + try: + while True: + c = CertStreamClient(message_callback, url, skip_heartbeats=skip_heartbeats, on_open=on_open, on_error=on_error) + c.run_forever(ping_interval=15, **kwargs) + time.sleep(5) + except KeyboardInterrupt: + certstream_logger.info("Kill command received, exiting!!") + +certstream_logger = logging.getLogger('certstream') +certstream_logger.setLevel(logging.INFO) diff --git a/certstream_consumer.py b/certstream_consumer.py index 8022be2..f529f65 100755 --- a/certstream_consumer.py +++ b/certstream_consumer.py @@ -10,31 +10,35 @@ while True: count = 0 todel = 1 while (toadd := r.lpop('toadd')) is not None: - toadd = json.loads(toadd.decode('ascii')) + toadd = json.loads(toadd.decode('utf-8')) rules[toadd['id']] = toadd['value'] + print("Added rule " + str(toadd['id'])) while (todel := r.lpop('todel')) is not None: try: - del rules[todel.decode('ascii')] - except: + del rules[int(todel.decode('utf-8'))] + print("Delete rule " + str(todel.decode('utf-8'))) + except Exception as e: + print(e) pass - domain = r.blpop('certstream')[1].decode('ascii') + domain = r.blpop('certstream')[1].decode('utf-8') for rule in rules.values(): notify = False + v = str(rule['v']) if rule['t'] == 0: - if rule['v'] in domain: + if v in domain: notify = True elif rule['t'] == 1: - if domain.startswith(rule['v']): + if domain.startswith(v): notify = True elif rule['t'] == 2: - if domain.endswith(rule['v']): + if domain.endswith(v): notify = True if notify: print(domain) - r.rpush('notifications', json.dumps({"domain": domain, "chats": rule['c']})) + r.rpush('notifications', json.dumps({"domain": domain, "chat": rule['c']})) count += 1 diff --git a/notifications_consumer.py b/notifications_consumer.py index ab5c641..ef70fb8 100755 --- a/notifications_consumer.py +++ b/notifications_consumer.py @@ -1,12 +1,14 @@ import redis import json import os +import requests r = redis.Redis() -token = os.environ.get('token') +token = os.environ.get('TOKEN') +print(token) while True: - notification = json.loads(r.blpop('notifications')[1].decode('ascii')) + notification = json.loads(r.blpop('notifications')[1].decode('utf-8')) print(notification) - for chat in notification['chats']: - requests.get("https://api.telegram.org/bot{}/sendMessage".format(token), params={"chatid": chat, "text": "New cert for *{}*".format(domain), "parse_mode": "Markdown"}) + res = requests.get("https://api.telegram.org/bot{}/sendMessage".format(token), params={"chat_id": notification["chat"], "text": "New cert for *{}*".format(notification["domain"]), "parse_mode": "Markdown"}) + print(res.text) diff --git a/php/bootstrap.php b/php/bootstrap.php index 40d6a64..e5b5882 100644 --- a/php/bootstrap.php +++ b/php/bootstrap.php @@ -2,8 +2,8 @@ $redis = new Redis(); $redis->connect('127.0.0.1', 6379); -$mysql_user = ''; -$mysql_pass = ''; +$mysql_user = 'mysql_user'; +$mysql_pass = 'mysql_password'; $db = new PDO('mysql:host=127.0.0.1;dbname=certalertbot;charset=utf8mb4', $mysql_user, $mysql_pass); diff --git a/php/certalert.php b/php/certalert.php index 6330a9e..b00c1d9 100644 --- a/php/certalert.php +++ b/php/certalert.php @@ -1,10 +1,13 @@ '); define('API_URL', 'https://api.telegram.org/bot'.BOT_TOKEN.'/'); -$mysql_user = ''; -$mysql_pass = ''; +$redis = new Redis(); +$redis->connect('127.0.0.1', 6379); + +$mysql_user = 'mysql_user'; +$mysql_pass = 'mysql_password'; $db = new PDO('mysql:host=127.0.0.1;dbname=certalertbot;charset=utf8mb4', $mysql_user, $mysql_pass); @@ -24,7 +27,7 @@ To delete a rule.
/add <in/start/end> <string>
 
To add a rule. -in mtaches the given substring in any postition, start at the beginning and end at the end +in matches the given substring in any postition, start at the beginning and end at the end. For special characters use the IDNA encoding. "; @@ -75,6 +78,10 @@ switch($command) { $exp = explode(" ", $update['message']['text']); $type = $exp[1]; $value = $exp[2]; + if (strlen($value) < 5) { + $reply = "The filter must be at least 5 chars."; + break; + } switch($type) { case 'in': $type = 0; @@ -92,6 +99,11 @@ switch($command) { if ($type > -1) { $stmt = $db->prepare("INSERT INTO rules (userid, chatid, type, value, timestamp) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP())"); $stmt->execute(array($fromid, $chatid, $type, $value)); + $id = $db->lastInsertId(); + $toadd["id"] = $id; + $toadd["value"] = array("t" => $type, "v" => $value, "c" => $chatid); + $toadd = json_encode($toadd, JSON_NUMERIC_CHECK); + $redis->rPush('toadd', $toadd); $reply = "Rule added, check with /list"; } else { $reply = "Invalid rule type."; @@ -103,6 +115,7 @@ switch($command) { $id = $exp[1]; $stmt = $db->prepare("DELETE FROM rules WHERE id = ? AND userid = ?"); $stmt->execute(array($id, $fromid)); + $redis->rPush('todel', $id); $reply = "Rule ".$id." deleted"; break; default: