12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- # encoding=utf-8
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2015 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2.1 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with this program; if not, see <http://www.gnu.org/licenses/>.
- import io
- import sys
- import asyncio
- class StdoutBuffer(object):
- def __init__(self):
- self.orig_stdout = None
- if sys.version_info[0] >= 3:
- self.stdout = io.StringIO()
- else:
- self.stdout = io.BytesIO()
- def __enter__(self):
- self.orig_stdout = sys.stdout
- sys.stdout = self.stdout
- return self.stdout
- def __exit__(self, exc_type, exc_val, exc_tb):
- sys.stdout = self.orig_stdout
- return False
- class StderrBuffer(object):
- def __init__(self):
- self.orig_stderr = None
- if sys.version_info[0] >= 3:
- self.stderr = io.StringIO()
- else:
- self.stderr = io.BytesIO()
- def __enter__(self):
- self.orig_stderr = sys.stderr
- sys.stderr = self.stderr
- return self.stderr
- def __exit__(self, exc_type, exc_val, exc_tb):
- sys.stderr = self.orig_stderr
- return False
- class MockEventsReader(object):
- def __init__(self, events, delay=0.05):
- self.events = events
- self.delay = delay
- self.current_event = None
- def at_eof(self):
- return not bool(self.events)
- @asyncio.coroutine
- def readuntil(self, delim):
- if not self.current_event:
- if not self.events:
- raise asyncio.IncompleteReadError(b'', delim)
- yield from asyncio.sleep(self.delay)
- self.current_event = self.events.pop(0)
- data, rest = self.current_event.split(delim, 1)
- self.current_event = rest
- return data + delim
- @asyncio.coroutine
- def __call__(self, vm=None):
- return self, (lambda: None)
|