core.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from __future__ import print_function
  2. import json
  3. import logging
  4. import time
  5. from websocket import WebSocketApp
  6. class Context(dict):
  7. """dot.notation access to dictionary attributes"""
  8. __getattr__ = dict.get
  9. __setattr__ = dict.__setitem__
  10. __delattr__ = dict.__delitem__
  11. class CertStreamClient(WebSocketApp):
  12. _context = Context()
  13. def __init__(self, message_callback, url, skip_heartbeats=True, on_open=None, on_error=None):
  14. self.message_callback = message_callback
  15. self.skip_heartbeats = skip_heartbeats
  16. self.on_open_handler = on_open
  17. self.on_error_handler = on_error
  18. super(CertStreamClient, self).__init__(
  19. url=url,
  20. on_open=self._on_open,
  21. on_message=self._on_message,
  22. on_error=self._on_error,
  23. )
  24. def _on_open(self):
  25. certstream_logger.info("Connection established to CertStream! Listening for events...")
  26. if self.on_open_handler:
  27. self.on_open_handler()
  28. def _on_message(self, message):
  29. frame = json.loads(message)
  30. if frame.get('message_type', None) == "heartbeat" and self.skip_heartbeats:
  31. return
  32. self.message_callback(frame, self._context)
  33. def _on_error(self, ex):
  34. if type(ex) == KeyboardInterrupt:
  35. raise
  36. if self.on_error_handler:
  37. self.on_error_handler(ex)
  38. certstream_logger.error("Error connecting to CertStream - {} - Sleeping for a few seconds and trying again...".format(ex))
  39. def listen_for_events(message_callback, url, skip_heartbeats=True, setup_logger=True, on_open=None, on_error=None, **kwargs):
  40. try:
  41. while True:
  42. c = CertStreamClient(message_callback, url, skip_heartbeats=skip_heartbeats, on_open=on_open, on_error=on_error)
  43. c.run_forever(ping_interval=15, **kwargs)
  44. time.sleep(5)
  45. except KeyboardInterrupt:
  46. certstream_logger.info("Kill command received, exiting!!")
  47. certstream_logger = logging.getLogger('certstream')
  48. certstream_logger.setLevel(logging.INFO)