#!/usr/bin/env python3 import subprocess import argparse import time import sys import os from os.path import expanduser #the term qube refers to a qubes vm def is_program_installed_in_qube( program, qube_name ): is_installed = True try: command = 'command -v ' + program subprocess.check_call([ 'qvm-run', '--no-color-output', qube_name, command ], stdout = open( os.devnull, 'w' ) ) except subprocess.CalledProcessError: is_installed = False return is_installed #this function requires virsh #domstate only works for Xen domU (guests) def is_qube_running( qube_name ): runs = False out = subprocess.check_output([ "virsh", "-c", "xen:///", "domstate", qube_name ]) out = out.decode('utf-8').replace('\n', '') if 'running' == out: runs = True return runs def get_qube_packages( qube_name ): content = "## Qubes Packages\n\n" #a qube can have more than one package manager installed (only one is functional) pkg_cmd = { 'dpkg' : 'dpkg -l qubes-*', 'pacman' : 'pacman -Qs qubes', 'rpm' : 'rpm -qa qubes-*' } if is_qube_running( qube_name ): for package_manager in pkg_cmd.keys(): if is_program_installed_in_qube( package_manager, qube_name ): pkg_list_cmd = pkg_cmd[package_manager] try: out = subprocess.check_output([ 'qvm-run', qube_name, '--pass-io', '--no-color-output', pkg_list_cmd ], stderr = open( os.devnull, 'w' ) ) out = out.decode('utf-8') content += create_heading( ( "Package Manager: " + package_manager ), 3 ) content += wrap_code( out ) except subprocess.CalledProcessError: pass #do nothing else: content += "**No packages listed, because Qube " + qube_name + " was not running**\n\n" return content def get_dom0_packages(): content = create_heading( "Dom0 Packages", 2 ) out = subprocess.check_output([ "rpm", "-qa", "qubes-*" ]) out = out.decode('utf-8') content += wrap_code( out ) return content def wrap_code( text ): code = "~~~\n" + text + "~~~\n\n" return code def create_heading( heading, level ): heading = heading + "\n\n" if 1 == level: heading = "# " + heading elif 2 == level: heading = "## " + heading else: heading = "### " + heading return heading def get_log_file_content( qube_name ): content = "## Log Files\n\n" qubes_os_log = "/var/log/qubes/" ext = ".log" log_prefix = [ "guid", "pacat", "qubesdb", "qrexec" ] #constructs for each log file prefix the full path and reads the log file for prefix in log_prefix: log_file = prefix + "." + qube_name + ext content += create_heading( ( "Log File: " + log_file ), 3 ) content += wrap_code( get_log_file( qubes_os_log + log_file ) ) return content def get_qube_prefs( qube_name ): qube_prefs = subprocess.check_output([ "qvm-prefs", qube_name ]) qube_prefs = qube_prefs.decode('utf-8') content = create_heading( "Qube Prefs", 2 ) content += wrap_code( qube_prefs ) return content def report( qube_name ): template = '''{title} {content} ''' title_text = create_heading( "Bug report: " + qube_name, 1 ) content_text = get_qube_prefs( qube_name ) content_text += get_dom0_packages() content_text += get_log_file_content( qube_name ) content_text += get_qube_packages( qube_name ) report = template.format( title=title_text, content=content_text ) return report def write_report( report_content, file_path ): with open( file_path, 'w' ) as report_file: report_file.write( report_content ) def send_report( dest_qube, file_path): #if dest_qube is not running -> start dest_qube if not is_qube_running( dest_qube ): try: subprocess.check_call([ "qvm-start", dest_qube ]) except subprocess.CalledProcessError: print( "Error while starting: " + dest_qube, file = sys.stderr ) try: subprocess.check_call([ "qvm-move-to-vm", dest_qube, file_path ]) except subprocess.calledProcessError: print( "Moving file bug-report failed", file = sys.stderr ) def get_log_file( log_file ): data = "" #open and close the file with open( log_file ) as log: data = log.read() return data def qube_exist( qube_name ): exists = True try: #calls: qvm-check --quiet vmanme subprocess.check_call([ "qvm-check", "--quiet", qube_name ]) except subprocess.CalledProcessError: exists = False return exists def get_report_file_path( qube_name ): #exapanduser -> works corss platform home_dir = expanduser("~") date = time.strftime("%H%M%S") file_path = home_dir + "/" + qube_name + "_bug-report_" + date + ".md" return file_path def main(): parser = argparse.ArgumentParser( description = 'Generates a bug report for a specific qube (Qubes VM)' ) parser.add_argument( 'vmname', metavar = '', type = str ) parser.add_argument( '-d', '--dest-vm', metavar = '', dest = "destvm", type = str, default = 'dom0', help = "send the report to the destination VM" ) parser.add_argument( '-p', '--print-report', action = 'store_const', const = "print_report", required = False, help = "prints the report without writing it or sending it to a destination VM" ) args = parser.parse_args() if qube_exist( args.vmname ): if qube_exist( args.destvm ): #get the report report_content = report( args.vmname ) #if -p or --print-report is an argument print the report if args.print_report: print( report_content ) #write and send the report else: file_path = get_report_file_path( args.vmname ) write_report( report_content, file_path ) print( "Report written to: " + file_path ) if 'dom0' != args.destvm: send_report( args.destvm, file_path ) print( "Report send to VM: " + args.destvm ) exit(0) else: print ( "Destination VM does not exist" ) exit(1) else: print( "VM does not exist" ) exit(1) #calls the main function -> program start point main()