VMM/handler: Reworked methods _get_disk_usage, _delete_home and v0.6.x
authorPascal Volk <neverseen@users.sourceforge.net>
Thu, 29 Jul 2010 04:01:43 +0000
branchv0.6.x
changeset 325 06c0457036a0
parent 324 38e344ba3d0f
child 326 8f8d9c4c8332
VMM/handler: Reworked methods _get_disk_usage, _delete_home and _delete_domain_dir. Deleted method _isdir.
VirtualMailManager/handler.py
--- a/VirtualMailManager/handler.py	Thu Jul 29 01:48:25 2010 +0000
+++ b/VirtualMailManager/handler.py	Thu Jul 29 04:01:43 2010 +0000
@@ -14,6 +14,7 @@
 
 import os
 import re
+import stat
 
 from shutil import rmtree
 from subprocess import Popen, PIPE
@@ -25,7 +26,7 @@
 from VirtualMailManager.aliasdomain import AliasDomain
 from VirtualMailManager.common import exec_ok
 from VirtualMailManager.config import Config as Cfg
-from VirtualMailManager.constants import \
+from VirtualMailManager.constants import MIN_GID, MIN_UID, \
      ACCOUNT_EXISTS, ALIAS_EXISTS, CONF_NOFILE, CONF_NOPERM, CONF_WRONGPERM, \
      DATABASE_ERROR, DOMAINDIR_GROUP_MISMATCH, DOMAIN_INVALID, \
      FOUND_DOTS_IN_PATH, INVALID_ARGUMENT, MAILDIR_PERM_MISMATCH, \
@@ -36,7 +37,7 @@
 from VirtualMailManager.errors import \
      DomainError, NotRootError, PermissionError, VMMError
 from VirtualMailManager.mailbox import new as new_mailbox
-from VirtualMailManager.pycompat import any
+from VirtualMailManager.pycompat import all, any
 from VirtualMailManager.relocated import Relocated
 from VirtualMailManager.transport import Transport
 
@@ -228,23 +229,17 @@
     def _get_disk_usage(self, directory):
         """Estimate file space usage for the given directory.
 
-        Keyword arguments:
-        directory -- the directory to summarize recursively disk usage for
-        """
-        if self._isdir(directory):
-            return Popen([self._cfg.dget('bin.du'), "-hs", directory],
-                stdout=PIPE).communicate()[0].split('\t')[0]
-        else:
-            return 0
+        Arguments:
 
