diff --git a/core-modules/000QubesVm.py b/core-modules/000QubesVm.py index 4727178c..9d513723 100644 --- a/core-modules/000QubesVm.py +++ b/core-modules/000QubesVm.py @@ -43,6 +43,8 @@ from qubes.qubes import QubesVmCollection,QubesException,QubesHost,QubesVmLabels from qubes.qubes import defaults,system_path,vm_files,qubes_max_qid from qubes.qmemman_client import QMemmanClient +import qubes.qubesutils + xid_to_name_cache = {} class QubesVm(object): @@ -675,10 +677,7 @@ class QubesVm(object): return 0 def get_disk_utilization_root_img(self): - if not os.path.exists(self.root_img): - return 0 - - return self.get_disk_usage(self.root_img) + return qubes.qubesutils.get_disk_usage(self.root_img) def get_root_img_sz(self): if not os.path.exists(self.root_img): @@ -790,21 +789,11 @@ class QubesVm(object): return used_dmdev != current_dmdev - def get_disk_usage(self, file_or_dir): - if not os.path.exists(file_or_dir): - return 0 - p = subprocess.Popen (["du", "-s", "--block-size=1", file_or_dir], - stdout=subprocess.PIPE) - result = p.communicate() - m = re.match(r"^(\d+)\s.*", result[0]) - sz = int(m.group(1)) if m is not None else 0 - return sz - def get_disk_utilization(self): - return self.get_disk_usage(self.dir_path) + return qubes.qubesutils.get_disk_usage(self.dir_path) def get_disk_utilization_private_img(self): - return self.get_disk_usage(self.private_img) + return qubes.qubesutils.get_disk_usage(self.private_img) def get_private_img_sz(self): if not os.path.exists(self.private_img): diff --git a/core/backup.py b/core/backup.py index 52154a2d..623f93e1 100644 --- a/core/backup.py +++ b/core/backup.py @@ -25,7 +25,7 @@ from qubes import QubesException,QubesVmCollection from qubes import QubesVmClasses from qubes import system_path,vm_files -from qubesutils import size_to_human, print_stdout, print_stderr +from qubesutils import size_to_human, print_stdout, print_stderr, get_disk_usage import sys import os import subprocess @@ -69,17 +69,6 @@ class BackupHeader: hmac_algorithm = 'hmac-algorithm' bool_options = ['encrypted', 'compressed'] -def get_disk_usage(file_or_dir): - if not os.path.exists(file_or_dir): - return 0 - - p = subprocess.Popen (["du", "-s", "--block-size=1", file_or_dir], - stdout=subprocess.PIPE) - result = p.communicate() - m = re.match(r"^(\d+)\s.*", result[0]) - sz = int(m.group(1)) if m is not None else 0 - return sz - def file_to_backup (file_path, subdir = None): sz = get_disk_usage (file_path) diff --git a/core/qubesutils.py b/core/qubesutils.py index efc64c4f..deff3c98 100644 --- a/core/qubesutils.py +++ b/core/qubesutils.py @@ -3,6 +3,7 @@ # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2011 Marek Marczykowski +# Copyright (C) 2014 Wojciech Porczyk # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -85,6 +86,27 @@ def parse_size(size): raise QubesException("Invalid size: {0}.".format(size)) +def get_disk_usage_one(st): + try: + return st.st_blocks * BLKSIZE + except: + return st.st_size + +def get_disk_usage(path): + try: + st = os.stat(path) + except os.error: + return 0 + + ret = get_disk_usage_one(st) + + # if path is not a directory, this is skipped + for dirpath, dirnames, filenames in os.walk(path): + for name in dirnames + filenames: + ret += get_disk_usage_one(os.stat(os.path.join(dirpath, name))) + + return ret + def print_stdout(text): print (text) diff --git a/core/tests/test_qubesutils.py b/core/tests/test_qubesutils.py new file mode 100644 index 00000000..caa1593c --- /dev/null +++ b/core/tests/test_qubesutils.py @@ -0,0 +1,48 @@ +#!/usr/bin/python -O + +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2014 Wojciech Porczyk +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# + +import subprocess +import unittest + +import qubes.qubesutils + + +class TestCaseFunctionsAndConstants(unittest.TestCase): + def check_output_int(self, cmd): + return int(subprocess.check_output(cmd).strip().split(None, 1)[0]) + + def test_00_BLKSIZE(self): + # this may fail on systems without st_blocks + self.assertEqual(qubes.qubesutils.BLKSIZE, self.check_output_int(['stat', '-c%B', '.'])) + + def test_01_get_size_one(self): + # this may fail on systems without st_blocks + self.assertEqual(qubes.qubesutils.get_disk_usage_one(os.stat('.')), + self.check_output_int(['stat', '-c%b', '.']) * qubes.qubesutils.BLKSIZE) + + def test_02_get_size(self): + self.assertEqual(qubes.qubesutils.get_disk_usage('.'), + self.check_output_int(['du', '-s', '--block-size=1', '.'])) + + +if __name__ == '__main__': + unittest.main()