Browse Source

tools: restore qvm-start --cdrom and similar options

Booting a VM from cdrom require attaching the device before VM startup,
which is possible only in persistent mode. But for qvm-start --cdrom
adding a cdrom only temporarily, use new update_persistence() function
to convert the assignment to temporary one.

Fixes QubesOS/qubes-issues#3055
Marek Marczykowski-Górecki 6 years ago
parent
commit
5e2638ab5e
3 changed files with 308 additions and 2 deletions
  1. 29 1
      doc/manpages/qvm-start.rst
  2. 157 0
      qubesadmin/tests/tools/qvm_start.py
  3. 122 1
      qubesadmin/tools/qvm_start.py

+ 29 - 1
doc/manpages/qvm-start.rst

@@ -15,7 +15,7 @@
 Synopsis
 --------
 
-:command:`qvm-start` [-h] [--verbose] [--quiet] *VMNAME*
+:command:`qvm-start` [-h] [options] *VMNAME*
 
 Options
 -------
@@ -45,6 +45,34 @@ Options
    exclude the qube from --all
 
 
+.. option:: --drive=DRIVE
+
+   Temporarily attach specified drive as CD/DVD or hard disk (can be specified with prefix "hd:" or "cdrom:", default is cdrom).
+   The syntax for the device itself is "qube_name:device_name", meaning *device_name* served by *qube_name*.
+   See `qvm-block` output for a list of available devices.
+
+   Additionally, "qube_name:path" syntax can be used. This
+   will setup loop device inside *qube_name*, pointing at *path*, and will use
+   it as device. You need to clean up that loop device yourself, but it will
+   also cleanup itself at next qube restart. This syntax is available only when
+   calling this tool from dom0.
+
+.. option:: --hddisk=DRIVE
+
+   Temporarily attach specified drive as hard disk. This is equivalent with
+   `--drive=hd:DRIVE`.
+
+.. option:: --cdrom=DRIVE
+
+   Temporarily attach specified drive as CD/DVD. This is equivalent with
+   `--drive=cdrom:DRIVE`.
+
+.. option:: --install-windows-tools
+
+   Temporarily attach Windows tools CDROM to the domain. This is equivalent with
+   `--cdrom=dom0:/usr/lib/qubes/qubes-windows-tools.iso`.
+
+
 Authors
 -------
 

+ 157 - 0
qubesadmin/tests/tools/qvm_start.py

@@ -18,6 +18,10 @@
 # You should have received a copy of the GNU Lesser General Public License along
 # with this program; if not, see <http://www.gnu.org/licenses/>.
 
+import unittest.mock
+
+import subprocess
+
 import qubesadmin.tests
 import qubesadmin.tests.tools
 import qubesadmin.tools.qvm_start
@@ -65,3 +69,156 @@ class TC_00_qvm_start(qubesadmin.tests.QubesTestCase):
             1)
         self.assertAllCalled()
 
