dbs.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import yaml
  2. from dateutil import parser
  3. from datetime import datetime
  4. from utils import time_remaining, to_ita_tz
  5. class CTFDb(object):
  6. starting_template = "[{0}]({1}) ({2})\nStarting Date: *{3}*\n\n"
  7. finishing_template = "[{0}]({1}) ({2})\nFinishing Date: *{3}*\n\n"
  8. def __init__(self):
  9. self.events = {}
  10. def is_in_db(self, id):
  11. """Helper function to check if a CTF is in the db"""
  12. if self.events.get(id) is None:
  13. return False
  14. else:
  15. return True
  16. def is_past(self, event):
  17. """Helper function to check if an event is finished"""
  18. finish_date = parser.parse(event["finish_date"])
  19. if finish_date < datetime.utcnow():
  20. return True
  21. return False
  22. def is_ongoing(self, event):
  23. """Helper function to check if an event is finished"""
  24. start_date = parser.parse(event["start_date"])
  25. if start_date < datetime.utcnow():
  26. return True
  27. return False
  28. def add_events(self, events):
  29. """Add an array of events to the DB and delete past events"""
  30. new = []
  31. for event in events:
  32. r = self.add_event(event)
  33. if r is not None:
  34. new.append(r)
  35. self.delete_past_events()
  36. return new
  37. def add_event(self, event):
  38. """Add a single event to the database, performing checks"""
  39. if self.is_past(event):
  40. return
  41. if self.is_in_db(event["id"]):
  42. # Update the db with new info but don't emit message
  43. self.events[event["id"]] = event
  44. return
  45. else:
  46. self.events[event["id"]] = event
  47. return event
  48. def delete_past_events(self):
  49. """Delete past events from the db"""
  50. to_delete = []
  51. for id in self.events:
  52. if self.is_past(self.events[id]):
  53. to_delete.append(id)
  54. # Can't delete directly
  55. for id in to_delete:
  56. del self.events[id]
  57. def starting_ctf(self, delta=24):
  58. """
  59. Return CTF that start in delta hours
  60. This function should be repeated every hour
  61. """
  62. starting = []
  63. for id in self.events:
  64. start_date = parser.parse(self.events[id]["start_date"])
  65. hours_remaining = time_remaining(start_date) // (60 * 60)
  66. if hours_remaining == delta:
  67. starting.append(self.events[id])
  68. return starting
  69. def upcoming(self, count=5):
  70. """Return first `count` CTF that are starting soon"""
  71. self.delete_past_events()
  72. upcoming = []
  73. for id in self.events:
  74. if not self.is_ongoing(self.events[id]):
  75. upcoming.append(self.events[id])
  76. upcoming = sorted(upcoming, key=lambda i: i["start_date"])
  77. return upcoming[:count]
  78. def running(self):
  79. """Return currently running CTF"""
  80. self.delete_past_events()
  81. running = []
  82. for id in self.events:
  83. if self.is_ongoing(self.events[id]):
  84. running.append(self.events[id])
  85. running = sorted(running, key=lambda i: i["finish_date"])
  86. return running
  87. def starting_message(self, ctf):
  88. """Format a message for a starting event"""
  89. date = parser.parse(ctf["start_date"])
  90. m = CTFDb.starting_template
  91. m = m.format(ctf["title"], ctf["link"],
  92. str(ctf["id"]), to_ita_tz(date))
  93. return m
  94. def finishing_message(self, ctf):
  95. """Format a message for a starting event"""
  96. date = parser.parse(ctf["finish_date"])
  97. m = CTFDb.finishing_template
  98. m = m.format(ctf["title"], ctf["link"],
  99. str(ctf["id"]), to_ita_tz(date))
  100. return m
  101. def get_info_message(self, id):
  102. if not self.is_in_db(id):
  103. return "I can\'t find a CTF with this id"
  104. ctf = self.events[id]
  105. start = parser.parse(ctf["start_date"])
  106. finish = parser.parse(ctf["finish_date"])
  107. onsite = "On site" if ctf["onsite"] else "Online"
  108. message = "[{0}]({1})\n".format(ctf["title"], ctf["link"])
  109. message += "Type: *{} {}*\n".format(ctf["format_text"], onsite)
  110. message += "Restriction: *{}*\n".format(ctf["restrictions"])
  111. message += "Url: {}\n".format(ctf["url"])
  112. message += "Weight: {}\n".format(ctf["weight"])
  113. message += "Start Date: *{}*\n".format(to_ita_tz(start))
  114. message += "Finish Date: *{}*\n".format(to_ita_tz(finish))
  115. return message
  116. class GroupDb(object):
  117. groups_db = "./groups.yaml"
  118. def __init__(self):
  119. self.groups = set()
  120. self.load()
  121. def load(self):
  122. """Load groups from backup file"""
  123. try:
  124. with open(GroupDb.groups_db, 'r') as f:
  125. content = f.read()
  126. self.groups = yaml.load(content)
  127. except FileNotFoundError:
  128. self.commit()
  129. def commit(self):
  130. """Write group changes to backup file"""
  131. with open(GroupDb.groups_db, 'w') as f:
  132. yaml.dump(self.groups, f)
  133. def add(self, id):
  134. """Add a new group"""
  135. self.groups.add(id)
  136. self.commit()
  137. def remove(self, id):
  138. """Add a new group"""
  139. self.groups.remove(id)
  140. self.commit()