Merge branch 'sane-and-pretty' of ssh://git.qubes-os.org/var/lib/qubes/git/rafal/core

This commit is contained in:
Marek Marczykowski 2011-05-09 12:25:14 +02:00
commit 3d92e50792
10 changed files with 477 additions and 296 deletions

View File

@ -8,7 +8,7 @@
#include <stdio.h> #include <stdio.h>
#include "filecopy.h" #include "filecopy.h"
char namebuf[MAX_PATH_LENGTH]; char untrusted_namebuf[MAX_PATH_LENGTH];
void notify_progress(int p1, int p2) void notify_progress(int p1, int p2)
{ {
} }
@ -22,28 +22,30 @@ void do_exit(int code)
} }
void fix_times_and_perms(struct file_header *hdr, char *name) void fix_times_and_perms(struct file_header *untrusted_hdr,
char *untrusted_name)
{ {
struct timeval times[2] = struct timeval times[2] =
{ {hdr->atime, hdr->atime_nsec / 1000}, {hdr->mtime, { {untrusted_hdr->atime, untrusted_hdr->atime_nsec / 1000},
hdr->mtime_nsec / 1000} {untrusted_hdr->mtime,
untrusted_hdr->mtime_nsec / 1000}
}; };
if (chmod(name, hdr->mode & 07777)) if (chmod(untrusted_name, untrusted_hdr->mode & 07777)) /* safe because of chroot */
do_exit(errno); do_exit(errno);
if (utimes(name, times)) if (utimes(untrusted_name, times)) /* as above */
do_exit(errno); do_exit(errno);
} }
void process_one_file_reg(struct file_header *hdr, char *name) void process_one_file_reg(struct file_header *untrusted_hdr,
char *untrusted_name)
{ {
int ret; int ret;
int fdout = int fdout = open(untrusted_name, O_WRONLY | O_CREAT | O_EXCL | O_NOFOLLOW, 0700); /* safe because of chroot */
open(name, O_WRONLY | O_CREAT | O_EXCL | O_NOFOLLOW, 0700);
if (fdout < 0) if (fdout < 0)
do_exit(errno); do_exit(errno);
ret = copy_file(fdout, 0, hdr->filelen); ret = copy_file(fdout, 0, untrusted_hdr->filelen);
if (ret != COPY_FILE_OK) { if (ret != COPY_FILE_OK) {
if (ret == COPY_FILE_READ_EOF if (ret == COPY_FILE_READ_EOF
|| ret == COPY_FILE_READ_ERROR) || ret == COPY_FILE_READ_ERROR)
@ -52,47 +54,53 @@ void process_one_file_reg(struct file_header *hdr, char *name)
do_exit(errno); do_exit(errno);
} }
close(fdout); close(fdout);
fix_times_and_perms(hdr, name); fix_times_and_perms(untrusted_hdr, untrusted_name);
} }
void process_one_file_dir(struct file_header *hdr, char *name) void process_one_file_dir(struct file_header *untrusted_hdr,
char *untrusted_name)
{ {
// fix perms only when the directory is sent for the second time // fix perms only when the directory is sent for the second time
// it allows to transfer r.x directory contents, as we create it rwx initially // it allows to transfer r.x directory contents, as we create it rwx initially
if (!mkdir(name, 0700)) if (!mkdir(untrusted_name, 0700)) /* safe because of chroot */
return; return;
if (errno != EEXIST) if (errno != EEXIST)
do_exit(errno); do_exit(errno);
fix_times_and_perms(hdr, name); fix_times_and_perms(untrusted_hdr, untrusted_name);
} }
void process_one_file_link(struct file_header *hdr, char *name) void process_one_file_link(struct file_header *untrusted_hdr,
char *untrusted_name)
{ {
char content[MAX_PATH_LENGTH]; char untrusted_content[MAX_PATH_LENGTH];
if (hdr->filelen > MAX_PATH_LENGTH - 1) unsigned int filelen;
if (untrusted_hdr->filelen > MAX_PATH_LENGTH - 1)
do_exit(ENAMETOOLONG); do_exit(ENAMETOOLONG);
if (!read_all(0, content, hdr->filelen)) filelen = untrusted_hdr->filelen; /* sanitized above */
if (!read_all(0, untrusted_content, filelen))
do_exit(LEGAL_EOF); // hopefully remote has produced error message do_exit(LEGAL_EOF); // hopefully remote has produced error message
content[hdr->filelen] = 0; untrusted_content[filelen] = 0;
if (symlink(content, name)) if (symlink(untrusted_content, untrusted_name)) /* safe because of chroot */
do_exit(errno); do_exit(errno);
} }
void process_one_file(struct file_header *hdr) void process_one_file(struct file_header *untrusted_hdr)
{ {
if (hdr->namelen > MAX_PATH_LENGTH - 1) unsigned int namelen;
if (untrusted_hdr->namelen > MAX_PATH_LENGTH - 1)
do_exit(ENAMETOOLONG); do_exit(ENAMETOOLONG);
if (!read_all(0, namebuf, hdr->namelen)) namelen = untrusted_hdr->namelen; /* sanitized above */
if (!read_all(0, untrusted_namebuf, namelen))
do_exit(LEGAL_EOF); // hopefully remote has produced error message do_exit(LEGAL_EOF); // hopefully remote has produced error message
namebuf[hdr->namelen] = 0; untrusted_namebuf[namelen] = 0;
if (S_ISREG(hdr->mode)) if (S_ISREG(untrusted_hdr->mode))
process_one_file_reg(hdr, namebuf); process_one_file_reg(untrusted_hdr, untrusted_namebuf);
else if (S_ISLNK(hdr->mode)) else if (S_ISLNK(untrusted_hdr->mode))
process_one_file_link(hdr, namebuf); process_one_file_link(untrusted_hdr, untrusted_namebuf);
else if (S_ISDIR(hdr->mode)) else if (S_ISDIR(untrusted_hdr->mode))
process_one_file_dir(hdr, namebuf); process_one_file_dir(untrusted_hdr, untrusted_namebuf);
else else
do_exit(EINVAL); do_exit(EINVAL);
} }
@ -100,9 +108,9 @@ void process_one_file(struct file_header *hdr)
void do_unpack(int fd) void do_unpack(int fd)
{ {
global_status_fd = fd; global_status_fd = fd;
struct file_header hdr; struct file_header untrusted_hdr;
while (read_all(0, &hdr, sizeof hdr)) while (read_all(0, &untrusted_hdr, sizeof untrusted_hdr))
process_one_file(&hdr); process_one_file(&untrusted_hdr);
if (errno) if (errno)
do_exit(errno); do_exit(errno);
else else

View File

@ -7,11 +7,11 @@ import os
class DomainState: class DomainState:
def __init__(self, id): def __init__(self, id):
self.meminfo = None self.meminfo = None #dictionary of memory info read from client
self.memory_actual = None self.memory_actual = None #the current memory size
self.mem_used = None self.mem_used = None #used memory, computed based on meminfo
self.id = id self.id = id #domain id
self.last_target = 0 self.last_target = 0 #the last memset target
class SystemState: class SystemState:
def __init__(self): def __init__(self):
@ -36,6 +36,7 @@ class SystemState:
# ret = host_metrics_record["memory_free"] # ret = host_metrics_record["memory_free"]
# return long(ret) # return long(ret)
#refresh information on memory assigned to all domains
def refresh_memactual(self): def refresh_memactual(self):
for domain in self.xc.domain_getinfo(): for domain in self.xc.domain_getinfo():
id = str(domain['domid']) id = str(domain['domid'])
@ -67,6 +68,7 @@ class SystemState:
except XenAPI.Failure: except XenAPI.Failure:
pass pass
#perform memory ballooning, across all domains, to add "memsize" to Xen free memory
def do_balloon(self, memsize): def do_balloon(self, memsize):
MAX_TRIES = 20 MAX_TRIES = 20
niter = 0 niter = 0
@ -82,6 +84,7 @@ class SystemState:
if prev_memory_actual is not None: if prev_memory_actual is not None:
for i in prev_memory_actual.keys(): for i in prev_memory_actual.keys():
if prev_memory_actual[i] == self.domdict[i].memory_actual: if prev_memory_actual[i] == self.domdict[i].memory_actual:
#domain not responding to memset requests, remove it from donors
self.domdict[i].no_progress = True self.domdict[i].no_progress = True
print 'domain', i, 'stuck at', self.domdict[i].memory_actual print 'domain', i, 'stuck at', self.domdict[i].memory_actual
memset_reqs = qmemman_algo.balloon(memsize + self.XEN_FREE_MEM_LEFT - xenfree, self.domdict) memset_reqs = qmemman_algo.balloon(memsize + self.XEN_FREE_MEM_LEFT - xenfree, self.domdict)
@ -96,10 +99,12 @@ class SystemState:
time.sleep(self.BALOON_DELAY) time.sleep(self.BALOON_DELAY)
niter = niter + 1 niter = niter + 1
def refresh_meminfo(self, domid, val): def refresh_meminfo(self, domid, untrusted_meminfo_key):
qmemman_algo.refresh_meminfo_for_domain(self.domdict[domid], val) qmemman_algo.refresh_meminfo_for_domain(self.domdict[domid], untrusted_meminfo_key)
self.do_balance() self.do_balance()
#is the computed balance request big enough ?
#so that we do not trash with small adjustments
def is_balance_req_significant(self, memset_reqs, xenfree): def is_balance_req_significant(self, memset_reqs, xenfree):
total_memory_transfer = 0 total_memory_transfer = 0
MIN_TOTAL_MEMORY_TRANSFER = 150*1024*1024 MIN_TOTAL_MEMORY_TRANSFER = 150*1024*1024

View File

@ -1,74 +1,90 @@
import string import string
def parse_meminfo(meminfo): #untrusted meminfo size is taken from xenstore key, thus its size is limited
dict = {} #so splits do not require excessive memory
l1 = string.split(meminfo,"\n") def parse_meminfo(untrusted_meminfo):
for i in l1: untrusted_dict = {}
l2 = string.split(i) #split meminfo contents into lines
if len(l2) >= 2: untrusted_lines = string.split(untrusted_meminfo,"\n")
dict[string.rstrip(l2[0], ":")] = l2[1] for untrusted_lines_iterator in untrusted_lines:
#split a single meminfo line into words
untrusted_words = string.split(untrusted_lines_iterator)
if len(untrusted_words) >= 2:
untrusted_dict[string.rstrip(untrusted_words[0], ":")] = untrusted_words[1]
return untrusted_dict
def is_meminfo_suspicious(domain, untrusted_meminfo):
ret = False
#check whether the required keys exist and are not negative
try: try:
for i in ('MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapTotal', 'SwapFree'): for i in ('MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapTotal', 'SwapFree'):
val = int(dict[i])*1024 val = int(untrusted_meminfo[i])*1024
if (val < 0): if (val < 0):
return None ret = True
dict[i] = val untrusted_meminfo[i] = val
except: except:
return None
if dict['SwapTotal'] < dict['SwapFree']:
return None
return dict
def is_suspicious(dom):
ret = False
if dom.meminfo['SwapTotal'] < dom.meminfo['SwapFree']:
ret = True ret = True
if dom.meminfo['MemTotal'] < dom.meminfo['MemFree'] + dom.meminfo['Cached'] + dom.meminfo['Buffers']:
if not ret and untrusted_meminfo['SwapTotal'] < untrusted_meminfo['SwapFree']:
ret = True ret = True
if not ret and untrusted_meminfo['MemTotal'] < untrusted_meminfo['MemFree'] + untrusted_meminfo['Cached'] + untrusted_meminfo['Buffers']:
ret = True
#we could also impose some limits on all the above values
#but it has little purpose - all the domain can gain by passing e.g.
#very large SwapTotal is that it will be assigned all free Xen memory
#it can be achieved with legal values, too, and it will not allow to
#starve existing domains, by design
if ret: if ret:
print 'suspicious meminfo for domain', dom.id, 'mem actual', dom.memory_actual, dom.meminfo print 'suspicious meminfo for domain', domain.id, 'mem actual', domain.memory_actual, untrusted_meminfo
return ret return ret
def refresh_meminfo_for_domain(dom, xenstore_key): #called when a domain updates its 'meminfo' xenstore key
meminfo = parse_meminfo(xenstore_key) def refresh_meminfo_for_domain(domain, untrusted_xenstore_key):
dom.meminfo = meminfo untrusted_meminfo = parse_meminfo(untrusted_xenstore_key)
if meminfo is None: if untrusted_meminfo is None:
domain.meminfo = None
return return
if is_suspicious(dom): #sanitize start
dom.meminfo = None if is_meminfo_suspicious(domain, untrusted_meminfo):
dom.mem_used = None #sanitize end
domain.meminfo = None
domain.mem_used = None
else: else:
dom.mem_used = dom.meminfo['MemTotal'] - dom.meminfo['MemFree'] - dom.meminfo['Cached'] - dom.meminfo['Buffers'] + dom.meminfo['SwapTotal'] - dom.meminfo['SwapFree'] #sanitized, can assign
domain.meminfo = untrusted_meminfo
domain.mem_used = domain.meminfo['MemTotal'] - domain.meminfo['MemFree'] - domain.meminfo['Cached'] - domain.meminfo['Buffers'] + domain.meminfo['SwapTotal'] - domain.meminfo['SwapFree']
def prefmem(dom): def prefmem(domain):
CACHE_FACTOR = 1.3 CACHE_FACTOR = 1.3
#dom0 is special, as it must have large cache, for vbds. Thus, give it a special boost #dom0 is special, as it must have large cache, for vbds. Thus, give it a special boost
if dom.id == '0': if domain.id == '0':
return dom.mem_used*CACHE_FACTOR + 350*1024*1024 return domain.mem_used*CACHE_FACTOR + 350*1024*1024
return dom.mem_used*CACHE_FACTOR return domain.mem_used*CACHE_FACTOR
def memneeded(dom): def memory_needed(domain):
#do not change #do not change
#in balance(), "distribute totalsum proportionally to mempref" relies on this exact formula #in balance(), "distribute total_available_memory proportionally to mempref" relies on this exact formula
ret = prefmem(dom) - dom.memory_actual ret = prefmem(domain) - domain.memory_actual
return ret return ret
#prepare list of (domain, memory_target) pairs that need to be passed
def balloon(memsize, domdict): #to "xm memset" equivalent in order to obtain "memsize" of memory
#return empty list when the request cannot be satisfied
def balloon(memsize, domain_dictionary):
REQ_SAFETY_NET_FACTOR = 1.05 REQ_SAFETY_NET_FACTOR = 1.05
donors = list() donors = list()
request = list() request = list()
available = 0 available = 0
for i in domdict.keys(): for i in domain_dictionary.keys():
if domdict[i].meminfo is None: if domain_dictionary[i].meminfo is None:
continue continue
if domdict[i].no_progress: if domain_dictionary[i].no_progress:
continue continue
need = memneeded(domdict[i]) need = memory_needed(domain_dictionary[i])
if need < 0: if need < 0:
print 'balloon: dom' , i, 'has actual memory', domdict[i].memory_actual print 'balloon: dom' , i, 'has actual memory', domain_dictionary[i].memory_actual
donors.append((i,-need)) donors.append((i,-need))
available-=need available-=need
print 'req=', memsize, 'avail=', available, 'donors', donors print 'req=', memsize, 'avail=', available, 'donors', donors
@ -79,78 +95,92 @@ def balloon(memsize, domdict):
id, mem = donors_iter id, mem = donors_iter
memborrowed = mem*scale*REQ_SAFETY_NET_FACTOR memborrowed = mem*scale*REQ_SAFETY_NET_FACTOR
print 'borrow' , memborrowed, 'from', id print 'borrow' , memborrowed, 'from', id
memtarget = int(domdict[id].memory_actual - memborrowed) memtarget = int(domain_dictionary[id].memory_actual - memborrowed)
request.append((id, memtarget)) request.append((id, memtarget))
return request return request
# REQ_SAFETY_NET_FACTOR is a bit greater that 1. So that if the domain yields a bit less than requested, due # REQ_SAFETY_NET_FACTOR is a bit greater that 1. So that if the domain yields a bit less than requested, due
# to e.g. rounding errors, we will not get stuck. The surplus will return to the VM during "balance" call. # to e.g. rounding errors, we will not get stuck. The surplus will return to the VM during "balance" call.
#redistribute positive "totalsum" of memory between domains, proportionally to prefmem #redistribute positive "total_available_memory" of memory between domains, proportionally to prefmem
def balance_when_enough_memory(domdict, xenfree, total_mem_pref, totalsum): def balance_when_enough_memory(domain_dictionary, xen_free_memory, total_mem_pref, total_available_memory):
donors_rq = list() donors_rq = list()
acceptors_rq = list() acceptors_rq = list()
for i in domdict.keys(): for i in domain_dictionary.keys():
if domdict[i].meminfo is None: if domain_dictionary[i].meminfo is None:
continue continue
#distribute totalsum proportionally to mempref #distribute total_available_memory proportionally to mempref
scale = 1.0*prefmem(domdict[i])/total_mem_pref scale = 1.0*prefmem(domain_dictionary[i])/total_mem_pref
target_nonint = prefmem(domdict[i]) + scale*totalsum target_nonint = prefmem(domain_dictionary[i]) + scale*total_available_memory
#prevent rounding errors #prevent rounding errors
target = int(0.999*target_nonint) target = int(0.999*target_nonint)
if (target < domdict[i].memory_actual): if (target < domain_dictionary[i].memory_actual):
donors_rq.append((i, target)) donors_rq.append((i, target))
else: else:
acceptors_rq.append((i, target)) acceptors_rq.append((i, target))
# print 'balance(enough): xenfree=', xenfree, 'requests:', donors_rq + acceptors_rq # print 'balance(enough): xen_free_memory=', xen_free_memory, 'requests:', donors_rq + acceptors_rq
return donors_rq + acceptors_rq return donors_rq + acceptors_rq
#when not enough mem to make everyone be above prefmem, make donors be at prefmem, and #when not enough mem to make everyone be above prefmem, make donors be at prefmem, and
#redistribute anything left between acceptors #redistribute anything left between acceptors
def balance_when_low_on_memory(domdict, xenfree, total_mem_pref_acceptors, donors, acceptors): def balance_when_low_on_memory(domain_dictionary, xen_free_memory, total_mem_pref_acceptors, donors, acceptors):
donors_rq = list() donors_rq = list()
acceptors_rq = list() acceptors_rq = list()
squeezed_mem = xenfree squeezed_mem = xen_free_memory
for i in donors: for i in donors:
avail = -memneeded(domdict[i]) avail = -memory_needed(domain_dictionary[i])
if avail < 10*1024*1024: if avail < 10*1024*1024:
#probably we have already tried making it exactly at prefmem, give up #probably we have already tried making it exactly at prefmem, give up
continue continue
squeezed_mem -= avail squeezed_mem -= avail
donors_rq.append((i, prefmem(domdict[i]))) donors_rq.append((i, prefmem(domain_dictionary[i])))
#the below can happen if initially xen free memory is below 50M #the below can happen if initially xen free memory is below 50M
if squeezed_mem < 0: if squeezed_mem < 0:
return donors_rq return donors_rq
for i in acceptors: for i in acceptors:
scale = 1.0*prefmem(domdict[i])/total_mem_pref_acceptors scale = 1.0*prefmem(domain_dictionary[i])/total_mem_pref_acceptors
target_nonint = domdict[i].memory_actual + scale*squeezed_mem target_nonint = domain_dictionary[i].memory_actual + scale*squeezed_mem
acceptors_rq.append((i, int(target_nonint))) acceptors_rq.append((i, int(target_nonint)))
# print 'balance(low): xenfree=', xenfree, 'requests:', donors_rq + acceptors_rq # print 'balance(low): xen_free_memory=', xen_free_memory, 'requests:', donors_rq + acceptors_rq
return donors_rq + acceptors_rq return donors_rq + acceptors_rq
def balance(xenfree, domdict):
total_memneeded = 0 #redistribute memory across domains
#called when one of domains update its 'meminfo' xenstore key
#return the list of (domain, memory_target) pairs to be passed to
#"xm memset" equivalent
def balance(xen_free_memory, domain_dictionary):
#sum of all memory requirements - in other words, the difference between
#memory required to be added to domains (acceptors) to make them be at their
#preferred memory, and memory that can be taken from domains (donors) that
#can provide memory. So, it can be negative when plenty of memory.
total_memory_needed = 0
#sum of memory preferences of all domains
total_mem_pref = 0 total_mem_pref = 0
#sum of memory preferences of all domains that require more memory
total_mem_pref_acceptors = 0 total_mem_pref_acceptors = 0
donors = list() donors = list() # domains that can yield memory
acceptors = list() acceptors = list() # domains that require more memory
#pass 1: compute the above "total" values #pass 1: compute the above "total" values
for i in domdict.keys(): for i in domain_dictionary.keys():
if domdict[i].meminfo is None: if domain_dictionary[i].meminfo is None:
continue continue
need = memneeded(domdict[i]) need = memory_needed(domain_dictionary[i])
# print 'domain' , i, 'act/pref', domdict[i].memory_actual, prefmem(domdict[i]), 'need=', need # print 'domain' , i, 'act/pref', domain_dictionary[i].memory_actual, prefmem(domain_dictionary[i]), 'need=', need
if need < 0: if need < 0:
donors.append(i) donors.append(i)
else: else:
acceptors.append(i) acceptors.append(i)
total_mem_pref_acceptors += prefmem(domdict[i]) total_mem_pref_acceptors += prefmem(domain_dictionary[i])
total_memneeded += need total_memory_needed += need
total_mem_pref += prefmem(domdict[i]) total_mem_pref += prefmem(domain_dictionary[i])
totalsum = xenfree - total_memneeded total_available_memory = xen_free_memory - total_memory_needed
if totalsum > 0: if total_available_memory > 0:
return balance_when_enough_memory(domdict, xenfree, total_mem_pref, totalsum) return balance_when_enough_memory(domain_dictionary, xen_free_memory, total_mem_pref, total_available_memory)
else: else:
return balance_when_low_on_memory(domdict, xenfree, total_mem_pref_acceptors, donors, acceptors) return balance_when_low_on_memory(domain_dictionary, xen_free_memory, total_mem_pref_acceptors, donors, acceptors)

View File

@ -17,7 +17,7 @@ def only_in_first_list(l1, l2):
ret.append(i) ret.append(i)
return ret return ret
def get_req_node(domain_id): def get_domain_meminfo_key(domain_id):
return '/local/domain/'+domain_id+'/memory/meminfo' return '/local/domain/'+domain_id+'/memory/meminfo'
@ -29,31 +29,33 @@ class WatchType:
class XS_Watcher: class XS_Watcher:
def __init__(self): def __init__(self):
self.handle = xen.lowlevel.xs.xs() self.handle = xen.lowlevel.xs.xs()
self.handle.watch('/vm', WatchType(XS_Watcher.dom_list_change, None)) self.handle.watch('/vm', WatchType(XS_Watcher.domain_list_changed, None))
self.watch_token_dict = {} self.watch_token_dict = {}
def dom_list_change(self, param): def domain_list_changed(self, param):
curr = self.handle.ls('', '/local/domain') curr = self.handle.ls('', '/local/domain')
if curr == None: if curr == None:
return return
global_lock.acquire() global_lock.acquire()
for i in only_in_first_list(curr, self.watch_token_dict.keys()): for i in only_in_first_list(curr, self.watch_token_dict.keys()):
watch = WatchType(XS_Watcher.request, i) #new domain has been created
watch = WatchType(XS_Watcher.meminfo_changed, i)
self.watch_token_dict[i] = watch self.watch_token_dict[i] = watch
self.handle.watch(get_req_node(i), watch) self.handle.watch(get_domain_meminfo_key(i), watch)
system_state.add_domain(i) system_state.add_domain(i)
for i in only_in_first_list(self.watch_token_dict.keys(), curr): for i in only_in_first_list(self.watch_token_dict.keys(), curr):
self.handle.unwatch(get_req_node(i), self.watch_token_dict[i]) #domain destroyed
self.handle.unwatch(get_domain_meminfo_key(i), self.watch_token_dict[i])
self.watch_token_dict.pop(i) self.watch_token_dict.pop(i)
system_state.del_domain(i) system_state.del_domain(i)
global_lock.release() global_lock.release()
def request(self, domain_id): def meminfo_changed(self, domain_id):
ret = self.handle.read('', get_req_node(domain_id)) untrusted_meminfo_key = self.handle.read('', get_domain_meminfo_key(domain_id))
if ret == None or ret == '': if untrusted_meminfo_key == None or untrusted_meminfo_key == '':
return return
global_lock.acquire() global_lock.acquire()
system_state.refresh_meminfo(domain_id, ret) system_state.refresh_meminfo(domain_id, untrusted_meminfo_key)
global_lock.release() global_lock.release()
def watch_loop(self): def watch_loop(self):

View File

@ -61,6 +61,12 @@ void buffer_free(struct buffer *b)
buffer_init(b); buffer_init(b);
} }
/*
The following two functions can be made much more efficient.
Yet the profiling output show they are not significant CPU hogs, so
we keep them so simple to make them obviously correct.
*/
void buffer_append(struct buffer *b, char *data, int len) void buffer_append(struct buffer *b, char *data, int len)
{ {
int newsize = len + b->buflen; int newsize = len + b->buflen;

View File

@ -41,8 +41,8 @@ enum {
WRITE_STDIN_ERROR WRITE_STDIN_ERROR
}; };
int flush_client_data(int fd, int clid, struct buffer *buffer); int flush_client_data(int fd, int client_id, struct buffer *buffer);
int write_stdin(int fd, int clid, char *data, int len, int write_stdin(int fd, int client_id, char *data, int len,
struct buffer *buffer); struct buffer *buffer);
void set_nonblock(int fd); void set_nonblock(int fd);
int fork_and_flush_stdin(int fd, struct buffer *buffer); int fork_and_flush_stdin(int fd, struct buffer *buffer);

View File

@ -56,7 +56,7 @@ enum {
struct server_header { struct server_header {
unsigned int type; unsigned int type;
unsigned int clid; unsigned int client_id;
unsigned int len; unsigned int len;
}; };

View File

@ -42,7 +42,7 @@ enum fdtype {
}; };
struct _process_fd { struct _process_fd {
int clid; int client_id;
int type; int type;
int is_blocked; int is_blocked;
}; };
@ -122,7 +122,7 @@ void do_exec(char *cmd)
exit(1); exit(1);
} }
void handle_just_exec(int clid, int len) void handle_just_exec(int client_id, int len)
{ {
char buf[len]; char buf[len];
int fdn, pid; int fdn, pid;
@ -143,7 +143,7 @@ void handle_just_exec(int clid, int len)
fprintf(stderr, "executed (nowait) %s pid %d\n", buf, pid); fprintf(stderr, "executed (nowait) %s pid %d\n", buf, pid);
} }
void handle_exec(int clid, int len) void handle_exec(int client_id, int len)
{ {
char buf[len]; char buf[len];
int pid, stdin_fd, stdout_fd, stderr_fd; int pid, stdin_fd, stdout_fd, stderr_fd;
@ -152,10 +152,10 @@ void handle_exec(int clid, int len)
do_fork_exec(buf, &pid, &stdin_fd, &stdout_fd, &stderr_fd); do_fork_exec(buf, &pid, &stdin_fd, &stdout_fd, &stderr_fd);
process_fd[stdout_fd].clid = clid; process_fd[stdout_fd].client_id = client_id;
process_fd[stdout_fd].type = FDTYPE_STDOUT; process_fd[stdout_fd].type = FDTYPE_STDOUT;
process_fd[stdout_fd].is_blocked = 0; process_fd[stdout_fd].is_blocked = 0;
process_fd[stderr_fd].clid = clid; process_fd[stderr_fd].client_id = client_id;
process_fd[stderr_fd].type = FDTYPE_STDERR; process_fd[stderr_fd].type = FDTYPE_STDERR;
process_fd[stderr_fd].is_blocked = 0; process_fd[stderr_fd].is_blocked = 0;
@ -166,13 +166,13 @@ void handle_exec(int clid, int len)
set_nonblock(stdin_fd); set_nonblock(stdin_fd);
client_info[clid].stdin_fd = stdin_fd; client_info[client_id].stdin_fd = stdin_fd;
client_info[clid].stdout_fd = stdout_fd; client_info[client_id].stdout_fd = stdout_fd;
client_info[clid].stderr_fd = stderr_fd; client_info[client_id].stderr_fd = stderr_fd;
client_info[clid].pid = pid; client_info[client_id].pid = pid;
client_info[clid].is_blocked = 0; client_info[client_id].is_blocked = 0;
client_info[clid].is_close_after_flush_needed = 0; client_info[client_id].is_close_after_flush_needed = 0;
buffer_init(&client_info[clid].buffer); buffer_init(&client_info[client_id].buffer);
fprintf(stderr, "executed %s pid %d\n", buf, pid); fprintf(stderr, "executed %s pid %d\n", buf, pid);
@ -187,79 +187,81 @@ void update_max_process_fd()
max_process_fd = i; max_process_fd = i;
} }
void send_exit_code(int clid, int status) void send_exit_code(int client_id, int status)
{ {
struct server_header s_hdr; struct server_header s_hdr;
s_hdr.type = MSG_AGENT_TO_SERVER_EXIT_CODE; s_hdr.type = MSG_AGENT_TO_SERVER_EXIT_CODE;
s_hdr.clid = clid; s_hdr.client_id = client_id;
s_hdr.len = sizeof status; s_hdr.len = sizeof status;
write_all_vchan_ext(&s_hdr, sizeof s_hdr); write_all_vchan_ext(&s_hdr, sizeof s_hdr);
write_all_vchan_ext(&status, sizeof(status)); write_all_vchan_ext(&status, sizeof(status));
fprintf(stderr, "send exit code for clid %d pid %d\n", clid, fprintf(stderr, "send exit code for client_id %d pid %d\n",
client_info[clid].pid); client_id, client_info[client_id].pid);
} }
// erase process data structures, possibly forced by remote // erase process data structures, possibly forced by remote
void remove_process(int clid, int status) void remove_process(int client_id, int status)
{ {
int i; int i;
if (!client_info[clid].pid) if (!client_info[client_id].pid)
return; return;
fork_and_flush_stdin(client_info[clid].stdin_fd, &client_info[clid].buffer); fork_and_flush_stdin(client_info[client_id].stdin_fd,
#if 0 &client_info[client_id].buffer);
#if 0
// let's let it die by itself, possibly after it has received buffered stdin // let's let it die by itself, possibly after it has received buffered stdin
kill(client_info[clid].pid, SIGKILL); kill(client_info[client_id].pid, SIGKILL);
#endif #endif
if (status != -1) if (status != -1)
send_exit_code(clid, status); send_exit_code(client_id, status);
close(client_info[clid].stdin_fd); close(client_info[client_id].stdin_fd);
client_info[clid].pid = 0; client_info[client_id].pid = 0;
client_info[clid].stdin_fd = -1; client_info[client_id].stdin_fd = -1;
client_info[clid].is_blocked = 0; client_info[client_id].is_blocked = 0;
buffer_free(&client_info[clid].buffer); buffer_free(&client_info[client_id].buffer);
for (i = 0; i <= max_process_fd; i++) for (i = 0; i <= max_process_fd; i++)
if (process_fd[i].type != FDTYPE_INVALID if (process_fd[i].type != FDTYPE_INVALID
&& process_fd[i].clid == clid) { && process_fd[i].client_id == client_id) {
process_fd[i].type = FDTYPE_INVALID; process_fd[i].type = FDTYPE_INVALID;
process_fd[i].clid = -1; process_fd[i].client_id = -1;
process_fd[i].is_blocked = 0; process_fd[i].is_blocked = 0;
close(i); close(i);
} }
update_max_process_fd(); update_max_process_fd();
} }
void handle_input(int clid, int len) void handle_input(int client_id, int len)
{ {
char buf[len]; char buf[len];
read_all_vchan_ext(buf, len); read_all_vchan_ext(buf, len);
if (!client_info[clid].pid) if (!client_info[client_id].pid)
return; return;
if (len == 0) { if (len == 0) {
if (client_info[clid].is_blocked) if (client_info[client_id].is_blocked)
client_info[clid].is_close_after_flush_needed = 1; client_info[client_id].
is_close_after_flush_needed = 1;
else { else {
close(client_info[clid].stdin_fd); close(client_info[client_id].stdin_fd);
client_info[clid].stdin_fd = -1; client_info[client_id].stdin_fd = -1;
} }
return; return;
} }
switch (write_stdin switch (write_stdin
(client_info[clid].stdin_fd, clid, buf, len, (client_info[client_id].stdin_fd, client_id, buf, len,
&client_info[clid].buffer)) { &client_info[client_id].buffer)) {
case WRITE_STDIN_OK: case WRITE_STDIN_OK:
break; break;
case WRITE_STDIN_BUFFERED: case WRITE_STDIN_BUFFERED:
client_info[clid].is_blocked = 1; client_info[client_id].is_blocked = 1;
break; break;
case WRITE_STDIN_ERROR: case WRITE_STDIN_ERROR:
remove_process(clid, 128); remove_process(client_id, 128);
break; break;
default: default:
fprintf(stderr, "unknown write_stdin?\n"); fprintf(stderr, "unknown write_stdin?\n");
@ -268,10 +270,10 @@ void handle_input(int clid, int len)
} }
void set_blocked_outerr(int clid, int val) void set_blocked_outerr(int client_id, int val)
{ {
process_fd[client_info[clid].stdout_fd].is_blocked = val; process_fd[client_info[client_id].stdout_fd].is_blocked = val;
process_fd[client_info[clid].stderr_fd].is_blocked = val; process_fd[client_info[client_id].stderr_fd].is_blocked = val;
} }
void handle_server_data() void handle_server_data()
@ -279,27 +281,27 @@ void handle_server_data()
struct server_header s_hdr; struct server_header s_hdr;
read_all_vchan_ext(&s_hdr, sizeof s_hdr); read_all_vchan_ext(&s_hdr, sizeof s_hdr);
// fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.clid, // fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.client_id,
// s_hdr.len); // s_hdr.len);
switch (s_hdr.type) { switch (s_hdr.type) {
case MSG_XON: case MSG_XON:
set_blocked_outerr(s_hdr.clid, 0); set_blocked_outerr(s_hdr.client_id, 0);
break; break;
case MSG_XOFF: case MSG_XOFF:
set_blocked_outerr(s_hdr.clid, 1); set_blocked_outerr(s_hdr.client_id, 1);
break; break;
case MSG_SERVER_TO_AGENT_EXEC_CMDLINE: case MSG_SERVER_TO_AGENT_EXEC_CMDLINE:
handle_exec(s_hdr.clid, s_hdr.len); handle_exec(s_hdr.client_id, s_hdr.len);
break; break;
case MSG_SERVER_TO_AGENT_JUST_EXEC: case MSG_SERVER_TO_AGENT_JUST_EXEC:
handle_just_exec(s_hdr.clid, s_hdr.len); handle_just_exec(s_hdr.client_id, s_hdr.len);
break; break;
case MSG_SERVER_TO_AGENT_INPUT: case MSG_SERVER_TO_AGENT_INPUT:
handle_input(s_hdr.clid, s_hdr.len); handle_input(s_hdr.client_id, s_hdr.len);
break; break;
case MSG_SERVER_TO_AGENT_CLIENT_END: case MSG_SERVER_TO_AGENT_CLIENT_END:
remove_process(s_hdr.clid, -1); remove_process(s_hdr.client_id, -1);
break; break;
default: default:
fprintf(stderr, "msg type from daemon is %d ?\n", fprintf(stderr, "msg type from daemon is %d ?\n",
@ -320,15 +322,15 @@ void handle_process_data(int fd)
return; return;
ret = read(fd, buf, len - sizeof s_hdr); ret = read(fd, buf, len - sizeof s_hdr);
s_hdr.clid = process_fd[fd].clid; s_hdr.client_id = process_fd[fd].client_id;
if (process_fd[fd].type == FDTYPE_STDOUT) if (process_fd[fd].type == FDTYPE_STDOUT)
s_hdr.type = MSG_AGENT_TO_SERVER_STDOUT; s_hdr.type = MSG_AGENT_TO_SERVER_STDOUT;
else if (process_fd[fd].type == FDTYPE_STDERR) else if (process_fd[fd].type == FDTYPE_STDERR)
s_hdr.type = MSG_AGENT_TO_SERVER_STDERR; s_hdr.type = MSG_AGENT_TO_SERVER_STDERR;
else { else {
fprintf(stderr, "fd=%d, clid=%d, type=%d ?\n", fd, fprintf(stderr, "fd=%d, client_id=%d, type=%d ?\n", fd,
process_fd[fd].clid, process_fd[fd].type); process_fd[fd].client_id, process_fd[fd].type);
exit(1); exit(1);
} }
s_hdr.len = ret; s_hdr.len = ret;
@ -338,13 +340,13 @@ void handle_process_data(int fd)
} }
if (ret == 0) { if (ret == 0) {
process_fd[fd].type = FDTYPE_INVALID; process_fd[fd].type = FDTYPE_INVALID;
process_fd[fd].clid = -1; process_fd[fd].client_id = -1;
process_fd[fd].is_blocked = 0; process_fd[fd].is_blocked = 0;
close(fd); close(fd);
update_max_process_fd(); update_max_process_fd();
} }
if (ret < 0) if (ret < 0)
remove_process(process_fd[fd].clid, 127); remove_process(process_fd[fd].client_id, 127);
} }
volatile int child_exited; volatile int child_exited;
@ -375,7 +377,7 @@ void handle_process_data_all(fd_set * select_fds)
} }
void flush_out_err(int clid) void flush_out_err(int client_id)
{ {
fd_set select_set; fd_set select_set;
int fd_max = -1; int fd_max = -1;
@ -387,7 +389,7 @@ void flush_out_err(int clid)
for (i = 0; i <= max_process_fd; i++) { for (i = 0; i <= max_process_fd; i++) {
if (process_fd[i].type != FDTYPE_INVALID if (process_fd[i].type != FDTYPE_INVALID
&& !process_fd[i].is_blocked && !process_fd[i].is_blocked
&& process_fd[i].clid == clid) { && process_fd[i].client_id == client_id) {
FD_SET(i, &select_set); FD_SET(i, &select_set);
fd_max = i; fd_max = i;
} }
@ -411,13 +413,13 @@ void reap_children()
{ {
int status; int status;
int pid; int pid;
int clid; int client_id;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) { while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
clid = find_info(pid); client_id = find_info(pid);
if (clid < 0) if (client_id < 0)
continue; continue;
flush_out_err(clid); flush_out_err(client_id);
remove_process(clid, status); remove_process(client_id, status);
} }
child_exited = 0; child_exited = 0;
} }
@ -450,10 +452,11 @@ int fill_fds_for_select(fd_set * rdset, fd_set * wrset)
return max; return max;
} }
void flush_client_data_agent(int clid) void flush_client_data_agent(int client_id)
{ {
struct _client_info *info = &client_info[clid]; struct _client_info *info = &client_info[client_id];
switch (flush_client_data(info->stdin_fd, clid, &info->buffer)) { switch (flush_client_data
(info->stdin_fd, client_id, &info->buffer)) {
case WRITE_STDIN_OK: case WRITE_STDIN_OK:
info->is_blocked = 0; info->is_blocked = 0;
if (info->is_close_after_flush_needed) { if (info->is_close_after_flush_needed) {
@ -463,7 +466,7 @@ void flush_client_data_agent(int clid)
} }
break; break;
case WRITE_STDIN_ERROR: case WRITE_STDIN_ERROR:
remove_process(clid, 128); remove_process(client_id, 128);
break; break;
case WRITE_STDIN_BUFFERED: case WRITE_STDIN_BUFFERED:
break; break;
@ -479,15 +482,16 @@ void handle_trigger_io()
char buf[5]; char buf[5];
int ret; int ret;
s_hdr.clid = 0; s_hdr.client_id = 0;
s_hdr.len = 0; s_hdr.len = 0;
if ((ret = read(trigger_fd, buf, 4)) == 4) { if ((ret = read(trigger_fd, buf, 4)) == 4) {
buf[4] = 0; buf[4] = 0;
if (!strcmp(buf, "FCPR")) if (!strcmp(buf, "FCPR"))
s_hdr.clid = QREXEC_EXECUTE_FILE_COPY; s_hdr.client_id = QREXEC_EXECUTE_FILE_COPY;
else if (!strcmp(buf, "DVMR")) else if (!strcmp(buf, "DVMR"))
s_hdr.clid = QREXEC_EXECUTE_FILE_COPY_FOR_DISPVM; s_hdr.client_id =
if (s_hdr.clid) { QREXEC_EXECUTE_FILE_COPY_FOR_DISPVM;
if (s_hdr.client_id) {
s_hdr.type = MSG_AGENT_TO_SERVER_TRIGGER_EXEC; s_hdr.type = MSG_AGENT_TO_SERVER_TRIGGER_EXEC;
write_all_vchan_ext(&s_hdr, sizeof s_hdr); write_all_vchan_ext(&s_hdr, sizeof s_hdr);
} }

View File

@ -34,24 +34,30 @@
#include "glue.h" #include "glue.h"
enum client_flags { enum client_flags {
CLIENT_INVALID = 0, CLIENT_INVALID = 0, // table slot not used
CLIENT_CMDLINE = 1, CLIENT_CMDLINE = 1, // waiting for cmdline from client
CLIENT_DATA = 2, CLIENT_DATA = 2, // waiting for data from client
CLIENT_DONT_READ = 4, CLIENT_DONT_READ = 4, // don't read from the client, the other side pipe is full, or EOF
CLIENT_OUTQ_FULL = 8 CLIENT_OUTQ_FULL = 8 // don't write to client, its stdin pipe is full
}; };
struct _client { struct _client {
int state; int state; // combination of above enum client_flags
struct buffer buffer; struct buffer buffer; // buffered data to client, if any
}; };
struct _client clients[MAX_FDS]; /*
The "clients" array is indexed by client's fd.
Thus its size must be equal MAX_FDS; defining MAX_CLIENTS for clarity.
*/
int max_client_fd = -1; #define MAX_CLIENTS MAX_FDS
int server_fd; struct _client clients[MAX_CLIENTS]; // data on all qrexec_client connections
void handle_usr1(int x) int max_client_fd = -1; // current max fd of all clients; so that we need not to scan all the "clients" table
int qrexec_daemon_unix_socket_fd; // /var/run/qubes/qrexec.xid descriptor
void sigusr1_handler(int x)
{ {
fprintf(stderr, "connected\n"); fprintf(stderr, "connected\n");
exit(0); exit(0);
@ -59,18 +65,19 @@ void handle_usr1(int x)
void sigchld_handler(int x); void sigchld_handler(int x);
char *remote_domain_name; char *remote_domain_name; // guess what
/* do the preparatory tasks, needed before entering the main event loop */
void init(int xid) void init(int xid)
{ {
char dbg_log[256]; char qrexec_error_log_name[256];
int logfd; int logfd;
if (xid <= 0) { if (xid <= 0) {
fprintf(stderr, "domain id=0?\n"); fprintf(stderr, "domain id=0?\n");
exit(1); exit(1);
} }
signal(SIGUSR1, handle_usr1); signal(SIGUSR1, sigusr1_handler);
switch (fork()) { switch (fork()) {
case -1: case -1:
perror("fork"); perror("fork");
@ -86,10 +93,17 @@ void init(int xid)
exit(0); exit(0);
} }
close(0); close(0);
snprintf(dbg_log, sizeof(dbg_log), snprintf(qrexec_error_log_name, sizeof(qrexec_error_log_name),
"/var/log/qubes/qrexec.%d.log", xid); "/var/log/qubes/qrexec.%d.log", xid);
umask(0007); umask(0007); // make the log readable by the "qubes" group
logfd = open(dbg_log, O_WRONLY | O_CREAT | O_TRUNC, 0640); logfd =
open(qrexec_error_log_name, O_WRONLY | O_CREAT | O_TRUNC,
0640);
if (logfd < 0) {
perror("open");
exit(1);
}
dup2(logfd, 1); dup2(logfd, 1);
dup2(logfd, 2); dup2(logfd, 2);
@ -104,18 +118,19 @@ void init(int xid)
setuid(getuid()); setuid(getuid());
/* When running as root, make the socket accessible; perms on /var/run/qubes still apply */ /* When running as root, make the socket accessible; perms on /var/run/qubes still apply */
umask(0); umask(0);
server_fd = get_server_socket(xid, remote_domain_name); qrexec_daemon_unix_socket_fd =
get_server_socket(xid, remote_domain_name);
umask(0077); umask(0077);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, sigchld_handler); signal(SIGCHLD, sigchld_handler);
signal(SIGUSR1, SIG_DFL); signal(SIGUSR1, SIG_DFL);
kill(getppid(), SIGUSR1); kill(getppid(), SIGUSR1); // let the parent know we are ready
} }
void handle_new_client() void handle_new_client()
{ {
int fd = do_accept(server_fd); int fd = do_accept(qrexec_daemon_unix_socket_fd);
if (fd >= MAX_FDS) { if (fd >= MAX_CLIENTS) {
fprintf(stderr, "too many clients ?\n"); fprintf(stderr, "too many clients ?\n");
exit(1); exit(1);
} }
@ -125,9 +140,13 @@ void handle_new_client()
max_client_fd = fd; max_client_fd = fd;
} }
/*
we need to track the number of children, so that excessive QREXEC_EXECUTE_*
commands do not fork-bomb dom0
*/
int children_count; int children_count;
void flush_client(int fd) void terminate_client_and_flush_data(int fd)
{ {
int i; int i;
struct server_header s_hdr; struct server_header s_hdr;
@ -143,29 +162,31 @@ void flush_client(int fd)
max_client_fd = i; max_client_fd = i;
} }
s_hdr.type = MSG_SERVER_TO_AGENT_CLIENT_END; s_hdr.type = MSG_SERVER_TO_AGENT_CLIENT_END;
s_hdr.clid = fd; s_hdr.client_id = fd;
s_hdr.len = 0; s_hdr.len = 0;
write_all_vchan_ext(&s_hdr, sizeof(s_hdr)); write_all_vchan_ext(&s_hdr, sizeof(s_hdr));
} }
void pass_to_agent(int fd, struct server_header *s_hdr) void get_cmdline_body_from_client_and_pass_to_agent(int fd,
struct server_header
*s_hdr)
{ {
int len = s_hdr->len; int len = s_hdr->len;
char buf[len]; char buf[len];
if (!read_all(fd, buf, len)) { if (!read_all(fd, buf, len)) {
flush_client(fd); terminate_client_and_flush_data(fd);
return; return;
} }
write_all_vchan_ext(s_hdr, sizeof(*s_hdr)); write_all_vchan_ext(s_hdr, sizeof(*s_hdr));
write_all_vchan_ext(buf, len); write_all_vchan_ext(buf, len);
} }
void handle_client_cmdline(int fd) void handle_cmdline_message_from_client(int fd)
{ {
struct client_header hdr; struct client_header hdr;
struct server_header s_hdr; struct server_header s_hdr;
if (!read_all(fd, &hdr, sizeof hdr)) { if (!read_all(fd, &hdr, sizeof hdr)) {
flush_client(fd); terminate_client_and_flush_data(fd);
return; return;
} }
switch (hdr.type) { switch (hdr.type) {
@ -176,59 +197,69 @@ void handle_client_cmdline(int fd)
s_hdr.type = MSG_SERVER_TO_AGENT_JUST_EXEC; s_hdr.type = MSG_SERVER_TO_AGENT_JUST_EXEC;
break; break;
default: default:
flush_client(fd); terminate_client_and_flush_data(fd);
return; return;
} }
s_hdr.clid = fd; s_hdr.client_id = fd;
s_hdr.len = hdr.len; s_hdr.len = hdr.len;
pass_to_agent(fd, &s_hdr); get_cmdline_body_from_client_and_pass_to_agent(fd, &s_hdr);
clients[fd].state = CLIENT_DATA; clients[fd].state = CLIENT_DATA;
set_nonblock(fd); set_nonblock(fd); // so that we can detect full queue without blocking
if (hdr.type == MSG_CLIENT_TO_SERVER_JUST_EXEC) if (hdr.type == MSG_CLIENT_TO_SERVER_JUST_EXEC)
flush_client(fd); terminate_client_and_flush_data(fd);
} }
void handle_client_data(int fd) /* handle data received from one of qrexec_client processes */
void handle_message_from_client(int fd)
{ {
struct server_header s_hdr; struct server_header s_hdr;
char buf[MAX_DATA_CHUNK]; char buf[MAX_DATA_CHUNK];
int len, ret; int len, ret;
if (clients[fd].state == CLIENT_CMDLINE) { if (clients[fd].state == CLIENT_CMDLINE) {
handle_client_cmdline(fd); handle_cmdline_message_from_client(fd);
return; return;
} }
// We have already passed cmdline from client.
// Now the client passes us raw data from its stdin.
len = buffer_space_vchan_ext(); len = buffer_space_vchan_ext();
if (len <= sizeof s_hdr) if (len <= sizeof s_hdr)
return; return;
/* Read at most the amount of data that we have room for in vchan */
ret = read(fd, buf, len - sizeof(s_hdr)); ret = read(fd, buf, len - sizeof(s_hdr));
if (ret < 0) { if (ret < 0) {
perror("read client"); perror("read client");
flush_client(fd); terminate_client_and_flush_data(fd);
return; return;
} }
s_hdr.clid = fd; s_hdr.client_id = fd;
s_hdr.len = ret; s_hdr.len = ret;
s_hdr.type = MSG_SERVER_TO_AGENT_INPUT; s_hdr.type = MSG_SERVER_TO_AGENT_INPUT;
write_all_vchan_ext(&s_hdr, sizeof(s_hdr)); write_all_vchan_ext(&s_hdr, sizeof(s_hdr));
write_all_vchan_ext(buf, ret); write_all_vchan_ext(buf, ret);
if (ret == 0) if (ret == 0) // EOF - so don't select() on this client
clients[fd].state |= CLIENT_DONT_READ; clients[fd].state |= CLIENT_DONT_READ;
} }
void flush_client_data_daemon(int clid) /*
Called when there is buffered data for this client, and select() reports
that client's pipe is writable; so we should be able to flush some
buffered data.
*/
void write_buffered_data_to_client(int client_id)
{ {
switch (flush_client_data(clid, clid, &clients[clid].buffer)) { switch (flush_client_data
case WRITE_STDIN_OK: (client_id, client_id, &clients[client_id].buffer)) {
clients[clid].state &= ~CLIENT_OUTQ_FULL; case WRITE_STDIN_OK: // no more buffered data
clients[client_id].state &= ~CLIENT_OUTQ_FULL;
break; break;
case WRITE_STDIN_ERROR: case WRITE_STDIN_ERROR:
flush_client(clid); terminate_client_and_flush_data(client_id);
break; break;
case WRITE_STDIN_BUFFERED: case WRITE_STDIN_BUFFERED: // no room for all data, don't clear CLIENT_OUTQ_FULL flag
break; break;
default: default:
fprintf(stderr, "unknown flush_client_data?\n"); fprintf(stderr, "unknown flush_client_data?\n");
@ -236,24 +267,31 @@ void flush_client_data_daemon(int clid)
} }
} }
void pass_to_client(int clid, struct client_header *hdr) /*
The header (hdr argument) is already built. Just read the raw data from
the packet, and pass it along with the header to the client.
*/
void get_packet_data_from_agent_and_pass_to_client(int client_id,
struct client_header
*hdr)
{ {
int len = hdr->len; int len = hdr->len;
char buf[sizeof(*hdr) + len]; char buf[sizeof(*hdr) + len];
/* make both the header and data be consecutive in the buffer */
*(struct client_header *) buf = *hdr; *(struct client_header *) buf = *hdr;
read_all_vchan_ext(buf + sizeof(*hdr), len); read_all_vchan_ext(buf + sizeof(*hdr), len);
switch (write_stdin switch (write_stdin
(clid, clid, buf, len + sizeof(*hdr), (client_id, client_id, buf, len + sizeof(*hdr),
&clients[clid].buffer)) { &clients[client_id].buffer)) {
case WRITE_STDIN_OK: case WRITE_STDIN_OK:
break; break;
case WRITE_STDIN_BUFFERED: case WRITE_STDIN_BUFFERED: // some data have been buffered
clients[clid].state |= CLIENT_OUTQ_FULL; clients[client_id].state |= CLIENT_OUTQ_FULL;
break; break;
case WRITE_STDIN_ERROR: case WRITE_STDIN_ERROR:
flush_client(clid); terminate_client_and_flush_data(client_id);
break; break;
default: default:
fprintf(stderr, "unknown write_stdin?\n"); fprintf(stderr, "unknown write_stdin?\n");
@ -261,6 +299,12 @@ void pass_to_client(int clid, struct client_header *hdr)
} }
} }
/*
The signal handler executes asynchronously; therefore all it should do is
to set a flag "signal has arrived", and let the main even loop react to this
flag in appropriate moment.
*/
int child_exited; int child_exited;
void sigchld_handler(int x) void sigchld_handler(int x)
@ -269,6 +313,7 @@ void sigchld_handler(int x)
signal(SIGCHLD, sigchld_handler); signal(SIGCHLD, sigchld_handler);
} }
/* clean zombies, update children_count */
void reap_children() void reap_children()
{ {
int status; int status;
@ -277,6 +322,7 @@ void reap_children()
child_exited = 0; child_exited = 0;
} }
/* too many children - wait for one of them to terminate */
void wait_for_child() void wait_for_child()
{ {
int status; int status;
@ -285,7 +331,7 @@ void wait_for_child()
} }
#define MAX_CHILDREN 10 #define MAX_CHILDREN 10
void check_children_count() void check_children_count_and_wait_if_too_many()
{ {
if (children_count > MAX_CHILDREN) { if (children_count > MAX_CHILDREN) {
fprintf(stderr, fprintf(stderr,
@ -296,12 +342,16 @@ void check_children_count()
} }
} }
void handle_trigger_exec(int req) /*
Called when agent sends a message asking to execute a predefined command.
*/
void handle_execute_predefined_command(int req)
{ {
char *rcmd = NULL, *lcmd = NULL; char *rcmd = NULL, *lcmd = NULL;
int i; int i;
check_children_count(); check_children_count_and_wait_if_too_many();
switch (req) { switch (req) {
case QREXEC_EXECUTE_FILE_COPY: case QREXEC_EXECUTE_FILE_COPY:
rcmd = "directly:user:/usr/lib/qubes/qfile-agent"; rcmd = "directly:user:/usr/lib/qubes/qfile-agent";
@ -311,7 +361,7 @@ void handle_trigger_exec(int req)
rcmd = "directly:user:/usr/lib/qubes/qfile-agent-dvm"; rcmd = "directly:user:/usr/lib/qubes/qfile-agent-dvm";
lcmd = "/usr/lib/qubes/qfile-daemon-dvm"; lcmd = "/usr/lib/qubes/qfile-daemon-dvm";
break; break;
default: default: /* cannot happen, already sanitized */
fprintf(stderr, "got trigger exec no %d\n", req); fprintf(stderr, "got trigger exec no %d\n", req);
exit(1); exit(1);
} }
@ -325,7 +375,7 @@ void handle_trigger_exec(int req)
children_count++; children_count++;
return; return;
} }
for (i = 3; i < 256; i++) for (i = 3; i < MAX_FDS; i++)
close(i); close(i);
signal(SIGCHLD, SIG_DFL); signal(SIGCHLD, SIG_DFL);
signal(SIGPIPE, SIG_DFL); signal(SIGPIPE, SIG_DFL);
@ -335,31 +385,79 @@ void handle_trigger_exec(int req)
exit(1); exit(1);
} }
void handle_agent_data() void check_client_id_in_range(unsigned int untrusted_client_id)
{
if (untrusted_client_id >= MAX_CLIENTS || untrusted_client_id < 0) {
fprintf(stderr, "from agent: client_id=%d\n",
untrusted_client_id);
exit(1);
}
}
void sanitize_message_from_agent(struct server_header *untrusted_header)
{
int untrusted_cmd;
switch (untrusted_header->type) {
case MSG_AGENT_TO_SERVER_TRIGGER_EXEC:
untrusted_cmd = untrusted_header->client_id;
if (untrusted_cmd != QREXEC_EXECUTE_FILE_COPY &&
untrusted_cmd != QREXEC_EXECUTE_FILE_COPY_FOR_DISPVM) {
fprintf(stderr,
"received MSG_AGENT_TO_SERVER_TRIGGER_EXEC cmd %d ?\n",
untrusted_cmd);
exit(1);
}
break;
case MSG_AGENT_TO_SERVER_STDOUT:
case MSG_SERVER_TO_CLIENT_STDERR:
case MSG_AGENT_TO_SERVER_EXIT_CODE:
check_client_id_in_range(untrusted_header->client_id);
if (untrusted_header->len > MAX_DATA_CHUNK
|| untrusted_header->len < 0) {
fprintf(stderr, "agent feeded %d of data bytes?\n",
untrusted_header->len);
exit(1);
}
break;
case MSG_XOFF:
case MSG_XON:
check_client_id_in_range(untrusted_header->client_id);
break;
default:
fprintf(stderr, "unknown mesage type %d from agent\n",
untrusted_header->type);
exit(1);
}
}
void handle_message_from_agent()
{ {
struct client_header hdr; struct client_header hdr;
struct server_header s_hdr; struct server_header s_hdr, untrusted_s_hdr;
read_all_vchan_ext(&s_hdr, sizeof s_hdr);
// fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.clid, read_all_vchan_ext(&untrusted_s_hdr, sizeof untrusted_s_hdr);
/* sanitize start */
sanitize_message_from_agent(&untrusted_s_hdr);
s_hdr = untrusted_s_hdr;
/* sanitize end */
// fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.client_id,
// s_hdr.len); // s_hdr.len);
if (s_hdr.type == MSG_AGENT_TO_SERVER_TRIGGER_EXEC) { if (s_hdr.type == MSG_AGENT_TO_SERVER_TRIGGER_EXEC) {
handle_trigger_exec(s_hdr.clid); handle_execute_predefined_command(s_hdr.client_id);
return; return;
} }
if (s_hdr.clid >= MAX_FDS || s_hdr.clid < 0) {
fprintf(stderr, "from agent: clid=%d\n", s_hdr.clid);
exit(1);
}
if (s_hdr.type == MSG_XOFF) { if (s_hdr.type == MSG_XOFF) {
clients[s_hdr.clid].state |= CLIENT_DONT_READ; clients[s_hdr.client_id].state |= CLIENT_DONT_READ;
return; return;
} }
if (s_hdr.type == MSG_XON) { if (s_hdr.type == MSG_XON) {
clients[s_hdr.clid].state &= ~CLIENT_DONT_READ; clients[s_hdr.client_id].state &= ~CLIENT_DONT_READ;
return; return;
} }
@ -373,54 +471,57 @@ void handle_agent_data()
case MSG_AGENT_TO_SERVER_EXIT_CODE: case MSG_AGENT_TO_SERVER_EXIT_CODE:
hdr.type = MSG_SERVER_TO_CLIENT_EXIT_CODE; hdr.type = MSG_SERVER_TO_CLIENT_EXIT_CODE;
break; break;
default: default: /* cannot happen, already sanitized */
fprintf(stderr, "from agent: type=%d\n", s_hdr.type); fprintf(stderr, "from agent: type=%d\n", s_hdr.type);
exit(1); exit(1);
} }
hdr.len = s_hdr.len; hdr.len = s_hdr.len;
if (hdr.len > MAX_DATA_CHUNK) { if (clients[s_hdr.client_id].state == CLIENT_INVALID) {
fprintf(stderr, "agent feeded %d of data bytes?\n",
hdr.len);
exit(1);
}
if (clients[s_hdr.clid].state == CLIENT_INVALID) {
// benefit of doubt - maybe client exited earlier // benefit of doubt - maybe client exited earlier
// just eat the packet data and continue
char buf[MAX_DATA_CHUNK]; char buf[MAX_DATA_CHUNK];
read_all_vchan_ext(buf, s_hdr.len); read_all_vchan_ext(buf, s_hdr.len);
return; return;
} }
pass_to_client(s_hdr.clid, &hdr); get_packet_data_from_agent_and_pass_to_client(s_hdr.client_id,
&hdr);
if (s_hdr.type == MSG_AGENT_TO_SERVER_EXIT_CODE) if (s_hdr.type == MSG_AGENT_TO_SERVER_EXIT_CODE)
flush_client(s_hdr.clid); terminate_client_and_flush_data(s_hdr.client_id);
} }
int fill_fds_for_select(fd_set * rdset, fd_set * wrset) /*
Scan the "clients" table, add ones we want to read from (because the other
end has not send MSG_XOFF on them) to read_fdset, add ones we want to write
to (because its pipe is full) to write_fdset. Return the highest used file
descriptor number, needed for the first select() parameter.
*/
int fill_fdsets_for_select(fd_set * read_fdset, fd_set * write_fdset)
{ {
int i; int i;
int max = -1; int max = -1;
FD_ZERO(rdset); FD_ZERO(read_fdset);
FD_ZERO(wrset); FD_ZERO(write_fdset);
for (i = 0; i <= max_client_fd; i++) { for (i = 0; i <= max_client_fd; i++) {
if (clients[i].state != CLIENT_INVALID if (clients[i].state != CLIENT_INVALID
&& !(clients[i].state & CLIENT_DONT_READ)) { && !(clients[i].state & CLIENT_DONT_READ)) {
FD_SET(i, rdset); FD_SET(i, read_fdset);
max = i; max = i;
} }
if (clients[i].state != CLIENT_INVALID if (clients[i].state != CLIENT_INVALID
&& clients[i].state & CLIENT_OUTQ_FULL) { && clients[i].state & CLIENT_OUTQ_FULL) {
FD_SET(i, wrset); FD_SET(i, write_fdset);
max = i; max = i;
} }
} }
FD_SET(server_fd, rdset); FD_SET(qrexec_daemon_unix_socket_fd, read_fdset);
if (server_fd > max) if (qrexec_daemon_unix_socket_fd > max)
max = server_fd; max = qrexec_daemon_unix_socket_fd;
return max; return max;
} }
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
fd_set rdset, wrset; fd_set read_fdset, write_fdset;
int i; int i;
int max; int max;
@ -429,29 +530,36 @@ int main(int argc, char **argv)
exit(1); exit(1);
} }
init(atoi(argv[1])); init(atoi(argv[1]));
/*
The main event loop. Waits for one of the following events:
- message from client
- message from agent
- new client
- child exited
*/
for (;;) { for (;;) {
max = fill_fds_for_select(&rdset, &wrset); max = fill_fdsets_for_select(&read_fdset, &write_fdset);
if (buffer_space_vchan_ext() <= if (buffer_space_vchan_ext() <=
sizeof(struct server_header)) sizeof(struct server_header))
FD_ZERO(&rdset); FD_ZERO(&read_fdset); // vchan full - don't read from clients
wait_for_vchan_or_argfd(max, &rdset, &wrset); wait_for_vchan_or_argfd(max, &read_fdset, &write_fdset);
if (FD_ISSET(server_fd, &rdset)) if (FD_ISSET(qrexec_daemon_unix_socket_fd, &read_fdset))
handle_new_client(); handle_new_client();
while (read_ready_vchan_ext()) while (read_ready_vchan_ext())
handle_agent_data(); handle_message_from_agent();
for (i = 0; i <= max_client_fd; i++) for (i = 0; i <= max_client_fd; i++)
if (clients[i].state != CLIENT_INVALID if (clients[i].state != CLIENT_INVALID
&& FD_ISSET(i, &rdset)) && FD_ISSET(i, &read_fdset))
handle_client_data(i); handle_message_from_client(i);
for (i = 0; i <= max_client_fd; i++) for (i = 0; i <= max_client_fd; i++)
if (clients[i].state != CLIENT_INVALID if (clients[i].state != CLIENT_INVALID
&& FD_ISSET(i, &wrset)) && FD_ISSET(i, &write_fdset))
flush_client_data_daemon(i); write_buffered_data_to_client(i);
if (child_exited) if (child_exited)
reap_children(); reap_children();

View File

@ -29,7 +29,12 @@
#include "buffer.h" #include "buffer.h"
#include "glue.h" #include "glue.h"
int flush_client_data(int fd, int clid, struct buffer *buffer) /*
There is buffered data in "buffer" for client id "client_id", and select()
reports that "fd" is writable. Write as much as possible to fd, if all sent,
notify the peer that this client's pipe is no longer full.
*/
int flush_client_data(int fd, int client_id, struct buffer *buffer)
{ {
int ret; int ret;
int len; int len;
@ -44,12 +49,15 @@ int flush_client_data(int fd, int clid, struct buffer *buffer)
} else } else
return WRITE_STDIN_BUFFERED; return WRITE_STDIN_BUFFERED;
} }
buffer_remove(buffer, len); // we previously called buffer_remove(buffer, len)
// it will be wrong if we change MAX_DATA_CHUNK to something large
// as pipes writes are atomic only to PIPE_MAX limit
buffer_remove(buffer, ret);
len = buffer_len(buffer); len = buffer_len(buffer);
if (!len) { if (!len) {
struct server_header s_hdr; struct server_header s_hdr;
s_hdr.type = MSG_XON; s_hdr.type = MSG_XON;
s_hdr.clid = clid; s_hdr.client_id = client_id;
s_hdr.len = 0; s_hdr.len = 0;
write_all_vchan_ext(&s_hdr, sizeof s_hdr); write_all_vchan_ext(&s_hdr, sizeof s_hdr);
return WRITE_STDIN_OK; return WRITE_STDIN_OK;
@ -58,7 +66,12 @@ int flush_client_data(int fd, int clid, struct buffer *buffer)
} }
int write_stdin(int fd, int clid, char *data, int len, /*
Write "len" bytes from "data" to "fd". If not all written, buffer the rest
to "buffer", and notify the peer that the client "client_id" pipe is full via
MSG_XOFF message.
*/
int write_stdin(int fd, int client_id, char *data, int len,
struct buffer *buffer) struct buffer *buffer)
{ {
int ret; int ret;
@ -84,7 +97,7 @@ int write_stdin(int fd, int clid, char *data, int len,
len - written); len - written);
s_hdr.type = MSG_XOFF; s_hdr.type = MSG_XOFF;
s_hdr.clid = clid; s_hdr.client_id = client_id;
s_hdr.len = 0; s_hdr.len = 0;
write_all_vchan_ext(&s_hdr, sizeof s_hdr); write_all_vchan_ext(&s_hdr, sizeof s_hdr);
@ -108,6 +121,11 @@ void set_block(int fd)
fcntl(fd, F_SETFL, fl & ~O_NONBLOCK); fcntl(fd, F_SETFL, fl & ~O_NONBLOCK);
} }
/*
Data feed process has exited, so we need to clear all control structures for
the client. However, if we have buffered data for the client (which is rare btw),
fire&forget a separate process to flush them.
*/
int fork_and_flush_stdin(int fd, struct buffer *buffer) int fork_and_flush_stdin(int fd, struct buffer *buffer)
{ {
int i; int i;