-    def _isdir(self, directory):
-        """Check if `directory` is a directory. Returns bool.
-        When `directory` isn't a directory, a warning will be appended to
-        _warnings."""
-        isdir = os.path.isdir(directory)
-        if not isdir:
+        `directory` : basestring
+          The directory to summarize recursively disk usage for
+        """
+        if os.path.isdir(directory):
+            return Popen([self._cfg.dget('bin.du'), "-hs", directory],
+                         stdout=PIPE).communicate()[0].split('\t')[0]
+        else:
             self._warnings.append(_('No such directory: %s') % directory)
-        return isdir
+            return 0
 
     def _make_domain_dir(self, domain):
         """Create a directory for the `domain` and its accounts."""
@@ -267,43 +262,72 @@
         os.chown('%s' % account.uid, account.uid, account.gid)
 
     def _delete_home(self, domdir, uid, gid):
-        """Delete a user's home directory."""
-        if uid > 0 and gid > 0:
-            userdir = '%s' % uid
-            if userdir.count('..') or domdir.count('..'):
-                raise VMMError(_(u'Found ".." in home directory path.'),
-                               FOUND_DOTS_IN_PATH)
-            if os.path.isdir(domdir):
-                os.chdir(domdir)
-                if os.path.isdir(userdir):
-                    mdstat = os.stat(userdir)
-                    if (mdstat.st_uid, mdstat.st_gid) != (uid, gid):
-                        raise VMMError(_(u'Detected owner/group mismatch in '
-                                         u'home directory.'),
-                                       MAILDIR_PERM_MISMATCH)
-                    rmtree(userdir, ignore_errors=True)
-                else:
-                    raise VMMError(_(u"No such directory: %s") %
-                                   os.path.join(domdir, userdir),
-                                   NO_SUCH_DIRECTORY)
+        """Delete a user's home directory.
+
+        Arguments:
+
+        `domdir` : basestring
+          The directory of the domain the user belongs to
+          (commonly AccountObj.domain_directory)
+        `uid` : int/long
+          The user's UID (commonly AccountObj.uid)
+        `gid` : int/long
+          The user's GID (commonly AccountObj.gid)
+        """
+        assert all(isinstance(xid, (long, int)) for xid in (uid, gid)) and \
+                isinstance(domdir, basestring)
+        if uid < MIN_UID or gid < MIN_GID:
+            raise VMMError(_(u"UID '%(uid)u' and/or GID '%(gid)u' are less "
+                             u"than %(min_uid)u/%(min_gid)u.") % {'uid': uid,
+                           'gid': gid, 'min_gid': MIN_GID, 'min_uid': MIN_UID},
+                           MAILDIR_PERM_MISMATCH)
+        if domdir.count('..'):
+            raise VMMError(_(u'Found ".." in domain directory path: %s') %
+                           domdir, FOUND_DOTS_IN_PATH)
+        if not os.path.isdir(domdir):
+            raise VMMError(_(u"No such directory: %s") % domdir,
+                           NO_SUCH_DIRECTORY)
+        os.chdir(domdir)
+        userdir = '%s' % uid
+        if not os.path.isdir(userdir):
+            self._warnings.append(_(u"No such directory: %s") %
+                                  os.path.join(domdir, userdir))
+            return
+        mdstat = os.lstat(userdir)
+        if (mdstat.st_uid, mdstat.st_gid) != (uid, gid):
+            raise VMMError(_(u'Detected owner/group mismatch in home '
+                             u'directory.'), MAILDIR_PERM_MISMATCH)
+        rmtree(userdir, ignore_errors=True)
 
     def _delete_domain_dir(self, domdir, gid):
-        """Delete a domain's directory."""
-        if gid > 0:
-            if not self._isdir(domdir):
-                return
-            basedir = self._cfg.dget('misc.base_directory')
-            domdirdirs = domdir.replace(basedir + '/', '').split('/')
-            domdirparent = os.path.join(basedir, domdirdirs[0])
-            if basedir.count('..') or domdir.count('..'):
-                raise VMMError(_(u'Found ".." in domain directory path.'),
-                               FOUND_DOTS_IN_PATH)
-            if os.path.isdir(domdirparent):
-                os.chdir(domdirparent)
-                if os.lstat(domdirdirs[1]).st_gid != gid:
-                    raise VMMError(_(u'Detected group mismatch in domain '
-                                     u'directory.'), DOMAINDIR_GROUP_MISMATCH)
-                rmtree(domdirdirs[1], ignore_errors=True)
+        """Delete a domain's directory.
+
+        Arguments:
+
+        `domdir` : basestring
+          The domain's directory (commonly DomainObj.directory)
+        `gid` : int/long
+          The domain's GID (commonly DomainObj.gid)
+        """
+        assert isinstance(domdir, basestring) and isinstance(gid, (long, int))
+        if gid < MIN_GID:
+            raise VMMError(_(u"GID '%(gid)u' is less than '%(min_gid)u'.") %
+                           {'gid': gid, 'min_gid': MIN_GID},
+                           DOMAINDIR_GROUP_MISMATCH)
+        if domdir.count('..'):
+            raise VMMError(_(u'Found ".." in domain directory path: %s') %
+                           domdir, FOUND_DOTS_IN_PATH)
+        try:
+            dirst = os.lstat(domdir)
+        except OSError:
+            dirst = None
+        if not dirst or not stat.S_ISDIR(dirst.st_mode):
+            self._warnings.append(_('No such directory: %s') % domdir)
+            return
+        if dirst.st_gid != gid:
+            raise VMMError(_(u'Detected group mismatch in domain directory: '
+                             u'%s') % domdir, DOMAINDIR_GROUP_MISMATCH)
+        rmtree(domdir, ignore_errors=True)
 
     def has_warnings(self):
         """Checks if warnings are present, returns bool."""