tests/tools: add MockEventsReader

Make it easy to test things listening for events
This commit is contained in:
Marek Marczykowski-Górecki 2017-07-06 21:51:30 +02:00
parent 389252f386
commit 6b1c6141f6
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -22,6 +22,8 @@
import io
import sys
import asyncio
class StdoutBuffer(object):
def __init__(self):
@ -57,3 +59,27 @@ class StderrBuffer(object):
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stderr = self.orig_stderr
return False
class MockEventsReader(object):
def __init__(self, events, delay=0.05):
self.events = events
self.delay = delay
self.current_event = None
def at_eof(self):
return not bool(self.events)
@asyncio.coroutine
def readuntil(self, delim):
if not self.current_event:
if not self.events:
raise asyncio.IncompleteReadError(b'', delim)
yield from asyncio.sleep(self.delay)
self.current_event = self.events.pop(0)
data, rest = self.current_event.split(delim, 1)
self.current_event = rest
return data + delim
@asyncio.coroutine
def __call__(self, vm=None):
return self, (lambda: None)