cli.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import argparse
  2. import datetime
  3. import json
  4. import logging
  5. import sys
  6. import termcolor
  7. from signal import signal, SIGPIPE, SIG_DFL
  8. import certstream
  9. parser = argparse.ArgumentParser(description='Connect to the CertStream and process CTL list updates.')
  10. parser.add_argument('--json', action='store_true', help='Output raw JSON to the console.')
  11. parser.add_argument('--full', action='store_true', help='Output all SAN addresses as well')
  12. parser.add_argument('--disable-colors', action='store_true', help='Disable colors when writing a human readable ')
  13. parser.add_argument('--verbose', action='store_true', default=False, dest='verbose', help='Display debug logging.')
  14. parser.add_argument('--url', default="wss://certstream.calidog.io", dest='url', help='Connect to a certstream server.')
  15. def main():
  16. args = parser.parse_args()
  17. # Ignore broken pipes
  18. signal(SIGPIPE, SIG_DFL)
  19. log_level = logging.INFO
  20. if args.verbose:
  21. log_level = logging.DEBUG
  22. logging.basicConfig(format='[%(levelname)s:%(name)s] %(asctime)s - %(message)s', level=log_level)
  23. def _handle_messages(message, context):
  24. if args.json:
  25. sys.stdout.flush()
  26. sys.stdout.write(json.dumps(message) + "\n")
  27. sys.stdout.flush()
  28. else:
  29. if args.disable_colors:
  30. logging.debug("Starting normal output.")
  31. payload = "{} {} - {} {}\n".format(
  32. "[{}]".format(datetime.datetime.fromtimestamp(message['data']['seen']).isoformat()),
  33. message['data']['source']['url'],
  34. message['data']['leaf_cert']['subject']['CN'],
  35. "[{}]".format(", ".join(message['data']['leaf_cert']['all_domains'])) if args.full else ""
  36. )
  37. sys.stdout.write(payload)
  38. else:
  39. logging.debug("Starting colored output.")
  40. payload = "{} {} - {} {}\n".format(
  41. termcolor.colored("[{}]".format(datetime.datetime.fromtimestamp(message['data']['seen']).isoformat()), 'cyan', attrs=["bold", ]),
  42. termcolor.colored(message['data']['source']['url'], 'blue', attrs=["bold",]),
  43. termcolor.colored(message['data']['leaf_cert']['subject']['CN'], 'green', attrs=["bold",]),
  44. termcolor.colored("[", 'blue') + "{}".format(
  45. termcolor.colored(", ", 'blue').join(
  46. [termcolor.colored(x, 'white', attrs=["bold",]) for x in message['data']['leaf_cert']['all_domains']]
  47. )
  48. ) + termcolor.colored("]", 'blue') if args.full else "",
  49. )
  50. sys.stdout.write(payload)
  51. sys.stdout.flush()
  52. certstream.listen_for_events(_handle_messages, args.url, skip_heartbeats=True)
  53. if __name__ == "__main__":
  54. main()