--- a/VirtualMailManager/Handler.py Tue Jul 27 19:29:44 2010 +0000
+++ b/VirtualMailManager/Handler.py Wed Jul 28 01:03:56 2010 +0000
@@ -59,7 +59,7 @@
class Handler(object):
"""Wrapper class to simplify the access on all the stuff from
VirtualMailManager"""
- __slots__ = ('_cfg', '_cfg_fname', '_dbh', '__warnings')
+ __slots__ = ('_cfg', '_cfg_fname', '_dbh', '_warnings')
def __init__(self, skip_some_checks=False):
"""Creates a new Handler instance.
@@ -72,21 +72,21 @@
Throws a NotRootError if your uid is greater 0.
"""
self._cfg_fname = ''
- self.__warnings = []
+ self._warnings = []
self._cfg = None
self._dbh = None
if os.geteuid():
raise NotRootError(_(u"You are not root.\n\tGood bye!\n"),
CONF_NOPERM)
- if self.__check_cfg_file():
+ if self._check_cfg_file():
self._cfg = Cfg(self._cfg_fname)
self._cfg.load()
if not skip_some_checks:
self._cfg.check()
self._chkenv()
- def __find_cfg_file(self):
+ def _find_cfg_file(self):
"""Search the CFG_FILE in CFG_PATH.
Raise a VMMError when no vmm.cfg could be found.
"""
@@ -98,11 +98,11 @@
if not self._cfg_fname:
raise VMMError(_(u"Could not find '%(cfg_file)s' in: "
u"'%(cfg_path)s'") % {'cfg_file': CFG_FILE,
- 'cfg_path' : CFG_PATH}, CONF_NOFILE)
+ 'cfg_path': CFG_PATH}, CONF_NOFILE)
- def __check_cfg_file(self):
+ def _check_cfg_file(self):
"""Checks the configuration file, returns bool"""
- self.__find_cfg_file()
+ self._find_cfg_file()
fstat = os.stat(self._cfg_fname)
fmode = int(oct(fstat.st_mode & 0777))
if fmode % 100 and fstat.st_uid != fstat.st_gid or \
@@ -148,7 +148,7 @@
else:
raise
- def __dbConnect(self):
+ def _db_connect(self):
"""Creates a pyPgSQL.PgSQL.connection instance."""
if self._dbh is None or (isinstance(self._dbh, PgSQL.Connection) and
not self._dbh._isOpen):
@@ -162,8 +162,8 @@
dbc = self._dbh.cursor()
dbc.execute("SET NAMES 'UTF8'")
dbc.close()
- except PgSQL.libpq.DatabaseError, e:
- raise VMMError(str(e), DATABASE_ERROR)
+ except PgSQL.libpq.DatabaseError, err:
+ raise VMMError(str(err), DATABASE_ERROR)
def _chk_other_address_types(self, address, exclude):
"""Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`,
@@ -202,44 +202,52 @@
raise VMMError(msg % {'a_type': OTHER_TYPES[other][0],
'address': address}, OTHER_TYPES[other][1])
- def __getAccount(self, address):
+ def _get_account(self, address):
+ """Return an Account instances for the given address (str)."""
address = EmailAddress(address)
- self.__dbConnect()
+ self._db_connect()
return Account(self._dbh, address)
- def __getAlias(self, address):
+ def _get_alias(self, address):
+ """Return an Alias instances for the given address (str)."""
address = EmailAddress(address)
- self.__dbConnect()
+ self._db_connect()
return Alias(self._dbh, address)
- def __getRelocated(self, address):
+ def _get_relocated(self, address):
+ """Return a Relocated instances for the given address (str)."""
address = EmailAddress(address)
- self.__dbConnect()
+ self._db_connect()
return Relocated(self._dbh, address)
- def __getDomain(self, domainname):
- self.__dbConnect()
+ def _get_domain(self, domainname):
+ """Return a Domain instances for the given domain name (str)."""
+ self._db_connect()
return Domain(self._dbh, domainname)
- def __getDiskUsage(self, directory):
+ def _get_disk_usage(self, directory):
"""Estimate file space usage for the given directory.
Keyword arguments:
directory -- the directory to summarize recursively disk usage for
"""
- if self.__isdir(directory):
+ if self._isdir(directory):
return Popen([self._cfg.dget('bin.du'), "-hs", directory],
stdout=PIPE).communicate()[0].split('\t')[0]
else:
return 0
- def __isdir(self, directory):
+ def _isdir(self, directory):
+ """Check if `directory` is a directory. Returns bool.
+ When `directory` isn't a directory, a warning will be appended to
+ _warnings."""
isdir = os.path.isdir(directory)
if not isdir:
- self.__warnings.append(_('No such directory: %s') % directory)
+ self._warnings.append(_('No such directory: %s') % directory)
return isdir
- def __make_domain_dir(self, domain):
+ def _make_domain_dir(self, domain):
+ """Create a directory for the `domain` and its accounts."""
cwd = os.getcwd()
hashdir, domdir = domain.directory.split(os.path.sep)[-2:]
os.chdir(self._cfg.dget('misc.base_directory'))
@@ -251,14 +259,15 @@
os.chown(domain.directory, 0, domain.gid)
os.chdir(cwd)
- def __make_home(self, account):
+ def _make_home(self, account):
"""Create a home directory for the new Account *account*."""
os.umask(0007)
os.chdir(account.domain_directory)
os.mkdir('%s' % account.uid, self._cfg.dget('account.directory_mode'))
os.chown('%s' % account.uid, account.uid, account.gid)
- def __userDirDelete(self, domdir, uid, gid):
+ def _delete_home(self, domdir, uid, gid):
+ """Delete a user's home directory."""
if uid > 0 and gid > 0:
userdir = '%s' % uid
if userdir.count('..') or domdir.count('..'):
@@ -278,9 +287,10 @@
os.path.join(domdir, userdir),
NO_SUCH_DIRECTORY)
- def __domDirDelete(self, domdir, gid):
+ def _delete_domain_dir(self, domdir, gid):
+ """Delete a domain's directory."""
if gid > 0:
- if not self.__isdir(domdir):
+ if not self._isdir(domdir):
return
basedir = self._cfg.dget('misc.base_directory')
domdirdirs = domdir.replace(basedir + '/', '').split('/')
@@ -295,17 +305,16 @@
u'directory.'), DOMAINDIR_GROUP_MISMATCH)
rmtree(domdirdirs[1], ignore_errors=True)
- def hasWarnings(self):
+ def has_warnings(self):
"""Checks if warnings are present, returns bool."""
- return bool(len(self.__warnings))
+ return bool(len(self._warnings))
- def getWarnings(self):
+ def get_warnings(self):
"""Returns a list with all available warnings and resets all
warnings.
-
"""
- ret_val = self.__warnings[:]
- del self.__warnings[:]
+ ret_val = self._warnings[:]
+ del self._warnings[:]
return ret_val
def cfg_dget(self, option):
@@ -325,8 +334,9 @@
assert 'cfg_dget' not in __builtin__.__dict__
__builtin__.__dict__['cfg_dget'] = self._cfg.dget
- def domainAdd(self, domainname, transport=None):
- dom = self.__getDomain(domainname)
+ def domain_add(self, domainname, transport=None):
+ """Wrapper around Domain.set_transport() and Domain.save()"""
+ dom = self._get_domain(domainname)
if transport is None:
dom.set_transport(Transport(self._dbh,
transport=self._cfg.dget('misc.transport')))
@@ -334,24 +344,26 @@
dom.set_transport(Transport(self._dbh, transport=transport))
dom.set_directory(self._cfg.dget('misc.base_directory'))
dom.save()
- self.__make_domain_dir(dom)
+ self._make_domain_dir(dom)
- def domainTransport(self, domainname, transport, force=None):
+ def domain_transport(self, domainname, transport, force=None):
+ """Wrapper around Domain.update_transport()"""
if force is not None and force != 'force':
raise DomainError(_(u"Invalid argument: '%s'") % force,
INVALID_ARGUMENT)
- dom = self.__getDomain(domainname)
+ dom = self._get_domain(domainname)
trsp = Transport(self._dbh, transport=transport)
if force is None:
dom.update_transport(trsp)
else:
dom.update_transport(trsp, force=True)
- def domainDelete(self, domainname, force=None):
+ def domain_delete(self, domainname, force=None):
+ """Wrapper around Domain.delete()"""
if force and force not in ('deluser', 'delalias', 'delall'):
raise DomainError(_(u"Invalid argument: '%s'") % force,
INVALID_ARGUMENT)
- dom = self.__getDomain(domainname)
+ dom = self._get_domain(domainname)
gid = dom.gid
domdir = dom.directory
if self._cfg.dget('domain.force_deletion') or force == 'delall':
@@ -363,14 +375,17 @@
else:
dom.delete()
if self._cfg.dget('domain.delete_directory'):
- self.__domDirDelete(domdir, gid)
+ self._delete_domain_dir(domdir, gid)
- def domainInfo(self, domainname, details=None):
+ def domain_info(self, domainname, details=None):
+ """Wrapper around Domain.get_info(), Domain.get_accounts(),
+ Domain.get_aliase_names(), Domain.get_aliases() and
+ Domain.get_relocated."""
if details not in [None, 'accounts', 'aliasdomains', 'aliases', 'full',
'relocated']:
raise VMMError(_(u'Invalid argument: ā%sā') % details,
INVALID_ARGUMENT)
- dom = self.__getDomain(domainname)
+ dom = self._get_domain(domainname)
dominfo = dom.get_info()
if dominfo['domainname'].startswith('xn--'):
dominfo['domainname'] += ' (%s)' % \
@@ -389,7 +404,7 @@
return (dominfo, dom.get_aliase_names(), dom.get_accounts(),
dom.get_aliases(), dom.get_relocated())
- def aliasDomainAdd(self, aliasname, domainname):
+ def aliasdomain_add(self, aliasname, domainname):
"""Adds an alias domain to the domain.
Arguments:
@@ -399,19 +414,19 @@
`domainname` : basestring
The name of the target domain
"""
- dom = self.__getDomain(domainname)
- aliasDom = AliasDomain(self._dbh, aliasname)
- aliasDom.set_destination(dom)
- aliasDom.save()
+ dom = self._get_domain(domainname)
+ alias_dom = AliasDomain(self._dbh, aliasname)
+ alias_dom.set_destination(dom)
+ alias_dom.save()
- def aliasDomainInfo(self, aliasname):
+ def aliasdomain_info(self, aliasname):
"""Returns a dict (keys: "alias" and "domain") with the names of
the alias domain and its primary domain."""
- self.__dbConnect()
- aliasDom = AliasDomain(self._dbh, aliasname)
- return aliasDom.info()
+ self._db_connect()
+ alias_dom = AliasDomain(self._dbh, aliasname)
+ return alias_dom.info()
- def aliasDomainSwitch(self, aliasname, domainname):
+ def aliasdomain_switch(self, aliasname, domainname):
"""Modifies the target domain of an existing alias domain.
Arguments:
@@ -421,12 +436,12 @@
`domainname` : basestring
The name of the new target domain
"""
- dom = self.__getDomain(domainname)
- aliasDom = AliasDomain(self._dbh, aliasname)
- aliasDom.set_destination(dom)
- aliasDom.switch()
+ dom = self._get_domain(domainname)
+ alias_dom = AliasDomain(self._dbh, aliasname)
+ alias_dom.set_destination(dom)
+ alias_dom.switch()
- def aliasDomainDelete(self, aliasname):
+ def aliasdomain_delete(self, aliasname):
"""Deletes the given alias domain.
Argument:
@@ -434,11 +449,12 @@
`aliasname` : basestring
The name of the alias domain
"""
- self.__dbConnect()
- aliasDom = AliasDomain(self._dbh, aliasname)
- aliasDom.delete()
+ self._db_connect()
+ alias_dom = AliasDomain(self._dbh, aliasname)
+ alias_dom.delete()
- def domainList(self, pattern=None):
+ def domain_list(self, pattern=None):
+ """Wrapper around function search() from module Domain."""
from VirtualMailManager.Domain import search
like = False
if pattern and (pattern.startswith('%') or pattern.endswith('%')):
@@ -446,16 +462,16 @@
if not re.match(RE_DOMAIN_SEARCH, pattern.strip('%')):
raise VMMError(_(u"The pattern '%s' contains invalid "
u"characters.") % pattern, DOMAIN_INVALID)
- self.__dbConnect()
+ self._db_connect()
return search(self._dbh, pattern=pattern, like=like)
def user_add(self, emailaddress, password):
"""Wrapper around Account.set_password() and Account.save()."""
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
acc.set_password(password)
acc.save()
oldpwd = os.getcwd()
- self.__make_home(acc)
+ self._make_home(acc)
mailbox = new_mailbox(acc)
mailbox.create()
folders = self._cfg.dget('mailbox.folders').split(':')
@@ -463,32 +479,32 @@
bad = mailbox.add_boxes(folders,
self._cfg.dget('mailbox.subscribe'))
if bad:
- self.__warnings.append(_(u"Skipped mailbox folders:") +
- '\n\t- ' + '\n\t- '.join(bad))
+ self._warnings.append(_(u"Skipped mailbox folders:") +
+ '\n\t- ' + '\n\t- '.join(bad))
os.chdir(oldpwd)
- def aliasAdd(self, aliasaddress, *targetaddresses):
+ def alias_add(self, aliasaddress, *targetaddresses):
"""Creates a new `Alias` entry for the given *aliasaddress* with
the given *targetaddresses*."""
- alias = self.__getAlias(aliasaddress)
+ alias = self._get_alias(aliasaddress)
destinations = [EmailAddress(address) for address in targetaddresses]
warnings = []
destinations = alias.add_destinations(destinations, warnings)
if warnings:
- self.__warnings.append(_('Ignored destination addresses:'))
- self.__warnings.extend((' * %s' % w for w in warnings))
+ self._warnings.append(_('Ignored destination addresses:'))
+ self._warnings.extend((' * %s' % w for w in warnings))
for destination in destinations:
if get_gid(self._dbh, destination.domainname) and \
not self._chk_other_address_types(destination, TYPE_RELOCATED):
- self.__warnings.append(_(u"The destination account/alias '%s' "
- u"doesn't exist.") % destination)
+ self._warnings.append(_(u"The destination account/alias '%s' "
+ u"doesn't exist.") % destination)
def user_delete(self, emailaddress, force=None):
"""Wrapper around Account.delete(...)"""
if force not in (None, 'delalias'):
raise VMMError(_(u"Invalid argument: '%s'") % force,
INVALID_ARGUMENT)
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
if not acc:
raise VMMError(_(u"The account '%s' doesn't exist.") %
acc.address, NO_SUCH_ACCOUNT)
@@ -499,7 +515,7 @@
acc.delete(bool(force))
if self._cfg.dget('account.delete_directory'):
try:
- self.__userDirDelete(dom_dir, uid, gid)
+ self._delete_home(dom_dir, uid, gid)
except VMMError, err:
if err.code in (FOUND_DOTS_IN_PATH, MAILDIR_PERM_MISMATCH,
NO_SUCH_DIRECTORY):
@@ -509,25 +525,25 @@
ā%(directory)sā
Reason: %(reason)s""") % \
{'directory': acc_dir, 'reason': err.msg}
- self.__warnings.append(warning)
+ self._warnings.append(warning)
else:
raise
- def aliasInfo(self, aliasaddress):
+ def alias_info(self, aliasaddress):
"""Returns an iterator object for all destinations (`EmailAddress`
instances) for the `Alias` with the given *aliasaddress*."""
- alias = self.__getAlias(aliasaddress)
+ alias = self._get_alias(aliasaddress)
if alias:
return alias.get_destinations()
if not self._is_other_address(alias.address, TYPE_ALIAS):
raise VMMError(_(u"The alias '%s' doesn't exist.") %
alias.address, NO_SUCH_ALIAS)
- def aliasDelete(self, aliasaddress, targetaddress=None):
+ def alias_delete(self, aliasaddress, targetaddress=None):
"""Deletes the `Alias` *aliasaddress* with all its destinations from
the database. If *targetaddress* is not ``None``, only this
destination will be removed from the alias."""
- alias = self.__getAlias(aliasaddress)
+ alias = self._get_alias(aliasaddress)
if targetaddress is None:
alias.delete()
else:
@@ -538,7 +554,7 @@
if details not in (None, 'du', 'aliases', 'full'):
raise VMMError(_(u"Invalid argument: '%s'") % details,
INVALID_ARGUMENT)
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
if not acc:
if not self._is_other_address(acc.address, TYPE_ACCOUNT):
raise VMMError(_(u"The account '%s' doesn't exist.") %
@@ -546,7 +562,7 @@
info = acc.get_info()
if self._cfg.dget('account.disk_usage') or details in ('du', 'full'):
path = os.path.join(acc.home, acc.mail_location.directory)
- info['disk usage'] = self.__getDiskUsage(path)
+ info['disk usage'] = self._get_disk_usage(path)
if details in (None, 'du'):
return info
if details in ('aliases', 'full'):
@@ -557,7 +573,7 @@
"""Search for an Account by its *uid*.
Returns a dict (address, uid and gid) if a user could be found."""
from VirtualMailManager.Account import get_account_by_uid
- self.__dbConnect()
+ self._db_connect()
return get_account_by_uid(uid, self._dbh)
def user_password(self, emailaddress, password):
@@ -565,7 +581,7 @@
if not isinstance(password, basestring) or not password:
raise VMMError(_(u"Could not accept password: '%s'") % password,
INVALID_ARGUMENT)
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
if not acc:
raise VMMError(_(u"The account '%s' doesn't exist.") %
acc.address, NO_SUCH_ACCOUNT)
@@ -576,7 +592,7 @@
if not isinstance(name, basestring) or not name:
raise VMMError(_(u"Could not accept name: '%s'") % name,
INVALID_ARGUMENT)
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
if not acc:
raise VMMError(_(u"The account '%s' doesn't exist.") %
acc.address, NO_SUCH_ACCOUNT)
@@ -587,7 +603,7 @@
if not isinstance(transport, basestring) or not transport:
raise VMMError(_(u"Could not accept transport: '%s'") % transport,
INVALID_ARGUMENT)
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
if not acc:
raise VMMError(_(u"The account '%s' doesn't exist.") %
acc.address, NO_SUCH_ACCOUNT)
@@ -598,7 +614,7 @@
if service not in (None, 'all', 'imap', 'pop3', 'smtp', 'sieve'):
raise VMMError(_(u"Could not accept service: '%s'") % service,
INVALID_ARGUMENT)
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
if not acc:
raise VMMError(_(u"The account '%s' doesn't exist.") %
acc.address, NO_SUCH_ACCOUNT)
@@ -609,37 +625,33 @@
if service not in (None, 'all', 'imap', 'pop3', 'smtp', 'sieve'):
raise VMMError(_(u"Could not accept service: '%s'") % service,
INVALID_ARGUMENT)
- acc = self.__getAccount(emailaddress)
+ acc = self._get_account(emailaddress)
if not acc:
raise VMMError(_(u"The account '%s' doesn't exist.") %
acc.address, NO_SUCH_ACCOUNT)
acc.enable(service)
- def relocatedAdd(self, emailaddress, targetaddress):
+ def relocated_add(self, emailaddress, targetaddress):
"""Creates a new `Relocated` entry in the database. If there is
already a relocated user with the given *emailaddress*, only the
*targetaddress* for the relocated user will be updated."""
- relocated = self.__getRelocated(emailaddress)
+ relocated = self._get_relocated(emailaddress)
relocated.set_destination(EmailAddress(targetaddress))
- def relocatedInfo(self, emailaddress):
+ def relocated_info(self, emailaddress):
"""Returns the target address of the relocated user with the given
*emailaddress*."""
- relocated = self.__getRelocated(emailaddress)
+ relocated = self._get_relocated(emailaddress)
if relocated:
return relocated.get_info()
if not self._is_other_address(relocated.address, TYPE_RELOCATED):
raise VMMError(_(u"The relocated user '%s' doesn't exist.") %
relocated.address, NO_SUCH_RELOCATED)
- def relocatedDelete(self, emailaddress):
+ def relocated_delete(self, emailaddress):
"""Deletes the relocated user with the given *emailaddress* from
the database."""
- relocated = self.__getRelocated(emailaddress)
+ relocated = self._get_relocated(emailaddress)
relocated.delete()
- def __del__(self):
- if isinstance(self._dbh, PgSQL.Connection) and self._dbh._isOpen:
- self._dbh.close()
-
del _