events.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  5. # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import asyncio
  21. import qubes.events
  22. import qubes.tests
  23. class TC_00_Emitter(qubes.tests.QubesTestCase):
  24. def test_000_add_handler(self):
  25. # need something mutable
  26. testevent_fired = [False]
  27. def on_testevent(subject, event):
  28. # pylint: disable=unused-argument
  29. if event == 'testevent':
  30. testevent_fired[0] = True
  31. emitter = qubes.events.Emitter()
  32. emitter.add_handler('testevent', on_testevent)
  33. emitter.events_enabled = True
  34. emitter.fire_event('testevent')
  35. self.assertTrue(testevent_fired[0])
  36. def test_001_decorator(self):
  37. class TestEmitter(qubes.events.Emitter):
  38. def __init__(self):
  39. # pylint: disable=bad-super-call
  40. super(TestEmitter, self).__init__()
  41. self.testevent_fired = False
  42. @qubes.events.handler('testevent')
  43. def on_testevent(self, event):
  44. if event == 'testevent':
  45. self.testevent_fired = True
  46. emitter = TestEmitter()
  47. emitter.events_enabled = True
  48. emitter.fire_event('testevent')
  49. self.assertTrue(emitter.testevent_fired)
  50. def test_002_fire_for_effect(self):
  51. class TestEmitter(qubes.events.Emitter):
  52. @qubes.events.handler('testevent')
  53. def on_testevent_1(self, event):
  54. pass
  55. @qubes.events.handler('testevent')
  56. def on_testevent_2(self, event):
  57. yield 'testvalue1'
  58. yield 'testvalue2'
  59. @qubes.events.handler('testevent')
  60. def on_testevent_3(self, event):
  61. return ('testvalue3', 'testvalue4')
  62. emitter = TestEmitter()
  63. emitter.events_enabled = True
  64. effect = emitter.fire_event('testevent')
  65. self.assertCountEqual(effect,
  66. ('testvalue1', 'testvalue2', 'testvalue3', 'testvalue4'))
  67. def test_004_catch_all(self):
  68. # need something mutable
  69. testevent_fired = [0]
  70. def on_all(subject, event, *args, **kwargs):
  71. # pylint: disable=unused-argument
  72. testevent_fired[0] += 1
  73. def on_foo(subject, event, *args, **kwargs):
  74. # pylint: disable=unused-argument
  75. testevent_fired[0] += 1
  76. emitter = qubes.events.Emitter()
  77. emitter.add_handler('*', on_all)
  78. emitter.add_handler('foo', on_foo)
  79. emitter.events_enabled = True
  80. emitter.fire_event('testevent')
  81. self.assertEqual(testevent_fired[0], 1)
  82. emitter.fire_event('foo')
  83. # now catch-all and foo should be executed
  84. self.assertEqual(testevent_fired[0], 3)
  85. emitter.fire_event('bar')
  86. self.assertEqual(testevent_fired[0], 4)
  87. def test_005_instance_handlers(self):
  88. class TestEmitter(qubes.events.Emitter):
  89. @qubes.events.handler('testevent')
  90. def on_testevent_1(self, event):
  91. yield 'testevent_1'
  92. def on_testevent_2(subject, event):
  93. yield 'testevent_2'
  94. emitter = TestEmitter()
  95. emitter.add_handler('testevent', on_testevent_2)
  96. emitter.events_enabled = True
  97. emitter2 = TestEmitter()
  98. emitter2.events_enabled = True
  99. with self.subTest('fire_event'):
  100. effect = emitter.fire_event('testevent')
  101. effect2 = emitter2.fire_event('testevent')
  102. self.assertEqual(list(effect),
  103. ['testevent_1', 'testevent_2'])
  104. self.assertEqual(list(effect2),
  105. ['testevent_1'])
  106. with self.subTest('fire_event_pre'):
  107. effect = emitter.fire_event('testevent', pre_event=True)
  108. effect2 = emitter2.fire_event('testevent', pre_event=True)
  109. self.assertEqual(list(effect),
  110. ['testevent_2', 'testevent_1'])
  111. self.assertEqual(list(effect2),
  112. ['testevent_1'])
  113. def test_005_fire_for_effect_async(self):
  114. class TestEmitter(qubes.events.Emitter):
  115. @qubes.events.handler('testevent')
  116. @asyncio.coroutine
  117. def on_testevent_1(self, event):
  118. pass
  119. @qubes.events.handler('testevent')
  120. @asyncio.coroutine
  121. def on_testevent_2(self, event):
  122. yield from asyncio.sleep(0.01)
  123. return ['testvalue1']
  124. @qubes.events.handler('testevent')
  125. @asyncio.coroutine
  126. def on_testevent_3(self, event):
  127. return ('testvalue2', 'testvalue3')
  128. @qubes.events.handler('testevent')
  129. def on_testevent_4(self, event):
  130. return ('testvalue4',)
  131. loop = asyncio.get_event_loop()
  132. emitter = TestEmitter()
  133. emitter.events_enabled = True
  134. effect = loop.run_until_complete(emitter.fire_event_async('testevent'))
  135. self.assertCountEqual(effect,
  136. ('testvalue1', 'testvalue2', 'testvalue3', 'testvalue4'))