qmemman_server.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. #!/usr/bin/python2
  2. # -*- coding: utf-8 -*-
  3. #
  4. # The Qubes OS Project, http://www.qubes-os.org
  5. #
  6. # Copyright (C) 2010 Rafal Wojtczuk <rafal@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU General Public License
  10. # as published by the Free Software Foundation; either version 2
  11. # of the License, or (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License
  19. # along with this program; if not, write to the Free Software
  20. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  21. #
  22. #
  23. import SocketServer
  24. import thread
  25. import time
  26. import xen.lowlevel.xs
  27. import sys
  28. import os
  29. import socket
  30. from qmemman import SystemState
  31. import qmemman_algo
  32. from ConfigParser import SafeConfigParser
  33. from optparse import OptionParser
  34. from qubesutils import parse_size
  35. import logging
  36. import logging.handlers
  37. config_path = '/etc/qubes/qmemman.conf'
  38. SOCK_PATH='/var/run/qubes/qmemman.sock'
  39. LOG_PATH='/var/log/qubes/qmemman.log'
  40. system_state = SystemState()
  41. global_lock = thread.allocate_lock()
  42. # If XS_Watcher will
  43. # handle meminfo event before @introduceDomain, it will use
  44. # incomplete domain list for that and may redistribute memory
  45. # allocated to some VM, but not yet used (see #1389).
  46. # To fix that, system_state should be updated (refresh domain
  47. # list) before processing other changes, every time some process requested
  48. # memory for a new VM, before releasing the lock. Then XS_Watcher will check
  49. # this flag before processing other event.
  50. force_refresh_domain_list = False
  51. def only_in_first_list(l1, l2):
  52. ret=[]
  53. for i in l1:
  54. if not i in l2:
  55. ret.append(i)
  56. return ret
  57. def get_domain_meminfo_key(domain_id):
  58. return '/local/domain/'+domain_id+'/memory/meminfo'
  59. class WatchType:
  60. def __init__(self, fn, param):
  61. self.fn = fn
  62. self.param = param
  63. class XS_Watcher:
  64. def __init__(self):
  65. self.log = logging.getLogger('qmemman.daemon.xswatcher')
  66. self.log.debug('XS_Watcher()')
  67. self.handle = xen.lowlevel.xs.xs()
  68. self.handle.watch('@introduceDomain', WatchType(
  69. XS_Watcher.domain_list_changed, False))
  70. self.handle.watch('@releaseDomain', WatchType(
  71. XS_Watcher.domain_list_changed, False))
  72. self.watch_token_dict = {}
  73. def domain_list_changed(self, refresh_only=False):
  74. """
  75. Check if any domain was created/destroyed. If it was, update
  76. appropriate list. Then redistribute memory.
  77. :param refresh_only If True, only refresh domain list, do not
  78. redistribute memory. In this mode, caller must already hold
  79. global_lock.
  80. """
  81. self.log.debug('domain_list_changed(only_refresh={!r})'.format(
  82. refresh_only))
  83. got_lock = False
  84. if not refresh_only:
  85. self.log.debug('acquiring global_lock')
  86. global_lock.acquire()
  87. got_lock = True
  88. self.log.debug('global_lock acquired')
  89. try:
  90. curr = self.handle.ls('', '/local/domain')
  91. if curr is None:
  92. return
  93. # check if domain is really there, it may happen that some empty
  94. # directories are left in xenstore
  95. curr = filter(
  96. lambda x:
  97. self.handle.read('',
  98. '/local/domain/{}/domid'.format(x)
  99. ) is not None,
  100. curr
  101. )
  102. self.log.debug('curr={!r}'.format(curr))
  103. for i in only_in_first_list(curr, self.watch_token_dict.keys()):
  104. # new domain has been created
  105. watch = WatchType(XS_Watcher.meminfo_changed, i)
  106. self.watch_token_dict[i] = watch
  107. self.handle.watch(get_domain_meminfo_key(i), watch)
  108. system_state.add_domain(i)
  109. for i in only_in_first_list(self.watch_token_dict.keys(), curr):
  110. # domain destroyed
  111. self.handle.unwatch(get_domain_meminfo_key(i), self.watch_token_dict[i])
  112. self.watch_token_dict.pop(i)
  113. system_state.del_domain(i)
  114. finally:
  115. if got_lock:
  116. global_lock.release()
  117. self.log.debug('global_lock released')
  118. if not refresh_only:
  119. system_state.do_balance()
  120. def meminfo_changed(self, domain_id):
  121. self.log.debug('meminfo_changed(domain_id={!r})'.format(domain_id))
  122. untrusted_meminfo_key = self.handle.read('', get_domain_meminfo_key(domain_id))
  123. if untrusted_meminfo_key == None or untrusted_meminfo_key == '':
  124. return
  125. self.log.debug('acquiring global_lock')
  126. global_lock.acquire()
  127. self.log.debug('global_lock acquired')
  128. if force_refresh_domain_list:
  129. self.domain_list_changed(refresh_only=True)
  130. system_state.refresh_meminfo(domain_id, untrusted_meminfo_key)
  131. global_lock.release()
  132. self.log.debug('global_lock released')
  133. def watch_loop(self):
  134. self.log.debug('watch_loop()')
  135. while True:
  136. result = self.handle.read_watch()
  137. self.log.debug('watch_loop result={!r}'.format(result))
  138. token = result[1]
  139. token.fn(self, token.param)
  140. class QMemmanReqHandler(SocketServer.BaseRequestHandler):
  141. """
  142. The RequestHandler class for our server.
  143. It is instantiated once per connection to the server, and must
  144. override the handle() method to implement communication to the
  145. client.
  146. """
  147. def handle(self):
  148. self.log = logging.getLogger('qmemman.daemon.reqhandler')
  149. got_lock = False
  150. try:
  151. # self.request is the TCP socket connected to the client
  152. while True:
  153. self.data = self.request.recv(1024).strip()
  154. self.log.debug('data={!r}'.format(self.data))
  155. if len(self.data) == 0:
  156. self.log.info('EOF')
  157. if got_lock:
  158. global force_refresh_domain_list
  159. force_refresh_domain_list = True
  160. return
  161. # XXX something is wrong here: return without release?
  162. if got_lock:
  163. self.log.warning('Second request over qmemman.sock?')
  164. return
  165. self.log.debug('acquiring global_lock')
  166. global_lock.acquire()
  167. self.log.debug('global_lock acquired')
  168. got_lock = True
  169. if system_state.do_balloon(int(self.data)):
  170. resp = "OK\n"
  171. else:
  172. resp = "FAIL\n"
  173. self.log.debug('resp={!r}'.format(resp))
  174. self.request.send(resp)
  175. except BaseException as e:
  176. self.log.exception(
  177. "exception while handling request: {!r}".format(e))
  178. finally:
  179. if got_lock:
  180. global_lock.release()
  181. self.log.debug('global_lock released')
  182. def start_server(server):
  183. server.serve_forever()
  184. class QMemmanServer:
  185. @staticmethod
  186. def main():
  187. # setup logging
  188. ha_syslog = logging.handlers.SysLogHandler('/dev/log')
  189. ha_syslog.setFormatter(
  190. logging.Formatter('%(name)s[%(process)d]: %(message)s'))
  191. logging.root.addHandler(ha_syslog)
  192. # leave log for backwards compatibility
  193. ha_file = logging.FileHandler(LOG_PATH)
  194. ha_file.setFormatter(
  195. logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
  196. logging.root.addHandler(ha_file)
  197. log = logging.getLogger('qmemman.daemon')
  198. usage = "usage: %prog [options]"
  199. parser = OptionParser(usage)
  200. parser.add_option("-c", "--config", action="store", dest="config", default=config_path)
  201. parser.add_option("-d", "--debug", action="store_true", dest="debug",
  202. default=False, help="Enable debugging")
  203. (options, args) = parser.parse_args()
  204. if options.debug:
  205. logging.root.setLevel(logging.DEBUG)
  206. # close io
  207. sys.stdin.close()
  208. sys.stdout.close()
  209. sys.stderr.close()
  210. config = SafeConfigParser({
  211. 'vm-min-mem': str(qmemman_algo.MIN_PREFMEM),
  212. 'dom0-mem-boost': str(qmemman_algo.DOM0_MEM_BOOST),
  213. 'cache-margin-factor': str(qmemman_algo.CACHE_FACTOR)
  214. })
  215. config.read(options.config)
  216. if config.has_section('global'):
  217. qmemman_algo.MIN_PREFMEM = parse_size(config.get('global', 'vm-min-mem'))
  218. qmemman_algo.DOM0_MEM_BOOST = parse_size(config.get('global', 'dom0-mem-boost'))
  219. qmemman_algo.CACHE_FACTOR = config.getfloat('global', 'cache-margin-factor')
  220. log.info('MIN_PREFMEM={qmemman_algo.MIN_PREFMEM}'
  221. ' DOM0_MEM_BOOST={qmemman_algo.DOM0_MEM_BOOST}'
  222. ' CACHE_FACTOR={qmemman_algo.CACHE_FACTOR}'.format(
  223. qmemman_algo=qmemman_algo))
  224. try:
  225. os.unlink(SOCK_PATH)
  226. except:
  227. pass
  228. log.debug('instantiating server')
  229. os.umask(0)
  230. server = SocketServer.UnixStreamServer(SOCK_PATH, QMemmanReqHandler)
  231. os.umask(077)
  232. # notify systemd
  233. nofity_socket = os.getenv('NOTIFY_SOCKET')
  234. if nofity_socket:
  235. log.debug('notifying systemd')
  236. s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
  237. if nofity_socket.startswith('@'):
  238. nofity_socket = '\0%s' % nofity_socket[1:]
  239. s.connect(nofity_socket)
  240. s.sendall("READY=1")
  241. s.close()
  242. thread.start_new_thread(start_server, tuple([server]))
  243. XS_Watcher().watch_loop()