devices_pci.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # pylint: disable=protected-access,pointless-statement
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2015-2016 Joanna Rutkowska <joanna@invisiblethingslab.com>
  6. # Copyright (C) 2015-2016 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, write to the Free Software Foundation, Inc.,
  20. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. #
  22. import os
  23. import subprocess
  24. import time
  25. import unittest
  26. import qubes.devices
  27. import qubes.ext.pci
  28. import qubes.tests
  29. class TC_00_Devices_PCI(qubes.tests.SystemTestsMixin,
  30. qubes.tests.QubesTestCase):
  31. def setUp(self):
  32. super(TC_00_Devices_PCI, self).setUp()
  33. if self._testMethodName not in ['test_000_list']:
  34. pcidev = os.environ.get('QUBES_TEST_PCIDEV', None)
  35. if pcidev is None:
  36. self.skipTest('Specify PCI device with QUBES_TEST_PCIDEV '
  37. 'environment variable')
  38. self.dev = self.app.domains[0].devices['pci'][pcidev]
  39. self.assignment = qubes.devices.DeviceAssignment(backend_domain=self.dev.backend_domain, ident=self.dev.ident, persistent=True)
  40. if isinstance(self.dev, qubes.devices.UnknownDevice):
  41. self.skipTest('Specified device {} does not exists'.format(pcidev))
  42. self.init_default_template()
  43. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  44. name=self.make_vm_name('vm'),
  45. label='red',
  46. )
  47. self.vm.create_on_disk()
  48. self.vm.features['pci-no-strict-reset/' + pcidev] = True
  49. self.app.save()
  50. @unittest.expectedFailure
  51. def test_000_list(self):
  52. p = subprocess.Popen(['lspci'], stdout=subprocess.PIPE)
  53. # get a dict: BDF -> description
  54. actual_devices = dict(
  55. l.split(' (')[0].split(' ', 1)
  56. for l in p.communicate()[0].decode().splitlines())
  57. for dev in self.app.domains[0].devices['pci']:
  58. self.assertIsInstance(dev, qubes.ext.pci.PCIDevice)
  59. self.assertEqual(dev.backend_domain, self.app.domains[0])
  60. self.assertIn(dev.ident, actual_devices)
  61. self.assertEqual(dev.description, actual_devices[dev.ident])
  62. actual_devices.pop(dev.ident)
  63. if actual_devices:
  64. self.fail('Not all devices listed, missing: {}'.format(
  65. actual_devices))
  66. def assertDeviceNotInCollection(self, dev, dev_col):
  67. self.assertNotIn(dev, dev_col.attached())
  68. self.assertNotIn(dev, dev_col.persistent())
  69. self.assertNotIn(dev, dev_col.assignments())
  70. self.assertNotIn(dev, dev_col.assignments(persistent=True))
  71. def test_010_attach_offline_persistent(self):
  72. dev_col = self.vm.devices['pci']
  73. self.assertDeviceNotInCollection(self.dev, dev_col)
  74. dev_col.attach(self.assignment)
  75. self.app.save()
  76. self.assertNotIn(self.dev, dev_col.attached())
  77. self.assertIn(self.dev, dev_col.persistent())
  78. self.assertIn(self.dev, dev_col.assignments())
  79. self.assertIn(self.dev, dev_col.assignments(persistent=True))
  80. self.assertNotIn(self.dev, dev_col.assignments(persistent=False))
  81. self.vm.start()
  82. self.assertIn(self.dev, dev_col.attached())
  83. p = self.vm.run('lspci', passio_popen=True)
  84. (stdout, _) = p.communicate()
  85. self.assertIn(self.dev.description, stdout.decode())
  86. def test_011_attach_offline_temp_fail(self):
  87. dev_col = self.vm.devices['pci']
  88. self.assertDeviceNotInCollection(self.dev, dev_col)
  89. self.assignment.persistent = False
  90. with self.assertRaises(qubes.exc.QubesVMNotRunningError):
  91. dev_col.attach(self.assignment)
  92. def test_020_attach_online_persistent(self):
  93. self.vm.start()
  94. dev_col = self.vm.devices['pci']
  95. self.assertDeviceNotInCollection(self.dev, dev_col)
  96. dev_col.attach(self.assignment)
  97. self.assertIn(self.dev, dev_col.attached())
  98. self.assertIn(self.dev, dev_col.persistent())
  99. self.assertIn(self.dev, dev_col.assignments())
  100. self.assertIn(self.dev, dev_col.assignments(persistent=True))
  101. self.assertNotIn(self.dev, dev_col.assignments(persistent=False))
  102. # give VM kernel some time to discover new device
  103. time.sleep(1)
  104. p = self.vm.run('lspci', passio_popen=True)
  105. (stdout, _) = p.communicate()
  106. self.assertIn(self.dev.description, stdout.decode())
  107. def test_021_persist_detach_online_fail(self):
  108. dev_col = self.vm.devices['pci']
  109. self.assertDeviceNotInCollection(self.dev, dev_col)
  110. dev_col.attach(self.assignment)
  111. self.app.save()
  112. self.vm.start()
  113. with self.assertRaises(qubes.exc.QubesVMNotHaltedError):
  114. self.vm.devices['pci'].detach(self.assignment)
  115. def test_030_persist_attach_detach_offline(self):
  116. dev_col = self.vm.devices['pci']
  117. self.assertDeviceNotInCollection(self.dev, dev_col)
  118. dev_col.attach(self.assignment)
  119. self.app.save()
  120. self.assertNotIn(self.dev, dev_col.attached())
  121. self.assertIn(self.dev, dev_col.persistent())
  122. self.assertIn(self.dev, dev_col.assignments())
  123. self.assertIn(self.dev, dev_col.assignments(persistent=True))
  124. self.assertNotIn(self.dev, dev_col.assignments(persistent=False))
  125. dev_col.detach(self.assignment)
  126. self.assertDeviceNotInCollection(self.dev, dev_col)
  127. def test_031_attach_detach_online_temp(self):
  128. dev_col = self.vm.devices['pci']
  129. self.vm.start()
  130. self.assignment.persistent = False
  131. self.assertDeviceNotInCollection(self.dev, dev_col)
  132. dev_col.attach(self.assignment)
  133. self.assertIn(self.dev, dev_col.attached())
  134. self.assertNotIn(self.dev, dev_col.persistent())
  135. self.assertIn(self.dev, dev_col.assignments())
  136. self.assertIn(self.dev, dev_col.assignments(persistent=False))
  137. self.assertNotIn(self.dev, dev_col.assignments(persistent=True))
  138. self.assertIn(self.dev, dev_col.assignments(persistent=False))
  139. # give VM kernel some time to discover new device
  140. time.sleep(1)
  141. p = self.vm.run('lspci', passio_popen=True)
  142. (stdout, _) = p.communicate()
  143. self.assertIn(self.dev.description, stdout.decode())
  144. dev_col.detach(self.assignment)
  145. self.assertDeviceNotInCollection(self.dev, dev_col)
  146. p = self.vm.run('lspci', passio_popen=True)
  147. (stdout, _) = p.communicate()
  148. self.assertNotIn(self.dev.description, stdout.decode())