Remove QubesCowVm class
StandaloneVM isn't really CowVM; also most AppVM/CowVM features applies also to TemplateVM. So CowVM class is meaningless.
This commit is contained in:
parent
3a5cc0cc21
commit
1f5c03da3f
@ -177,6 +177,9 @@ class QubesVm(object):
|
||||
root_img = None,
|
||||
private_img = None,
|
||||
memory = default_memory,
|
||||
template_vm = None,
|
||||
firewall_conf = None,
|
||||
volatile_img = None,
|
||||
vcpus = None):
|
||||
|
||||
|
||||
@ -199,9 +202,6 @@ class QubesVm(object):
|
||||
self.uses_default_netvm = uses_default_netvm
|
||||
self.netvm_vm = netvm_vm
|
||||
|
||||
# Create template_vm property - used in AppVM, NetVM, ProxyVM
|
||||
self.template_vm = None
|
||||
|
||||
# We use it in remove from disk to avoid removing rpm files (for templates)
|
||||
self.installed_by_rpm = installed_by_rpm
|
||||
|
||||
@ -220,6 +220,11 @@ class QubesVm(object):
|
||||
self.private_img = dir_path + "/" + (
|
||||
private_img if private_img is not None else default_private_img)
|
||||
|
||||
if firewall_conf is None:
|
||||
self.firewall_conf = dir_path + "/" + default_firewall_conf_file
|
||||
else:
|
||||
self.firewall_conf = firewall_conf
|
||||
|
||||
self.updateable = updateable
|
||||
self.label = label if label is not None else QubesVmLabels["red"]
|
||||
if self.dir_path is not None:
|
||||
@ -232,6 +237,21 @@ class QubesVm(object):
|
||||
|
||||
self.memory = memory
|
||||
|
||||
if template_vm is not None:
|
||||
if updateable:
|
||||
print "ERROR: Template based VM cannot be updateable!"
|
||||
return false
|
||||
if not template_vm.is_template():
|
||||
print "ERROR: template_qid={0} doesn't point to a valid TemplateVM".\
|
||||
format(template_vm.qid)
|
||||
return False
|
||||
|
||||
template_vm.appvms[qid] = self
|
||||
else:
|
||||
assert self.root_img is not None, "Missing root_img for standalone VM!"
|
||||
|
||||
self.template_vm = template_vm
|
||||
|
||||
# By default allow use all VCPUs
|
||||
if vcpus is None:
|
||||
qubes_host = QubesHost()
|
||||
@ -534,6 +554,220 @@ class QubesVm(object):
|
||||
"/local/domain/{0}/qubes_secondary_dns".format(xid),
|
||||
self.netvm_vm.secondary_dns])
|
||||
|
||||
def create_config_file(self):
|
||||
assert self.template_vm is not None
|
||||
|
||||
conf_template = None
|
||||
if self.type == "NetVM":
|
||||
conf_template = open (self.template_vm.netvms_conf_file, "r")
|
||||
elif self.updateable:
|
||||
conf_template = open (self.template_vm.standalonevms_conf_file, "r")
|
||||
else:
|
||||
conf_template = open (self.template_vm.appvms_conf_file, "r")
|
||||
if os.path.isfile(self.conf_file):
|
||||
shutil.copy(self.conf_file, self.conf_file + ".backup")
|
||||
conf_appvm = open(self.conf_file, "w")
|
||||
rx_vmname = re.compile (r"%VMNAME%")
|
||||
rx_vmdir = re.compile (r"%VMDIR%")
|
||||
rx_template = re.compile (r"%TEMPLATEDIR%")
|
||||
rx_pcidevs = re.compile (r"%PCIDEVS%")
|
||||
rx_mem = re.compile (r"%MEM%")
|
||||
rx_vcpus = re.compile (r"%VCPUS%")
|
||||
|
||||
for line in conf_template:
|
||||
line = rx_vmname.sub (self.name, line)
|
||||
line = rx_vmdir.sub (self.dir_path, line)
|
||||
line = rx_template.sub (self.template_vm.dir_path, line)
|
||||
line = rx_pcidevs.sub (self.pcidevs, line)
|
||||
line = rx_mem.sub (str(self.memory), line)
|
||||
line = rx_vcpus.sub (str(self.vcpus), line)
|
||||
conf_appvm.write(line)
|
||||
|
||||
conf_template.close()
|
||||
conf_appvm.close()
|
||||
|
||||
def create_on_disk(self, verbose):
|
||||
assert self.template_vm is not None
|
||||
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
if verbose:
|
||||
print "--> Creating directory: {0}".format(self.dir_path)
|
||||
os.mkdir (self.dir_path)
|
||||
|
||||
if verbose:
|
||||
print "--> Creating the VM config file: {0}".format(self.conf_file)
|
||||
|
||||
self.create_config_file()
|
||||
|
||||
template_priv = self.template_vm.private_img
|
||||
if verbose:
|
||||
print "--> Copying the template's private image: {0}".\
|
||||
format(template_priv)
|
||||
|
||||
# We prefer to use Linux's cp, because it nicely handles sparse files
|
||||
retcode = subprocess.call (["cp", template_priv, self.private_img])
|
||||
if retcode != 0:
|
||||
raise IOError ("Error while copying {0} to {1}".\
|
||||
format(template_priv, self.private_img))
|
||||
|
||||
if self.is_updateable():
|
||||
template_root = self.template_vm.root_img
|
||||
if verbose:
|
||||
print "--> Copying the template's root image: {0}".\
|
||||
format(template_root)
|
||||
|
||||
# We prefer to use Linux's cp, because it nicely handles sparse files
|
||||
retcode = subprocess.call (["cp", template_root, self.root_img])
|
||||
if retcode != 0:
|
||||
raise IOError ("Error while copying {0} to {1}".\
|
||||
format(template_root, self.root_img))
|
||||
|
||||
kernels_dir = self.dir_path + '/' + default_kernels_subdir
|
||||
if verbose:
|
||||
print "--> Copying the template's kernel dir: {0}".\
|
||||
format(self.template_vm.kernels_dir)
|
||||
shutil.copytree (self.template_vm.kernels_dir, kernels_dir)
|
||||
|
||||
|
||||
# Create volatile.img
|
||||
self.reset_volatile_storage()
|
||||
|
||||
def verify_files(self):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
if not os.path.exists (self.dir_path):
|
||||
raise QubesException (
|
||||
"VM directory doesn't exist: {0}".\
|
||||
format(self.dir_path))
|
||||
|
||||
if not os.path.exists (self.conf_file):
|
||||
raise QubesException (
|
||||
"VM config file doesn't exist: {0}".\
|
||||
format(self.conf_file))
|
||||
|
||||
if self.is_updateable() and not os.path.exists (self.root_img):
|
||||
raise QubesException (
|
||||
"VM root image file doesn't exist: {0}".\
|
||||
format(self.root_img))
|
||||
|
||||
if not os.path.exists (self.private_img):
|
||||
raise QubesException (
|
||||
"VM private image file doesn't exist: {0}".\
|
||||
format(self.private_img))
|
||||
return True
|
||||
|
||||
def reset_volatile_storage(self):
|
||||
assert not self.is_running(), "Attempt to clean volatile image of running VM!"
|
||||
|
||||
# Only makes sense on template based VM
|
||||
if self.template_vm is None:
|
||||
return
|
||||
|
||||
print "--> Cleaning volatile image: {0}...".format (self.volatile_img)
|
||||
if dry_run:
|
||||
return
|
||||
if os.path.exists (self.volatile_img):
|
||||
os.remove (self.volatile_img)
|
||||
|
||||
retcode = subprocess.call (["tar", "xf", self.template_vm.clean_volatile_img, "-C", self.dir_path])
|
||||
if retcode != 0:
|
||||
raise IOError ("Error while unpacking {0} to {1}".\
|
||||
format(self.template_vm.clean_volatile_img, self.volatile_img))
|
||||
|
||||
def remove_from_disk(self):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
shutil.rmtree (self.dir_path)
|
||||
|
||||
def write_firewall_conf(self, conf):
|
||||
root = xml.etree.ElementTree.Element(
|
||||
"QubesFirwallRules",
|
||||
policy = "allow" if conf["allow"] else "deny",
|
||||
dns = "allow" if conf["allowDns"] else "deny",
|
||||
icmp = "allow" if conf["allowIcmp"] else "deny"
|
||||
)
|
||||
|
||||
for rule in conf["rules"]:
|
||||
element = xml.etree.ElementTree.Element(
|
||||
"rule",
|
||||
address=rule["address"],
|
||||
port=str(rule["portBegin"]),
|
||||
)
|
||||
if rule["netmask"] is not None and rule["netmask"] != 32:
|
||||
element.set("netmask", str(rule["netmask"]))
|
||||
if rule["portEnd"] is not None:
|
||||
element.set("toport", str(rule["portEnd"]))
|
||||
root.append(element)
|
||||
|
||||
tree = xml.etree.ElementTree.ElementTree(root)
|
||||
|
||||
try:
|
||||
f = open(self.firewall_conf, 'a') # create the file if not exist
|
||||
f.close()
|
||||
|
||||
with open(self.firewall_conf, 'w') as f:
|
||||
fcntl.lockf(f, fcntl.LOCK_EX)
|
||||
tree.write(f, "UTF-8")
|
||||
fcntl.lockf(f, fcntl.LOCK_UN)
|
||||
f.close()
|
||||
except EnvironmentError as err:
|
||||
print "{0}: save error: {1}".format(
|
||||
os.path.basename(sys.argv[0]), err)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def has_firewall(self):
|
||||
return os.path.exists (self.firewall_conf)
|
||||
|
||||
def get_firewall_conf(self):
|
||||
conf = { "rules": list(), "allow": True, "allowDns": True, "allowIcmp": True }
|
||||
|
||||
try:
|
||||
tree = xml.etree.ElementTree.parse(self.firewall_conf)
|
||||
root = tree.getroot()
|
||||
|
||||
conf["allow"] = (root.get("policy") == "allow")
|
||||
conf["allowDns"] = (root.get("dns") == "allow")
|
||||
conf["allowIcmp"] = (root.get("icmp") == "allow")
|
||||
|
||||
for element in root:
|
||||
rule = {}
|
||||
attr_list = ("address", "netmask", "port", "toport")
|
||||
|
||||
for attribute in attr_list:
|
||||
rule[attribute] = element.get(attribute)
|
||||
|
||||
if rule["netmask"] is not None:
|
||||
rule["netmask"] = int(rule["netmask"])
|
||||
else:
|
||||
rule["netmask"] = 32
|
||||
|
||||
rule["portBegin"] = int(rule["port"])
|
||||
|
||||
if rule["toport"] is not None:
|
||||
rule["portEnd"] = int(rule["toport"])
|
||||
else:
|
||||
rule["portEnd"] = None
|
||||
|
||||
del(rule["port"])
|
||||
del(rule["toport"])
|
||||
|
||||
conf["rules"].append(rule)
|
||||
|
||||
except EnvironmentError as err:
|
||||
return conf
|
||||
except (xml.parsers.expat.ExpatError,
|
||||
ValueError, LookupError) as err:
|
||||
print("{0}: load error: {1}".format(
|
||||
os.path.basename(sys.argv[0]), err))
|
||||
return None
|
||||
|
||||
return conf
|
||||
|
||||
def get_total_xen_memory(self):
|
||||
hosts = xend_session.session.xenapi.host.get_all()
|
||||
@ -549,6 +783,8 @@ class QubesVm(object):
|
||||
if self.is_running():
|
||||
raise QubesException ("VM is already running!")
|
||||
|
||||
self.reset_volatile_storage()
|
||||
|
||||
if verbose:
|
||||
print "--> Rereading the VM's conf file ({0})...".format(self.conf_file)
|
||||
self.update_xen_storage()
|
||||
@ -656,6 +892,7 @@ class QubesVm(object):
|
||||
attrs["uses_default_netvm"] = str(self.uses_default_netvm)
|
||||
attrs["netvm_qid"] = str(self.netvm_vm.qid) if self.netvm_vm is not None else "none"
|
||||
attrs["installed_by_rpm"] = str(self.installed_by_rpm)
|
||||
attrs["template_qid"] = str(self.template_vm.qid) if self.template_vm and not self.is_updateable() else "none"
|
||||
attrs["updateable"] = str(self.updateable)
|
||||
attrs["label"] = self.label.name
|
||||
attrs["memory"] = str(self.memory)
|
||||
@ -939,196 +1176,7 @@ class QubesTemplateVm(QubesVm):
|
||||
attrs["rootcow_img"] = self.rootcow_img
|
||||
return attrs
|
||||
|
||||
class QubesCowVm(QubesVm):
|
||||
"""
|
||||
A class that represent a VM that may be based on some template, i.e. doesn't have own root.img
|
||||
|
||||
"""
|
||||
def __init__(self, **kwargs):
|
||||
if "dir_path" not in kwargs or kwargs["dir_path"] is None:
|
||||
kwargs["dir_path"] = qubes_appvms_dir + "/" + kwargs["name"]
|
||||
|
||||
if "updateable" not in kwargs or kwargs["updateable"] is None:
|
||||
kwargs["updateable"] = False
|
||||
|
||||
if "template_vm" in kwargs:
|
||||
template_vm = kwargs.pop("template_vm")
|
||||
else:
|
||||
template_vm = None
|
||||
|
||||
super(QubesCowVm, self).__init__(**kwargs)
|
||||
qid = kwargs["qid"]
|
||||
dir_path = kwargs["dir_path"]
|
||||
if not self.is_updateable():
|
||||
# Dirty hack for QubesDom0NetVm...
|
||||
if not isinstance(self, QubesDom0NetVm):
|
||||
assert template_vm is not None, "Missing template_vm for template based VM!"
|
||||
if not template_vm.is_template():
|
||||
print "ERROR: template_qid={0} doesn't point to a valid TemplateVM".\
|
||||
format(template_vm.qid)
|
||||
return False
|
||||
|
||||
template_vm.appvms[qid] = self
|
||||
else:
|
||||
assert self.root_img is not None, "Missing root_img for standalone VM!"
|
||||
|
||||
self.template_vm = template_vm
|
||||
|
||||
def set_updateable(self):
|
||||
if self.is_updateable():
|
||||
return
|
||||
|
||||
raise QubesException ("Change 'updateable' flag is not supported. Please use qvm-create.")
|
||||
|
||||
def set_nonupdateable(self):
|
||||
if self.is_updateable():
|
||||
return
|
||||
|
||||
raise QubesException ("Change 'updateable' flag is not supported. Please use qvm-create.")
|
||||
|
||||
def create_config_file(self):
|
||||
conf_template = None
|
||||
if self.type == "NetVM":
|
||||
conf_template = open (self.template_vm.netvms_conf_file, "r")
|
||||
elif self.updateable:
|
||||
conf_template = open (self.template_vm.standalonevms_conf_file, "r")
|
||||
else:
|
||||
conf_template = open (self.template_vm.appvms_conf_file, "r")
|
||||
if os.path.isfile(self.conf_file):
|
||||
shutil.copy(self.conf_file, self.conf_file + ".backup")
|
||||
conf_appvm = open(self.conf_file, "w")
|
||||
rx_vmname = re.compile (r"%VMNAME%")
|
||||
rx_vmdir = re.compile (r"%VMDIR%")
|
||||
rx_template = re.compile (r"%TEMPLATEDIR%")
|
||||
rx_pcidevs = re.compile (r"%PCIDEVS%")
|
||||
rx_mem = re.compile (r"%MEM%")
|
||||
rx_vcpus = re.compile (r"%VCPUS%")
|
||||
|
||||
for line in conf_template:
|
||||
line = rx_vmname.sub (self.name, line)
|
||||
line = rx_vmdir.sub (self.dir_path, line)
|
||||
line = rx_template.sub (self.template_vm.dir_path, line)
|
||||
line = rx_pcidevs.sub (self.pcidevs, line)
|
||||
line = rx_mem.sub (str(self.memory), line)
|
||||
line = rx_vcpus.sub (str(self.vcpus), line)
|
||||
conf_appvm.write(line)
|
||||
|
||||
conf_template.close()
|
||||
conf_appvm.close()
|
||||
|
||||
def create_on_disk(self, verbose):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
if verbose:
|
||||
print "--> Creating directory: {0}".format(self.dir_path)
|
||||
os.mkdir (self.dir_path)
|
||||
|
||||
if verbose:
|
||||
print "--> Creating the VM config file: {0}".format(self.conf_file)
|
||||
|
||||
self.create_config_file()
|
||||
|
||||
template_priv = self.template_vm.private_img
|
||||
if verbose:
|
||||
print "--> Copying the template's private image: {0}".\
|
||||
format(template_priv)
|
||||
|
||||
# We prefer to use Linux's cp, because it nicely handles sparse files
|
||||
retcode = subprocess.call (["cp", template_priv, self.private_img])
|
||||
if retcode != 0:
|
||||
raise IOError ("Error while copying {0} to {1}".\
|
||||
format(template_priv, self.private_img))
|
||||
|
||||
if self.is_updateable():
|
||||
template_root = self.template_vm.root_img
|
||||
if verbose:
|
||||
print "--> Copying the template's root image: {0}".\
|
||||
format(template_root)
|
||||
|
||||
# We prefer to use Linux's cp, because it nicely handles sparse files
|
||||
retcode = subprocess.call (["cp", template_root, self.root_img])
|
||||
if retcode != 0:
|
||||
raise IOError ("Error while copying {0} to {1}".\
|
||||
format(template_root, self.root_img))
|
||||
|
||||
kernels_dir = self.dir_path + '/' + default_kernels_subdir
|
||||
if verbose:
|
||||
print "--> Copying the template's kernel dir: {0}".\
|
||||
format(self.template_vm.kernels_dir)
|
||||
shutil.copytree (self.template_vm.kernels_dir, kernels_dir)
|
||||
|
||||
|
||||
# Create volatile.img
|
||||
self.reset_volatile_storage()
|
||||
|
||||
def verify_files(self):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
if not os.path.exists (self.dir_path):
|
||||
raise QubesException (
|
||||
"VM directory doesn't exist: {0}".\
|
||||
format(self.dir_path))
|
||||
|
||||
if not os.path.exists (self.conf_file):
|
||||
raise QubesException (
|
||||
"VM config file doesn't exist: {0}".\
|
||||
format(self.conf_file))
|
||||
|
||||
if self.is_updateable() and not os.path.exists (self.root_img):
|
||||
raise QubesException (
|
||||
"VM root image file doesn't exist: {0}".\
|
||||
format(self.root_img))
|
||||
|
||||
if not os.path.exists (self.private_img):
|
||||
raise QubesException (
|
||||
"VM private image file doesn't exist: {0}".\
|
||||
format(self.private_img))
|
||||
return True
|
||||
|
||||
def start(self, debug_console = False, verbose = False, preparing_dvm = False):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
if self.is_running():
|
||||
raise QubesException("VM is already running!")
|
||||
|
||||
self.reset_volatile_storage()
|
||||
|
||||
return super(QubesCowVm, self).start(debug_console=debug_console, verbose=verbose, preparing_dvm=preparing_dvm)
|
||||
|
||||
def reset_volatile_storage(self):
|
||||
assert not self.is_running(), "Attempt to clean volatile image of running VM!"
|
||||
|
||||
# Only makes sense on template based VM
|
||||
if not self.template_vm:
|
||||
return
|
||||
|
||||
print "--> Cleaning volatile image: {0}...".format (self.volatile_img)
|
||||
if dry_run:
|
||||
return
|
||||
if os.path.exists (self.volatile_img):
|
||||
os.remove (self.volatile_img)
|
||||
|
||||
retcode = subprocess.call (["tar", "xf", self.template_vm.clean_volatile_img, "-C", self.dir_path])
|
||||
if retcode != 0:
|
||||
raise IOError ("Error while unpacking {0} to {1}".\
|
||||
format(self.template_vm.clean_volatile_img, self.volatile_img))
|
||||
|
||||
def remove_from_disk(self):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
subprocess.check_call ([qubes_appmenu_remove_cmd, self.name])
|
||||
shutil.rmtree (self.dir_path)
|
||||
|
||||
def get_xml_attrs(self):
|
||||
attrs = super(QubesCowVm, self).get_xml_attrs()
|
||||
attrs["template_qid"] = str(self.template_vm.qid) if not self.is_updateable() else "none"
|
||||
return attrs
|
||||
|
||||
class QubesNetVm(QubesCowVm):
|
||||
class QubesNetVm(QubesVm):
|
||||
"""
|
||||
A class that represents a NetVM. A child of QubesCowVM.
|
||||
"""
|
||||
@ -1438,19 +1486,10 @@ class QubesDisposableVm(QubesVm):
|
||||
"""
|
||||
def __init__(self, **kwargs):
|
||||
|
||||
template_vm = kwargs.pop("template_vm")
|
||||
template_vm = kwargs["template_vm"]
|
||||
assert template_vm is not None, "Missing template_vm for DisposableVM!"
|
||||
|
||||
super(QubesDisposableVm, self).__init__(dir_path="/nonexistent", **kwargs)
|
||||
qid = kwargs["qid"]
|
||||
|
||||
assert template_vm is not None, "Missing template_vm for DisposableVM!"
|
||||
if not template_vm.is_template():
|
||||
print "ERROR: template_qid={0} doesn't point to a valid TemplateVM".\
|
||||
format(new_vm.template_vm.qid)
|
||||
return False
|
||||
|
||||
self.template_vm = template_vm
|
||||
template_vm.appvms[qid] = self
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
@ -1468,19 +1507,16 @@ class QubesDisposableVm(QubesVm):
|
||||
return True
|
||||
|
||||
|
||||
class QubesAppVm(QubesCowVm):
|
||||
class QubesAppVm(QubesVm):
|
||||
"""
|
||||
A class that represents an AppVM. A child of QubesVm.
|
||||
"""
|
||||
def __init__(self, **kwargs):
|
||||
|
||||
if "dir_path" not in kwargs or kwargs["dir_path"] is None:
|
||||
kwargs["dir_path"] = qubes_appvms_dir + "/" + kwargs["name"]
|
||||
|
||||
super(QubesAppVm, self).__init__(**kwargs)
|
||||
dir_path = self.dir_path
|
||||
|
||||
if "firewall_conf" not in kwargs or kwargs["firewall_conf"] is None:
|
||||
kwargs["firewall_conf"] = dir_path + "/" + default_firewall_conf_file
|
||||
|
||||
self.firewall_conf = kwargs["firewall_conf"]
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
@ -1498,6 +1534,13 @@ class QubesAppVm(QubesCowVm):
|
||||
|
||||
self.create_appmenus (verbose)
|
||||
|
||||
def remove_from_disk(self):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
subprocess.check_call ([qubes_appmenu_remove_cmd, self.name])
|
||||
super(QubesAppVm, self).remove_from_disk()
|
||||
|
||||
def create_appmenus(self, verbose):
|
||||
if self.template_vm is not None:
|
||||
subprocess.check_call ([qubes_appmenu_create_cmd, self.template_vm.appmenus_templates_dir, self.name])
|
||||
@ -1505,88 +1548,6 @@ class QubesAppVm(QubesCowVm):
|
||||
# Only add apps to menu
|
||||
subprocess.check_call ([qubes_appmenu_create_cmd, "none", self.name])
|
||||
|
||||
def write_firewall_conf(self, conf):
|
||||
root = xml.etree.ElementTree.Element(
|
||||
"QubesFirwallRules",
|
||||
policy = "allow" if conf["allow"] else "deny",
|
||||
dns = "allow" if conf["allowDns"] else "deny",
|
||||
icmp = "allow" if conf["allowIcmp"] else "deny"
|
||||
)
|
||||
|
||||
for rule in conf["rules"]:
|
||||
element = xml.etree.ElementTree.Element(
|
||||
"rule",
|
||||
address=rule["address"],
|
||||
port=str(rule["portBegin"]),
|
||||
)
|
||||
if rule["netmask"] is not None and rule["netmask"] != 32:
|
||||
element.set("netmask", str(rule["netmask"]))
|
||||
if rule["portEnd"] is not None:
|
||||
element.set("toport", str(rule["portEnd"]))
|
||||
root.append(element)
|
||||
|
||||
tree = xml.etree.ElementTree.ElementTree(root)
|
||||
|
||||
try:
|
||||
f = open(self.firewall_conf, 'a') # create the file if not exist
|
||||
f.close()
|
||||
|
||||
with open(self.firewall_conf, 'w') as f:
|
||||
fcntl.lockf(f, fcntl.LOCK_EX)
|
||||
tree.write(f, "UTF-8")
|
||||
fcntl.lockf(f, fcntl.LOCK_UN)
|
||||
f.close()
|
||||
except EnvironmentError as err:
|
||||
print "{0}: save error: {1}".format(
|
||||
os.path.basename(sys.argv[0]), err)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_firewall_conf(self):
|
||||
conf = { "rules": list(), "allow": True, "allowDns": True, "allowIcmp": True }
|
||||
|
||||
try:
|
||||
tree = xml.etree.ElementTree.parse(self.firewall_conf)
|
||||
root = tree.getroot()
|
||||
|
||||
conf["allow"] = (root.get("policy") == "allow")
|
||||
conf["allowDns"] = (root.get("dns") == "allow")
|
||||
conf["allowIcmp"] = (root.get("icmp") == "allow")
|
||||
|
||||
for element in root:
|
||||
rule = {}
|
||||
attr_list = ("address", "netmask", "port", "toport")
|
||||
|
||||
for attribute in attr_list:
|
||||
rule[attribute] = element.get(attribute)
|
||||
|
||||
if rule["netmask"] is not None:
|
||||
rule["netmask"] = int(rule["netmask"])
|
||||
else:
|
||||
rule["netmask"] = 32
|
||||
|
||||
rule["portBegin"] = int(rule["port"])
|
||||
|
||||
if rule["toport"] is not None:
|
||||
rule["portEnd"] = int(rule["toport"])
|
||||
else:
|
||||
rule["portEnd"] = None
|
||||
|
||||
del(rule["port"])
|
||||
del(rule["toport"])
|
||||
|
||||
conf["rules"].append(rule)
|
||||
|
||||
except EnvironmentError as err:
|
||||
return conf
|
||||
except (xml.parsers.expat.ExpatError,
|
||||
ValueError, LookupError) as err:
|
||||
print("{0}: load error: {1}".format(
|
||||
os.path.basename(sys.argv[0]), err))
|
||||
return None
|
||||
|
||||
return conf
|
||||
|
||||
class QubesVmCollection(dict):
|
||||
"""
|
||||
|
Loading…
Reference in New Issue
Block a user