qmemmand.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. #
  2. # The Qubes OS Project, http://www.qubes-os.org
  3. #
  4. # Copyright (C) 2010 Rafal Wojtczuk <rafal@invisiblethingslab.com>
  5. #
  6. # This library is free software; you can redistribute it and/or
  7. # modify it under the terms of the GNU Lesser General Public
  8. # License as published by the Free Software Foundation; either
  9. # version 2.1 of the License, or (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. # Lesser General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU Lesser General Public
  17. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  18. #
  19. #
  20. # pylint: disable=global-statement
  21. import configparser
  22. import socketserver
  23. import logging
  24. import logging.handlers
  25. import os
  26. import socket
  27. import sys
  28. import threading
  29. from dataclasses import dataclass
  30. from typing import Callable, Any
  31. import xen.lowlevel.xs # pylint: disable=import-error
  32. import qubes.qmemman
  33. import qubes.qmemman.algo
  34. import qubes.utils
  35. SOCK_PATH = '/var/run/qubes/qmemman.sock'
  36. system_state = qubes.qmemman.SystemState()
  37. global_lock = threading.Lock()
  38. # If XSWatcher will
  39. # handle meminfo event before @introduceDomain, it will use
  40. # incomplete domain list for that and may redistribute memory
  41. # allocated to some VM, but not yet used (see #1389).
  42. # To fix that, system_state should be updated (refresh domain
  43. # list) before processing other changes, every time some process requested
  44. # memory for a new VM, before releasing the lock. Then XS_Watcher will check
  45. # this flag before processing other event.
  46. force_refresh_domain_list = False
  47. def only_in_first_list(list1, list2):
  48. ret = []
  49. for i in list1:
  50. if i not in list2:
  51. ret.append(i)
  52. return ret
  53. def get_domain_meminfo_key(domain_id):
  54. return '/local/domain/'+domain_id+'/memory/meminfo'
  55. @dataclass
  56. class WatchType:
  57. func: Callable
  58. param: Any
  59. class XSWatcher:
  60. def __init__(self):
  61. self.log = logging.getLogger('qmemman.daemon.xswatcher')
  62. self.log.debug('XS_Watcher()')
  63. self.handle = xen.lowlevel.xs.xs()
  64. self.handle.watch('@introduceDomain', WatchType(
  65. XSWatcher.domain_list_changed, False))
  66. self.handle.watch('@releaseDomain', WatchType(
  67. XSWatcher.domain_list_changed, False))
  68. self.watch_token_dict = {}
  69. def domain_list_changed(self, refresh_only=False):
  70. """
  71. Check if any domain was created/destroyed. If it was, update
  72. appropriate list. Then redistribute memory.
  73. :param refresh_only If True, only refresh domain list, do not
  74. redistribute memory. In this mode, caller must already hold
  75. global_lock.
  76. """
  77. self.log.debug('domain_list_changed(only_refresh={!r})'.format(
  78. refresh_only))
  79. got_lock = False
  80. if not refresh_only:
  81. self.log.debug('acquiring global_lock')
  82. global_lock.acquire()
  83. got_lock = True
  84. self.log.debug('global_lock acquired')
  85. try:
  86. curr = self.handle.ls('', '/local/domain')
  87. if curr is None:
  88. return
  89. # check if domain is really there, it may happen that some empty
  90. # directories are left in xenstore
  91. curr = list(filter(
  92. lambda x:
  93. self.handle.read('',
  94. '/local/domain/{}/domid'.format(x)
  95. ) is not None,
  96. curr
  97. ))
  98. self.log.debug('curr={!r}'.format(curr))
  99. for i in only_in_first_list(curr, self.watch_token_dict.keys()):
  100. # new domain has been created
  101. watch = WatchType(XSWatcher.meminfo_changed, i)
  102. self.watch_token_dict[i] = watch
  103. self.handle.watch(get_domain_meminfo_key(i), watch)
  104. system_state.add_domain(i)
  105. for i in only_in_first_list(self.watch_token_dict.keys(), curr):
  106. # domain destroyed
  107. self.handle.unwatch(get_domain_meminfo_key(i),
  108. self.watch_token_dict[i])
  109. self.watch_token_dict.pop(i)
  110. system_state.del_domain(i)
  111. except: # pylint: disable=bare-except
  112. self.log.exception('Updating domain list failed')
  113. finally:
  114. if got_lock:
  115. global_lock.release()
  116. self.log.debug('global_lock released')
  117. if not refresh_only:
  118. try:
  119. system_state.do_balance()
  120. except: # pylint: disable=bare-except
  121. self.log.exception('do_balance() failed')
  122. def meminfo_changed(self, domain_id):
  123. self.log.debug('meminfo_changed(domain_id={!r})'.format(domain_id))
  124. untrusted_meminfo_key = self.handle.read(
  125. '', get_domain_meminfo_key(domain_id))
  126. if untrusted_meminfo_key is None or untrusted_meminfo_key == b'':
  127. return
  128. self.log.debug('acquiring global_lock')
  129. global_lock.acquire()
  130. self.log.debug('global_lock acquired')
  131. try:
  132. global force_refresh_domain_list
  133. if force_refresh_domain_list:
  134. self.domain_list_changed(refresh_only=True)
  135. force_refresh_domain_list = False
  136. if domain_id not in self.watch_token_dict:
  137. # domain just destroyed
  138. return
  139. system_state.refresh_meminfo(domain_id, untrusted_meminfo_key)
  140. except: # pylint: disable=bare-except
  141. self.log.exception('Updating meminfo for %d failed', domain_id)
  142. finally:
  143. global_lock.release()
  144. self.log.debug('global_lock released')
  145. def watch_loop(self):
  146. self.log.debug('watch_loop()')
  147. while True:
  148. result = self.handle.read_watch()
  149. self.log.debug('watch_loop result={!r}'.format(result))
  150. token = result[1]
  151. token.func(self, token.param)
  152. class QMemmanReqHandler(socketserver.BaseRequestHandler):
  153. """
  154. The RequestHandler class for our server.
  155. It is instantiated once per connection to the server, and must
  156. override the handle() method to implement communication to the
  157. client.
  158. """
  159. def handle(self):
  160. self.log = logging.getLogger('qmemman.daemon.reqhandler')
  161. got_lock = False
  162. try:
  163. # self.request is the TCP socket connected to the client
  164. while True:
  165. self.data = self.request.recv(1024).strip()
  166. self.log.debug('data={!r}'.format(self.data))
  167. if len(self.data) == 0:
  168. self.log.info('client disconnected, resuming membalance')
  169. if got_lock:
  170. global force_refresh_domain_list
  171. force_refresh_domain_list = True
  172. return
  173. # XXX something is wrong here: return without release?
  174. if got_lock:
  175. self.log.warning('Second request over qmemman.sock?')
  176. return
  177. self.log.debug('acquiring global_lock')
  178. global_lock.acquire()
  179. self.log.debug('global_lock acquired')
  180. got_lock = True
  181. if system_state.do_balloon(int(self.data.decode('ascii'))):
  182. resp = b"OK\n"
  183. else:
  184. resp = b"FAIL\n"
  185. self.log.debug('resp={!r}'.format(resp))
  186. self.request.send(resp)
  187. except BaseException as e:
  188. self.log.exception(
  189. "exception while handling request: {!r}".format(e))
  190. finally:
  191. if got_lock:
  192. global_lock.release()
  193. self.log.debug('global_lock released')
  194. parser = qubes.tools.QubesArgumentParser(want_app=False)
  195. parser.add_argument('--config', '-c', metavar='FILE',
  196. action='store', default='/etc/qubes/qmemman.conf',
  197. help='qmemman config file')
  198. parser.add_argument('--foreground',
  199. action='store_true', default=False,
  200. help='do not close stdio')
  201. def main():
  202. args = parser.parse_args()
  203. # setup logging
  204. ha_syslog = logging.handlers.SysLogHandler('/dev/log')
  205. ha_syslog.setFormatter(
  206. logging.Formatter('%(name)s[%(process)d]: %(message)s'))
  207. logging.root.addHandler(ha_syslog)
  208. if args.foreground:
  209. ha_stderr = logging.StreamHandler(sys.stderr)
  210. ha_stderr.setFormatter(
  211. logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
  212. logging.root.addHandler(ha_stderr)
  213. sys.stdin.close()
  214. log = logging.getLogger('qmemman.daemon')
  215. config = configparser.SafeConfigParser({
  216. 'vm-min-mem': str(qubes.qmemman.algo.MIN_PREFMEM),
  217. 'dom0-mem-boost': str(qubes.qmemman.algo.DOM0_MEM_BOOST),
  218. 'cache-margin-factor': str(qubes.qmemman.algo.CACHE_FACTOR)
  219. })
  220. config.read(args.config)
  221. if config.has_section('global'):
  222. qubes.qmemman.algo.MIN_PREFMEM = \
  223. qubes.utils.parse_size(config.get('global', 'vm-min-mem'))
  224. qubes.qmemman.algo.DOM0_MEM_BOOST = \
  225. qubes.utils.parse_size(config.get('global', 'dom0-mem-boost'))
  226. qubes.qmemman.algo.CACHE_FACTOR = \
  227. config.getfloat('global', 'cache-margin-factor')
  228. loglevel = config.getint('global', 'log-level', fallback=30)
  229. logging.root.setLevel(loglevel)
  230. log.info('MIN_PREFMEM={algo.MIN_PREFMEM}'
  231. ' DOM0_MEM_BOOST={algo.DOM0_MEM_BOOST}'
  232. ' CACHE_FACTOR={algo.CACHE_FACTOR}'.format(
  233. algo=qubes.qmemman.algo))
  234. try:
  235. os.unlink(SOCK_PATH)
  236. except FileNotFoundError:
  237. pass
  238. log.debug('instantiating server')
  239. os.umask(0)
  240. # Initialize the connection to Xen and to XenStore
  241. system_state.init()
  242. server = socketserver.UnixStreamServer(SOCK_PATH, QMemmanReqHandler)
  243. os.umask(0o077)
  244. # notify systemd
  245. nofity_socket = os.getenv('NOTIFY_SOCKET')
  246. if nofity_socket:
  247. log.debug('notifying systemd')
  248. sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
  249. if nofity_socket.startswith('@'):
  250. nofity_socket = '\0%s' % nofity_socket[1:]
  251. sock.connect(nofity_socket)
  252. sock.sendall(b"READY=1")
  253. sock.close()
  254. threading.Thread(target=server.serve_forever).start()
  255. XSWatcher().watch_loop()