__init__.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # encoding=utf-8
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2015 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import io
  21. import sys
  22. import asyncio
  23. class StdoutBuffer(object):
  24. def __init__(self):
  25. self.orig_stdout = None
  26. if sys.version_info[0] >= 3:
  27. self.stdout = io.StringIO()
  28. else:
  29. self.stdout = io.BytesIO()
  30. def __enter__(self):
  31. self.orig_stdout = sys.stdout
  32. sys.stdout = self.stdout
  33. return self.stdout
  34. def __exit__(self, exc_type, exc_val, exc_tb):
  35. sys.stdout = self.orig_stdout
  36. return False
  37. class StderrBuffer(object):
  38. def __init__(self):
  39. self.orig_stderr = None
  40. if sys.version_info[0] >= 3:
  41. self.stderr = io.StringIO()
  42. else:
  43. self.stderr = io.BytesIO()
  44. def __enter__(self):
  45. self.orig_stderr = sys.stderr
  46. sys.stderr = self.stderr
  47. return self.stderr
  48. def __exit__(self, exc_type, exc_val, exc_tb):
  49. sys.stderr = self.orig_stderr
  50. return False
  51. class MockEventsReader(object):
  52. def __init__(self, events, delay=0.05):
  53. self.events = events
  54. self.delay = delay
  55. self.current_event = None
  56. def at_eof(self):
  57. return not bool(self.events)
  58. @asyncio.coroutine
  59. def readuntil(self, delim):
  60. if not self.current_event:
  61. if not self.events:
  62. raise asyncio.IncompleteReadError(b'', delim)
  63. yield from asyncio.sleep(self.delay)
  64. self.current_event = self.events.pop(0)
  65. data, rest = self.current_event.split(delim, 1)
  66. self.current_event = rest
  67. return data + delim
  68. @asyncio.coroutine
  69. def __call__(self, vm=None):
  70. return self, (lambda: None)