# HG changeset patch # User Pascal Volk # Date 1280279036 0 # Node ID f4956b4ceba1e75ce24c00725cea7607478424f4 # Parent 4dc2edf02d114c8190fc92d5d33fb35ba5c5967d VMM//{,cli/}Handler: PEP-8-ified the Handler classes: * converted CamelCase method names to lower_case_with_underscores * eliminated __names * added missing docstrings diff -r 4dc2edf02d11 -r f4956b4ceba1 VirtualMailManager/Handler.py --- a/VirtualMailManager/Handler.py Tue Jul 27 19:29:44 2010 +0000 +++ b/VirtualMailManager/Handler.py Wed Jul 28 01:03:56 2010 +0000 @@ -59,7 +59,7 @@ class Handler(object): """Wrapper class to simplify the access on all the stuff from VirtualMailManager""" - __slots__ = ('_cfg', '_cfg_fname', '_dbh', '__warnings') + __slots__ = ('_cfg', '_cfg_fname', '_dbh', '_warnings') def __init__(self, skip_some_checks=False): """Creates a new Handler instance. @@ -72,21 +72,21 @@ Throws a NotRootError if your uid is greater 0. """ self._cfg_fname = '' - self.__warnings = [] + self._warnings = [] self._cfg = None self._dbh = None if os.geteuid(): raise NotRootError(_(u"You are not root.\n\tGood bye!\n"), CONF_NOPERM) - if self.__check_cfg_file(): + if self._check_cfg_file(): self._cfg = Cfg(self._cfg_fname) self._cfg.load() if not skip_some_checks: self._cfg.check() self._chkenv() - def __find_cfg_file(self): + def _find_cfg_file(self): """Search the CFG_FILE in CFG_PATH. Raise a VMMError when no vmm.cfg could be found. """ @@ -98,11 +98,11 @@ if not self._cfg_fname: raise VMMError(_(u"Could not find '%(cfg_file)s' in: " u"'%(cfg_path)s'") % {'cfg_file': CFG_FILE, - 'cfg_path' : CFG_PATH}, CONF_NOFILE) + 'cfg_path': CFG_PATH}, CONF_NOFILE) - def __check_cfg_file(self): + def _check_cfg_file(self): """Checks the configuration file, returns bool""" - self.__find_cfg_file() + self._find_cfg_file() fstat = os.stat(self._cfg_fname) fmode = int(oct(fstat.st_mode & 0777)) if fmode % 100 and fstat.st_uid != fstat.st_gid or \ @@ -148,7 +148,7 @@ else: raise - def __dbConnect(self): + def _db_connect(self): """Creates a pyPgSQL.PgSQL.connection instance.""" if self._dbh is None or (isinstance(self._dbh, PgSQL.Connection) and not self._dbh._isOpen): @@ -162,8 +162,8 @@ dbc = self._dbh.cursor() dbc.execute("SET NAMES 'UTF8'") dbc.close() - except PgSQL.libpq.DatabaseError, e: - raise VMMError(str(e), DATABASE_ERROR) + except PgSQL.libpq.DatabaseError, err: + raise VMMError(str(err), DATABASE_ERROR) def _chk_other_address_types(self, address, exclude): """Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`, @@ -202,44 +202,52 @@ raise VMMError(msg % {'a_type': OTHER_TYPES[other][0], 'address': address}, OTHER_TYPES[other][1]) - def __getAccount(self, address): + def _get_account(self, address): + """Return an Account instances for the given address (str).""" address = EmailAddress(address) - self.__dbConnect() + self._db_connect() return Account(self._dbh, address) - def __getAlias(self, address): + def _get_alias(self, address): + """Return an Alias instances for the given address (str).""" address = EmailAddress(address) - self.__dbConnect() + self._db_connect() return Alias(self._dbh, address) - def __getRelocated(self, address): + def _get_relocated(self, address): + """Return a Relocated instances for the given address (str).""" address = EmailAddress(address) - self.__dbConnect() + self._db_connect() return Relocated(self._dbh, address) - def __getDomain(self, domainname): - self.__dbConnect() + def _get_domain(self, domainname): + """Return a Domain instances for the given domain name (str).""" + self._db_connect() return Domain(self._dbh, domainname) - def __getDiskUsage(self, directory): + def _get_disk_usage(self, directory): """Estimate file space usage for the given directory. Keyword arguments: directory -- the directory to summarize recursively disk usage for """ - if self.__isdir(directory): + if self._isdir(directory): return Popen([self._cfg.dget('bin.du'), "-hs", directory], stdout=PIPE).communicate()[0].split('\t')[0] else: return 0 - def __isdir(self, directory): + def _isdir(self, directory): + """Check if `directory` is a directory. Returns bool. + When `directory` isn't a directory, a warning will be appended to + _warnings.""" isdir = os.path.isdir(directory) if not isdir: - self.__warnings.append(_('No such directory: %s') % directory) + self._warnings.append(_('No such directory: %s') % directory) return isdir - def __make_domain_dir(self, domain): + def _make_domain_dir(self, domain): + """Create a directory for the `domain` and its accounts.""" cwd = os.getcwd() hashdir, domdir = domain.directory.split(os.path.sep)[-2:] os.chdir(self._cfg.dget('misc.base_directory')) @@ -251,14 +259,15 @@ os.chown(domain.directory, 0, domain.gid) os.chdir(cwd) - def __make_home(self, account): + def _make_home(self, account): """Create a home directory for the new Account *account*.""" os.umask(0007) os.chdir(account.domain_directory) os.mkdir('%s' % account.uid, self._cfg.dget('account.directory_mode')) os.chown('%s' % account.uid, account.uid, account.gid) - def __userDirDelete(self, domdir, uid, gid): + def _delete_home(self, domdir, uid, gid): + """Delete a user's home directory.""" if uid > 0 and gid > 0: userdir = '%s' % uid if userdir.count('..') or domdir.count('..'): @@ -278,9 +287,10 @@ os.path.join(domdir, userdir), NO_SUCH_DIRECTORY) - def __domDirDelete(self, domdir, gid): + def _delete_domain_dir(self, domdir, gid): + """Delete a domain's directory.""" if gid > 0: - if not self.__isdir(domdir): + if not self._isdir(domdir): return basedir = self._cfg.dget('misc.base_directory') domdirdirs = domdir.replace(basedir + '/', '').split('/') @@ -295,17 +305,16 @@ u'directory.'), DOMAINDIR_GROUP_MISMATCH) rmtree(domdirdirs[1], ignore_errors=True) - def hasWarnings(self): + def has_warnings(self): """Checks if warnings are present, returns bool.""" - return bool(len(self.__warnings)) + return bool(len(self._warnings)) - def getWarnings(self): + def get_warnings(self): """Returns a list with all available warnings and resets all warnings. - """ - ret_val = self.__warnings[:] - del self.__warnings[:] + ret_val = self._warnings[:] + del self._warnings[:] return ret_val def cfg_dget(self, option): @@ -325,8 +334,9 @@ assert 'cfg_dget' not in __builtin__.__dict__ __builtin__.__dict__['cfg_dget'] = self._cfg.dget - def domainAdd(self, domainname, transport=None): - dom = self.__getDomain(domainname) + def domain_add(self, domainname, transport=None): + """Wrapper around Domain.set_transport() and Domain.save()""" + dom = self._get_domain(domainname) if transport is None: dom.set_transport(Transport(self._dbh, transport=self._cfg.dget('misc.transport'))) @@ -334,24 +344,26 @@ dom.set_transport(Transport(self._dbh, transport=transport)) dom.set_directory(self._cfg.dget('misc.base_directory')) dom.save() - self.__make_domain_dir(dom) + self._make_domain_dir(dom) - def domainTransport(self, domainname, transport, force=None): + def domain_transport(self, domainname, transport, force=None): + """Wrapper around Domain.update_transport()""" if force is not None and force != 'force': raise DomainError(_(u"Invalid argument: '%s'") % force, INVALID_ARGUMENT) - dom = self.__getDomain(domainname) + dom = self._get_domain(domainname) trsp = Transport(self._dbh, transport=transport) if force is None: dom.update_transport(trsp) else: dom.update_transport(trsp, force=True) - def domainDelete(self, domainname, force=None): + def domain_delete(self, domainname, force=None): + """Wrapper around Domain.delete()""" if force and force not in ('deluser', 'delalias', 'delall'): raise DomainError(_(u"Invalid argument: '%s'") % force, INVALID_ARGUMENT) - dom = self.__getDomain(domainname) + dom = self._get_domain(domainname) gid = dom.gid domdir = dom.directory if self._cfg.dget('domain.force_deletion') or force == 'delall': @@ -363,14 +375,17 @@ else: dom.delete() if self._cfg.dget('domain.delete_directory'): - self.__domDirDelete(domdir, gid) + self._delete_domain_dir(domdir, gid) - def domainInfo(self, domainname, details=None): + def domain_info(self, domainname, details=None): + """Wrapper around Domain.get_info(), Domain.get_accounts(), + Domain.get_aliase_names(), Domain.get_aliases() and + Domain.get_relocated.""" if details not in [None, 'accounts', 'aliasdomains', 'aliases', 'full', 'relocated']: raise VMMError(_(u'Invalid argument: ā€œ%sā€') % details, INVALID_ARGUMENT) - dom = self.__getDomain(domainname) + dom = self._get_domain(domainname) dominfo = dom.get_info() if dominfo['domainname'].startswith('xn--'): dominfo['domainname'] += ' (%s)' % \ @@ -389,7 +404,7 @@ return (dominfo, dom.get_aliase_names(), dom.get_accounts(), dom.get_aliases(), dom.get_relocated()) - def aliasDomainAdd(self, aliasname, domainname): + def aliasdomain_add(self, aliasname, domainname): """Adds an alias domain to the domain. Arguments: @@ -399,19 +414,19 @@ `domainname` : basestring The name of the target domain """ - dom = self.__getDomain(domainname) - aliasDom = AliasDomain(self._dbh, aliasname) - aliasDom.set_destination(dom) - aliasDom.save() + dom = self._get_domain(domainname) + alias_dom = AliasDomain(self._dbh, aliasname) + alias_dom.set_destination(dom) + alias_dom.save() - def aliasDomainInfo(self, aliasname): + def aliasdomain_info(self, aliasname): """Returns a dict (keys: "alias" and "domain") with the names of the alias domain and its primary domain.""" - self.__dbConnect() - aliasDom = AliasDomain(self._dbh, aliasname) - return aliasDom.info() + self._db_connect() + alias_dom = AliasDomain(self._dbh, aliasname) + return alias_dom.info() - def aliasDomainSwitch(self, aliasname, domainname): + def aliasdomain_switch(self, aliasname, domainname): """Modifies the target domain of an existing alias domain. Arguments: @@ -421,12 +436,12 @@ `domainname` : basestring The name of the new target domain """ - dom = self.__getDomain(domainname) - aliasDom = AliasDomain(self._dbh, aliasname) - aliasDom.set_destination(dom) - aliasDom.switch() + dom = self._get_domain(domainname) + alias_dom = AliasDomain(self._dbh, aliasname) + alias_dom.set_destination(dom) + alias_dom.switch() - def aliasDomainDelete(self, aliasname): + def aliasdomain_delete(self, aliasname): """Deletes the given alias domain. Argument: @@ -434,11 +449,12 @@ `aliasname` : basestring The name of the alias domain """ - self.__dbConnect() - aliasDom = AliasDomain(self._dbh, aliasname) - aliasDom.delete() + self._db_connect() + alias_dom = AliasDomain(self._dbh, aliasname) + alias_dom.delete() - def domainList(self, pattern=None): + def domain_list(self, pattern=None): + """Wrapper around function search() from module Domain.""" from VirtualMailManager.Domain import search like = False if pattern and (pattern.startswith('%') or pattern.endswith('%')): @@ -446,16 +462,16 @@ if not re.match(RE_DOMAIN_SEARCH, pattern.strip('%')): raise VMMError(_(u"The pattern '%s' contains invalid " u"characters.") % pattern, DOMAIN_INVALID) - self.__dbConnect() + self._db_connect() return search(self._dbh, pattern=pattern, like=like) def user_add(self, emailaddress, password): """Wrapper around Account.set_password() and Account.save().""" - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) acc.set_password(password) acc.save() oldpwd = os.getcwd() - self.__make_home(acc) + self._make_home(acc) mailbox = new_mailbox(acc) mailbox.create() folders = self._cfg.dget('mailbox.folders').split(':') @@ -463,32 +479,32 @@ bad = mailbox.add_boxes(folders, self._cfg.dget('mailbox.subscribe')) if bad: - self.__warnings.append(_(u"Skipped mailbox folders:") + - '\n\t- ' + '\n\t- '.join(bad)) + self._warnings.append(_(u"Skipped mailbox folders:") + + '\n\t- ' + '\n\t- '.join(bad)) os.chdir(oldpwd) - def aliasAdd(self, aliasaddress, *targetaddresses): + def alias_add(self, aliasaddress, *targetaddresses): """Creates a new `Alias` entry for the given *aliasaddress* with the given *targetaddresses*.""" - alias = self.__getAlias(aliasaddress) + alias = self._get_alias(aliasaddress) destinations = [EmailAddress(address) for address in targetaddresses] warnings = [] destinations = alias.add_destinations(destinations, warnings) if warnings: - self.__warnings.append(_('Ignored destination addresses:')) - self.__warnings.extend((' * %s' % w for w in warnings)) + self._warnings.append(_('Ignored destination addresses:')) + self._warnings.extend((' * %s' % w for w in warnings)) for destination in destinations: if get_gid(self._dbh, destination.domainname) and \ not self._chk_other_address_types(destination, TYPE_RELOCATED): - self.__warnings.append(_(u"The destination account/alias '%s' " - u"doesn't exist.") % destination) + self._warnings.append(_(u"The destination account/alias '%s' " + u"doesn't exist.") % destination) def user_delete(self, emailaddress, force=None): """Wrapper around Account.delete(...)""" if force not in (None, 'delalias'): raise VMMError(_(u"Invalid argument: '%s'") % force, INVALID_ARGUMENT) - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) if not acc: raise VMMError(_(u"The account '%s' doesn't exist.") % acc.address, NO_SUCH_ACCOUNT) @@ -499,7 +515,7 @@ acc.delete(bool(force)) if self._cfg.dget('account.delete_directory'): try: - self.__userDirDelete(dom_dir, uid, gid) + self._delete_home(dom_dir, uid, gid) except VMMError, err: if err.code in (FOUND_DOTS_IN_PATH, MAILDIR_PERM_MISMATCH, NO_SUCH_DIRECTORY): @@ -509,25 +525,25 @@ ā€œ%(directory)sā€ Reason: %(reason)s""") % \ {'directory': acc_dir, 'reason': err.msg} - self.__warnings.append(warning) + self._warnings.append(warning) else: raise - def aliasInfo(self, aliasaddress): + def alias_info(self, aliasaddress): """Returns an iterator object for all destinations (`EmailAddress` instances) for the `Alias` with the given *aliasaddress*.""" - alias = self.__getAlias(aliasaddress) + alias = self._get_alias(aliasaddress) if alias: return alias.get_destinations() if not self._is_other_address(alias.address, TYPE_ALIAS): raise VMMError(_(u"The alias '%s' doesn't exist.") % alias.address, NO_SUCH_ALIAS) - def aliasDelete(self, aliasaddress, targetaddress=None): + def alias_delete(self, aliasaddress, targetaddress=None): """Deletes the `Alias` *aliasaddress* with all its destinations from the database. If *targetaddress* is not ``None``, only this destination will be removed from the alias.""" - alias = self.__getAlias(aliasaddress) + alias = self._get_alias(aliasaddress) if targetaddress is None: alias.delete() else: @@ -538,7 +554,7 @@ if details not in (None, 'du', 'aliases', 'full'): raise VMMError(_(u"Invalid argument: '%s'") % details, INVALID_ARGUMENT) - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) if not acc: if not self._is_other_address(acc.address, TYPE_ACCOUNT): raise VMMError(_(u"The account '%s' doesn't exist.") % @@ -546,7 +562,7 @@ info = acc.get_info() if self._cfg.dget('account.disk_usage') or details in ('du', 'full'): path = os.path.join(acc.home, acc.mail_location.directory) - info['disk usage'] = self.__getDiskUsage(path) + info['disk usage'] = self._get_disk_usage(path) if details in (None, 'du'): return info if details in ('aliases', 'full'): @@ -557,7 +573,7 @@ """Search for an Account by its *uid*. Returns a dict (address, uid and gid) if a user could be found.""" from VirtualMailManager.Account import get_account_by_uid - self.__dbConnect() + self._db_connect() return get_account_by_uid(uid, self._dbh) def user_password(self, emailaddress, password): @@ -565,7 +581,7 @@ if not isinstance(password, basestring) or not password: raise VMMError(_(u"Could not accept password: '%s'") % password, INVALID_ARGUMENT) - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) if not acc: raise VMMError(_(u"The account '%s' doesn't exist.") % acc.address, NO_SUCH_ACCOUNT) @@ -576,7 +592,7 @@ if not isinstance(name, basestring) or not name: raise VMMError(_(u"Could not accept name: '%s'") % name, INVALID_ARGUMENT) - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) if not acc: raise VMMError(_(u"The account '%s' doesn't exist.") % acc.address, NO_SUCH_ACCOUNT) @@ -587,7 +603,7 @@ if not isinstance(transport, basestring) or not transport: raise VMMError(_(u"Could not accept transport: '%s'") % transport, INVALID_ARGUMENT) - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) if not acc: raise VMMError(_(u"The account '%s' doesn't exist.") % acc.address, NO_SUCH_ACCOUNT) @@ -598,7 +614,7 @@ if service not in (None, 'all', 'imap', 'pop3', 'smtp', 'sieve'): raise VMMError(_(u"Could not accept service: '%s'") % service, INVALID_ARGUMENT) - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) if not acc: raise VMMError(_(u"The account '%s' doesn't exist.") % acc.address, NO_SUCH_ACCOUNT) @@ -609,37 +625,33 @@ if service not in (None, 'all', 'imap', 'pop3', 'smtp', 'sieve'): raise VMMError(_(u"Could not accept service: '%s'") % service, INVALID_ARGUMENT) - acc = self.__getAccount(emailaddress) + acc = self._get_account(emailaddress) if not acc: raise VMMError(_(u"The account '%s' doesn't exist.") % acc.address, NO_SUCH_ACCOUNT) acc.enable(service) - def relocatedAdd(self, emailaddress, targetaddress): + def relocated_add(self, emailaddress, targetaddress): """Creates a new `Relocated` entry in the database. If there is already a relocated user with the given *emailaddress*, only the *targetaddress* for the relocated user will be updated.""" - relocated = self.__getRelocated(emailaddress) + relocated = self._get_relocated(emailaddress) relocated.set_destination(EmailAddress(targetaddress)) - def relocatedInfo(self, emailaddress): + def relocated_info(self, emailaddress): """Returns the target address of the relocated user with the given *emailaddress*.""" - relocated = self.__getRelocated(emailaddress) + relocated = self._get_relocated(emailaddress) if relocated: return relocated.get_info() if not self._is_other_address(relocated.address, TYPE_RELOCATED): raise VMMError(_(u"The relocated user '%s' doesn't exist.") % relocated.address, NO_SUCH_RELOCATED) - def relocatedDelete(self, emailaddress): + def relocated_delete(self, emailaddress): """Deletes the relocated user with the given *emailaddress* from the database.""" - relocated = self.__getRelocated(emailaddress) + relocated = self._get_relocated(emailaddress) relocated.delete() - def __del__(self): - if isinstance(self._dbh, PgSQL.Connection) and self._dbh._isOpen: - self._dbh.close() - del _ diff -r 4dc2edf02d11 -r f4956b4ceba1 VirtualMailManager/cli/Handler.py --- a/VirtualMailManager/cli/Handler.py Tue Jul 27 19:29:44 2010 +0000 +++ b/VirtualMailManager/cli/Handler.py Wed Jul 28 01:03:56 2010 +0000 @@ -48,7 +48,8 @@ self._cfg.check() self._chkenv() - def cfgSet(self, option, value): + def cfg_set(self, option, value): + """Set a new value for the given option.""" return self._cfg.set(option, value) def configure(self, section=None):