Fix pylint warnings.
This commit is contained in:
parent
8a4b5e683a
commit
faef52e61a
Binary file not shown.
@ -3,7 +3,6 @@
|
||||
import argparse
|
||||
import datetime
|
||||
import enum
|
||||
import math
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
@ -11,7 +10,6 @@ import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import dnf
|
||||
import qubesadmin
|
||||
import qubesadmin.tools
|
||||
import rpm
|
||||
@ -26,14 +24,14 @@ UNVERIFIED_SUFFIX = '.unverified'
|
||||
|
||||
def qubes_release():
|
||||
if os.path.exists('/usr/share/qubes/marker-vm'):
|
||||
with open('/usr/share/qubes/marker-vm', 'r') as f:
|
||||
with open('/usr/share/qubes/marker-vm', 'r') as fd:
|
||||
# Get last line (in the format `x.x`)
|
||||
return f.readlines()[-1].strip()
|
||||
return fd.readlines()[-1].strip()
|
||||
return subprocess.check_output(['lsb_release', '-sr'],
|
||||
encoding='UTF-8').strip()
|
||||
|
||||
parser = argparse.ArgumentParser(description='Qubes Template Manager')
|
||||
parser.add_argument('operation', type=str)
|
||||
parser.add_argument('operation', type=str)
|
||||
parser.add_argument('templates', nargs='*')
|
||||
|
||||
# qrexec related
|
||||
@ -82,9 +80,9 @@ class TemplateState(enum.Enum):
|
||||
def verify_rpm(path, nogpgcheck=False, transaction_set=None):
|
||||
if transaction_set is None:
|
||||
transaction_set = rpm.TransactionSet()
|
||||
with open(path, 'rb') as f:
|
||||
with open(path, 'rb') as fd:
|
||||
try:
|
||||
hdr = transaction_set.hdrFromFdno(f)
|
||||
hdr = transaction_set.hdrFromFdno(fd)
|
||||
if hdr[rpm.RPMTAG_SIGSIZE] is None \
|
||||
and hdr[rpm.RPMTAG_SIGPGP] is None \
|
||||
and hdr[rpm.RPMTAG_SIGGPG] is None:
|
||||
@ -99,8 +97,8 @@ def verify_rpm(path, nogpgcheck=False, transaction_set=None):
|
||||
def get_package_hdr(path, transaction_set=None):
|
||||
if transaction_set is None:
|
||||
transaction_set = rpm.TransactionSet()
|
||||
with open(path, 'rb') as f:
|
||||
hdr = transaction_set.hdrFromFdno(f)
|
||||
with open(path, 'rb') as fd:
|
||||
hdr = transaction_set.hdrFromFdno(fd)
|
||||
return hdr
|
||||
|
||||
def extract_rpm(name, path, target):
|
||||
@ -116,8 +114,8 @@ def extract_rpm(name, path, target):
|
||||
return rpm2cpio.wait() == 0 and cpio.wait() == 0
|
||||
|
||||
def parse_config(path):
|
||||
with open(path, 'r') as f:
|
||||
return dict(line.rstrip('\n').split('=', 1) for line in f)
|
||||
with open(path, 'r') as fd:
|
||||
return dict(line.rstrip('\n').split('=', 1) for line in fd)
|
||||
|
||||
def install(args, app):
|
||||
# TODO: Lock, mentioned in the note above
|
||||
@ -151,10 +149,10 @@ def install(args, app):
|
||||
download(args, app, path_override=args.cachedir,
|
||||
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX)
|
||||
|
||||
for idx, rpmfile in enumerate(rpm_list):
|
||||
for rpmfile in rpm_list:
|
||||
path = rpmfile + UNVERIFIED_SUFFIX
|
||||
if not verify_rpm(path, args.nogpgcheck, transaction_set):
|
||||
parser.error('Package \'%s\' verification failed.' % template)
|
||||
parser.error('Package \'%s\' verification failed.' % rpmfile)
|
||||
os.rename(path, rpmfile)
|
||||
|
||||
for rpmfile in rpm_list:
|
||||
@ -208,44 +206,45 @@ def qrexec_popen(args, app, service, stdout=subprocess.PIPE, filter_esc=True):
|
||||
service,
|
||||
filter_esc=filter_esc,
|
||||
stdout=stdout)
|
||||
else:
|
||||
return subprocess.Popen([
|
||||
'/etc/qubes-rpc/%s' % service,
|
||||
],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=stdout,
|
||||
stderr=subprocess.PIPE)
|
||||
return subprocess.Popen([
|
||||
'/etc/qubes-rpc/%s' % service,
|
||||
],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=stdout,
|
||||
stderr=subprocess.PIPE)
|
||||
|
||||
def qrexec_payload(args, app, spec):
|
||||
_ = app # unused
|
||||
|
||||
def check_newline(string, name):
|
||||
if '\n' in string:
|
||||
parser.error(f"Malformed {name}:" +
|
||||
" argument should not contain '\\n'.")
|
||||
|
||||
payload = ''
|
||||
for r in args.enablerepo if args.enablerepo else []:
|
||||
check_newline(r, '--enablerepo')
|
||||
payload += '--enablerepo=%s\n' % r
|
||||
for r in args.disablerepo if args.disablerepo else []:
|
||||
check_newline(r, '--disablerepo')
|
||||
payload += '--disablerepo=%s\n' % r
|
||||
for r in args.repoid if args.repoid else []:
|
||||
check_newline(r, '--repoid')
|
||||
payload += '--repoid=%s\n' % r
|
||||
for repo in args.enablerepo if args.enablerepo else []:
|
||||
check_newline(repo, '--enablerepo')
|
||||
payload += '--enablerepo=%s\n' % repo
|
||||
for repo in args.disablerepo if args.disablerepo else []:
|
||||
check_newline(repo, '--disablerepo')
|
||||
payload += '--disablerepo=%s\n' % repo
|
||||
for repo in args.repoid if args.repoid else []:
|
||||
check_newline(repo, '--repoid')
|
||||
payload += '--repoid=%s\n' % repo
|
||||
check_newline(args.releasever, '--releasever')
|
||||
payload += '--releasever=%s\n' % args.releasever
|
||||
check_newline(spec, 'template name')
|
||||
payload += spec + '\n'
|
||||
payload += '---\n'
|
||||
for fn in args.repo_files:
|
||||
with open(fn, 'r') as f:
|
||||
payload += f.read() + '\n'
|
||||
for path in args.repo_files:
|
||||
with open(path, 'r') as fd:
|
||||
payload += fd.read() + '\n'
|
||||
return payload
|
||||
|
||||
def qrexec_repoquery(args, app, spec='*'):
|
||||
proc = qrexec_popen(args, app, 'qubes.TemplateSearch')
|
||||
payload = qrexec_payload(args, app, spec)
|
||||
stdout, stderr = proc.communicate(payload.encode('UTF-8'))
|
||||
stdout, _ = proc.communicate(payload.encode('UTF-8'))
|
||||
stdout = stdout.decode('ASCII')
|
||||
if proc.wait() != 0:
|
||||
raise ConnectionError("qrexec call 'qubes.TemplateSearch' failed.")
|
||||
@ -259,18 +258,18 @@ def qrexec_repoquery(args, app, spec='*'):
|
||||
return result
|
||||
|
||||
def qrexec_download(args, app, spec, path, dlsize=None):
|
||||
with open(path, 'wb') as f:
|
||||
with open(path, 'wb') as fd:
|
||||
# Don't filter ESCs for binary files
|
||||
proc = qrexec_popen(args, app, 'qubes.TemplateDownload',
|
||||
stdout=f, filter_esc=False)
|
||||
stdout=fd, filter_esc=False)
|
||||
payload = qrexec_payload(args, app, spec)
|
||||
proc.stdin.write(payload.encode('UTF-8'))
|
||||
proc.stdin.close()
|
||||
with tqdm.tqdm(desc=spec, total=dlsize, unit_scale=True,
|
||||
unit_divisor=1000, unit='B') as pbar:
|
||||
last = 0
|
||||
while proc.poll() == None:
|
||||
cur = f.tell()
|
||||
while proc.poll() is None:
|
||||
cur = fd.tell()
|
||||
pbar.update(cur - last)
|
||||
last = cur
|
||||
time.sleep(0.1)
|
||||
@ -304,6 +303,7 @@ def do_list(args, app):
|
||||
(vm.name, version_str, TemplateState.INSTALLED.value))
|
||||
|
||||
if args.available or args.all:
|
||||
#pylint: disable=unused-variable
|
||||
for name, epoch, version, release, reponame, dlsize, summary \
|
||||
in query_res:
|
||||
version_str = build_version_str((epoch, version, release))
|
||||
@ -353,6 +353,7 @@ def get_dl_list(args, app):
|
||||
parser.error('Package \'%s\' not found.' % template)
|
||||
sys.exit(1)
|
||||
# We only select one (latest) package for each distinct package name
|
||||
#pylint: disable=unused-variable
|
||||
for name, epoch, version, release, reponame, dlsize, summary \
|
||||
in query_res:
|
||||
ver = (epoch, version, release)
|
||||
@ -364,7 +365,7 @@ def download(args, app, path_override=None, dl_list=None, suffix=''):
|
||||
if dl_list is None:
|
||||
dl_list = get_dl_list(args, app)
|
||||
|
||||
path = path_override if path_override != None else args.downloaddir
|
||||
path = path_override if path_override is not None else args.downloaddir
|
||||
for name, (ver, dlsize) in dl_list.items():
|
||||
version_str = build_version_str(ver)
|
||||
spec = PACKAGE_NAME_PREFIX + name + '-' + version_str
|
||||
@ -380,27 +381,31 @@ def download(args, app, path_override=None, dl_list=None, suffix=''):
|
||||
os.rename(target, target_suffix)
|
||||
else:
|
||||
print('Downloading \'%s\'...' % spec, file=sys.stderr)
|
||||
ok = False
|
||||
done = False
|
||||
for attempt in range(args.retries):
|
||||
try:
|
||||
qrexec_download(args, app, spec, target_suffix, dlsize)
|
||||
ok = True
|
||||
done = True
|
||||
break
|
||||
except ConnectionError:
|
||||
if attempt + 1 < args.retries:
|
||||
print('\'%s\' download failed, retrying...' % spec,
|
||||
file=sys.stderr)
|
||||
if not ok:
|
||||
if not done:
|
||||
print('\'%s\' download failed.' % spec, file=sys.stderr)
|
||||
os.remove(target_suffix)
|
||||
sys.exit(1)
|
||||
|
||||
def remove(args, app):
|
||||
_ = app # unused
|
||||
|
||||
# Use exec so stdio can be shared easily
|
||||
os.execvp('qvm-remove', ['qvm-remove'] + args.templates)
|
||||
|
||||
def clean(args, app):
|
||||
# TODO: More fine-grained options
|
||||
_ = app # unused
|
||||
|
||||
shutil.rmtree(args.cachedir)
|
||||
|
||||
def main(args=None, app=None):
|
||||
@ -410,17 +415,17 @@ def main(args=None, app=None):
|
||||
app = qubesadmin.Qubes()
|
||||
|
||||
if args.operation == 'install':
|
||||
install(args, app)
|
||||
install(args, app)
|
||||
elif args.operation == 'list':
|
||||
do_list(args, app)
|
||||
do_list(args, app)
|
||||
elif args.operation == 'download':
|
||||
download(args, app)
|
||||
download(args, app)
|
||||
elif args.operation == 'remove':
|
||||
remove(args, app)
|
||||
elif args.operation == 'clean':
|
||||
clean(args, app)
|
||||
else:
|
||||
parser.error('Operation \'%s\' not supported.' % args.operation)
|
||||
parser.error('Operation \'%s\' not supported.' % args.operation)
|
||||
|
||||
return 0
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user