68 lines
2.8 KiB
Python
68 lines
2.8 KiB
Python
|
import argparse
|
||
|
import datetime
|
||
|
import json
|
||
|
import logging
|
||
|
import sys
|
||
|
import termcolor
|
||
|
|
||
|
from signal import signal, SIGPIPE, SIG_DFL
|
||
|
|
||
|
import certstream
|
||
|
|
||
|
parser = argparse.ArgumentParser(description='Connect to the CertStream and process CTL list updates.')
|
||
|
|
||
|
parser.add_argument('--json', action='store_true', help='Output raw JSON to the console.')
|
||
|
parser.add_argument('--full', action='store_true', help='Output all SAN addresses as well')
|
||
|
parser.add_argument('--disable-colors', action='store_true', help='Disable colors when writing a human readable ')
|
||
|
parser.add_argument('--verbose', action='store_true', default=False, dest='verbose', help='Display debug logging.')
|
||
|
parser.add_argument('--url', default="wss://certstream.calidog.io", dest='url', help='Connect to a certstream server.')
|
||
|
|
||
|
def main():
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
# Ignore broken pipes
|
||
|
signal(SIGPIPE, SIG_DFL)
|
||
|
|
||
|
log_level = logging.INFO
|
||
|
if args.verbose:
|
||
|
log_level = logging.DEBUG
|
||
|
|
||
|
logging.basicConfig(format='[%(levelname)s:%(name)s] %(asctime)s - %(message)s', level=log_level)
|
||
|
|
||
|
def _handle_messages(message, context):
|
||
|
if args.json:
|
||
|
sys.stdout.flush()
|
||
|
sys.stdout.write(json.dumps(message) + "\n")
|
||
|
sys.stdout.flush()
|
||
|
else:
|
||
|
if args.disable_colors:
|
||
|
logging.debug("Starting normal output.")
|
||
|
payload = "{} {} - {} {}\n".format(
|
||
|
"[{}]".format(datetime.datetime.fromtimestamp(message['data']['seen']).isoformat()),
|
||
|
message['data']['source']['url'],
|
||
|
message['data']['leaf_cert']['subject']['CN'],
|
||
|
"[{}]".format(", ".join(message['data']['leaf_cert']['all_domains'])) if args.full else ""
|
||
|
)
|
||
|
|
||
|
sys.stdout.write(payload)
|
||
|
else:
|
||
|
logging.debug("Starting colored output.")
|
||
|
payload = "{} {} - {} {}\n".format(
|
||
|
termcolor.colored("[{}]".format(datetime.datetime.fromtimestamp(message['data']['seen']).isoformat()), 'cyan', attrs=["bold", ]),
|
||
|
termcolor.colored(message['data']['source']['url'], 'blue', attrs=["bold",]),
|
||
|
termcolor.colored(message['data']['leaf_cert']['subject']['CN'], 'green', attrs=["bold",]),
|
||
|
termcolor.colored("[", 'blue') + "{}".format(
|
||
|
termcolor.colored(", ", 'blue').join(
|
||
|
[termcolor.colored(x, 'white', attrs=["bold",]) for x in message['data']['leaf_cert']['all_domains']]
|
||
|
)
|
||
|
) + termcolor.colored("]", 'blue') if args.full else "",
|
||
|
)
|
||
|
sys.stdout.write(payload)
|
||
|
|
||
|
sys.stdout.flush()
|
||
|
|
||
|
certstream.listen_for_events(_handle_messages, args.url, skip_heartbeats=True)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|