+    def test_010_drive_cdrom(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00dom0 class=AdminVM state=Running\n' \
+            b'some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.device.block.Available', None, None)] = \
+            b'0\x00sr0\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Attach', 'dom0+sr0',
+                b'devtype=cdrom persistent=True')] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Set.persistent', 'dom0+sr0',
+            b'False')] = b'0\x00'
+        qubesadmin.tools.qvm_start.main(['--cdrom=dom0:sr0', 'some-vm'],
+            app=self.app)
+        self.assertAllCalled()
+
+    def test_011_drive_disk(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00dom0 class=AdminVM state=Running\n' \
+            b'some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.device.block.Available', None, None)] = \
+            b'0\x00sdb1\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Attach', 'dom0+sdb1',
+                b'devtype=disk persistent=True')] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Set.persistent', 'dom0+sdb1',
+            b'False')] = b'0\x00'
+        qubesadmin.tools.qvm_start.main(['--hd=dom0:sdb1', 'some-vm'],
+            app=self.app)
+        self.assertAllCalled()
+
+    def test_012_drive_disk(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00dom0 class=AdminVM state=Running\n' \
+            b'some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.device.block.Available', None, None)] = \
+            b'0\x00sdb1\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Attach', 'dom0+sdb1',
+                b'devtype=disk persistent=True')] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Set.persistent', 'dom0+sdb1',
+            b'False')] = b'0\x00'
+        qubesadmin.tools.qvm_start.main(['--drive=hd:dom0:sdb1', 'some-vm'],
+            app=self.app)
+        self.assertAllCalled()
+
+    @unittest.mock.patch('subprocess.check_output')
+    def test_013_drive_loop_local(self, mock_subprocess):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00dom0 class=AdminVM state=Running\n' \
+            b'some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.device.block.Available', None, None)] = \
+            b'0\x00loop12\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Attach', 'dom0+loop12',
+                b'devtype=cdrom persistent=True')] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Set.persistent', 'dom0+loop12',
+            b'False')] = b'0\x00'
+        mock_subprocess.return_value = b'/dev/loop12'
+        qubesadmin.tools.qvm_start.main([
+            '--cdrom=dom0:/home/some/image.iso',
+            'some-vm'],
+            app=self.app)
+        self.assertAllCalled()
+        mock_subprocess.assert_called_once_with(
+            ['sudo', 'losetup', '-f', '--show', '/home/some/image.iso'])
+
+    def test_014_drive_loop_remote(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00dom0 class=AdminVM state=Running\n' \
+            b'other-vm class=AppVM state=Running\n' \
+            b'some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('other-vm', 'admin.vm.device.block.Available', None, None)] = \
+            b'0\x00loop7\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Attach', 'other-vm+loop7',
+                b'devtype=cdrom persistent=True')] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Set.persistent',
+            'other-vm+loop7',
+            b'False')] = b'0\x00'
+        with unittest.mock.patch.object(self.app.domains['other-vm'], 'run') \
+                as mock_run:
+            mock_run.return_value = (b'/dev/loop7', b'')
+            qubesadmin.tools.qvm_start.main([
+                '--cdrom=other-vm:/home/some/image.iso',
+                'some-vm'],
+                app=self.app)
+            mock_run.assert_called_once_with(
+                'losetup -f --show /home/some/image.iso',
+                user='root')
+        self.assertAllCalled()
+
+    def test_015_drive_failed_start(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00dom0 class=AdminVM state=Running\n' \
+            b'other-vm class=AppVM state=Running\n' \
+            b'some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Attach', 'other-vm+loop7',
+                b'devtype=cdrom persistent=True')] = b'0\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.Start', None, None)] = \
+            b'2\x00QubesException\x00\x00An error occurred\x00'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Detach',
+            'other-vm+loop7', None)] = b'0\x00'
+        qubesadmin.tools.qvm_start.main([
+            '--cdrom=other-vm:loop7',
+            'some-vm'],
+            app=self.app)
+        self.assertAllCalled()
+
+    def test_016_drive_failed_attach(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00dom0 class=AdminVM state=Running\n' \
+            b'other-vm class=AppVM state=Running\n' \
+            b'some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.device.block.Attach', 'other-vm+loop7',
+                b'devtype=cdrom persistent=True')] = \
+            b'2\x00QubesException\x00\x00An error occurred\x00'
+        retcode = qubesadmin.tools.qvm_start.main([
+            '--cdrom=other-vm:loop7',
+            'some-vm'],
+            app=self.app)
+        self.assertEqual(retcode, 1)
+        self.assertAllCalled()

+ 122 - 1
qubesadmin/tools/qvm_start.py

@@ -19,12 +19,35 @@
 # with this program; if not, see <http://www.gnu.org/licenses/>.
 
 '''qvm-start - start a domain'''
+import argparse
+import sys
 
+import subprocess
 
-import sys
+import qubesadmin.devices
 import qubesadmin.exc
 import qubesadmin.tools
 
+class DriveAction(argparse.Action):
+    '''Action for argument parser that stores drive image path.'''
+
+    # pylint: disable=redefined-builtin,too-few-public-methods
+    def __init__(self,
+            option_strings,
+            dest='drive',
+            prefix='cdrom:',
+            metavar='IMAGE',
+            required=False,
+            help='Attach drive'):
+        super(DriveAction, self).__init__(option_strings, dest,
+            metavar=metavar, help=help)
+        self.prefix = prefix
+
+    def __call__(self, parser, namespace, values, option_string=None):
+        # pylint: disable=redefined-outer-name
+        setattr(namespace, self.dest, self.prefix + values)
+
+
 parser = qubesadmin.tools.QubesArgumentParser(
     description='start a domain', vmname_nargs='+')
 
