Browse Source

Cleanup Admin API denial reporting

Rename QubesDaemonNoResponseError to more intuitive
QubesDaemonAccessError (keep legacy name still working).
Use QubesPropertyAccessError whenever the access is about @property -
this makes it easy to use `getattr` to use default value instead.

QubesOS/qubes-issues#5811
Marek Marczykowski-Górecki 3 years ago
parent
commit
7425a5359b
4 changed files with 81 additions and 31 deletions
  1. 1 1
      qubesadmin/app.py
  2. 17 11
      qubesadmin/base.py
  3. 7 3
      qubesadmin/exc.py
  4. 56 16
      qubesadmin/storage.py

+ 1 - 1
qubesadmin/app.py

@@ -838,7 +838,7 @@ class QubesRemote(QubesBase):
                                  stderr=subprocess.PIPE)
             (stdout, stderr) = p.communicate(payload)
         if p.returncode != 0:
-            raise qubesadmin.exc.QubesDaemonNoResponseError(
+            raise qubesadmin.exc.QubesDaemonAccessError(
                 'Service call error: %s', stderr.decode())
 
         return self._parse_qubesd_response(stdout)

+ 17 - 11
qubesadmin/base.py

@@ -78,7 +78,7 @@ class PropertyHolder(object):
         '''
 
         if response_data == b'':
-            raise qubesadmin.exc.QubesDaemonNoResponseError(
+            raise qubesadmin.exc.QubesDaemonAccessError(
                 'Got empty response from qubesd. See journalctl in dom0 for '
                 'details.')
 
@@ -151,11 +151,14 @@ class PropertyHolder(object):
         # cached properties list
         if self._properties is not None and item not in self._properties:
             raise AttributeError(item)
-        property_str = self.qubesd_call(
-            self._method_dest,
-            self._method_prefix + 'Get',
-            item,
-            None)
+        try:
+            property_str = self.qubesd_call(
+                self._method_dest,
+                self._method_prefix + 'Get',
+                item,
+                None)
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError(item)
         is_default, value = self._deserialize_property(property_str)
         if self.app.cache_enabled:
             self._properties_cache[item] = (is_default, value)
@@ -170,11 +173,14 @@ class PropertyHolder(object):
         '''
         if item.startswith('_'):
             raise AttributeError(item)
-        property_str = self.qubesd_call(
-            self._method_dest,
-            self._method_prefix + 'GetDefault',
-            item,
-            None)
+        try:
+            property_str = self.qubesd_call(
+                self._method_dest,
+                self._method_prefix + 'GetDefault',
+                item,
+                None)
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError(item)
         if not property_str:
             raise AttributeError(item + ' has no default')
         (prop_type, value) = property_str.split(b' ', 1)

+ 7 - 3
qubesadmin/exc.py

@@ -161,13 +161,17 @@ class BackupRestoreError(QubesException):
         self.backup_log = backup_log
 
 # pylint: disable=too-many-ancestors
-class QubesDaemonNoResponseError(QubesDaemonCommunicationError):
-    '''Got empty response from qubesd'''
+class QubesDaemonAccessError(QubesDaemonCommunicationError):
+    '''Got empty response from qubesd. This can be lack of permission,
+    or some server-side issue.'''
 
 
-class QubesPropertyAccessError(QubesException, AttributeError):
+class QubesPropertyAccessError(QubesDaemonAccessError, AttributeError):
     '''Failed to read/write property value, cause is unknown (insufficient
     permissions, no such property, invalid value, other)'''
     def __init__(self, prop):
         super(QubesPropertyAccessError, self).__init__(
             'Failed to access \'%s\' property' % prop)
+
+# legacy name
+QubesDaemonNoResponseError = QubesDaemonAccessError

+ 56 - 16
qubesadmin/storage.py

@@ -19,6 +19,7 @@
 # with this program; if not, see <http://www.gnu.org/licenses/>.
 
 '''Storage subsystem.'''
+import qubesadmin.exc
 
 class Volume(object):
     '''Storage volume.'''
@@ -112,7 +113,10 @@ class Volume(object):
         '''Storage volume pool name.'''
         if self._pool is not None:
             return self._pool
-        self._fetch_info()
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('pool')
         return str(self._info['pool'])
 
     @property
@@ -120,25 +124,37 @@ class Volume(object):
         '''Storage volume id, unique within given pool.'''
         if self._vid is not None:
             return self._vid
-        self._fetch_info()
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('vid')
         return str(self._info['vid'])
 
     @property
     def size(self):
         '''Size of volume, in bytes.'''
-        self._fetch_info(True)
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('size')
         return int(self._info['size'])
 
     @property
     def usage(self):
         '''Used volume space, in bytes.'''
-        self._fetch_info(True)
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('usage')
         return int(self._info['usage'])
 
     @property
     def rw(self):
         '''True if volume is read-write.'''
-        self._fetch_info()
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('rw')
         return self._info['rw'] == 'True'
 
     @rw.setter
@@ -150,13 +166,19 @@ class Volume(object):
     @property
     def snap_on_start(self):
         '''Create a snapshot from source on VM start.'''
-        self._fetch_info()
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('snap_on_start')
         return self._info['snap_on_start'] == 'True'
 
     @property
     def save_on_stop(self):
         '''Commit changes to original volume on VM stop.'''
-        self._fetch_info()
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('save_on_stop')
         return self._info['save_on_stop'] == 'True'
 
     @property
@@ -165,7 +187,10 @@ class Volume(object):
 
         If None, this volume itself will be used.
         '''
-        self._fetch_info()
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('source')
         if self._info['source']:
             return self._info['source']
         return None
@@ -173,7 +198,10 @@ class Volume(object):
     @property
     def revisions_to_keep(self):
         '''Number of revisions to keep around'''
-        self._fetch_info()
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('revisions_to_keep')
         return int(self._info['revisions_to_keep'])
 
     @revisions_to_keep.setter
@@ -186,7 +214,10 @@ class Volume(object):
         '''Returns `True` if this snapshot of a source volume (for
         `snap_on_start`=True) is outdated.
         '''
-        self._fetch_info(True)
+        try:
+            self._fetch_info()
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('is_outdated')
         return self._info.get('is_outdated', False) == 'True'
 
     def resize(self, size):
@@ -290,8 +321,11 @@ class Pool(object):
     @property
     def usage_details(self):
         ''' Storage pool usage details (current - not cached) '''
-        pool_usage_data = self.app.qubesd_call(
-            'dom0', 'admin.pool.UsageDetails', self.name, None)
+        try:
+            pool_usage_data = self.app.qubesd_call(
+                'dom0', 'admin.pool.UsageDetails', self.name, None)
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('usage_details')
         pool_usage_data = pool_usage_data.decode('utf-8')
         assert pool_usage_data.endswith('\n') or pool_usage_data == ''
         pool_usage_data = pool_usage_data[:-1]
@@ -306,8 +340,11 @@ class Pool(object):
     def config(self):
         ''' Storage pool config '''
         if self._config is None:
-            pool_info_data = self.app.qubesd_call(
-                'dom0', 'admin.pool.Info', self.name, None)
+            try:
+                pool_info_data = self.app.qubesd_call(
+                    'dom0', 'admin.pool.Info', self.name, None)
+            except qubesadmin.exc.QubesDaemonAccessError:
+                raise qubesadmin.exc.QubesPropertyAccessError('config')
             pool_info_data = pool_info_data.decode('utf-8')
             assert pool_info_data.endswith('\n')
             pool_info_data = pool_info_data[:-1]
@@ -355,8 +392,11 @@ class Pool(object):
     @property
     def volumes(self):
         ''' Volumes managed by this pool '''
-        volumes_data = self.app.qubesd_call(
-            'dom0', 'admin.pool.volume.List', self.name, None)
+        try:
+            volumes_data = self.app.qubesd_call(
+                'dom0', 'admin.pool.volume.List', self.name, None)
+        except qubesadmin.exc.QubesDaemonAccessError:
+            raise qubesadmin.exc.QubesPropertyAccessError('volumes')
         assert volumes_data.endswith(b'\n')
         volumes_data = volumes_data[:-1].decode('ascii')
         for vid in volumes_data.splitlines():