qmemmand.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # pylint: skip-file
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2010 Rafal Wojtczuk <rafal@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. #
  21. import configparser
  22. import socketserver
  23. import logging
  24. import logging.handlers
  25. import os
  26. import socket
  27. import sys
  28. import threading
  29. import xen.lowlevel.xs
  30. import qubes.qmemman
  31. import qubes.qmemman.algo
  32. import qubes.utils
  33. SOCK_PATH = '/var/run/qubes/qmemman.sock'
  34. LOG_PATH = '/var/log/qubes/qmemman.log'
  35. system_state = qubes.qmemman.SystemState()
  36. global_lock = threading.Lock()
  37. # If XS_Watcher will
  38. # handle meminfo event before @introduceDomain, it will use
  39. # incomplete domain list for that and may redistribute memory
  40. # allocated to some VM, but not yet used (see #1389).
  41. # To fix that, system_state should be updated (refresh domain
  42. # list) before processing other changes, every time some process requested
  43. # memory for a new VM, before releasing the lock. Then XS_Watcher will check
  44. # this flag before processing other event.
  45. force_refresh_domain_list = False
  46. def only_in_first_list(l1, l2):
  47. ret = []
  48. for i in l1:
  49. if not i in l2:
  50. ret.append(i)
  51. return ret
  52. def get_domain_meminfo_key(domain_id):
  53. return '/local/domain/'+domain_id+'/memory/meminfo'
  54. class WatchType(object):
  55. def __init__(self, fn, param):
  56. self.fn = fn
  57. self.param = param
  58. class XS_Watcher(object):
  59. def __init__(self):
  60. self.log = logging.getLogger('qmemman.daemon.xswatcher')
  61. self.log.debug('XS_Watcher()')
  62. self.handle = xen.lowlevel.xs.xs()
  63. self.handle.watch('@introduceDomain', WatchType(
  64. XS_Watcher.domain_list_changed, False))
  65. self.handle.watch('@releaseDomain', WatchType(
  66. XS_Watcher.domain_list_changed, False))
  67. self.watch_token_dict = {}
  68. def domain_list_changed(self, refresh_only=False):
  69. """
  70. Check if any domain was created/destroyed. If it was, update
  71. appropriate list. Then redistribute memory.
  72. :param refresh_only If True, only refresh domain list, do not
  73. redistribute memory. In this mode, caller must already hold
  74. global_lock.
  75. """
  76. self.log.debug('domain_list_changed(only_refresh={!r})'.format(
  77. refresh_only))
  78. got_lock = False
  79. if not refresh_only:
  80. self.log.debug('acquiring global_lock')
  81. global_lock.acquire()
  82. got_lock = True
  83. self.log.debug('global_lock acquired')
  84. try:
  85. curr = self.handle.ls('', '/local/domain')
  86. if curr is None:
  87. return
  88. # check if domain is really there, it may happen that some empty
  89. # directories are left in xenstore
  90. curr = list(filter(
  91. lambda x:
  92. self.handle.read('',
  93. '/local/domain/{}/domid'.format(x)
  94. ) is not None,
  95. curr
  96. ))
  97. self.log.debug('curr={!r}'.format(curr))
  98. for i in only_in_first_list(curr, self.watch_token_dict.keys()):
  99. # new domain has been created
  100. watch = WatchType(XS_Watcher.meminfo_changed, i)
  101. self.watch_token_dict[i] = watch
  102. self.handle.watch(get_domain_meminfo_key(i), watch)
  103. system_state.add_domain(i)
  104. for i in only_in_first_list(self.watch_token_dict.keys(), curr):
  105. # domain destroyed
  106. self.handle.unwatch(get_domain_meminfo_key(i), self.watch_token_dict[i])
  107. self.watch_token_dict.pop(i)
  108. system_state.del_domain(i)
  109. finally:
  110. if got_lock:
  111. global_lock.release()
  112. self.log.debug('global_lock released')
  113. if not refresh_only:
  114. system_state.do_balance()
  115. def meminfo_changed(self, domain_id):
  116. self.log.debug('meminfo_changed(domain_id={!r})'.format(domain_id))
  117. untrusted_meminfo_key = self.handle.read(
  118. '', get_domain_meminfo_key(domain_id))
  119. if untrusted_meminfo_key == None or untrusted_meminfo_key == b'':
  120. return
  121. self.log.debug('acquiring global_lock')
  122. global_lock.acquire()
  123. self.log.debug('global_lock acquired')
  124. try:
  125. if force_refresh_domain_list:
  126. self.domain_list_changed(refresh_only=True)
  127. system_state.refresh_meminfo(domain_id, untrusted_meminfo_key)
  128. finally:
  129. global_lock.release()
  130. self.log.debug('global_lock released')
  131. def watch_loop(self):
  132. self.log.debug('watch_loop()')
  133. while True:
  134. result = self.handle.read_watch()
  135. self.log.debug('watch_loop result={!r}'.format(result))
  136. token = result[1]
  137. token.fn(self, token.param)
  138. class QMemmanReqHandler(socketserver.BaseRequestHandler):
  139. """
  140. The RequestHandler class for our server.
  141. It is instantiated once per connection to the server, and must
  142. override the handle() method to implement communication to the
  143. client.
  144. """
  145. def handle(self):
  146. self.log = logging.getLogger('qmemman.daemon.reqhandler')
  147. got_lock = False
  148. try:
  149. # self.request is the TCP socket connected to the client
  150. while True:
  151. self.data = self.request.recv(1024).strip()
  152. self.log.debug('data={!r}'.format(self.data))
  153. if len(self.data) == 0:
  154. self.log.info('EOF')
  155. if got_lock:
  156. global force_refresh_domain_list
  157. force_refresh_domain_list = True
  158. return
  159. # XXX something is wrong here: return without release?
  160. if got_lock:
  161. self.log.warning('Second request over qmemman.sock?')
  162. return
  163. self.log.debug('acquiring global_lock')
  164. global_lock.acquire()
  165. self.log.debug('global_lock acquired')
  166. got_lock = True
  167. if system_state.do_balloon(int(self.data.decode('ascii'))):
  168. resp = b"OK\n"
  169. else:
  170. resp = b"FAIL\n"
  171. self.log.debug('resp={!r}'.format(resp))
  172. self.request.send(resp)
  173. except BaseException as e:
  174. self.log.exception(
  175. "exception while handling request: {!r}".format(e))
  176. finally:
  177. if got_lock:
  178. global_lock.release()
  179. self.log.debug('global_lock released')
  180. parser = qubes.tools.QubesArgumentParser(want_app=False)
  181. parser.add_argument('--config', '-c', metavar='FILE',
  182. action='store', default='/etc/qubes/qmemman.conf',
  183. help='qmemman config file')
  184. parser.add_argument('--foreground',
  185. action='store_true', default=False,
  186. help='do not close stdio')
  187. def main():
  188. args = parser.parse_args()
  189. # setup logging
  190. ha_syslog = logging.handlers.SysLogHandler('/dev/log')
  191. ha_syslog.setFormatter(
  192. logging.Formatter('%(name)s[%(process)d]: %(message)s'))
  193. logging.root.addHandler(ha_syslog)
  194. # leave log for backwards compatibility
  195. ha_file = logging.FileHandler(LOG_PATH)
  196. ha_file.setFormatter(
  197. logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
  198. logging.root.addHandler(ha_file)
  199. if args.foreground:
  200. ha_stderr = logging.StreamHandler(sys.stderr)
  201. ha_file.setFormatter(
  202. logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
  203. logging.root.addHandler(ha_stderr)
  204. sys.stdin.close()
  205. logging.root.setLevel(parser.get_loglevel_from_verbosity(args))
  206. log = logging.getLogger('qmemman.daemon')
  207. config = configparser.SafeConfigParser({
  208. 'vm-min-mem': str(qubes.qmemman.algo.MIN_PREFMEM),
  209. 'dom0-mem-boost': str(qubes.qmemman.algo.DOM0_MEM_BOOST),
  210. 'cache-margin-factor': str(qubes.qmemman.algo.CACHE_FACTOR)
  211. })
  212. config.read(args.config)
  213. if config.has_section('global'):
  214. qubes.qmemman.algo.MIN_PREFMEM = \
  215. qubes.utils.parse_size(config.get('global', 'vm-min-mem'))
  216. qubes.qmemman.algo.DOM0_MEM_BOOST = \
  217. qubes.utils.parse_size(config.get('global', 'dom0-mem-boost'))
  218. qubes.qmemman.algo.CACHE_FACTOR = \
  219. config.getfloat('global', 'cache-margin-factor')
  220. log.info('MIN_PREFMEM={algo.MIN_PREFMEM}'
  221. ' DOM0_MEM_BOOST={algo.DOM0_MEM_BOOST}'
  222. ' CACHE_FACTOR={algo.CACHE_FACTOR}'.format(
  223. algo=qubes.qmemman.algo))
  224. try:
  225. os.unlink(SOCK_PATH)
  226. except:
  227. pass
  228. log.debug('instantiating server')
  229. os.umask(0)
  230. server = socketserver.UnixStreamServer(SOCK_PATH, QMemmanReqHandler)
  231. os.umask(0o077)
  232. # notify systemd
  233. nofity_socket = os.getenv('NOTIFY_SOCKET')
  234. if nofity_socket:
  235. log.debug('notifying systemd')
  236. s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
  237. if nofity_socket.startswith('@'):
  238. nofity_socket = '\0%s' % nofity_socket[1:]
  239. s.connect(nofity_socket)
  240. s.sendall(b"READY=1")
  241. s.close()
  242. threading.Thread(target=server.serve_forever).start()
  243. XS_Watcher().watch_loop()