tests: py3k related fixes - bytes/str
Adjust usage of bytes vs str type.
This commit is contained in:
parent
d68499f17f
commit
9cad353939
@ -80,16 +80,16 @@ class BackupTestsMixin(qubes.tests.SystemTestsMixin):
|
|||||||
block_size = 4096
|
block_size = 4096
|
||||||
|
|
||||||
self.log.debug("Filling %s" % path)
|
self.log.debug("Filling %s" % path)
|
||||||
f = open(path, 'w+')
|
f = open(path, 'wb+')
|
||||||
if size is None:
|
if size is None:
|
||||||
f.seek(0, 2)
|
f.seek(0, 2)
|
||||||
size = f.tell()
|
size = f.tell()
|
||||||
f.seek(0)
|
f.seek(0)
|
||||||
|
|
||||||
for block_num in range(size/block_size):
|
for block_num in range(int(size/block_size)):
|
||||||
if sparse:
|
if sparse:
|
||||||
f.seek(block_size, 1)
|
f.seek(block_size, 1)
|
||||||
f.write('a' * block_size)
|
f.write(b'a' * block_size)
|
||||||
|
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
@ -232,7 +232,7 @@ class BackupTestsMixin(qubes.tests.SystemTestsMixin):
|
|||||||
continue
|
continue
|
||||||
vol_path = vm.storage.get_pool(volume).export(volume)
|
vol_path = vm.storage.get_pool(volume).export(volume)
|
||||||
hasher = hashlib.sha1()
|
hasher = hashlib.sha1()
|
||||||
with open(vol_path) as afile:
|
with open(vol_path, 'rb') as afile:
|
||||||
for buf in iter(lambda: afile.read(4096000), b''):
|
for buf in iter(lambda: afile.read(4096000), b''):
|
||||||
hasher.update(buf)
|
hasher.update(buf)
|
||||||
hashes[vm.name][name] = hasher.hexdigest()
|
hashes[vm.name][name] = hasher.hexdigest()
|
||||||
|
@ -502,7 +502,7 @@ class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|||||||
# And compare the result
|
# And compare the result
|
||||||
(test_output, _) = testvm2.run('cat test.txt',
|
(test_output, _) = testvm2.run('cat test.txt',
|
||||||
passio_popen=True).communicate()
|
passio_popen=True).communicate()
|
||||||
self.assertEquals(test_string, test_output.strip())
|
self.assertEquals(test_string, test_output.strip().decode('ascii'))
|
||||||
|
|
||||||
clipboard_content = \
|
clipboard_content = \
|
||||||
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
|
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
|
||||||
|
@ -58,7 +58,7 @@ class TC_00_Devices_PCI(qubes.tests.SystemTestsMixin,
|
|||||||
# get a dict: BDF -> description
|
# get a dict: BDF -> description
|
||||||
actual_devices = dict(
|
actual_devices = dict(
|
||||||
l.split(' (')[0].split(' ', 1)
|
l.split(' (')[0].split(' ', 1)
|
||||||
for l in p.communicate()[0].splitlines())
|
for l in p.communicate()[0].decode().splitlines())
|
||||||
for dev in self.app.domains[0].devices['pci']:
|
for dev in self.app.domains[0].devices['pci']:
|
||||||
self.assertIsInstance(dev, qubes.ext.pci.PCIDevice)
|
self.assertIsInstance(dev, qubes.ext.pci.PCIDevice)
|
||||||
self.assertEqual(dev.backend_domain, self.app.domains[0])
|
self.assertEqual(dev.backend_domain, self.app.domains[0])
|
||||||
|
@ -72,9 +72,9 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
|
|||||||
self.testvm.start()
|
self.testvm.start()
|
||||||
|
|
||||||
p = self.testvm.run("qvm-run --dispvm bash; true", passio_popen=True)
|
p = self.testvm.run("qvm-run --dispvm bash; true", passio_popen=True)
|
||||||
p.stdin.write("qubesdb-read /name\n")
|
p.stdin.write(b"qubesdb-read /name\n")
|
||||||
p.stdin.write("echo ERROR\n")
|
p.stdin.write(b"echo ERROR\n")
|
||||||
p.stdin.write("sudo poweroff\n")
|
p.stdin.write(b"sudo poweroff\n")
|
||||||
# do not close p.stdin on purpose - wait to automatic disconnect when
|
# do not close p.stdin on purpose - wait to automatic disconnect when
|
||||||
# domain is destroyed
|
# domain is destroyed
|
||||||
timeout = 30
|
timeout = 30
|
||||||
@ -88,7 +88,7 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
|
|||||||
lines = p.stdout.read().splitlines()
|
lines = p.stdout.read().splitlines()
|
||||||
self.assertTrue(lines, 'No output received from DispVM')
|
self.assertTrue(lines, 'No output received from DispVM')
|
||||||
dispvm_name = lines[0]
|
dispvm_name = lines[0]
|
||||||
self.assertNotEquals(dispvm_name, "ERROR")
|
self.assertNotEquals(dispvm_name, b"ERROR")
|
||||||
|
|
||||||
self.reload_db()
|
self.reload_db()
|
||||||
self.assertNotIn(dispvm_name, self.app.domains)
|
self.assertNotIn(dispvm_name, self.app.domains)
|
||||||
@ -111,8 +111,8 @@ class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
try:
|
try:
|
||||||
dispvm.start()
|
dispvm.start()
|
||||||
p = dispvm.run_service('qubes.VMShell', passio_popen=True)
|
p = dispvm.run_service('qubes.VMShell', passio_popen=True)
|
||||||
(stdout, _) = p.communicate(input="echo test")
|
(stdout, _) = p.communicate(input=b"echo test")
|
||||||
self.assertEqual(stdout, "test\n")
|
self.assertEqual(stdout, b"test\n")
|
||||||
finally:
|
finally:
|
||||||
dispvm.cleanup()
|
dispvm.cleanup()
|
||||||
|
|
||||||
@ -124,17 +124,17 @@ class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
dispvm.start()
|
dispvm.start()
|
||||||
p = dispvm.run_service('qubes.VMShell', passio_popen=True)
|
p = dispvm.run_service('qubes.VMShell', passio_popen=True)
|
||||||
# wait for DispVM startup:
|
# wait for DispVM startup:
|
||||||
p.stdin.write("echo test\n")
|
p.stdin.write(b"echo test\n")
|
||||||
p.stdin.flush()
|
p.stdin.flush()
|
||||||
l = p.stdout.readline()
|
l = p.stdout.readline()
|
||||||
self.assertEqual(l, "test\n")
|
self.assertEqual(l, b"test\n")
|
||||||
|
|
||||||
self.assertTrue(dispvm.is_running())
|
self.assertTrue(dispvm.is_running())
|
||||||
try:
|
try:
|
||||||
window_title = 'user@%s' % (dispvm.name,)
|
window_title = 'user@%s' % (dispvm.name,)
|
||||||
p.stdin.write("xterm -e "
|
p.stdin.write("xterm -e "
|
||||||
"\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n".
|
"\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n".
|
||||||
format(window_title))
|
format(window_title).encode())
|
||||||
self.wait_for_window(window_title)
|
self.wait_for_window(window_title)
|
||||||
|
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
@ -154,7 +154,7 @@ class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
(window_title, _) = subprocess.Popen(
|
(window_title, _) = subprocess.Popen(
|
||||||
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
|
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
|
||||||
communicate()
|
communicate()
|
||||||
window_title = window_title.strip().\
|
window_title = window_title.decode().strip().\
|
||||||
replace('(', '\(').replace(')', '\)')
|
replace('(', '\(').replace(')', '\)')
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
if "gedit" in window_title:
|
if "gedit" in window_title:
|
||||||
@ -235,7 +235,7 @@ class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
(window_title, _) = subprocess.Popen(
|
(window_title, _) = subprocess.Popen(
|
||||||
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
|
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
|
||||||
communicate()
|
communicate()
|
||||||
window_title = window_title.strip()
|
window_title = window_title.decode().strip()
|
||||||
# ignore LibreOffice splash screen and window with no title
|
# ignore LibreOffice splash screen and window with no title
|
||||||
# set yet
|
# set yet
|
||||||
if window_title and not window_title.startswith("LibreOffice")\
|
if window_title and not window_title.startswith("LibreOffice")\
|
||||||
@ -253,9 +253,9 @@ class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(test_txt_content, _) = p.communicate()
|
(test_txt_content, _) = p.communicate()
|
||||||
# Drop BOM if added by editor
|
# Drop BOM if added by editor
|
||||||
if test_txt_content.startswith('\xef\xbb\xbf'):
|
if test_txt_content.startswith(b'\xef\xbb\xbf'):
|
||||||
test_txt_content = test_txt_content[3:]
|
test_txt_content = test_txt_content[3:]
|
||||||
self.assertEqual(test_txt_content, "Test test 2\ntest1\n")
|
self.assertEqual(test_txt_content, b"Test test 2\ntest1\n")
|
||||||
|
|
||||||
def load_tests(loader, tests, pattern):
|
def load_tests(loader, tests, pattern):
|
||||||
try:
|
try:
|
||||||
|
@ -54,7 +54,7 @@ Key-Usage: sign
|
|||||||
Name-Real: Qubes test
|
Name-Real: Qubes test
|
||||||
Expire-Date: 0
|
Expire-Date: 0
|
||||||
%commit
|
%commit
|
||||||
'''.format(keydir=keydir))
|
'''.format(keydir=keydir).encode())
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
|
|
||||||
@ -63,7 +63,7 @@ Expire-Date: 0
|
|||||||
p = subprocess.Popen(gpg_opts + ['--with-colons', '--list-keys'],
|
p = subprocess.Popen(gpg_opts + ['--with-colons', '--list-keys'],
|
||||||
stdout=subprocess.PIPE)
|
stdout=subprocess.PIPE)
|
||||||
for line in p.stdout.readlines():
|
for line in p.stdout.readlines():
|
||||||
fields = line.split(':')
|
fields = line.decode().split(':')
|
||||||
if fields[0] == 'pub':
|
if fields[0] == 'pub':
|
||||||
return fields[4][-8:].lower()
|
return fields[4][-8:].lower()
|
||||||
raise RuntimeError
|
raise RuntimeError
|
||||||
@ -79,7 +79,7 @@ Expire-Date: 0
|
|||||||
p = subprocess.Popen(['sudo', 'dd',
|
p = subprocess.Popen(['sudo', 'dd',
|
||||||
'status=none', 'of=/etc/yum.repos.d/test.repo'],
|
'status=none', 'of=/etc/yum.repos.d/test.repo'],
|
||||||
stdin=subprocess.PIPE)
|
stdin=subprocess.PIPE)
|
||||||
p.stdin.write('''
|
p.stdin.write(b'''
|
||||||
[test]
|
[test]
|
||||||
name = Test
|
name = Test
|
||||||
baseurl = http://localhost:8080/
|
baseurl = http://localhost:8080/
|
||||||
@ -173,7 +173,7 @@ Test package
|
|||||||
p = self.updatevm.run('mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
|
p = self.updatevm.run('mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
|
||||||
os.path.basename(
|
os.path.basename(
|
||||||
filename)), passio_popen=True)
|
filename)), passio_popen=True)
|
||||||
p.stdin.write(open(filename).read())
|
p.stdin.write(open(filename, 'rb').read())
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
retcode = self.updatevm.run('cd /tmp/repo; createrepo .', wait=True)
|
retcode = self.updatevm.run('cd /tmp/repo; createrepo .', wait=True)
|
||||||
|
@ -51,7 +51,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
def run_cmd(self, vm, cmd, user="root"):
|
def run_cmd(self, vm, cmd, user="root"):
|
||||||
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
|
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.stdout.read()
|
p.stdout.read().decode()
|
||||||
return p.wait()
|
return p.wait()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -350,7 +350,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True,
|
passio_popen=True,
|
||||||
ignore_stderr=True)
|
ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
output = p.stdout.read()
|
output = p.stdout.read().decode()
|
||||||
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
||||||
self.assertIn('192.168.1.128', output)
|
self.assertIn('192.168.1.128', output)
|
||||||
self.assertNotIn(self.testvm1.ip, output)
|
self.assertNotIn(self.testvm1.ip, output)
|
||||||
@ -359,7 +359,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True,
|
passio_popen=True,
|
||||||
ignore_stderr=True)
|
ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
output = p.stdout.read()
|
output = p.stdout.read().decode()
|
||||||
self.assertEqual(p.wait(), 0, 'ip route show failed')
|
self.assertEqual(p.wait(), 0, 'ip route show failed')
|
||||||
self.assertIn('192.168.1.1', output)
|
self.assertIn('192.168.1.1', output)
|
||||||
self.assertNotIn(self.testvm1.netvm.ip, output)
|
self.assertNotIn(self.testvm1.netvm.ip, output)
|
||||||
@ -375,7 +375,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True,
|
passio_popen=True,
|
||||||
ignore_stderr=True)
|
ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
output = p.stdout.read()
|
output = p.stdout.read().decode()
|
||||||
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
||||||
self.assertIn('192.168.1.128', output)
|
self.assertIn('192.168.1.128', output)
|
||||||
self.assertNotIn(self.testvm1.ip, output)
|
self.assertNotIn(self.testvm1.ip, output)
|
||||||
@ -473,7 +473,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
(stdout, _) = p.communicate()
|
(stdout, _) = p.communicate()
|
||||||
self.assertEqual(p.returncode, 0,
|
self.assertEqual(p.returncode, 0,
|
||||||
'{} failed with {}'.format(cmd, p.returncode))
|
'{} failed with {}'.format(cmd, p.returncode))
|
||||||
self.assertNotEqual(stdout.split()[0], '0',
|
self.assertNotEqual(stdout.decode().split()[0], '0',
|
||||||
'Packets didn\'t managed to the VM')
|
'Packets didn\'t managed to the VM')
|
||||||
|
|
||||||
def test_204_fake_ip_proxy(self):
|
def test_204_fake_ip_proxy(self):
|
||||||
@ -501,7 +501,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True,
|
passio_popen=True,
|
||||||
ignore_stderr=True)
|
ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
output = p.stdout.read()
|
output = p.stdout.read().decode()
|
||||||
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
||||||
self.assertIn('192.168.1.128', output)
|
self.assertIn('192.168.1.128', output)
|
||||||
self.assertNotIn(self.testvm1.ip, output)
|
self.assertNotIn(self.testvm1.ip, output)
|
||||||
@ -510,7 +510,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True,
|
passio_popen=True,
|
||||||
ignore_stderr=True)
|
ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
output = p.stdout.read()
|
output = p.stdout.read().decode()
|
||||||
self.assertEqual(p.wait(), 0, 'ip route show failed')
|
self.assertEqual(p.wait(), 0, 'ip route show failed')
|
||||||
self.assertIn('192.168.1.1', output)
|
self.assertIn('192.168.1.1', output)
|
||||||
self.assertNotIn(self.testvm1.netvm.ip, output)
|
self.assertNotIn(self.testvm1.netvm.ip, output)
|
||||||
@ -519,7 +519,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True,
|
passio_popen=True,
|
||||||
ignore_stderr=True)
|
ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
output = p.stdout.read()
|
output = p.stdout.read().decode()
|
||||||
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
|
||||||
self.assertNotIn('192.168.1.128', output)
|
self.assertNotIn('192.168.1.128', output)
|
||||||
self.assertIn(self.testvm1.ip, output)
|
self.assertIn(self.testvm1.ip, output)
|
||||||
@ -528,7 +528,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_popen=True,
|
passio_popen=True,
|
||||||
ignore_stderr=True)
|
ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
output = p.stdout.read()
|
output = p.stdout.read().decode()
|
||||||
self.assertEqual(p.wait(), 0, 'ip route show failed')
|
self.assertEqual(p.wait(), 0, 'ip route show failed')
|
||||||
self.assertIn('192.168.1.128', output)
|
self.assertIn('192.168.1.128', output)
|
||||||
self.assertNotIn(self.proxy.ip, output)
|
self.assertNotIn(self.proxy.ip, output)
|
||||||
@ -688,7 +688,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
|
|||||||
def run_cmd(self, vm, cmd, user="root"):
|
def run_cmd(self, vm, cmd, user="root"):
|
||||||
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
|
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.stdout.read()
|
p.stdout.read().decode()
|
||||||
return p.wait()
|
return p.wait()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -778,7 +778,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
|
|||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
if p.wait() != 0:
|
if p.wait() != 0:
|
||||||
raise RuntimeError("Failed to write Packages file: {}".format(
|
raise RuntimeError("Failed to write Packages file: {}".format(
|
||||||
p.stderr.read()))
|
p.stderr.read().decode()))
|
||||||
|
|
||||||
p = self.netvm_repo.run(
|
p = self.netvm_repo.run(
|
||||||
"mkdir -p /tmp/apt-repo/dists/test && "
|
"mkdir -p /tmp/apt-repo/dists/test && "
|
||||||
@ -804,7 +804,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
|
|||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
if p.wait() != 0:
|
if p.wait() != 0:
|
||||||
raise RuntimeError("Failed to write Release file: {}".format(
|
raise RuntimeError("Failed to write Release file: {}".format(
|
||||||
p.stderr.read()))
|
p.stderr.read().decode()))
|
||||||
|
|
||||||
def create_repo_yum(self):
|
def create_repo_yum(self):
|
||||||
pkg_file_name = "test-pkg-1.0-1.fc21.x86_64.rpm"
|
pkg_file_name = "test-pkg-1.0-1.fc21.x86_64.rpm"
|
||||||
@ -815,7 +815,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
|
|||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
if p.wait() != 0:
|
if p.wait() != 0:
|
||||||
raise RuntimeError("Failed to write {}: {}".format(pkg_file_name,
|
raise RuntimeError("Failed to write {}: {}".format(pkg_file_name,
|
||||||
p.stderr.read()))
|
p.stderr.read().decode()))
|
||||||
|
|
||||||
# createrepo is installed by default in Fedora template
|
# createrepo is installed by default in Fedora template
|
||||||
p = self.netvm_repo.run("createrepo /tmp/yum-repo",
|
p = self.netvm_repo.run("createrepo /tmp/yum-repo",
|
||||||
@ -823,7 +823,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
|
|||||||
passio_stderr=True)
|
passio_stderr=True)
|
||||||
if p.wait() != 0:
|
if p.wait() != 0:
|
||||||
raise RuntimeError("Failed to create yum metadata: {}".format(
|
raise RuntimeError("Failed to create yum metadata: {}".format(
|
||||||
p.stderr.read()))
|
p.stderr.read().decode()))
|
||||||
|
|
||||||
def create_repo_and_serve(self):
|
def create_repo_and_serve(self):
|
||||||
if self.template.count("debian") or self.template.count("whonix"):
|
if self.template.count("debian") or self.template.count("whonix"):
|
||||||
|
@ -87,13 +87,13 @@ class TC_00_qvm_run(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|||||||
self.sharedopts + ['--pass-io', self.vm1.name, 'false']))
|
self.sharedopts + ['--pass-io', self.vm1.name, 'false']))
|
||||||
|
|
||||||
def test_002_passio_localcmd(self):
|
def test_002_passio_localcmd(self):
|
||||||
self.assertEqual('aqq', self.get_qvm_run_output(
|
self.assertEqual(b'aqq', self.get_qvm_run_output(
|
||||||
self.sharedopts + [self.vm1.name, 'printf aqq']))
|
self.sharedopts + [self.vm1.name, 'printf aqq']))
|
||||||
|
|
||||||
def test_003_user(self):
|
def test_003_user(self):
|
||||||
self.assertNotEqual('0\n', self.get_qvm_run_output(
|
self.assertNotEqual(b'0\n', self.get_qvm_run_output(
|
||||||
self.sharedopts + ['--user', 'user', self.vm1.name, 'id -u']))
|
self.sharedopts + ['--user', 'user', self.vm1.name, 'id -u']))
|
||||||
self.assertEqual('0\n', self.get_qvm_run_output(
|
self.assertEqual(b'0\n', self.get_qvm_run_output(
|
||||||
self.sharedopts + ['--user', 'root', self.vm1.name, 'id -u']))
|
self.sharedopts + ['--user', 'root', self.vm1.name, 'id -u']))
|
||||||
|
|
||||||
def test_004_autostart(self):
|
def test_004_autostart(self):
|
||||||
@ -114,13 +114,13 @@ class TC_00_qvm_run(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|||||||
sys.stdout = io.StringIO()
|
sys.stdout = io.StringIO()
|
||||||
qubes.tools.qvm_run.main(
|
qubes.tools.qvm_run.main(
|
||||||
self.sharedopts + ['--colour-output', '32', self.vm1.name, 'true'])
|
self.sharedopts + ['--colour-output', '32', self.vm1.name, 'true'])
|
||||||
self.assertEqual('\033[0;32m\033[0m', sys.stdout.getvalue())
|
self.assertEqual(b'\033[0;32m\033[0m', sys.stdout.getvalue())
|
||||||
|
|
||||||
def test_006_filter_esc(self):
|
def test_006_filter_esc(self):
|
||||||
self.assertEqual('\033', self.get_qvm_run_output(
|
self.assertEqual(b'\033', self.get_qvm_run_output(
|
||||||
self.sharedopts + ['--no-filter-escape-chars', self.vm1.name,
|
self.sharedopts + ['--no-filter-escape-chars', self.vm1.name,
|
||||||
r'printf \\033']))
|
r'printf \\033']))
|
||||||
self.assertEqual('_', self.get_qvm_run_output(
|
self.assertEqual(b'_', self.get_qvm_run_output(
|
||||||
self.sharedopts + ['--filter-escape-chars', self.vm1.name,
|
self.sharedopts + ['--filter-escape-chars', self.vm1.name,
|
||||||
r'printf \\033']))
|
r'printf \\033']))
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ import qubes.vm.appvm
|
|||||||
import qubes.vm.templatevm
|
import qubes.vm.templatevm
|
||||||
import re
|
import re
|
||||||
|
|
||||||
TEST_DATA = "0123456789" * 1024
|
TEST_DATA = b"0123456789" * 1024
|
||||||
|
|
||||||
|
|
||||||
class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
||||||
@ -222,7 +222,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
stdout = p.stdout.read()
|
stdout = p.stdout.read()
|
||||||
p.stdin.write(TEST_DATA)
|
p.stdin.write(TEST_DATA)
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
if stdout.strip() != "test":
|
if stdout.strip() != b"test":
|
||||||
result.value = 1
|
result.value = 1
|
||||||
# this may hang in some buggy cases
|
# this may hang in some buggy cases
|
||||||
elif len(p.stderr.read()) > 0:
|
elif len(p.stderr.read()) > 0:
|
||||||
@ -258,14 +258,14 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
">&$SAVED_FD_1'" % self.testvm2.name,
|
">&$SAVED_FD_1'" % self.testvm2.name,
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
if stdout != "test\n":
|
if stdout != b"test\n":
|
||||||
result.value = 1
|
result.value = 1
|
||||||
|
|
||||||
self.testvm1.start()
|
self.testvm1.start()
|
||||||
self.testvm2.start()
|
self.testvm2.start()
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.stdin.write("/bin/cat")
|
p.stdin.write(b"/bin/cat")
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
|
policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
|
||||||
@ -293,14 +293,14 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
% self.testvm2.name,
|
% self.testvm2.name,
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
if stdout != "test\n":
|
if stdout != b"test\n":
|
||||||
result.value = 1
|
result.value = 1
|
||||||
|
|
||||||
self.testvm1.start()
|
self.testvm1.start()
|
||||||
self.testvm2.start()
|
self.testvm2.start()
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.stdin.write("echo test; exec >&-; cat >/dev/null")
|
p.stdin.write(b"echo test; exec >&-; cat >/dev/null")
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
|
policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
|
||||||
@ -376,7 +376,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
|
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.stdin.write("exit 0")
|
p.stdin.write(b"exit 0")
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
|
|
||||||
@ -385,11 +385,11 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
% self.testvm1.name,
|
% self.testvm1.name,
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
self.assertEqual(stdout, "0\n")
|
self.assertEqual(stdout, b"0\n")
|
||||||
|
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.stdin.write("exit 3")
|
p.stdin.write(b"exit 3")
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
|
|
||||||
@ -398,7 +398,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
% self.testvm1.name,
|
% self.testvm1.name,
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
self.assertEqual(stdout, "3\n")
|
self.assertEqual(stdout, b"3\n")
|
||||||
|
|
||||||
def test_070_qrexec_vm_simultaneous_write(self):
|
def test_070_qrexec_vm_simultaneous_write(self):
|
||||||
"""Test for simultaneous write in VM(src)->VM(dst) connection
|
"""Test for simultaneous write in VM(src)->VM(dst) connection
|
||||||
@ -432,9 +432,9 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
# first write a lot of data
|
# first write a lot of data
|
||||||
p.stdin.write("dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n")
|
p.stdin.write(b"dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n")
|
||||||
# and only then read something
|
# and only then read something
|
||||||
p.stdin.write("dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
|
p.stdin.write(b"dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
policy = open("/etc/qubes-rpc/policy/test.write", "w")
|
policy = open("/etc/qubes-rpc/policy/test.write", "w")
|
||||||
@ -473,9 +473,9 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
# first write a lot of data
|
# first write a lot of data
|
||||||
p.stdin.write("dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n")
|
p.stdin.write(b"dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n")
|
||||||
# and only then read something
|
# and only then read something
|
||||||
p.stdin.write("dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
|
p.stdin.write(b"dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
policy = open("/etc/qubes-rpc/policy/test.write", "w")
|
policy = open("/etc/qubes-rpc/policy/test.write", "w")
|
||||||
@ -512,11 +512,11 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
# first write a lot of data
|
# first write a lot of data
|
||||||
p.stdin.write("dd if=/dev/zero bs=993 count=10000 iflag=fullblock &\n")
|
p.stdin.write(b"dd if=/dev/zero bs=993 count=10000 iflag=fullblock &\n")
|
||||||
# and only then read something
|
# and only then read something
|
||||||
p.stdin.write("dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
|
p.stdin.write(b"dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
|
||||||
p.stdin.write("sleep 1; \n")
|
p.stdin.write(b"sleep 1; \n")
|
||||||
p.stdin.write("wait\n")
|
p.stdin.write(b"wait\n")
|
||||||
p.stdin.close()
|
p.stdin.close()
|
||||||
p.wait()
|
p.wait()
|
||||||
policy = open("/etc/qubes-rpc/policy/test.write", "w")
|
policy = open("/etc/qubes-rpc/policy/test.write", "w")
|
||||||
@ -538,7 +538,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
self.testvm2.start()
|
self.testvm2.start()
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.communicate("/bin/echo $1")
|
p.communicate(b"/bin/echo $1")
|
||||||
|
|
||||||
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
||||||
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
|
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
|
||||||
@ -548,7 +548,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
"test.Argument+argument".format(self.testvm2.name),
|
"test.Argument+argument".format(self.testvm2.name),
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
self.assertEqual(stdout, "argument\n")
|
self.assertEqual(stdout, b"argument\n")
|
||||||
|
|
||||||
def test_081_qrexec_service_argument_allow_specific(self):
|
def test_081_qrexec_service_argument_allow_specific(self):
|
||||||
"""Qrexec service call with argument - allow only specific value"""
|
"""Qrexec service call with argument - allow only specific value"""
|
||||||
@ -556,7 +556,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
self.testvm2.start()
|
self.testvm2.start()
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.communicate("/bin/echo $1")
|
p.communicate(b"/bin/echo $1")
|
||||||
|
|
||||||
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
||||||
policy.write("$anyvm $anyvm deny")
|
policy.write("$anyvm $anyvm deny")
|
||||||
@ -572,7 +572,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
"test.Argument+argument".format(self.testvm2.name),
|
"test.Argument+argument".format(self.testvm2.name),
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
self.assertEqual(stdout, "argument\n")
|
self.assertEqual(stdout, b"argument\n")
|
||||||
|
|
||||||
def test_082_qrexec_service_argument_deny_specific(self):
|
def test_082_qrexec_service_argument_deny_specific(self):
|
||||||
"""Qrexec service call with argument - deny specific value"""
|
"""Qrexec service call with argument - deny specific value"""
|
||||||
@ -580,7 +580,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
self.testvm2.start()
|
self.testvm2.start()
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.communicate("/bin/echo $1")
|
p.communicate(b"/bin/echo $1")
|
||||||
|
|
||||||
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
||||||
policy.write("$anyvm $anyvm allow")
|
policy.write("$anyvm $anyvm allow")
|
||||||
@ -596,7 +596,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
"test.Argument+argument".format(self.testvm2.name),
|
"test.Argument+argument".format(self.testvm2.name),
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
self.assertEqual(stdout, "")
|
self.assertEqual(stdout, b"")
|
||||||
self.assertEqual(p.returncode, 1, "Service request should be denied")
|
self.assertEqual(p.returncode, 1, "Service request should be denied")
|
||||||
|
|
||||||
def test_083_qrexec_service_argument_specific_implementation(self):
|
def test_083_qrexec_service_argument_specific_implementation(self):
|
||||||
@ -606,11 +606,11 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
self.testvm2.start()
|
self.testvm2.start()
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.communicate("/bin/echo $1")
|
p.communicate(b"/bin/echo $1")
|
||||||
|
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument+argument",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument+argument",
|
||||||
user="root", passio_popen=True)
|
user="root", passio_popen=True)
|
||||||
p.communicate("/bin/echo specific: $1")
|
p.communicate(b"/bin/echo specific: $1")
|
||||||
|
|
||||||
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
||||||
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
|
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
|
||||||
@ -620,7 +620,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
"test.Argument+argument".format(self.testvm2.name),
|
"test.Argument+argument".format(self.testvm2.name),
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
self.assertEqual(stdout, "specific: argument\n")
|
self.assertEqual(stdout, b"specific: argument\n")
|
||||||
|
|
||||||
def test_084_qrexec_service_argument_extra_env(self):
|
def test_084_qrexec_service_argument_extra_env(self):
|
||||||
"""Qrexec service call with argument - extra env variables"""
|
"""Qrexec service call with argument - extra env variables"""
|
||||||
@ -628,8 +628,8 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
self.testvm2.start()
|
self.testvm2.start()
|
||||||
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
p.communicate("/bin/echo $QREXEC_SERVICE_FULL_NAME "
|
p.communicate(b"/bin/echo $QREXEC_SERVICE_FULL_NAME "
|
||||||
"$QREXEC_SERVICE_ARGUMENT")
|
b"$QREXEC_SERVICE_ARGUMENT")
|
||||||
|
|
||||||
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
|
||||||
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
|
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
|
||||||
@ -639,7 +639,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
"test.Argument+argument".format(self.testvm2.name),
|
"test.Argument+argument".format(self.testvm2.name),
|
||||||
passio_popen=True)
|
passio_popen=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
self.assertEqual(stdout, "test.Argument+argument argument\n")
|
self.assertEqual(stdout, b"test.Argument+argument argument\n")
|
||||||
|
|
||||||
def test_100_qrexec_filecopy(self):
|
def test_100_qrexec_filecopy(self):
|
||||||
self.testvm1.start()
|
self.testvm1.start()
|
||||||
@ -784,7 +784,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
# Check if reverting back to UTC works
|
# Check if reverting back to UTC works
|
||||||
(vm_tz, _) = self.testvm1.run("TZ=UTC date +%Z",
|
(vm_tz, _) = self.testvm1.run("TZ=UTC date +%Z",
|
||||||
passio_popen=True).communicate()
|
passio_popen=True).communicate()
|
||||||
self.assertEqual(vm_tz.strip(), "UTC")
|
self.assertEqual(vm_tz.strip(), b"UTC")
|
||||||
|
|
||||||
def test_210_time_sync(self):
|
def test_210_time_sync(self):
|
||||||
"""Test time synchronization mechanism"""
|
"""Test time synchronization mechanism"""
|
||||||
@ -919,7 +919,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
"}\n")
|
"}\n")
|
||||||
|
|
||||||
p = self.testvm1.run("cat > allocator.c", passio_popen=True)
|
p = self.testvm1.run("cat > allocator.c", passio_popen=True)
|
||||||
p.communicate(allocator_c)
|
p.communicate(allocator_c.encode())
|
||||||
p = self.testvm1.run("gcc allocator.c -o allocator",
|
p = self.testvm1.run("gcc allocator.c -o allocator",
|
||||||
passio_popen=True, passio_stderr=True)
|
passio_popen=True, passio_stderr=True)
|
||||||
(stdout, stderr) = p.communicate()
|
(stdout, stderr) = p.communicate()
|
||||||
@ -941,14 +941,14 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
user="root", passio_popen=True, passio_stderr=True)
|
user="root", passio_popen=True, passio_stderr=True)
|
||||||
# wait for memory being allocated; can't use just .read(), because EOF
|
# wait for memory being allocated; can't use just .read(), because EOF
|
||||||
# passing is unreliable while the process is still running
|
# passing is unreliable while the process is still running
|
||||||
alloc1.stdin.write("\n")
|
alloc1.stdin.write(b"\n")
|
||||||
alloc1.stdin.flush()
|
alloc1.stdin.flush()
|
||||||
alloc_out = alloc1.stdout.read(len("Stage1\nStage2\nStage3\n"))
|
alloc_out = alloc1.stdout.read(len("Stage1\nStage2\nStage3\n"))
|
||||||
|
|
||||||
if "Stage3" not in alloc_out:
|
if b"Stage3" not in alloc_out:
|
||||||
# read stderr only in case of failed assert, but still have nice
|
# read stderr only in case of failed assert, but still have nice
|
||||||
# failure message (don't use self.fail() directly)
|
# failure message (don't use self.fail() directly)
|
||||||
self.assertIn("Stage3", alloc_out, alloc1.stderr.read())
|
self.assertIn(b"Stage3", alloc_out, alloc1.stderr.read())
|
||||||
|
|
||||||
# now, launch some window - it should get fragmented composition buffer
|
# now, launch some window - it should get fragmented composition buffer
|
||||||
# it is important to have some changing content there, to generate
|
# it is important to have some changing content there, to generate
|
||||||
@ -965,10 +965,10 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
|
|||||||
winid = search.communicate()[0].strip()
|
winid = search.communicate()[0].strip()
|
||||||
xprop = subprocess.Popen(['xprop', '-notype', '-id', winid,
|
xprop = subprocess.Popen(['xprop', '-notype', '-id', winid,
|
||||||
'_QUBES_VMWINDOWID'], stdout=subprocess.PIPE)
|
'_QUBES_VMWINDOWID'], stdout=subprocess.PIPE)
|
||||||
vm_winid = xprop.stdout.read().strip().split(' ')[4]
|
vm_winid = xprop.stdout.read().decode().strip().split(' ')[4]
|
||||||
|
|
||||||
# now free the fragmented memory and trigger compaction
|
# now free the fragmented memory and trigger compaction
|
||||||
alloc1.stdin.write("\n")
|
alloc1.stdin.write(b"\n")
|
||||||
alloc1.wait()
|
alloc1.wait()
|
||||||
self.testvm1.run("echo 1 > /proc/sys/vm/compact_memory", user="root")
|
self.testvm1.run("echo 1 > /proc/sys/vm/compact_memory", user="root")
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user