Merge branch 'sane-and-pretty' of ssh://git.qubes-os.org/var/lib/qubes/git/rafal/core into r1-beta1-fixes

This commit is contained in:
Marek Marczykowski 2011-05-09 14:30:55 +02:00
commit b842155516
10 changed files with 477 additions and 296 deletions

View File

@ -8,7 +8,7 @@
#include <stdio.h>
#include "filecopy.h"
char namebuf[MAX_PATH_LENGTH];
char untrusted_namebuf[MAX_PATH_LENGTH];
void notify_progress(int p1, int p2)
{
}
@ -22,28 +22,30 @@ void do_exit(int code)
}
void fix_times_and_perms(struct file_header *hdr, char *name)
void fix_times_and_perms(struct file_header *untrusted_hdr,
char *untrusted_name)
{
struct timeval times[2] =
{ {hdr->atime, hdr->atime_nsec / 1000}, {hdr->mtime,
hdr->mtime_nsec / 1000}
{ {untrusted_hdr->atime, untrusted_hdr->atime_nsec / 1000},
{untrusted_hdr->mtime,
untrusted_hdr->mtime_nsec / 1000}
};
if (chmod(name, hdr->mode & 07777))
if (chmod(untrusted_name, untrusted_hdr->mode & 07777)) /* safe because of chroot */
do_exit(errno);
if (utimes(name, times))
if (utimes(untrusted_name, times)) /* as above */
do_exit(errno);
}
void process_one_file_reg(struct file_header *hdr, char *name)
void process_one_file_reg(struct file_header *untrusted_hdr,
char *untrusted_name)
{
int ret;
int fdout =
open(name, O_WRONLY | O_CREAT | O_EXCL | O_NOFOLLOW, 0700);
int fdout = open(untrusted_name, O_WRONLY | O_CREAT | O_EXCL | O_NOFOLLOW, 0700); /* safe because of chroot */
if (fdout < 0)
do_exit(errno);
ret = copy_file(fdout, 0, hdr->filelen);
ret = copy_file(fdout, 0, untrusted_hdr->filelen);
if (ret != COPY_FILE_OK) {
if (ret == COPY_FILE_READ_EOF
|| ret == COPY_FILE_READ_ERROR)
@ -52,47 +54,53 @@ void process_one_file_reg(struct file_header *hdr, char *name)
do_exit(errno);
}
close(fdout);
fix_times_and_perms(hdr, name);
fix_times_and_perms(untrusted_hdr, untrusted_name);
}
void process_one_file_dir(struct file_header *hdr, char *name)
void process_one_file_dir(struct file_header *untrusted_hdr,
char *untrusted_name)
{
// fix perms only when the directory is sent for the second time
// it allows to transfer r.x directory contents, as we create it rwx initially
if (!mkdir(name, 0700))
if (!mkdir(untrusted_name, 0700)) /* safe because of chroot */
return;
if (errno != EEXIST)
do_exit(errno);
fix_times_and_perms(hdr, name);
fix_times_and_perms(untrusted_hdr, untrusted_name);
}
void process_one_file_link(struct file_header *hdr, char *name)
void process_one_file_link(struct file_header *untrusted_hdr,
char *untrusted_name)
{
char content[MAX_PATH_LENGTH];
if (hdr->filelen > MAX_PATH_LENGTH - 1)
char untrusted_content[MAX_PATH_LENGTH];
unsigned int filelen;
if (untrusted_hdr->filelen > MAX_PATH_LENGTH - 1)
do_exit(ENAMETOOLONG);
if (!read_all(0, content, hdr->filelen))
filelen = untrusted_hdr->filelen; /* sanitized above */
if (!read_all(0, untrusted_content, filelen))
do_exit(LEGAL_EOF); // hopefully remote has produced error message
content[hdr->filelen] = 0;
if (symlink(content, name))
untrusted_content[filelen] = 0;
if (symlink(untrusted_content, untrusted_name)) /* safe because of chroot */
do_exit(errno);
}
void process_one_file(struct file_header *hdr)
void process_one_file(struct file_header *untrusted_hdr)
{
if (hdr->namelen > MAX_PATH_LENGTH - 1)
unsigned int namelen;
if (untrusted_hdr->namelen > MAX_PATH_LENGTH - 1)
do_exit(ENAMETOOLONG);
if (!read_all(0, namebuf, hdr->namelen))
namelen = untrusted_hdr->namelen; /* sanitized above */
if (!read_all(0, untrusted_namebuf, namelen))
do_exit(LEGAL_EOF); // hopefully remote has produced error message
namebuf[hdr->namelen] = 0;
if (S_ISREG(hdr->mode))
process_one_file_reg(hdr, namebuf);
else if (S_ISLNK(hdr->mode))
process_one_file_link(hdr, namebuf);
else if (S_ISDIR(hdr->mode))
process_one_file_dir(hdr, namebuf);
untrusted_namebuf[namelen] = 0;
if (S_ISREG(untrusted_hdr->mode))
process_one_file_reg(untrusted_hdr, untrusted_namebuf);
else if (S_ISLNK(untrusted_hdr->mode))
process_one_file_link(untrusted_hdr, untrusted_namebuf);
else if (S_ISDIR(untrusted_hdr->mode))
process_one_file_dir(untrusted_hdr, untrusted_namebuf);
else
do_exit(EINVAL);
}
@ -100,9 +108,9 @@ void process_one_file(struct file_header *hdr)
void do_unpack(int fd)
{
global_status_fd = fd;
struct file_header hdr;
while (read_all(0, &hdr, sizeof hdr))
process_one_file(&hdr);
struct file_header untrusted_hdr;
while (read_all(0, &untrusted_hdr, sizeof untrusted_hdr))
process_one_file(&untrusted_hdr);
if (errno)
do_exit(errno);
else

View File

@ -7,11 +7,11 @@ import os
class DomainState:
def __init__(self, id):
self.meminfo = None
self.memory_actual = None
self.mem_used = None
self.id = id
self.last_target = 0
self.meminfo = None #dictionary of memory info read from client
self.memory_actual = None #the current memory size
self.mem_used = None #used memory, computed based on meminfo
self.id = id #domain id
self.last_target = 0 #the last memset target
class SystemState:
def __init__(self):
@ -36,6 +36,7 @@ class SystemState:
# ret = host_metrics_record["memory_free"]
# return long(ret)
#refresh information on memory assigned to all domains
def refresh_memactual(self):
for domain in self.xc.domain_getinfo():
id = str(domain['domid'])
@ -67,6 +68,7 @@ class SystemState:
except XenAPI.Failure:
pass
#perform memory ballooning, across all domains, to add "memsize" to Xen free memory
def do_balloon(self, memsize):
MAX_TRIES = 20
niter = 0
@ -82,6 +84,7 @@ class SystemState:
if prev_memory_actual is not None:
for i in prev_memory_actual.keys():
if prev_memory_actual[i] == self.domdict[i].memory_actual:
#domain not responding to memset requests, remove it from donors
self.domdict[i].no_progress = True
print 'domain', i, 'stuck at', self.domdict[i].memory_actual
memset_reqs = qmemman_algo.balloon(memsize + self.XEN_FREE_MEM_LEFT - xenfree, self.domdict)
@ -96,10 +99,12 @@ class SystemState:
time.sleep(self.BALOON_DELAY)
niter = niter + 1
def refresh_meminfo(self, domid, val):
qmemman_algo.refresh_meminfo_for_domain(self.domdict[domid], val)
def refresh_meminfo(self, domid, untrusted_meminfo_key):
qmemman_algo.refresh_meminfo_for_domain(self.domdict[domid], untrusted_meminfo_key)
self.do_balance()
#is the computed balance request big enough ?
#so that we do not trash with small adjustments
def is_balance_req_significant(self, memset_reqs, xenfree):
total_memory_transfer = 0
MIN_TOTAL_MEMORY_TRANSFER = 150*1024*1024

View File

@ -1,74 +1,90 @@
import string
def parse_meminfo(meminfo):
dict = {}
l1 = string.split(meminfo,"\n")
for i in l1:
l2 = string.split(i)
if len(l2) >= 2:
dict[string.rstrip(l2[0], ":")] = l2[1]
#untrusted meminfo size is taken from xenstore key, thus its size is limited
#so splits do not require excessive memory
def parse_meminfo(untrusted_meminfo):
untrusted_dict = {}
#split meminfo contents into lines
untrusted_lines = string.split(untrusted_meminfo,"\n")
for untrusted_lines_iterator in untrusted_lines:
#split a single meminfo line into words
untrusted_words = string.split(untrusted_lines_iterator)
if len(untrusted_words) >= 2:
untrusted_dict[string.rstrip(untrusted_words[0], ":")] = untrusted_words[1]
return untrusted_dict
def is_meminfo_suspicious(domain, untrusted_meminfo):
ret = False
#check whether the required keys exist and are not negative
try:
for i in ('MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapTotal', 'SwapFree'):
val = int(dict[i])*1024
val = int(untrusted_meminfo[i])*1024
if (val < 0):
return None
dict[i] = val
ret = True
untrusted_meminfo[i] = val
except:
return None
if dict['SwapTotal'] < dict['SwapFree']:
return None
return dict
def is_suspicious(dom):
ret = False
if dom.meminfo['SwapTotal'] < dom.meminfo['SwapFree']:
ret = True
if dom.meminfo['MemTotal'] < dom.meminfo['MemFree'] + dom.meminfo['Cached'] + dom.meminfo['Buffers']:
if not ret and untrusted_meminfo['SwapTotal'] < untrusted_meminfo['SwapFree']:
ret = True
if not ret and untrusted_meminfo['MemTotal'] < untrusted_meminfo['MemFree'] + untrusted_meminfo['Cached'] + untrusted_meminfo['Buffers']:
ret = True
#we could also impose some limits on all the above values
#but it has little purpose - all the domain can gain by passing e.g.
#very large SwapTotal is that it will be assigned all free Xen memory
#it can be achieved with legal values, too, and it will not allow to
#starve existing domains, by design
if ret:
print 'suspicious meminfo for domain', dom.id, 'mem actual', dom.memory_actual, dom.meminfo
print 'suspicious meminfo for domain', domain.id, 'mem actual', domain.memory_actual, untrusted_meminfo
return ret
def refresh_meminfo_for_domain(dom, xenstore_key):
meminfo = parse_meminfo(xenstore_key)
dom.meminfo = meminfo
if meminfo is None:
#called when a domain updates its 'meminfo' xenstore key
def refresh_meminfo_for_domain(domain, untrusted_xenstore_key):
untrusted_meminfo = parse_meminfo(untrusted_xenstore_key)
if untrusted_meminfo is None:
domain.meminfo = None
return
if is_suspicious(dom):
dom.meminfo = None
dom.mem_used = None
#sanitize start
if is_meminfo_suspicious(domain, untrusted_meminfo):
#sanitize end
domain.meminfo = None
domain.mem_used = None
else:
dom.mem_used = dom.meminfo['MemTotal'] - dom.meminfo['MemFree'] - dom.meminfo['Cached'] - dom.meminfo['Buffers'] + dom.meminfo['SwapTotal'] - dom.meminfo['SwapFree']
#sanitized, can assign
domain.meminfo = untrusted_meminfo
domain.mem_used = domain.meminfo['MemTotal'] - domain.meminfo['MemFree'] - domain.meminfo['Cached'] - domain.meminfo['Buffers'] + domain.meminfo['SwapTotal'] - domain.meminfo['SwapFree']
def prefmem(dom):
def prefmem(domain):
CACHE_FACTOR = 1.3
#dom0 is special, as it must have large cache, for vbds. Thus, give it a special boost
if dom.id == '0':
return dom.mem_used*CACHE_FACTOR + 350*1024*1024
return dom.mem_used*CACHE_FACTOR
if domain.id == '0':
return domain.mem_used*CACHE_FACTOR + 350*1024*1024
return domain.mem_used*CACHE_FACTOR
def memneeded(dom):
def memory_needed(domain):
#do not change
#in balance(), "distribute totalsum proportionally to mempref" relies on this exact formula
ret = prefmem(dom) - dom.memory_actual
#in balance(), "distribute total_available_memory proportionally to mempref" relies on this exact formula
ret = prefmem(domain) - domain.memory_actual
return ret
def balloon(memsize, domdict):
#prepare list of (domain, memory_target) pairs that need to be passed
#to "xm memset" equivalent in order to obtain "memsize" of memory
#return empty list when the request cannot be satisfied
def balloon(memsize, domain_dictionary):
REQ_SAFETY_NET_FACTOR = 1.05
donors = list()
request = list()
available = 0
for i in domdict.keys():
if domdict[i].meminfo is None:
for i in domain_dictionary.keys():
if domain_dictionary[i].meminfo is None:
continue
if domdict[i].no_progress:
if domain_dictionary[i].no_progress:
continue
need = memneeded(domdict[i])
need = memory_needed(domain_dictionary[i])
if need < 0:
print 'balloon: dom' , i, 'has actual memory', domdict[i].memory_actual
print 'balloon: dom' , i, 'has actual memory', domain_dictionary[i].memory_actual
donors.append((i,-need))
available-=need
print 'req=', memsize, 'avail=', available, 'donors', donors
@ -79,78 +95,92 @@ def balloon(memsize, domdict):
id, mem = donors_iter
memborrowed = mem*scale*REQ_SAFETY_NET_FACTOR
print 'borrow' , memborrowed, 'from', id
memtarget = int(domdict[id].memory_actual - memborrowed)
memtarget = int(domain_dictionary[id].memory_actual - memborrowed)
request.append((id, memtarget))
return request
# REQ_SAFETY_NET_FACTOR is a bit greater that 1. So that if the domain yields a bit less than requested, due
# to e.g. rounding errors, we will not get stuck. The surplus will return to the VM during "balance" call.
#redistribute positive "totalsum" of memory between domains, proportionally to prefmem
def balance_when_enough_memory(domdict, xenfree, total_mem_pref, totalsum):
#redistribute positive "total_available_memory" of memory between domains, proportionally to prefmem
def balance_when_enough_memory(domain_dictionary, xen_free_memory, total_mem_pref, total_available_memory):
donors_rq = list()
acceptors_rq = list()
for i in domdict.keys():
if domdict[i].meminfo is None:
for i in domain_dictionary.keys():
if domain_dictionary[i].meminfo is None:
continue
#distribute totalsum proportionally to mempref
scale = 1.0*prefmem(domdict[i])/total_mem_pref
target_nonint = prefmem(domdict[i]) + scale*totalsum
#distribute total_available_memory proportionally to mempref
scale = 1.0*prefmem(domain_dictionary[i])/total_mem_pref
target_nonint = prefmem(domain_dictionary[i]) + scale*total_available_memory
#prevent rounding errors
target = int(0.999*target_nonint)
if (target < domdict[i].memory_actual):
if (target < domain_dictionary[i].memory_actual):
donors_rq.append((i, target))
else:
acceptors_rq.append((i, target))
# print 'balance(enough): xenfree=', xenfree, 'requests:', donors_rq + acceptors_rq
# print 'balance(enough): xen_free_memory=', xen_free_memory, 'requests:', donors_rq + acceptors_rq
return donors_rq + acceptors_rq
#when not enough mem to make everyone be above prefmem, make donors be at prefmem, and
#redistribute anything left between acceptors
def balance_when_low_on_memory(domdict, xenfree, total_mem_pref_acceptors, donors, acceptors):
def balance_when_low_on_memory(domain_dictionary, xen_free_memory, total_mem_pref_acceptors, donors, acceptors):
donors_rq = list()
acceptors_rq = list()
squeezed_mem = xenfree
squeezed_mem = xen_free_memory
for i in donors:
avail = -memneeded(domdict[i])
avail = -memory_needed(domain_dictionary[i])
if avail < 10*1024*1024:
#probably we have already tried making it exactly at prefmem, give up
continue
squeezed_mem -= avail
donors_rq.append((i, prefmem(domdict[i])))
donors_rq.append((i, prefmem(domain_dictionary[i])))
#the below can happen if initially xen free memory is below 50M
if squeezed_mem < 0:
return donors_rq
for i in acceptors:
scale = 1.0*prefmem(domdict[i])/total_mem_pref_acceptors
target_nonint = domdict[i].memory_actual + scale*squeezed_mem
scale = 1.0*prefmem(domain_dictionary[i])/total_mem_pref_acceptors
target_nonint = domain_dictionary[i].memory_actual + scale*squeezed_mem
acceptors_rq.append((i, int(target_nonint)))
# print 'balance(low): xenfree=', xenfree, 'requests:', donors_rq + acceptors_rq
# print 'balance(low): xen_free_memory=', xen_free_memory, 'requests:', donors_rq + acceptors_rq
return donors_rq + acceptors_rq
def balance(xenfree, domdict):
total_memneeded = 0
#redistribute memory across domains
#called when one of domains update its 'meminfo' xenstore key
#return the list of (domain, memory_target) pairs to be passed to
#"xm memset" equivalent
def balance(xen_free_memory, domain_dictionary):
#sum of all memory requirements - in other words, the difference between
#memory required to be added to domains (acceptors) to make them be at their
#preferred memory, and memory that can be taken from domains (donors) that
#can provide memory. So, it can be negative when plenty of memory.
total_memory_needed = 0
#sum of memory preferences of all domains
total_mem_pref = 0
#sum of memory preferences of all domains that require more memory
total_mem_pref_acceptors = 0
donors = list()
acceptors = list()
donors = list() # domains that can yield memory
acceptors = list() # domains that require more memory
#pass 1: compute the above "total" values
for i in domdict.keys():
if domdict[i].meminfo is None:
for i in domain_dictionary.keys():
if domain_dictionary[i].meminfo is None:
continue
need = memneeded(domdict[i])
# print 'domain' , i, 'act/pref', domdict[i].memory_actual, prefmem(domdict[i]), 'need=', need
need = memory_needed(domain_dictionary[i])
# print 'domain' , i, 'act/pref', domain_dictionary[i].memory_actual, prefmem(domain_dictionary[i]), 'need=', need
if need < 0:
donors.append(i)
else:
acceptors.append(i)
total_mem_pref_acceptors += prefmem(domdict[i])
total_memneeded += need
total_mem_pref += prefmem(domdict[i])
total_mem_pref_acceptors += prefmem(domain_dictionary[i])
total_memory_needed += need
total_mem_pref += prefmem(domain_dictionary[i])
totalsum = xenfree - total_memneeded
if totalsum > 0:
return balance_when_enough_memory(domdict, xenfree, total_mem_pref, totalsum)
total_available_memory = xen_free_memory - total_memory_needed
if total_available_memory > 0:
return balance_when_enough_memory(domain_dictionary, xen_free_memory, total_mem_pref, total_available_memory)
else:
return balance_when_low_on_memory(domdict, xenfree, total_mem_pref_acceptors, donors, acceptors)
return balance_when_low_on_memory(domain_dictionary, xen_free_memory, total_mem_pref_acceptors, donors, acceptors)

View File

@ -17,7 +17,7 @@ def only_in_first_list(l1, l2):
ret.append(i)
return ret
def get_req_node(domain_id):
def get_domain_meminfo_key(domain_id):
return '/local/domain/'+domain_id+'/memory/meminfo'
@ -29,31 +29,33 @@ class WatchType:
class XS_Watcher:
def __init__(self):
self.handle = xen.lowlevel.xs.xs()
self.handle.watch('/vm', WatchType(XS_Watcher.dom_list_change, None))
self.handle.watch('/vm', WatchType(XS_Watcher.domain_list_changed, None))
self.watch_token_dict = {}
def dom_list_change(self, param):
def domain_list_changed(self, param):
curr = self.handle.ls('', '/local/domain')
if curr == None:
return
global_lock.acquire()
for i in only_in_first_list(curr, self.watch_token_dict.keys()):
watch = WatchType(XS_Watcher.request, i)
#new domain has been created
watch = WatchType(XS_Watcher.meminfo_changed, i)
self.watch_token_dict[i] = watch
self.handle.watch(get_req_node(i), watch)
self.handle.watch(get_domain_meminfo_key(i), watch)
system_state.add_domain(i)
for i in only_in_first_list(self.watch_token_dict.keys(), curr):
self.handle.unwatch(get_req_node(i), self.watch_token_dict[i])
#domain destroyed
self.handle.unwatch(get_domain_meminfo_key(i), self.watch_token_dict[i])
self.watch_token_dict.pop(i)
system_state.del_domain(i)
global_lock.release()
def request(self, domain_id):
ret = self.handle.read('', get_req_node(domain_id))
if ret == None or ret == '':
def meminfo_changed(self, domain_id):
untrusted_meminfo_key = self.handle.read('', get_domain_meminfo_key(domain_id))
if untrusted_meminfo_key == None or untrusted_meminfo_key == '':
return
global_lock.acquire()
system_state.refresh_meminfo(domain_id, ret)
system_state.refresh_meminfo(domain_id, untrusted_meminfo_key)
global_lock.release()
def watch_loop(self):

View File

@ -61,6 +61,12 @@ void buffer_free(struct buffer *b)
buffer_init(b);
}
/*
The following two functions can be made much more efficient.
Yet the profiling output show they are not significant CPU hogs, so
we keep them so simple to make them obviously correct.
*/
void buffer_append(struct buffer *b, char *data, int len)
{
int newsize = len + b->buflen;

View File

@ -41,8 +41,8 @@ enum {
WRITE_STDIN_ERROR
};
int flush_client_data(int fd, int clid, struct buffer *buffer);
int write_stdin(int fd, int clid, char *data, int len,
int flush_client_data(int fd, int client_id, struct buffer *buffer);
int write_stdin(int fd, int client_id, char *data, int len,
struct buffer *buffer);
void set_nonblock(int fd);
int fork_and_flush_stdin(int fd, struct buffer *buffer);

View File

@ -56,7 +56,7 @@ enum {
struct server_header {
unsigned int type;
unsigned int clid;
unsigned int client_id;
unsigned int len;
};

View File

@ -42,7 +42,7 @@ enum fdtype {
};
struct _process_fd {
int clid;
int client_id;
int type;
int is_blocked;
};
@ -122,7 +122,7 @@ void do_exec(char *cmd)
exit(1);
}
void handle_just_exec(int clid, int len)
void handle_just_exec(int client_id, int len)
{
char buf[len];
int fdn, pid;
@ -143,7 +143,7 @@ void handle_just_exec(int clid, int len)
fprintf(stderr, "executed (nowait) %s pid %d\n", buf, pid);
}
void handle_exec(int clid, int len)
void handle_exec(int client_id, int len)
{
char buf[len];
int pid, stdin_fd, stdout_fd, stderr_fd;
@ -152,10 +152,10 @@ void handle_exec(int clid, int len)
do_fork_exec(buf, &pid, &stdin_fd, &stdout_fd, &stderr_fd);
process_fd[stdout_fd].clid = clid;
process_fd[stdout_fd].client_id = client_id;
process_fd[stdout_fd].type = FDTYPE_STDOUT;
process_fd[stdout_fd].is_blocked = 0;
process_fd[stderr_fd].clid = clid;
process_fd[stderr_fd].client_id = client_id;
process_fd[stderr_fd].type = FDTYPE_STDERR;
process_fd[stderr_fd].is_blocked = 0;
@ -166,13 +166,13 @@ void handle_exec(int clid, int len)
set_nonblock(stdin_fd);
client_info[clid].stdin_fd = stdin_fd;
client_info[clid].stdout_fd = stdout_fd;
client_info[clid].stderr_fd = stderr_fd;
client_info[clid].pid = pid;
client_info[clid].is_blocked = 0;
client_info[clid].is_close_after_flush_needed = 0;
buffer_init(&client_info[clid].buffer);
client_info[client_id].stdin_fd = stdin_fd;
client_info[client_id].stdout_fd = stdout_fd;
client_info[client_id].stderr_fd = stderr_fd;
client_info[client_id].pid = pid;
client_info[client_id].is_blocked = 0;
client_info[client_id].is_close_after_flush_needed = 0;
buffer_init(&client_info[client_id].buffer);
fprintf(stderr, "executed %s pid %d\n", buf, pid);
@ -187,79 +187,81 @@ void update_max_process_fd()
max_process_fd = i;
}
void send_exit_code(int clid, int status)
void send_exit_code(int client_id, int status)
{
struct server_header s_hdr;
s_hdr.type = MSG_AGENT_TO_SERVER_EXIT_CODE;
s_hdr.clid = clid;
s_hdr.client_id = client_id;
s_hdr.len = sizeof status;
write_all_vchan_ext(&s_hdr, sizeof s_hdr);
write_all_vchan_ext(&status, sizeof(status));
fprintf(stderr, "send exit code for clid %d pid %d\n", clid,
client_info[clid].pid);
fprintf(stderr, "send exit code for client_id %d pid %d\n",
client_id, client_info[client_id].pid);
}
// erase process data structures, possibly forced by remote
void remove_process(int clid, int status)
void remove_process(int client_id, int status)
{
int i;
if (!client_info[clid].pid)
if (!client_info[client_id].pid)
return;
fork_and_flush_stdin(client_info[clid].stdin_fd, &client_info[clid].buffer);
fork_and_flush_stdin(client_info[client_id].stdin_fd,
&client_info[client_id].buffer);
#if 0
// let's let it die by itself, possibly after it has received buffered stdin
kill(client_info[clid].pid, SIGKILL);
kill(client_info[client_id].pid, SIGKILL);
#endif
if (status != -1)
send_exit_code(clid, status);
send_exit_code(client_id, status);
close(client_info[clid].stdin_fd);
client_info[clid].pid = 0;
client_info[clid].stdin_fd = -1;
client_info[clid].is_blocked = 0;
buffer_free(&client_info[clid].buffer);
close(client_info[client_id].stdin_fd);
client_info[client_id].pid = 0;
client_info[client_id].stdin_fd = -1;
client_info[client_id].is_blocked = 0;
buffer_free(&client_info[client_id].buffer);
for (i = 0; i <= max_process_fd; i++)
if (process_fd[i].type != FDTYPE_INVALID
&& process_fd[i].clid == clid) {
&& process_fd[i].client_id == client_id) {
process_fd[i].type = FDTYPE_INVALID;
process_fd[i].clid = -1;
process_fd[i].client_id = -1;
process_fd[i].is_blocked = 0;
close(i);
}
update_max_process_fd();
}
void handle_input(int clid, int len)
void handle_input(int client_id, int len)
{
char buf[len];
read_all_vchan_ext(buf, len);
if (!client_info[clid].pid)
if (!client_info[client_id].pid)
return;
if (len == 0) {
if (client_info[clid].is_blocked)
client_info[clid].is_close_after_flush_needed = 1;
if (client_info[client_id].is_blocked)
client_info[client_id].
is_close_after_flush_needed = 1;
else {
close(client_info[clid].stdin_fd);
client_info[clid].stdin_fd = -1;
close(client_info[client_id].stdin_fd);
client_info[client_id].stdin_fd = -1;
}
return;
}
switch (write_stdin
(client_info[clid].stdin_fd, clid, buf, len,
&client_info[clid].buffer)) {
(client_info[client_id].stdin_fd, client_id, buf, len,
&client_info[client_id].buffer)) {
case WRITE_STDIN_OK:
break;
case WRITE_STDIN_BUFFERED:
client_info[clid].is_blocked = 1;
client_info[client_id].is_blocked = 1;
break;
case WRITE_STDIN_ERROR:
remove_process(clid, 128);
remove_process(client_id, 128);
break;
default:
fprintf(stderr, "unknown write_stdin?\n");
@ -268,10 +270,10 @@ void handle_input(int clid, int len)
}
void set_blocked_outerr(int clid, int val)
void set_blocked_outerr(int client_id, int val)
{
process_fd[client_info[clid].stdout_fd].is_blocked = val;
process_fd[client_info[clid].stderr_fd].is_blocked = val;
process_fd[client_info[client_id].stdout_fd].is_blocked = val;
process_fd[client_info[client_id].stderr_fd].is_blocked = val;
}
void handle_server_data()
@ -279,27 +281,27 @@ void handle_server_data()
struct server_header s_hdr;
read_all_vchan_ext(&s_hdr, sizeof s_hdr);
// fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.clid,
// fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.client_id,
// s_hdr.len);
switch (s_hdr.type) {
case MSG_XON:
set_blocked_outerr(s_hdr.clid, 0);
set_blocked_outerr(s_hdr.client_id, 0);
break;
case MSG_XOFF:
set_blocked_outerr(s_hdr.clid, 1);
set_blocked_outerr(s_hdr.client_id, 1);
break;
case MSG_SERVER_TO_AGENT_EXEC_CMDLINE:
handle_exec(s_hdr.clid, s_hdr.len);
handle_exec(s_hdr.client_id, s_hdr.len);
break;
case MSG_SERVER_TO_AGENT_JUST_EXEC:
handle_just_exec(s_hdr.clid, s_hdr.len);
handle_just_exec(s_hdr.client_id, s_hdr.len);
break;
case MSG_SERVER_TO_AGENT_INPUT:
handle_input(s_hdr.clid, s_hdr.len);
handle_input(s_hdr.client_id, s_hdr.len);
break;
case MSG_SERVER_TO_AGENT_CLIENT_END:
remove_process(s_hdr.clid, -1);
remove_process(s_hdr.client_id, -1);
break;
default:
fprintf(stderr, "msg type from daemon is %d ?\n",
@ -320,15 +322,15 @@ void handle_process_data(int fd)
return;
ret = read(fd, buf, len - sizeof s_hdr);
s_hdr.clid = process_fd[fd].clid;
s_hdr.client_id = process_fd[fd].client_id;
if (process_fd[fd].type == FDTYPE_STDOUT)
s_hdr.type = MSG_AGENT_TO_SERVER_STDOUT;
else if (process_fd[fd].type == FDTYPE_STDERR)
s_hdr.type = MSG_AGENT_TO_SERVER_STDERR;
else {
fprintf(stderr, "fd=%d, clid=%d, type=%d ?\n", fd,
process_fd[fd].clid, process_fd[fd].type);
fprintf(stderr, "fd=%d, client_id=%d, type=%d ?\n", fd,
process_fd[fd].client_id, process_fd[fd].type);
exit(1);
}
s_hdr.len = ret;
@ -338,13 +340,13 @@ void handle_process_data(int fd)
}
if (ret == 0) {
process_fd[fd].type = FDTYPE_INVALID;
process_fd[fd].clid = -1;
process_fd[fd].client_id = -1;
process_fd[fd].is_blocked = 0;
close(fd);
update_max_process_fd();
}
if (ret < 0)
remove_process(process_fd[fd].clid, 127);
remove_process(process_fd[fd].client_id, 127);
}
volatile int child_exited;
@ -375,7 +377,7 @@ void handle_process_data_all(fd_set * select_fds)
}
void flush_out_err(int clid)
void flush_out_err(int client_id)
{
fd_set select_set;
int fd_max = -1;
@ -387,7 +389,7 @@ void flush_out_err(int clid)
for (i = 0; i <= max_process_fd; i++) {
if (process_fd[i].type != FDTYPE_INVALID
&& !process_fd[i].is_blocked
&& process_fd[i].clid == clid) {
&& process_fd[i].client_id == client_id) {
FD_SET(i, &select_set);
fd_max = i;
}
@ -411,13 +413,13 @@ void reap_children()
{
int status;
int pid;
int clid;
int client_id;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
clid = find_info(pid);
if (clid < 0)
client_id = find_info(pid);
if (client_id < 0)
continue;
flush_out_err(clid);
remove_process(clid, status);
flush_out_err(client_id);
remove_process(client_id, status);
}
child_exited = 0;
}
@ -450,10 +452,11 @@ int fill_fds_for_select(fd_set * rdset, fd_set * wrset)
return max;
}
void flush_client_data_agent(int clid)
void flush_client_data_agent(int client_id)
{
struct _client_info *info = &client_info[clid];
switch (flush_client_data(info->stdin_fd, clid, &info->buffer)) {
struct _client_info *info = &client_info[client_id];
switch (flush_client_data
(info->stdin_fd, client_id, &info->buffer)) {
case WRITE_STDIN_OK:
info->is_blocked = 0;
if (info->is_close_after_flush_needed) {
@ -463,7 +466,7 @@ void flush_client_data_agent(int clid)
}
break;
case WRITE_STDIN_ERROR:
remove_process(clid, 128);
remove_process(client_id, 128);
break;
case WRITE_STDIN_BUFFERED:
break;
@ -479,15 +482,16 @@ void handle_trigger_io()
char buf[5];
int ret;
s_hdr.clid = 0;
s_hdr.client_id = 0;
s_hdr.len = 0;
if ((ret = read(trigger_fd, buf, 4)) == 4) {
buf[4] = 0;
if (!strcmp(buf, "FCPR"))
s_hdr.clid = QREXEC_EXECUTE_FILE_COPY;
s_hdr.client_id = QREXEC_EXECUTE_FILE_COPY;
else if (!strcmp(buf, "DVMR"))
s_hdr.clid = QREXEC_EXECUTE_FILE_COPY_FOR_DISPVM;
if (s_hdr.clid) {
s_hdr.client_id =
QREXEC_EXECUTE_FILE_COPY_FOR_DISPVM;
if (s_hdr.client_id) {
s_hdr.type = MSG_AGENT_TO_SERVER_TRIGGER_EXEC;
write_all_vchan_ext(&s_hdr, sizeof s_hdr);
}

View File

@ -34,24 +34,30 @@
#include "glue.h"
enum client_flags {
CLIENT_INVALID = 0,
CLIENT_CMDLINE = 1,
CLIENT_DATA = 2,
CLIENT_DONT_READ = 4,
CLIENT_OUTQ_FULL = 8
CLIENT_INVALID = 0, // table slot not used
CLIENT_CMDLINE = 1, // waiting for cmdline from client
CLIENT_DATA = 2, // waiting for data from client
CLIENT_DONT_READ = 4, // don't read from the client, the other side pipe is full, or EOF
CLIENT_OUTQ_FULL = 8 // don't write to client, its stdin pipe is full
};
struct _client {
int state;
struct buffer buffer;
int state; // combination of above enum client_flags
struct buffer buffer; // buffered data to client, if any
};
struct _client clients[MAX_FDS];
/*
The "clients" array is indexed by client's fd.
Thus its size must be equal MAX_FDS; defining MAX_CLIENTS for clarity.
*/
int max_client_fd = -1;
int server_fd;
#define MAX_CLIENTS MAX_FDS
struct _client clients[MAX_CLIENTS]; // data on all qrexec_client connections
void handle_usr1(int x)
int max_client_fd = -1; // current max fd of all clients; so that we need not to scan all the "clients" table
int qrexec_daemon_unix_socket_fd; // /var/run/qubes/qrexec.xid descriptor
void sigusr1_handler(int x)
{
fprintf(stderr, "connected\n");
exit(0);
@ -59,18 +65,19 @@ void handle_usr1(int x)
void sigchld_handler(int x);
char *remote_domain_name;
char *remote_domain_name; // guess what
/* do the preparatory tasks, needed before entering the main event loop */
void init(int xid)
{
char dbg_log[256];
char qrexec_error_log_name[256];
int logfd;
if (xid <= 0) {
fprintf(stderr, "domain id=0?\n");
exit(1);
}
signal(SIGUSR1, handle_usr1);
signal(SIGUSR1, sigusr1_handler);
switch (fork()) {
case -1:
perror("fork");
@ -86,10 +93,17 @@ void init(int xid)
exit(0);
}
close(0);
snprintf(dbg_log, sizeof(dbg_log),
snprintf(qrexec_error_log_name, sizeof(qrexec_error_log_name),
"/var/log/qubes/qrexec.%d.log", xid);
umask(0007);
logfd = open(dbg_log, O_WRONLY | O_CREAT | O_TRUNC, 0640);
umask(0007); // make the log readable by the "qubes" group
logfd =
open(qrexec_error_log_name, O_WRONLY | O_CREAT | O_TRUNC,
0640);
if (logfd < 0) {
perror("open");
exit(1);
}
dup2(logfd, 1);
dup2(logfd, 2);
@ -104,18 +118,19 @@ void init(int xid)
setuid(getuid());
/* When running as root, make the socket accessible; perms on /var/run/qubes still apply */
umask(0);
server_fd = get_server_socket(xid, remote_domain_name);
qrexec_daemon_unix_socket_fd =
get_server_socket(xid, remote_domain_name);
umask(0077);
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, sigchld_handler);
signal(SIGUSR1, SIG_DFL);
kill(getppid(), SIGUSR1);
kill(getppid(), SIGUSR1); // let the parent know we are ready
}
void handle_new_client()
{
int fd = do_accept(server_fd);
if (fd >= MAX_FDS) {
int fd = do_accept(qrexec_daemon_unix_socket_fd);
if (fd >= MAX_CLIENTS) {
fprintf(stderr, "too many clients ?\n");
exit(1);
}
@ -125,9 +140,13 @@ void handle_new_client()
max_client_fd = fd;
}
/*
we need to track the number of children, so that excessive QREXEC_EXECUTE_*
commands do not fork-bomb dom0
*/
int children_count;
void flush_client(int fd)
void terminate_client_and_flush_data(int fd)
{
int i;
struct server_header s_hdr;
@ -143,29 +162,31 @@ void flush_client(int fd)
max_client_fd = i;
}
s_hdr.type = MSG_SERVER_TO_AGENT_CLIENT_END;
s_hdr.clid = fd;
s_hdr.client_id = fd;
s_hdr.len = 0;
write_all_vchan_ext(&s_hdr, sizeof(s_hdr));
}
void pass_to_agent(int fd, struct server_header *s_hdr)
void get_cmdline_body_from_client_and_pass_to_agent(int fd,
struct server_header
*s_hdr)
{
int len = s_hdr->len;
char buf[len];
if (!read_all(fd, buf, len)) {
flush_client(fd);
terminate_client_and_flush_data(fd);
return;
}
write_all_vchan_ext(s_hdr, sizeof(*s_hdr));
write_all_vchan_ext(buf, len);
}
void handle_client_cmdline(int fd)
void handle_cmdline_message_from_client(int fd)
{
struct client_header hdr;
struct server_header s_hdr;
if (!read_all(fd, &hdr, sizeof hdr)) {
flush_client(fd);
terminate_client_and_flush_data(fd);
return;
}
switch (hdr.type) {
@ -176,59 +197,69 @@ void handle_client_cmdline(int fd)
s_hdr.type = MSG_SERVER_TO_AGENT_JUST_EXEC;
break;
default:
flush_client(fd);
terminate_client_and_flush_data(fd);
return;
}
s_hdr.clid = fd;
s_hdr.client_id = fd;
s_hdr.len = hdr.len;
pass_to_agent(fd, &s_hdr);
get_cmdline_body_from_client_and_pass_to_agent(fd, &s_hdr);
clients[fd].state = CLIENT_DATA;
set_nonblock(fd);
set_nonblock(fd); // so that we can detect full queue without blocking
if (hdr.type == MSG_CLIENT_TO_SERVER_JUST_EXEC)
flush_client(fd);
terminate_client_and_flush_data(fd);
}
void handle_client_data(int fd)
/* handle data received from one of qrexec_client processes */
void handle_message_from_client(int fd)
{
struct server_header s_hdr;
char buf[MAX_DATA_CHUNK];
int len, ret;
if (clients[fd].state == CLIENT_CMDLINE) {
handle_client_cmdline(fd);
handle_cmdline_message_from_client(fd);
return;
}
// We have already passed cmdline from client.
// Now the client passes us raw data from its stdin.
len = buffer_space_vchan_ext();
if (len <= sizeof s_hdr)
return;
/* Read at most the amount of data that we have room for in vchan */
ret = read(fd, buf, len - sizeof(s_hdr));
if (ret < 0) {
perror("read client");
flush_client(fd);
terminate_client_and_flush_data(fd);
return;
}
s_hdr.clid = fd;
s_hdr.client_id = fd;
s_hdr.len = ret;
s_hdr.type = MSG_SERVER_TO_AGENT_INPUT;
write_all_vchan_ext(&s_hdr, sizeof(s_hdr));
write_all_vchan_ext(buf, ret);
if (ret == 0)
if (ret == 0) // EOF - so don't select() on this client
clients[fd].state |= CLIENT_DONT_READ;
}
void flush_client_data_daemon(int clid)
/*
Called when there is buffered data for this client, and select() reports
that client's pipe is writable; so we should be able to flush some
buffered data.
*/
void write_buffered_data_to_client(int client_id)
{
switch (flush_client_data(clid, clid, &clients[clid].buffer)) {
case WRITE_STDIN_OK:
clients[clid].state &= ~CLIENT_OUTQ_FULL;
switch (flush_client_data
(client_id, client_id, &clients[client_id].buffer)) {
case WRITE_STDIN_OK: // no more buffered data
clients[client_id].state &= ~CLIENT_OUTQ_FULL;
break;
case WRITE_STDIN_ERROR:
flush_client(clid);
terminate_client_and_flush_data(client_id);
break;
case WRITE_STDIN_BUFFERED:
case WRITE_STDIN_BUFFERED: // no room for all data, don't clear CLIENT_OUTQ_FULL flag
break;
default:
fprintf(stderr, "unknown flush_client_data?\n");
@ -236,24 +267,31 @@ void flush_client_data_daemon(int clid)
}
}
void pass_to_client(int clid, struct client_header *hdr)
/*
The header (hdr argument) is already built. Just read the raw data from
the packet, and pass it along with the header to the client.
*/
void get_packet_data_from_agent_and_pass_to_client(int client_id,
struct client_header
*hdr)
{
int len = hdr->len;
char buf[sizeof(*hdr) + len];
/* make both the header and data be consecutive in the buffer */
*(struct client_header *) buf = *hdr;
read_all_vchan_ext(buf + sizeof(*hdr), len);
switch (write_stdin
(clid, clid, buf, len + sizeof(*hdr),
&clients[clid].buffer)) {
(client_id, client_id, buf, len + sizeof(*hdr),
&clients[client_id].buffer)) {
case WRITE_STDIN_OK:
break;
case WRITE_STDIN_BUFFERED:
clients[clid].state |= CLIENT_OUTQ_FULL;
case WRITE_STDIN_BUFFERED: // some data have been buffered
clients[client_id].state |= CLIENT_OUTQ_FULL;
break;
case WRITE_STDIN_ERROR:
flush_client(clid);
terminate_client_and_flush_data(client_id);
break;
default:
fprintf(stderr, "unknown write_stdin?\n");
@ -261,6 +299,12 @@ void pass_to_client(int clid, struct client_header *hdr)
}
}
/*
The signal handler executes asynchronously; therefore all it should do is
to set a flag "signal has arrived", and let the main even loop react to this
flag in appropriate moment.
*/
int child_exited;
void sigchld_handler(int x)
@ -269,6 +313,7 @@ void sigchld_handler(int x)
signal(SIGCHLD, sigchld_handler);
}
/* clean zombies, update children_count */
void reap_children()
{
int status;
@ -277,6 +322,7 @@ void reap_children()
child_exited = 0;
}
/* too many children - wait for one of them to terminate */
void wait_for_child()
{
int status;
@ -285,7 +331,7 @@ void wait_for_child()
}
#define MAX_CHILDREN 10
void check_children_count()
void check_children_count_and_wait_if_too_many()
{
if (children_count > MAX_CHILDREN) {
fprintf(stderr,
@ -296,12 +342,16 @@ void check_children_count()
}
}
void handle_trigger_exec(int req)
/*
Called when agent sends a message asking to execute a predefined command.
*/
void handle_execute_predefined_command(int req)
{
char *rcmd = NULL, *lcmd = NULL;
int i;
check_children_count();
check_children_count_and_wait_if_too_many();
switch (req) {
case QREXEC_EXECUTE_FILE_COPY:
rcmd = "directly:user:/usr/lib/qubes/qfile-agent";
@ -311,7 +361,7 @@ void handle_trigger_exec(int req)
rcmd = "directly:user:/usr/lib/qubes/qfile-agent-dvm";
lcmd = "/usr/lib/qubes/qfile-daemon-dvm";
break;
default:
default: /* cannot happen, already sanitized */
fprintf(stderr, "got trigger exec no %d\n", req);
exit(1);
}
@ -325,7 +375,7 @@ void handle_trigger_exec(int req)
children_count++;
return;
}
for (i = 3; i < 256; i++)
for (i = 3; i < MAX_FDS; i++)
close(i);
signal(SIGCHLD, SIG_DFL);
signal(SIGPIPE, SIG_DFL);
@ -335,31 +385,79 @@ void handle_trigger_exec(int req)
exit(1);
}
void handle_agent_data()
void check_client_id_in_range(unsigned int untrusted_client_id)
{
if (untrusted_client_id >= MAX_CLIENTS || untrusted_client_id < 0) {
fprintf(stderr, "from agent: client_id=%d\n",
untrusted_client_id);
exit(1);
}
}
void sanitize_message_from_agent(struct server_header *untrusted_header)
{
int untrusted_cmd;
switch (untrusted_header->type) {
case MSG_AGENT_TO_SERVER_TRIGGER_EXEC:
untrusted_cmd = untrusted_header->client_id;
if (untrusted_cmd != QREXEC_EXECUTE_FILE_COPY &&
untrusted_cmd != QREXEC_EXECUTE_FILE_COPY_FOR_DISPVM) {
fprintf(stderr,
"received MSG_AGENT_TO_SERVER_TRIGGER_EXEC cmd %d ?\n",
untrusted_cmd);
exit(1);
}
break;
case MSG_AGENT_TO_SERVER_STDOUT:
case MSG_AGENT_TO_SERVER_STDERR:
case MSG_AGENT_TO_SERVER_EXIT_CODE:
check_client_id_in_range(untrusted_header->client_id);
if (untrusted_header->len > MAX_DATA_CHUNK
|| untrusted_header->len < 0) {
fprintf(stderr, "agent feeded %d of data bytes?\n",
untrusted_header->len);
exit(1);
}
break;
case MSG_XOFF:
case MSG_XON:
check_client_id_in_range(untrusted_header->client_id);
break;
default:
fprintf(stderr, "unknown mesage type %d from agent\n",
untrusted_header->type);
exit(1);
}
}
void handle_message_from_agent()
{
struct client_header hdr;
struct server_header s_hdr;
read_all_vchan_ext(&s_hdr, sizeof s_hdr);
struct server_header s_hdr, untrusted_s_hdr;
// fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.clid,
read_all_vchan_ext(&untrusted_s_hdr, sizeof untrusted_s_hdr);
/* sanitize start */
sanitize_message_from_agent(&untrusted_s_hdr);
s_hdr = untrusted_s_hdr;
/* sanitize end */
// fprintf(stderr, "got %x %x %x\n", s_hdr.type, s_hdr.client_id,
// s_hdr.len);
if (s_hdr.type == MSG_AGENT_TO_SERVER_TRIGGER_EXEC) {
handle_trigger_exec(s_hdr.clid);
handle_execute_predefined_command(s_hdr.client_id);
return;
}
if (s_hdr.clid >= MAX_FDS || s_hdr.clid < 0) {
fprintf(stderr, "from agent: clid=%d\n", s_hdr.clid);
exit(1);
}
if (s_hdr.type == MSG_XOFF) {
clients[s_hdr.clid].state |= CLIENT_DONT_READ;
clients[s_hdr.client_id].state |= CLIENT_DONT_READ;
return;
}
if (s_hdr.type == MSG_XON) {
clients[s_hdr.clid].state &= ~CLIENT_DONT_READ;
clients[s_hdr.client_id].state &= ~CLIENT_DONT_READ;
return;
}
@ -373,54 +471,57 @@ void handle_agent_data()
case MSG_AGENT_TO_SERVER_EXIT_CODE:
hdr.type = MSG_SERVER_TO_CLIENT_EXIT_CODE;
break;
default:
default: /* cannot happen, already sanitized */
fprintf(stderr, "from agent: type=%d\n", s_hdr.type);
exit(1);
}
hdr.len = s_hdr.len;
if (hdr.len > MAX_DATA_CHUNK) {
fprintf(stderr, "agent feeded %d of data bytes?\n",
hdr.len);
exit(1);
}
if (clients[s_hdr.clid].state == CLIENT_INVALID) {
if (clients[s_hdr.client_id].state == CLIENT_INVALID) {
// benefit of doubt - maybe client exited earlier
// just eat the packet data and continue
char buf[MAX_DATA_CHUNK];
read_all_vchan_ext(buf, s_hdr.len);
return;
}
pass_to_client(s_hdr.clid, &hdr);
get_packet_data_from_agent_and_pass_to_client(s_hdr.client_id,
&hdr);
if (s_hdr.type == MSG_AGENT_TO_SERVER_EXIT_CODE)
flush_client(s_hdr.clid);
terminate_client_and_flush_data(s_hdr.client_id);
}
int fill_fds_for_select(fd_set * rdset, fd_set * wrset)
/*
Scan the "clients" table, add ones we want to read from (because the other
end has not send MSG_XOFF on them) to read_fdset, add ones we want to write
to (because its pipe is full) to write_fdset. Return the highest used file
descriptor number, needed for the first select() parameter.
*/
int fill_fdsets_for_select(fd_set * read_fdset, fd_set * write_fdset)
{
int i;
int max = -1;
FD_ZERO(rdset);
FD_ZERO(wrset);
FD_ZERO(read_fdset);
FD_ZERO(write_fdset);
for (i = 0; i <= max_client_fd; i++) {
if (clients[i].state != CLIENT_INVALID
&& !(clients[i].state & CLIENT_DONT_READ)) {
FD_SET(i, rdset);
FD_SET(i, read_fdset);
max = i;
}
if (clients[i].state != CLIENT_INVALID
&& clients[i].state & CLIENT_OUTQ_FULL) {
FD_SET(i, wrset);
FD_SET(i, write_fdset);
max = i;
}
}
FD_SET(server_fd, rdset);
if (server_fd > max)
max = server_fd;
FD_SET(qrexec_daemon_unix_socket_fd, read_fdset);
if (qrexec_daemon_unix_socket_fd > max)
max = qrexec_daemon_unix_socket_fd;
return max;
}
int main(int argc, char **argv)
{
fd_set rdset, wrset;
fd_set read_fdset, write_fdset;
int i;
int max;
@ -429,29 +530,36 @@ int main(int argc, char **argv)
exit(1);
}
init(atoi(argv[1]));
/*
The main event loop. Waits for one of the following events:
- message from client
- message from agent
- new client
- child exited
*/
for (;;) {
max = fill_fds_for_select(&rdset, &wrset);
max = fill_fdsets_for_select(&read_fdset, &write_fdset);
if (buffer_space_vchan_ext() <=
sizeof(struct server_header))
FD_ZERO(&rdset);
FD_ZERO(&read_fdset); // vchan full - don't read from clients
wait_for_vchan_or_argfd(max, &rdset, &wrset);
wait_for_vchan_or_argfd(max, &read_fdset, &write_fdset);
if (FD_ISSET(server_fd, &rdset))
if (FD_ISSET(qrexec_daemon_unix_socket_fd, &read_fdset))
handle_new_client();
while (read_ready_vchan_ext())
handle_agent_data();
handle_message_from_agent();
for (i = 0; i <= max_client_fd; i++)
if (clients[i].state != CLIENT_INVALID
&& FD_ISSET(i, &rdset))
handle_client_data(i);
&& FD_ISSET(i, &read_fdset))
handle_message_from_client(i);
for (i = 0; i <= max_client_fd; i++)
if (clients[i].state != CLIENT_INVALID
&& FD_ISSET(i, &wrset))
flush_client_data_daemon(i);
&& FD_ISSET(i, &write_fdset))
write_buffered_data_to_client(i);
if (child_exited)
reap_children();

View File

@ -29,7 +29,12 @@
#include "buffer.h"
#include "glue.h"
int flush_client_data(int fd, int clid, struct buffer *buffer)
/*
There is buffered data in "buffer" for client id "client_id", and select()
reports that "fd" is writable. Write as much as possible to fd, if all sent,
notify the peer that this client's pipe is no longer full.
*/
int flush_client_data(int fd, int client_id, struct buffer *buffer)
{
int ret;
int len;
@ -44,12 +49,15 @@ int flush_client_data(int fd, int clid, struct buffer *buffer)
} else
return WRITE_STDIN_BUFFERED;
}
buffer_remove(buffer, len);
// we previously called buffer_remove(buffer, len)
// it will be wrong if we change MAX_DATA_CHUNK to something large
// as pipes writes are atomic only to PIPE_MAX limit
buffer_remove(buffer, ret);
len = buffer_len(buffer);
if (!len) {
struct server_header s_hdr;
s_hdr.type = MSG_XON;
s_hdr.clid = clid;
s_hdr.client_id = client_id;
s_hdr.len = 0;
write_all_vchan_ext(&s_hdr, sizeof s_hdr);
return WRITE_STDIN_OK;
@ -58,7 +66,12 @@ int flush_client_data(int fd, int clid, struct buffer *buffer)
}
int write_stdin(int fd, int clid, char *data, int len,
/*
Write "len" bytes from "data" to "fd". If not all written, buffer the rest
to "buffer", and notify the peer that the client "client_id" pipe is full via
MSG_XOFF message.
*/
int write_stdin(int fd, int client_id, char *data, int len,
struct buffer *buffer)
{
int ret;
@ -84,7 +97,7 @@ int write_stdin(int fd, int clid, char *data, int len,
len - written);
s_hdr.type = MSG_XOFF;
s_hdr.clid = clid;
s_hdr.client_id = client_id;
s_hdr.len = 0;
write_all_vchan_ext(&s_hdr, sizeof s_hdr);
@ -108,6 +121,11 @@ void set_block(int fd)
fcntl(fd, F_SETFL, fl & ~O_NONBLOCK);
}
/*
Data feed process has exited, so we need to clear all control structures for
the client. However, if we have buffered data for the client (which is rare btw),
fire&forget a separate process to flush them.
*/
int fork_and_flush_stdin(int fd, struct buffer *buffer)
{
int i;