qubes/api: refactor creating multiple qubesd sockets
Now there is a single function to do this, shared with tests.
This commit is contained in:
parent
bec58fc861
commit
96a66ac6bd
@ -20,10 +20,12 @@
|
||||
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import asyncio
|
||||
import errno
|
||||
import functools
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import struct
|
||||
import traceback
|
||||
|
||||
@ -105,6 +107,10 @@ class AbstractQubesAPI(object):
|
||||
There are also two helper functions for firing events associated with API
|
||||
calls.
|
||||
'''
|
||||
|
||||
#: the preferred socket location (to be overridden in child's class)
|
||||
SOCKNAME = None
|
||||
|
||||
def __init__(self, app, src, method_name, dest, arg, send_event=None):
|
||||
#: :py:class:`qubes.Qubes` object
|
||||
self.app = app
|
||||
@ -332,27 +338,61 @@ class QubesDaemonProtocol(asyncio.Protocol):
|
||||
self.transport.write(str(exc).encode('utf-8') + b'\0')
|
||||
|
||||
|
||||
_umask_lock = asyncio.Lock()
|
||||
|
||||
@asyncio.coroutine
|
||||
def create_server(sockpath, handler, app, debug=False, *, loop=None):
|
||||
def create_servers(*args, force=False, loop=None, **kwargs):
|
||||
'''Create multiple Qubes API servers
|
||||
|
||||
:param qubes.Qubes app: the app that is a backend of the servers
|
||||
:param bool force: if :py:obj:`True`, unconditionaly remove existing \
|
||||
sockets; if :py:obj:`False`, raise an error if there is some process \
|
||||
listening to such socket
|
||||
:param asyncio.Loop loop: loop
|
||||
|
||||
*args* are supposed to be classess inheriting from
|
||||
:py:class:`AbstractQubesAPI`
|
||||
|
||||
*kwargs* (like *app* or *debug* for example) are passed to
|
||||
:py:class:`QubesDaemonProtocol` constructor
|
||||
'''
|
||||
loop = loop or asyncio.get_event_loop()
|
||||
|
||||
servers = []
|
||||
old_umask = os.umask(0o007)
|
||||
try:
|
||||
os.unlink(sockpath)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
# XXX this can be optimised with asyncio.wait() to start servers in
|
||||
# parallel, but I currently don't see the need
|
||||
for handler in args:
|
||||
sockpath = handler.SOCKNAME
|
||||
assert sockpath is not None, \
|
||||
'SOCKNAME needs to be overloaded in {}'.format(
|
||||
type(handler).__name__)
|
||||
|
||||
if os.path.exists(sockpath):
|
||||
if force:
|
||||
os.unlink(sockpath)
|
||||
else:
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
try:
|
||||
sock.connect(sockpath)
|
||||
except ConnectionRefusedError:
|
||||
# dead socket, remove it anyway
|
||||
os.unlink(sockpath)
|
||||
else:
|
||||
# woops, someone is listening
|
||||
sock.close()
|
||||
raise FileExistsError(errno.EEXIST,
|
||||
'socket already exists: {!r}'.format(sockpath))
|
||||
|
||||
with (yield from _umask_lock):
|
||||
old_umask = os.umask(0o007)
|
||||
try:
|
||||
server = yield from loop.create_unix_server(
|
||||
functools.partial(QubesDaemonProtocol,
|
||||
handler, app=app, debug=debug),
|
||||
functools.partial(QubesDaemonProtocol, handler, **kwargs),
|
||||
sockpath)
|
||||
finally:
|
||||
os.umask(old_umask)
|
||||
|
||||
for sock in server.sockets:
|
||||
shutil.chown(sock.getsockname(), group='qubes')
|
||||
for sock in server.sockets:
|
||||
shutil.chown(sock.getsockname(), group='qubes')
|
||||
|
||||
return server
|
||||
servers.append(server)
|
||||
|
||||
finally:
|
||||
os.umask(old_umask)
|
||||
|
||||
return servers
|
||||
|
@ -35,8 +35,6 @@ import qubes.utils
|
||||
import qubes.vm
|
||||
import qubes.vm.qubesvm
|
||||
|
||||
QUBESD_ADMIN_SOCK = '/var/run/qubesd.sock'
|
||||
|
||||
|
||||
class QubesMgmtEventsDispatcher(object):
|
||||
def __init__(self, filters, send_event):
|
||||
@ -75,6 +73,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
|
||||
https://www.qubes-os.org/doc/mgmt1/
|
||||
'''
|
||||
|
||||
SOCKNAME = '/var/run/qubesd.sock'
|
||||
|
||||
@qubes.api.method('admin.vmclass.List', no_payload=True)
|
||||
@asyncio.coroutine
|
||||
def vmclass_list(self):
|
||||
|
@ -29,19 +29,12 @@ import qubes.api.admin
|
||||
import qubes.vm.adminvm
|
||||
import qubes.vm.dispvm
|
||||
|
||||
QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock'
|
||||
|
||||
|
||||
class QubesInternalAPI(qubes.api.AbstractQubesAPI):
|
||||
''' Communication interface for dom0 components,
|
||||
by design the input here is trusted.'''
|
||||
#
|
||||
# PRIVATE METHODS, not to be called via RPC
|
||||
#
|
||||
|
||||
#
|
||||
# ACTUAL RPC CALLS
|
||||
#
|
||||
SOCKNAME = '/var/run/qubesd.internal.sock'
|
||||
|
||||
@qubes.api.method('internal.GetSystemInfo', no_payload=True)
|
||||
@asyncio.coroutine
|
||||
|
@ -28,10 +28,10 @@ import qubes.api
|
||||
import qubes.api.admin
|
||||
import qubes.vm.dispvm
|
||||
|
||||
QUBESD_MISC_SOCK = '/var/run/qubesd.misc.sock'
|
||||
|
||||
|
||||
class QubesMiscAPI(qubes.api.AbstractQubesAPI):
|
||||
SOCKNAME = '/var/run/qubesd.misc.sock'
|
||||
|
||||
@qubes.api.method('qubes.FeaturesRequest', no_payload=True)
|
||||
@asyncio.coroutine
|
||||
def qubes_features_request(self):
|
||||
|
@ -103,7 +103,6 @@ except OSError:
|
||||
# command not found; let's assume we're outside
|
||||
pass
|
||||
|
||||
|
||||
def skipUnlessDom0(test_item):
|
||||
'''Decorator that skips test outside dom0.
|
||||
|
||||
@ -591,12 +590,11 @@ class SystemTestsMixin(object):
|
||||
)
|
||||
os.environ['QUBES_XML_PATH'] = XMLPATH
|
||||
|
||||
self.qrexec_policy_server = self.loop.run_until_complete(
|
||||
qubes.api.create_server(
|
||||
qubes.api.internal.QUBESD_INTERNAL_SOCK,
|
||||
self.qubesd = self.loop.run_until_complete(
|
||||
qubes.api.create_servers(
|
||||
qubes.api.admin.QubesAdminAPI,
|
||||
qubes.api.internal.QubesInternalAPI,
|
||||
app=self.app,
|
||||
debug=True))
|
||||
app=self.app, debug=True))
|
||||
|
||||
def init_default_template(self, template=None):
|
||||
if template is None:
|
||||
@ -680,11 +678,13 @@ class SystemTestsMixin(object):
|
||||
self.reload_db()
|
||||
|
||||
def tearDown(self):
|
||||
# close the server before super(), because that might close the loop
|
||||
for sock in self.qrexec_policy_server.sockets:
|
||||
os.unlink(sock.getsockname())
|
||||
self.qrexec_policy_server.close()
|
||||
self.loop.run_until_complete(self.qrexec_policy_server.wait_closed())
|
||||
# close the servers before super(), because that might close the loop
|
||||
for server in self.qubesd:
|
||||
for sock in server.sockets:
|
||||
os.unlink(sock.getsockname())
|
||||
server.close()
|
||||
self.loop.run_until_complete(asyncio.wait([
|
||||
server.wait_closed() for server in self.qubesd]))
|
||||
|
||||
super(SystemTestsMixin, self).tearDown()
|
||||
self.remove_test_vms()
|
||||
|
@ -413,7 +413,7 @@ def main():
|
||||
logging.root.addHandler(ha_kmsg)
|
||||
|
||||
if not args.allow_running_along_qubesd \
|
||||
and os.path.exists(qubes.api.admin.QUBESD_ADMIN_SOCK):
|
||||
and os.path.exists(qubes.api.admin.QubesAdminAPI.SOCKNAME):
|
||||
parser.error('refusing to run until qubesd is disabled')
|
||||
|
||||
runner = unittest.TextTestRunner(stream=sys.stdout,
|
||||
|
@ -36,19 +36,11 @@ def main(args=None):
|
||||
|
||||
args.app.vmm.register_event_handlers(args.app)
|
||||
|
||||
servers = []
|
||||
servers.append(loop.run_until_complete(qubes.api.create_server(
|
||||
qubes.api.admin.QUBESD_ADMIN_SOCK,
|
||||
servers = loop.run_until_complete(qubes.api.create_servers(
|
||||
qubes.api.admin.QubesAdminAPI,
|
||||
app=args.app, debug=args.debug)))
|
||||
servers.append(loop.run_until_complete(qubes.api.create_server(
|
||||
qubes.api.internal.QUBESD_INTERNAL_SOCK,
|
||||
qubes.api.internal.QubesInternalAPI,
|
||||
app=args.app, debug=args.debug)))
|
||||
servers.append(loop.run_until_complete(qubes.api.create_server(
|
||||
qubes.api.misc.QUBESD_MISC_SOCK,
|
||||
qubes.api.misc.QubesMiscAPI,
|
||||
app=args.app, debug=args.debug)))
|
||||
app=args.app, debug=args.debug))
|
||||
|
||||
socknames = []
|
||||
for server in servers:
|
||||
@ -71,11 +63,9 @@ def main(args=None):
|
||||
try:
|
||||
os.unlink(sockname)
|
||||
except FileNotFoundError:
|
||||
# XXX
|
||||
# We had our socket unlinked by somebody else, possibly other
|
||||
# qubesd instance. That also means we probably unlinked their
|
||||
# socket when creating our server...
|
||||
pass
|
||||
args.app.log.warning(
|
||||
'socket {} got unlinked sometime before shutdown'.format(
|
||||
sockname))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user