7c1dfe9266
This change will allow to use the same policy mechanism to control clipboard copy between domains.
194 lines
6.5 KiB
Python
Executable File
194 lines
6.5 KiB
Python
Executable File
#!/usr/bin/python
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import subprocess
|
|
import xen.lowlevel.xl
|
|
import qubes.guihelpers
|
|
from optparse import OptionParser
|
|
from qubes.qubes import QubesVmCollection
|
|
import fcntl
|
|
|
|
POLICY_FILE_DIR="/etc/qubes_rpc/policy"
|
|
QREXEC_CLIENT="/usr/lib/qubes/qrexec_client"
|
|
|
|
class UserChoice:
|
|
ALLOW=0
|
|
DENY=1
|
|
ALWAYS_ALLOW=2
|
|
|
|
def line_to_dict(line):
|
|
tokens=line.split()
|
|
if len(tokens) < 3:
|
|
return None
|
|
|
|
if tokens[0][0] == '#':
|
|
return None
|
|
|
|
dict={}
|
|
dict['source']=tokens[0]
|
|
dict['dest']=tokens[1]
|
|
|
|
dict['full-action']=tokens[2]
|
|
action_list=tokens[2].split(',')
|
|
dict['action']=action_list.pop(0)
|
|
|
|
for iter in action_list:
|
|
paramval=iter.split("=")
|
|
dict["action."+paramval[0]]=paramval[1]
|
|
|
|
return dict
|
|
|
|
|
|
def read_policy_file(exec_index):
|
|
policy_file=POLICY_FILE_DIR+"/"+exec_index
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
policy_list=list()
|
|
f = open(policy_file)
|
|
fcntl.flock(f, fcntl.LOCK_SH)
|
|
for iter in f.readlines():
|
|
dict = line_to_dict(iter)
|
|
if dict is not None:
|
|
policy_list.append(dict)
|
|
f.close()
|
|
return policy_list
|
|
|
|
def is_match(item, config_term):
|
|
return (item is not "dom0" and config_term == "$anyvm") or item == config_term
|
|
|
|
def get_default_policy():
|
|
dict={}
|
|
dict["action"]="deny"
|
|
return dict
|
|
|
|
|
|
def find_policy(policy, domain, target):
|
|
for iter in policy:
|
|
if not is_match(domain, iter["source"]):
|
|
continue
|
|
if not is_match(target, iter["dest"]):
|
|
continue
|
|
return iter
|
|
return get_default_policy()
|
|
|
|
def is_domain_running(target):
|
|
xl_ctx = xen.lowlevel.xl.ctx()
|
|
domains = xl_ctx.list_domains()
|
|
for dominfo in domains:
|
|
domname = xl_ctx.domid_to_name(dominfo.domid)
|
|
if domname == target:
|
|
return True
|
|
return False
|
|
|
|
def spawn_target_if_necessary(target):
|
|
if is_domain_running(target):
|
|
return
|
|
null=open("/dev/null", "r+")
|
|
subprocess.call(["qvm-run", "-a", "-q", target, "true"], stdin=null, stdout=null)
|
|
null.close()
|
|
|
|
def do_execute(domain, target, user, exec_index, process_ident):
|
|
if target == "dom0":
|
|
cmd="/usr/lib/qubes/qubes_rpc_multiplexer "+exec_index + " " + domain
|
|
elif target == "$dispvm":
|
|
cmd = "/usr/lib/qubes/qfile-daemon-dvm " + exec_index + " " + domain + " " +user
|
|
else:
|
|
# see the previous commit why "qvm-run -a" is broken and dangerous
|
|
# also, dangling "xl" would keep stderr open and may prevent closing connection
|
|
spawn_target_if_necessary(target)
|
|
cmd= QREXEC_CLIENT + " -d " + target + " '" + user
|
|
cmd+=":QUBESRPC "+ exec_index + " " + domain + "'"
|
|
os.execl(QREXEC_CLIENT, "qrexec_client", "-d", domain, "-l", cmd, "-c", process_ident)
|
|
|
|
def confirm_execution(domain, target, exec_index):
|
|
text = "Do you allow domain \"" +domain + "\" to execute " + exec_index
|
|
text+= " operation on the domain \"" + target +"\"?<br>"
|
|
text+= " \"Yes to All\" option will automatically allow this operation in the future."
|
|
return qubes.guihelpers.ask(text, yestoall=True)
|
|
|
|
def add_always_allow(domain, target, exec_index, options):
|
|
policy_file=POLICY_FILE_DIR+"/"+exec_index
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
f = open(policy_file, 'r+')
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
lines = []
|
|
for l in f.readlines():
|
|
lines.append(l)
|
|
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
|
|
f.seek(0)
|
|
f.write("".join(lines))
|
|
f.close()
|
|
|
|
def policy_editor(domain, target, exec_index):
|
|
text = "Policy editor not yet implemented. Please add a line in the form \""
|
|
text+= domain + " " + target + " action_to_take\""
|
|
text+= " to /etc/qubes_rpc/policy/" + exec_index +" file in dom0, then close this info."
|
|
subprocess.call(["/usr/bin/zenity", "--info", "--text", text])
|
|
|
|
def main():
|
|
usage = "usage: %prog [options] <src-domain> <target-domain> <service> <process-ident>"
|
|
parser = OptionParser (usage)
|
|
parser.add_option ("--assume-yes-for-ask", action="store_true", dest="assume_yes_for_ask", default=False,
|
|
help="Allow run of service without confirmation if policy say 'ask'")
|
|
parser.add_option ("--just-evaluate", action="store_true", dest="just_evaluate", default=False,
|
|
help="Do not run the service, only evaluate policy; retcode=0 means 'allow'")
|
|
|
|
(options, args) = parser.parse_args ()
|
|
domain=args[0]
|
|
target=args[1]
|
|
exec_index=args[2]
|
|
process_ident=args[3]
|
|
|
|
policy_list=read_policy_file(exec_index)
|
|
if policy_list==None:
|
|
policy_editor(domain, target, exec_index)
|
|
policy_list=read_policy_file(exec_index)
|
|
if policy_list==None:
|
|
policy_list=list()
|
|
|
|
policy_dict=find_policy(policy_list, domain, target)
|
|
|
|
if policy_dict["action"] == "ask" and options.assume_yes_for_ask:
|
|
policy_dict["action"] = "allow"
|
|
|
|
if policy_dict["action"] == "ask":
|
|
user_choice = confirm_execution(domain, target, exec_index)
|
|
if user_choice == UserChoice.ALWAYS_ALLOW:
|
|
add_always_allow(domain, target, exec_index, policy_dict["full-action"].lstrip('ask'))
|
|
policy_dict["action"] = "allow"
|
|
elif user_choice == UserChoice.ALLOW:
|
|
policy_dict["action"] = "allow"
|
|
else:
|
|
policy_dict["action"] = "deny"
|
|
|
|
if options.just_evaluate:
|
|
if policy_dict["action"] == "allow":
|
|
exit(0)
|
|
else:
|
|
exit(1)
|
|
|
|
if policy_dict["action"] == "allow":
|
|
if policy_dict.has_key("action.target"):
|
|
target=policy_dict["action.target"]
|
|
if policy_dict.has_key("action.user"):
|
|
user=policy_dict["action.user"]
|
|
else:
|
|
qvm_collection = QubesVmCollection()
|
|
qvm_collection.lock_db_for_reading()
|
|
qvm_collection.load()
|
|
qvm_collection.unlock_db()
|
|
vm = qvm_collection.get_vm_by_name(target)
|
|
if vm is None:
|
|
print >> sys.stderr, "Cannot find settings of VM '%s', assuming default user 'user'" % target
|
|
user = "user"
|
|
else:
|
|
user = vm.default_user
|
|
do_execute(domain, target, user, exec_index, process_ident)
|
|
|
|
print >> sys.stderr, "Rpc denied:", domain, target, exec_index
|
|
os.execl(QREXEC_CLIENT, "qrexec_client", "-d", domain, "-l", "/bin/false", "-c", process_ident)
|
|
|
|
main()
|