Merge remote-tracking branch 'woju/bug-822'
This commit is contained in:
commit
8ac5981fdd
@ -43,6 +43,8 @@ from qubes.qubes import QubesVmCollection,QubesException,QubesHost,QubesVmLabels
|
||||
from qubes.qubes import defaults,system_path,vm_files,qubes_max_qid
|
||||
from qubes.qmemman_client import QMemmanClient
|
||||
|
||||
import qubes.qubesutils
|
||||
|
||||
xid_to_name_cache = {}
|
||||
|
||||
class QubesVm(object):
|
||||
@ -675,10 +677,7 @@ class QubesVm(object):
|
||||
return 0
|
||||
|
||||
def get_disk_utilization_root_img(self):
|
||||
if not os.path.exists(self.root_img):
|
||||
return 0
|
||||
|
||||
return self.get_disk_usage(self.root_img)
|
||||
return qubes.qubesutils.get_disk_usage(self.root_img)
|
||||
|
||||
def get_root_img_sz(self):
|
||||
if not os.path.exists(self.root_img):
|
||||
@ -790,21 +789,11 @@ class QubesVm(object):
|
||||
|
||||
return used_dmdev != current_dmdev
|
||||
|
||||
def get_disk_usage(self, file_or_dir):
|
||||
if not os.path.exists(file_or_dir):
|
||||
return 0
|
||||
p = subprocess.Popen (["du", "-s", "--block-size=1", file_or_dir],
|
||||
stdout=subprocess.PIPE)
|
||||
result = p.communicate()
|
||||
m = re.match(r"^(\d+)\s.*", result[0])
|
||||
sz = int(m.group(1)) if m is not None else 0
|
||||
return sz
|
||||
|
||||
def get_disk_utilization(self):
|
||||
return self.get_disk_usage(self.dir_path)
|
||||
return qubes.qubesutils.get_disk_usage(self.dir_path)
|
||||
|
||||
def get_disk_utilization_private_img(self):
|
||||
return self.get_disk_usage(self.private_img)
|
||||
return qubes.qubesutils.get_disk_usage(self.private_img)
|
||||
|
||||
def get_private_img_sz(self):
|
||||
if not os.path.exists(self.private_img):
|
||||
|
@ -52,9 +52,6 @@ class QubesAdminVm(QubesNetVm):
|
||||
def get_power_state(self):
|
||||
return "Running"
|
||||
|
||||
def get_disk_usage(self, file_or_dir):
|
||||
return 0
|
||||
|
||||
def get_disk_utilization(self):
|
||||
return 0
|
||||
|
||||
|
@ -236,9 +236,6 @@ class QubesHVm(QubesVm):
|
||||
for hook in self.hooks_create_on_disk:
|
||||
hook(self, verbose, source_template=source_template)
|
||||
|
||||
def get_disk_utilization_private_img(self):
|
||||
return self.get_disk_usage(self.private_img)
|
||||
|
||||
def get_private_img_sz(self):
|
||||
if not os.path.exists(self.private_img):
|
||||
return 0
|
||||
|
@ -25,7 +25,7 @@
|
||||
from qubes import QubesException,QubesVmCollection
|
||||
from qubes import QubesVmClasses
|
||||
from qubes import system_path,vm_files
|
||||
from qubesutils import size_to_human, print_stdout, print_stderr
|
||||
from qubesutils import size_to_human, print_stdout, print_stderr, get_disk_usage
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
@ -69,17 +69,6 @@ class BackupHeader:
|
||||
hmac_algorithm = 'hmac-algorithm'
|
||||
bool_options = ['encrypted', 'compressed']
|
||||
|
||||
def get_disk_usage(file_or_dir):
|
||||
if not os.path.exists(file_or_dir):
|
||||
return 0
|
||||
|
||||
p = subprocess.Popen (["du", "-s", "--block-size=1", file_or_dir],
|
||||
stdout=subprocess.PIPE)
|
||||
result = p.communicate()
|
||||
m = re.match(r"^(\d+)\s.*", result[0])
|
||||
sz = int(m.group(1)) if m is not None else 0
|
||||
return sz
|
||||
|
||||
|
||||
def file_to_backup (file_path, subdir = None):
|
||||
sz = get_disk_usage (file_path)
|
||||
|
@ -3,6 +3,7 @@
|
||||
# The Qubes OS Project, http://www.qubes-os.org
|
||||
#
|
||||
# Copyright (C) 2011 Marek Marczykowski <marmarek@invisiblethingslab.com>
|
||||
# Copyright (C) 2014 Wojciech Porczyk <wojciech@porczyk.eu>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
@ -20,8 +21,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
from qubes import QubesVm,QubesException,QubesVmCollection
|
||||
from qubes import QubesVmClasses
|
||||
from qubes import QubesException
|
||||
from qubes import xs, xl_ctx
|
||||
from qubes import system_path,vm_files
|
||||
import sys
|
||||
@ -85,6 +85,27 @@ def parse_size(size):
|
||||
|
||||
raise QubesException("Invalid size: {0}.".format(size))
|
||||
|
||||
def get_disk_usage_one(st):
|
||||
try:
|
||||
return st.st_blocks * BLKSIZE
|
||||
except:
|
||||
return st.st_size
|
||||
|
||||
def get_disk_usage(path):
|
||||
try:
|
||||
st = os.stat(path)
|
||||
except os.error:
|
||||
return 0
|
||||
|
||||
ret = get_disk_usage_one(st)
|
||||
|
||||
# if path is not a directory, this is skipped
|
||||
for dirpath, dirnames, filenames in os.walk(path):
|
||||
for name in dirnames + filenames:
|
||||
ret += get_disk_usage_one(os.stat(os.path.join(dirpath, name)))
|
||||
|
||||
return ret
|
||||
|
||||
def print_stdout(text):
|
||||
print (text)
|
||||
|
||||
|
48
core/tests/test_qubesutils.py
Normal file
48
core/tests/test_qubesutils.py
Normal file
@ -0,0 +1,48 @@
|
||||
#!/usr/bin/python -O
|
||||
|
||||
#
|
||||
# The Qubes OS Project, http://www.qubes-os.org
|
||||
#
|
||||
# Copyright (C) 2014 Wojciech Porczyk <wojciech@porczyk.eu>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
#
|
||||
|
||||
import subprocess
|
||||
import unittest
|
||||
|
||||
import qubes.qubesutils
|
||||
|
||||
|
||||
class TestCaseFunctionsAndConstants(unittest.TestCase):
|
||||
def check_output_int(self, cmd):
|
||||
return int(subprocess.check_output(cmd).strip().split(None, 1)[0])
|
||||
|
||||
def test_00_BLKSIZE(self):
|
||||
# this may fail on systems without st_blocks
|
||||
self.assertEqual(qubes.qubesutils.BLKSIZE, self.check_output_int(['stat', '-c%B', '.']))
|
||||
|
||||
def test_01_get_size_one(self):
|
||||
# this may fail on systems without st_blocks
|
||||
self.assertEqual(qubes.qubesutils.get_disk_usage_one(os.stat('.')),
|
||||
self.check_output_int(['stat', '-c%b', '.']) * qubes.qubesutils.BLKSIZE)
|
||||
|
||||
def test_02_get_size(self):
|
||||
self.assertEqual(qubes.qubesutils.get_disk_usage('.'),
|
||||
self.check_output_int(['du', '-s', '--block-size=1', '.']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user