@@ -32,6 +55,85 @@ parser.add_argument('--skip-if-running',
     action='store_true', default=False,
     help='Do not fail if the qube is already runnning')
 
+parser_drive = parser.add_mutually_exclusive_group()
+
+parser_drive.add_argument('--drive', metavar='DRIVE',
+    help='temporarily attach specified drive as CD/DVD or hard disk (can be'
+        ' specified with prefix "hd:" or "cdrom:", default is cdrom)')
+
+parser_drive.add_argument('--hddisk',
+    action=DriveAction, dest='drive', prefix='hd:',
+    help='temporarily attach specified drive as hard disk')
+
+parser_drive.add_argument('--cdrom', metavar='IMAGE',
+    action=DriveAction, dest='drive', prefix='cdrom:',
+    help='temporarily attach specified drive as CD/DVD')
+
+parser_drive.add_argument('--install-windows-tools',
+    action='store_const', dest='drive', default=False,
+    const='cdrom:dom0:/usr/lib/qubes/qubes-windows-tools.iso',
+    help='temporarily attach Windows tools CDROM to the domain')
+
+
+def get_drive_assignment(app, drive_str):
+    ''' Prepare :py:class:`qubesadmin.devices.DeviceAssignment` object for a
+    given drive.
+
+    If running in dom0, it will also take care about creating appropriate
+    loop device (if necessary). Otherwise, only existing block devices are
+    supported.
+
+    :param app: Qubes() instance
+    :param drive_str: drive argument
+    :return: DeviceAssignment matching *drive_str*
+    '''
+    devtype = 'cdrom'
+    if drive_str.startswith('cdrom:'):
+        devtype = 'cdrom'
+        drive_str = drive_str[len('cdrom:'):]
+    elif drive_str.startswith('hd:'):
+        devtype = 'disk'
+        drive_str = drive_str[len('hd:'):]
+
+    backend_domain_name, ident = drive_str.split(':', 1)
+    try:
+        backend_domain = app.domains[backend_domain_name]
+    except KeyError:
+        raise qubesadmin.exc.QubesVMNotFoundError(
+            'No such VM: %s', backend_domain_name)
+    if ident.startswith('/'):
+        # it is a path - if we're running in dom0, try to call losetup to
+        # export the device, otherwise reject
+        if app.qubesd_connection_type == 'qrexec':
+            raise qubesadmin.exc.QubesException(
+                'Existing block device identifier needed when running from '
+                'outside of dom0 (see qvm-block)')
+        try:
+            if isinstance(backend_domain, qubesadmin.vm.AdminVM):
+                loop_name = subprocess.check_output(
+                    ['sudo', 'losetup', '-f', '--show', ident])
+            else:
+                loop_name, _ = backend_domain.run(
+                    'losetup -f --show ' + ident, user='root')
+        except subprocess.CalledProcessError:
+            raise qubesadmin.exc.QubesException(
+                'Failed to setup loop device for %s', ident)
+        assert loop_name.startswith(b'/dev/loop')
+        ident = loop_name.decode().split('/')[2]
+        # FIXME: synchronize with udev + exposing device in qubesdb
+
+    options = {}
+    if devtype:
+        options['devtype'] = devtype
+    assignment = qubesadmin.devices.DeviceAssignment(
+        backend_domain,
+        ident,
+        options=options,
+        persistent=True)
+
+    return assignment
+
+
 def main(args=None, app=None):
     '''Main routine of :program:`qvm-start`.
 
@@ -45,9 +147,28 @@ def main(args=None, app=None):
     for domain in args.domains:
         if args.skip_if_running and domain.is_running():
             continue
+        drive_assignment = None
         try:
+            if args.drive:
+                drive_assignment = get_drive_assignment(args.app, args.drive)
+                try:
+                    domain.devices['block'].attach(drive_assignment)
+                except:
+                    drive_assignment = None
+                    raise
+
             domain.start()
+
+            if drive_assignment:
+                # don't reconnect this device after VM reboot
+                domain.devices['block'].update_persistent(
+                    drive_assignment.device, False)
         except (IOError, OSError, qubesadmin.exc.QubesException) as e:
+            if drive_assignment:
+                try:
+                    domain.devices['block'].detach(drive_assignment)
+                except qubesadmin.exc.QubesException:
+                    pass
             exit_code = 1
             parser.print_error(str(e))