|
@@ -17,6 +17,19 @@
|
|
|
#
|
|
|
# You should have received a copy of the GNU Lesser General Public License along
|
|
|
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+import socket
|
|
|
+import subprocess
|
|
|
+import tempfile
|
|
|
+import unittest
|
|
|
+
|
|
|
+import multiprocessing
|
|
|
+
|
|
|
+try:
|
|
|
+ import unittest.mock as mock
|
|
|
+except ImportError:
|
|
|
+ import mock
|
|
|
|
|
|
import qubesmgmt.tests
|
|
|
|
|
@@ -181,3 +194,188 @@ class TC_10_QubesBase(qubesmgmt.tests.QubesTestCase):
|
|
|
pools={'private': 'some-pool', 'volatile': 'other-pool'})
|
|
|
self.assertEqual(new_vm.name, 'new-name')
|
|
|
self.assertAllCalled()
|
|
|
+
|
|
|
+
|
|
|
+class TC_20_QubesLocal(unittest.TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ super(TC_20_QubesLocal, self).setUp()
|
|
|
+ self.socket_dir = tempfile.mkdtemp()
|
|
|
+ self.orig_sock = qubesmgmt.config.QUBESD_SOCKET
|
|
|
+ qubesmgmt.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
|
|
|
+ self.proc = None
|
|
|
+ self.app = qubesmgmt.app.QubesLocal()
|
|
|
+
|
|
|
+ def listen_and_send(self, send_data):
|
|
|
+ '''Listen on socket and send data in response.
|
|
|
+
|
|
|
+ :param bytes send_data: data to send
|
|
|
+ '''
|
|
|
+ self.socket_pipe, child_pipe = multiprocessing.Pipe()
|
|
|
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
+ self.socket.bind(os.path.join(self.socket_dir, 'sock'))
|
|
|
+ self.socket.listen(1)
|
|
|
+
|
|
|
+ def worker(sock, pipe, send_data_):
|
|
|
+ conn, addr = sock.accept()
|
|
|
+ pipe.send(conn.makefile('rb').read())
|
|
|
+ conn.sendall(send_data_)
|
|
|
+ conn.close()
|
|
|
+ self.proc = multiprocessing.Process(target=worker,
|
|
|
+ args=(self.socket, child_pipe, send_data))
|
|
|
+ self.proc.start()
|
|
|
+ self.socket.close()
|
|
|
+
|
|
|
+ def get_request(self):
|
|
|
+ '''Get request sent to "qubesd" mock'''
|
|
|
+ return self.socket_pipe.recv()
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ qubesmgmt.config.QUBESD_SOCKET = self.orig_sock
|
|
|
+ if self.proc is not None:
|
|
|
+ try:
|
|
|
+ self.proc.terminate()
|
|
|
+ except OSError:
|
|
|
+ pass
|
|
|
+ shutil.rmtree(self.socket_dir)
|
|
|
+ super(TC_20_QubesLocal, self).tearDown()
|
|
|
+
|
|
|
+ def test_000_qubesd_call(self):
|
|
|
+ self.listen_and_send(b'0\0')
|
|
|
+ self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
|
|
|
+ self.assertEqual(self.get_request(),
|
|
|
+ b'dom0\0some.method\0test-vm\0arg1\0payload')
|
|
|
+
|
|
|
+ def test_001_qubesd_call_none_arg(self):
|
|
|
+ self.listen_and_send(b'0\0')
|
|
|
+ self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
|
|
|
+ self.assertEqual(self.get_request(),
|
|
|
+ b'dom0\0some.method\0test-vm\0\0payload')
|
|
|
+
|
|
|
+ def test_002_qubesd_call_none_payload(self):
|
|
|
+ self.listen_and_send(b'0\0')
|
|
|
+ self.app.qubesd_call('test-vm', 'some.method', None, None)
|
|
|
+ self.assertEqual(self.get_request(),
|
|
|
+ b'dom0\0some.method\0test-vm\0\0')
|
|
|
+
|
|
|
+ def test_010_run_service(self):
|
|
|
+ self.listen_and_send(b'0\0')
|
|
|
+ with mock.patch('subprocess.Popen') as mock_proc:
|
|
|
+ p = self.app.run_service('some-vm', 'service.name')
|
|
|
+ mock_proc.assert_called_once_with([
|
|
|
+ qubesmgmt.config.QREXEC_CLIENT,
|
|
|
+ '-d', 'some-vm', 'DEFAULT:QUBESRPC service.name dom0'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE)
|
|
|
+
|
|
|
+ self.assertEqual(self.get_request(),
|
|
|
+ b'dom0\0mgmt.vm.Start\0some-vm\0\0')
|
|
|
+
|
|
|
+ def test_011_run_service_filter_esc(self):
|
|
|
+ self.listen_and_send(b'0\0')
|
|
|
+ with mock.patch('subprocess.Popen') as mock_proc:
|
|
|
+ p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
|
|
|
+ mock_proc.assert_called_once_with([
|
|
|
+ qubesmgmt.config.QREXEC_CLIENT,
|
|
|
+ '-d', 'some-vm', '-t', '-T',
|
|
|
+ 'DEFAULT:QUBESRPC service.name dom0'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE)
|
|
|
+
|
|
|
+ self.assertEqual(self.get_request(),
|
|
|
+ b'dom0\0mgmt.vm.Start\0some-vm\0\0')
|
|
|
+
|
|
|
+ def test_012_run_service_user(self):
|
|
|
+ self.listen_and_send(b'0\0')
|
|
|
+ with mock.patch('subprocess.Popen') as mock_proc:
|
|
|
+ p = self.app.run_service('some-vm', 'service.name', user='user')
|
|
|
+ mock_proc.assert_called_once_with([
|
|
|
+ qubesmgmt.config.QREXEC_CLIENT,
|
|
|
+ '-d', 'some-vm',
|
|
|
+ 'user:QUBESRPC service.name dom0'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE)
|
|
|
+
|
|
|
+ self.assertEqual(self.get_request(),
|
|
|
+ b'dom0\0mgmt.vm.Start\0some-vm\0\0')
|
|
|
+
|
|
|
+ def test_013_run_service_default_target(self):
|
|
|
+ with self.assertRaises(ValueError):
|
|
|
+ self.app.run_service('', 'service.name')
|
|
|
+
|
|
|
+
|
|
|
+class TC_30_QubesRemote(unittest.TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ super(TC_30_QubesRemote, self).setUp()
|
|
|
+ self.proc_mock = mock.Mock()
|
|
|
+ self.proc_mock.configure_mock(**{
|
|
|
+ 'return_value.returncode': 0
|
|
|
+ })
|
|
|
+ self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
|
|
|
+ self.proc_patch.start()
|
|
|
+ self.app = qubesmgmt.app.QubesRemote()
|
|
|
+
|
|
|
+ def set_proc_stdout(self, send_data):
|
|
|
+ self.proc_mock.configure_mock(**{
|
|
|
+ 'return_value.communicate.return_value': (send_data, None)
|
|
|
+ })
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ self.proc_patch.stop()
|
|
|
+ super(TC_30_QubesRemote, self).tearDown()
|
|
|
+
|
|
|
+ def test_000_qubesd_call(self):
|
|
|
+ self.set_proc_stdout(b'0\0')
|
|
|
+ self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
|
|
|
+ self.assertEqual(self.proc_mock.mock_calls, [
|
|
|
+ mock.call([qubesmgmt.config.QREXEC_CLIENT_VM, 'test-vm',
|
|
|
+ 'some.method+arg1'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE),
|
|
|
+ mock.call().communicate(b'payload')
|
|
|
+ ])
|
|
|
+
|
|
|
+ def test_001_qubesd_call_none_arg(self):
|
|
|
+ self.set_proc_stdout(b'0\0')
|
|
|
+ self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
|
|
|
+ self.assertEqual(self.proc_mock.mock_calls, [
|
|
|
+ mock.call([qubesmgmt.config.QREXEC_CLIENT_VM, 'test-vm',
|
|
|
+ 'some.method'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE),
|
|
|
+ mock.call().communicate(b'payload')
|
|
|
+ ])
|
|
|
+
|
|
|
+ def test_002_qubesd_call_none_payload(self):
|
|
|
+ self.set_proc_stdout(b'0\0')
|
|
|
+ self.app.qubesd_call('test-vm', 'some.method', None, None)
|
|
|
+ self.assertEqual(self.proc_mock.mock_calls, [
|
|
|
+ mock.call([qubesmgmt.config.QREXEC_CLIENT_VM, 'test-vm',
|
|
|
+ 'some.method'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE),
|
|
|
+ mock.call().communicate(None)
|
|
|
+ ])
|
|
|
+
|
|
|
+ def test_010_run_service(self):
|
|
|
+ self.app.run_service('some-vm', 'service.name')
|
|
|
+ self.proc_mock.assert_called_once_with([
|
|
|
+ qubesmgmt.config.QREXEC_CLIENT_VM,
|
|
|
+ 'some-vm', 'service.name'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE)
|
|
|
+
|
|
|
+ def test_011_run_service_filter_esc(self):
|
|
|
+ with self.assertRaises(NotImplementedError):
|
|
|
+ p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
|
|
|
+
|
|
|
+ def test_012_run_service_user(self):
|
|
|
+ with self.assertRaises(ValueError):
|
|
|
+ p = self.app.run_service('some-vm', 'service.name', user='user')
|
|
|
+
|
|
|
+ def test_013_run_service_default_target(self):
|
|
|
+ self.app.run_service('', 'service.name')
|
|
|
+ self.proc_mock.assert_called_once_with([
|
|
|
+ qubesmgmt.config.QREXEC_CLIENT_VM,
|
|
|
+ '', 'service.name'],
|
|
|
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE)
|