# pylint: disable=protected-access,pointless-statement

#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015-2016  Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2015-2016  Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
#

import os
import subprocess
import time
import unittest

import qubes.devices
import qubes.ext.pci
import qubes.tests


@qubes.tests.skipUnlessEnv('QUBES_TEST_PCIDEV')
class TC_00_Devices_PCI(qubes.tests.SystemTestCase):
    def setUp(self):
        super(TC_00_Devices_PCI, self).setUp()
        if self._testMethodName not in ['test_000_list']:
            pcidev = os.environ['QUBES_TEST_PCIDEV']
            self.dev = self.app.domains[0].devices['pci'][pcidev]
            self.assignment = qubes.devices.DeviceAssignment(
                backend_domain=self.dev.backend_domain,
                ident=self.dev.ident,
                persistent=True)
            if isinstance(self.dev, qubes.devices.UnknownDevice):
                self.skipTest('Specified device {} does not exists'.format(pcidev))
            self.init_default_template()
            self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
                name=self.make_vm_name('vm'),
                label='red',
            )
            self.vm.virt_mode = 'hvm'
            self.loop.run_until_complete(
                self.vm.create_on_disk())
            self.vm.features['pci-no-strict-reset/' + pcidev] = True
            self.app.save()

    @unittest.expectedFailure
    def test_000_list(self):
        p = subprocess.Popen(['lspci'], stdout=subprocess.PIPE)
        # get a dict: BDF -> description
        actual_devices = dict(
            l.split(' (')[0].split(' ', 1)
                for l in p.communicate()[0].decode().splitlines())
        for dev in self.app.domains[0].devices['pci']:
            lspci_ident = dev.ident.replace('_', ':')
            self.assertIsInstance(dev, qubes.ext.pci.PCIDevice)
            self.assertEqual(dev.backend_domain, self.app.domains[0])
            self.assertIn(lspci_ident, actual_devices)
            self.assertEqual(dev.description, actual_devices[lspci_ident])
            actual_devices.pop(lspci_ident)

        if actual_devices:
            self.fail('Not all devices listed, missing: {}'.format(
                actual_devices))

    def assertDeviceNotInCollection(self, dev, dev_col):
        self.assertNotIn(dev, dev_col.attached())
        self.assertNotIn(dev, dev_col.persistent())
        self.assertNotIn(dev, dev_col.assignments())
        self.assertNotIn(dev, dev_col.assignments(persistent=True))

    def test_010_attach_offline_persistent(self):
        dev_col = self.vm.devices['pci']
        self.assertDeviceNotInCollection(self.dev, dev_col)
        self.loop.run_until_complete(
            dev_col.attach(self.assignment))
        self.app.save()
        self.assertNotIn(self.dev, dev_col.attached())
        self.assertIn(self.dev, dev_col.persistent())
        self.assertIn(self.dev, dev_col.assignments())
        self.assertIn(self.dev, dev_col.assignments(persistent=True))
        self.assertNotIn(self.dev, dev_col.assignments(persistent=False))

        self.loop.run_until_complete(self.vm.start())

        self.assertIn(self.dev, dev_col.attached())
        (stdout, _) = self.loop.run_until_complete(
            self.vm.run_for_stdio('lspci'))
        self.assertIn(self.dev.description, stdout.decode())


    def test_011_attach_offline_temp_fail(self):
        dev_col = self.vm.devices['pci']
        self.assertDeviceNotInCollection(self.dev, dev_col)
        self.assignment.persistent = False
        with self.assertRaises(qubes.exc.QubesVMNotRunningError):
            self.loop.run_until_complete(
                dev_col.attach(self.assignment))


    def test_020_attach_online_persistent(self):
        self.loop.run_until_complete(
            self.vm.start())
        dev_col = self.vm.devices['pci']
        self.assertDeviceNotInCollection(self.dev, dev_col)
        self.loop.run_until_complete(
            dev_col.attach(self.assignment))

        self.assertIn(self.dev, dev_col.attached())
        self.assertIn(self.dev, dev_col.persistent())
        self.assertIn(self.dev, dev_col.assignments())
        self.assertIn(self.dev, dev_col.assignments(persistent=True))
        self.assertNotIn(self.dev, dev_col.assignments(persistent=False))

        # give VM kernel some time to discover new device
        time.sleep(1)
        (stdout, _) = self.loop.run_until_complete(
            self.vm.run_for_stdio('lspci'))
        self.assertIn(self.dev.description, stdout.decode())


    def test_021_persist_detach_online_fail(self):
        dev_col = self.vm.devices['pci']
        self.assertDeviceNotInCollection(self.dev, dev_col)
        self.loop.run_until_complete(
            dev_col.attach(self.assignment))
        self.app.save()
        self.loop.run_until_complete(
            self.vm.start())
        with self.assertRaises(qubes.exc.QubesVMNotHaltedError):
            self.loop.run_until_complete(
                self.vm.devices['pci'].detach(self.assignment))

    def test_030_persist_attach_detach_offline(self):
        dev_col = self.vm.devices['pci']
        self.assertDeviceNotInCollection(self.dev, dev_col)
        self.loop.run_until_complete(
            dev_col.attach(self.assignment))
        self.app.save()
        self.assertNotIn(self.dev, dev_col.attached())
        self.assertIn(self.dev, dev_col.persistent())
        self.assertIn(self.dev, dev_col.assignments())
        self.assertIn(self.dev, dev_col.assignments(persistent=True))
        self.assertNotIn(self.dev, dev_col.assignments(persistent=False))
        self.loop.run_until_complete(
            dev_col.detach(self.assignment))
        self.assertDeviceNotInCollection(self.dev, dev_col)

    def test_031_attach_detach_online_temp(self):
        dev_col = self.vm.devices['pci']
        self.loop.run_until_complete(
            self.vm.start())
        self.assignment.persistent = False
        self.assertDeviceNotInCollection(self.dev, dev_col)
        self.loop.run_until_complete(
            dev_col.attach(self.assignment))

        self.assertIn(self.dev, dev_col.attached())
        self.assertNotIn(self.dev, dev_col.persistent())
        self.assertIn(self.dev, dev_col.assignments())
        self.assertIn(self.dev, dev_col.assignments(persistent=False))
        self.assertNotIn(self.dev, dev_col.assignments(persistent=True))
        self.assertIn(self.dev, dev_col.assignments(persistent=False))


        # give VM kernel some time to discover new device
        time.sleep(1)
        (stdout, _) = self.loop.run_until_complete(
            self.vm.run_for_stdio('lspci'))

        self.assertIn(self.dev.description, stdout.decode())
        self.loop.run_until_complete(
            dev_col.detach(self.assignment))
        self.assertDeviceNotInCollection(self.dev, dev_col)

        (stdout, _) = self.loop.run_until_complete(
            self.vm.run_for_stdio('lspci'))
        self.assertNotIn(self.dev.description, stdout.decode())