Due to the UNION query in address_list, the assumption that the list of
gids received from the database would be continuous does not hold.
To prevent addresses for domains with multiple entry types from being
listed, it is necessary to check the list of domain gids for every
entry.
Signed-off-by: martin f. krafft <madduck@debian.org>
---
VirtualMailManager/common.py | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2012, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.handler ~~~~~~~~~~~~~~~~~~~~~~~~~~ A wrapper class. It wraps round all other classes and does some dependencies checks. Additionally it communicates with the PostgreSQL database, creates or deletes directories of domains or users."""importosimportrefromshutilimportrmtreefromsubprocessimportPopen,PIPEfromVirtualMailManager.accountimportAccountfromVirtualMailManager.aliasimportAliasfromVirtualMailManager.aliasdomainimportAliasDomainfromVirtualMailManager.catchallimportCatchallAliasfromVirtualMailManager.commonimportexec_ok,lisdirfromVirtualMailManager.configimportConfigasCfgfromVirtualMailManager.constantsimportMIN_GID,MIN_UID, \ACCOUNT_EXISTS,ALIAS_EXISTS,CONF_NOFILE,CONF_NOPERM,CONF_WRONGPERM, \DATABASE_ERROR,DOMAINDIR_GROUP_MISMATCH,DOMAIN_INVALID, \FOUND_DOTS_IN_PATH,INVALID_ARGUMENT,MAILDIR_PERM_MISMATCH, \NOT_EXECUTABLE,NO_SUCH_ACCOUNT,NO_SUCH_ALIAS,NO_SUCH_BINARY, \NO_SUCH_DIRECTORY,NO_SUCH_RELOCATED,RELOCATED_EXISTS,UNKNOWN_SERVICE, \VMM_ERROR,LOCALPART_INVALID,TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATEDfromVirtualMailManager.domainimportDomainfromVirtualMailManager.emailaddressimportDestinationEmailAddress, \EmailAddress,RE_LOCALPARTfromVirtualMailManager.errorsimport \DomainError,NotRootError,PermissionError,VMMErrorfromVirtualMailManager.mailboximportnewasnew_mailboxfromVirtualMailManager.pycompatimportall,anyfromVirtualMailManager.quotalimitimportQuotaLimitfromVirtualMailManager.relocatedimportRelocatedfromVirtualMailManager.servicesetimportServiceSet,SERVICESfromVirtualMailManager.transportimportTransport_=lambdamsg:msg_db_mod=NoneCFG_FILE='vmm.cfg'CFG_PATH='/root:/usr/local/etc:/etc'RE_DOMAIN_SEARCH="""^[a-z0-9-\.]+$"""OTHER_TYPES={TYPE_ACCOUNT:(_(u'an account'),ACCOUNT_EXISTS),TYPE_ALIAS:(_(u'an alias'),ALIAS_EXISTS),TYPE_RELOCATED:(_(u'a relocated user'),RELOCATED_EXISTS),}classHandler(object):"""Wrapper class to simplify the access on all the stuff from VirtualMailManager"""__slots__=('_cfg','_cfg_fname','_db_connect','_dbh','_warnings')def__init__(self,skip_some_checks=False):"""Creates a new Handler instance. ``skip_some_checks`` : bool When a derived class knows how to handle all checks this argument may be ``True``. By default it is ``False`` and all checks will be performed. Throws a NotRootError if your uid is greater 0. """self._cfg_fname=''self._warnings=[]self._cfg=Noneself._dbh=Noneself._db_connect=Noneifos.geteuid():raiseNotRootError(_(u"You are not root.\n\tGood bye!\n"),CONF_NOPERM)ifself._check_cfg_file():self._cfg=Cfg(self._cfg_fname)self._cfg.load()ifnotskip_some_checks:self._cfg.check()self._chkenv()self._set_db_connect()def_find_cfg_file(self):"""Search the CFG_FILE in CFG_PATH. Raise a VMMError when no vmm.cfg could be found. """forpathinCFG_PATH.split(':'):tmp=os.path.join(path,CFG_FILE)ifos.path.isfile(tmp):self._cfg_fname=tmpbreakifnotself._cfg_fname:raiseVMMError(_(u"Could not find '%(cfg_file)s' in: "u"'%(cfg_path)s'")%{'cfg_file':CFG_FILE,'cfg_path':CFG_PATH},CONF_NOFILE)def_check_cfg_file(self):"""Checks the configuration file, returns bool"""self._find_cfg_file()fstat=os.stat(self._cfg_fname)fmode=int(oct(fstat.st_mode&0777))iffmode%100andfstat.st_uid!=fstat.st_gidor \fmode%10andfstat.st_uid==fstat.st_gid:# TP: Please keep the backticks around the command. `chmod 0600 …`raisePermissionError(_(u"wrong permissions for '%(file)s': "u"%(perms)s\n`chmod 0600 %(file)s` would "u"be great.")%{'file':self._cfg_fname,'perms':fmode},CONF_WRONGPERM)else:returnTruedef_chkenv(self):"""Make sure our base_directory is a directory and that all required executables exists and are executable. If not, a VMMError will be raised"""dir_created=Falsebasedir=self._cfg.dget('misc.base_directory')ifnotos.path.exists(basedir):old_umask=os.umask(0006)os.makedirs(basedir,0771)os.chown(basedir,0,0)os.umask(old_umask)dir_created=Trueifnotdir_createdandnotlisdir(basedir):raiseVMMError(_(u"'%(path)s' is not a directory.\n(%(cfg_file)s: "u"section 'misc', option 'base_directory')")%{'path':basedir,'cfg_file':self._cfg_fname},NO_SUCH_DIRECTORY)foropt,valinself._cfg.items('bin'):try:exec_ok(val)exceptVMMError,err:iferr.codein(NO_SUCH_BINARY,NOT_EXECUTABLE):raiseVMMError(err.msg+_(u"\n(%(cfg_file)s: section "u"'bin', option '%(option)s')")%{'cfg_file':self._cfg_fname,'option':opt},err.code)else:raisedef_set_db_connect(self):"""check which module to use and set self._db_connect"""global_db_modifself._cfg.dget('database.module').lower()=='psycopg2':try:_db_mod=__import__('psycopg2')exceptImportError:raiseVMMError(_(u"Unable to import database module '%s'.")%'psycopg2',VMM_ERROR)self._db_connect=self._psycopg2_connectelse:try:tmp=__import__('pyPgSQL',globals(),locals(),['PgSQL'])exceptImportError:raiseVMMError(_(u"Unable to import database module '%s'.")%'pyPgSQL',VMM_ERROR)_db_mod=tmp.PgSQLself._db_connect=self._pypgsql_connectdef_pypgsql_connect(self):"""Creates a pyPgSQL.PgSQL.connection instance."""ifself._dbhisNoneor(isinstance(self._dbh,_db_mod.Connection)andnotself._dbh._isOpen):try:self._dbh=_db_mod.connect(database=self._cfg.dget('database.name'),user=self._cfg.pget('database.user'),host=self._cfg.dget('database.host'),port=self._cfg.dget('database.port'),password=self._cfg.pget('database.pass'),client_encoding='utf8',unicode_results=True)dbc=self._dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()except_db_mod.libpq.DatabaseError,err:raiseVMMError(str(err),DATABASE_ERROR)def_psycopg2_connect(self):"""Return a new psycopg2 connection object."""ifself._dbhisNoneor \(isinstance(self._dbh,_db_mod.extensions.connection)andself._dbh.closed):try:self._dbh=_db_mod.connect(host=self._cfg.dget('database.host'),sslmode=self._cfg.dget('database.sslmode'),port=self._cfg.dget('database.port'),database=self._cfg.dget('database.name'),user=self._cfg.pget('database.user'),password=self._cfg.pget('database.pass'))self._dbh.set_client_encoding('utf8')_db_mod.extensions.register_type(_db_mod.extensions.UNICODE)dbc=self._dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()except_db_mod.DatabaseError,err:raiseVMMError(str(err),DATABASE_ERROR)def_chk_other_address_types(self,address,exclude):"""Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`, `TYPE_ALIAS` or `TYPE_RELOCATED`, but not as the `TYPE_*` specified by *exclude*. If the *address* is known as one of the `TYPE_*`s the according `TYPE_*` constant will be returned. Otherwise 0 will be returned."""assertexcludein(TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATED)and \isinstance(address,EmailAddress)ifexcludeisnotTYPE_ACCOUNT:account=Account(self._dbh,address)ifaccount:returnTYPE_ACCOUNTifexcludeisnotTYPE_ALIAS:alias=Alias(self._dbh,address)ifalias:returnTYPE_ALIASifexcludeisnotTYPE_RELOCATED:relocated=Relocated(self._dbh,address)ifrelocated:returnTYPE_RELOCATEDreturn0def_is_other_address(self,address,exclude):"""Checks if *address* is known for an Account (TYPE_ACCOUNT), Alias (TYPE_ALIAS) or Relocated (TYPE_RELOCATED), except for *exclude*. Returns `False` if the address is not known for other types. Raises a `VMMError` if the address is known. """other=self._chk_other_address_types(address,exclude)ifnotother:returnFalse# TP: %(a_type)s will be one of: 'an account', 'an alias' or# 'a relocated user'msg=_(u"There is already %(a_type)s with the address '%(address)s'.")raiseVMMError(msg%{'a_type':OTHER_TYPES[other][0],'address':address},OTHER_TYPES[other][1])def_get_account(self,address):"""Return an Account instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnAccount(self._dbh,address)def_get_alias(self,address):"""Return an Alias instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnAlias(self._dbh,address)def_get_catchall(self,domain):"""Return a CatchallAlias instances for the given domain (str)."""self._db_connect()returnCatchallAlias(self._dbh,domain)def_get_relocated(self,address):"""Return a Relocated instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnRelocated(self._dbh,address)def_get_domain(self,domainname):"""Return a Domain instances for the given domain name (str)."""self._db_connect()returnDomain(self._dbh,domainname)def_get_disk_usage(self,directory):"""Estimate file space usage for the given directory. Arguments: `directory` : basestring The directory to summarize recursively disk usage for """iflisdir(directory):returnPopen([self._cfg.dget('bin.du'),"-hs",directory],stdout=PIPE).communicate()[0].split('\t')[0]else:self._warnings.append(_('No such directory: %s')%directory)return0def_make_domain_dir(self,domain):"""Create a directory for the `domain` and its accounts."""cwd=os.getcwd()hashdir,domdir=domain.directory.split(os.path.sep)[-2:]dir_created=Falseos.chdir(self._cfg.dget('misc.base_directory'))old_umask=os.umask(0022)ifnotos.path.exists(hashdir):os.mkdir(hashdir,0711)os.chown(hashdir,0,0)dir_created=Trueifnotdir_createdandnotlisdir(hashdir):raiseVMMError(_(u"'%s' is not a directory.")%hashdir,NO_SUCH_DIRECTORY)ifos.path.exists(domain.directory):raiseVMMError(_(u"The file/directory '%s' already exists.")%domain.directory,VMM_ERROR)os.mkdir(os.path.join(hashdir,domdir),self._cfg.dget('domain.directory_mode'))os.chown(domain.directory,0,domain.gid)os.umask(old_umask)os.chdir(cwd)def_make_home(self,account):"""Create a home directory for the new Account *account*."""domdir=account.domain.directoryifnotlisdir(domdir):self._make_domain_dir(account.domain)os.umask(0007)uid=account.uidos.chdir(domdir)os.mkdir('%s'%uid,self._cfg.dget('account.directory_mode'))os.chown('%s'%uid,uid,account.gid)def_make_account_dirs(self,account):"""Create all necessary directories for the account."""oldpwd=os.getcwd()self._make_home(account)mailbox=new_mailbox(account)mailbox.create()folders=self._cfg.dget('mailbox.folders').split(':')ifany(folders):bad=mailbox.add_boxes(folders,self._cfg.dget('mailbox.subscribe'))ifbad:self._warnings.append(_(u"Skipped mailbox folders:")+'\n\t- '+'\n\t- '.join(bad))os.chdir(oldpwd)def_delete_home(self,domdir,uid,gid):"""Delete a user's home directory. Arguments: `domdir` : basestring The directory of the domain the user belongs to (commonly AccountObj.domain.directory) `uid` : int/long The user's UID (commonly AccountObj.uid) `gid` : int/long The user's GID (commonly AccountObj.gid) """assertall(isinstance(xid,(long,int))forxidin(uid,gid))and \isinstance(domdir,basestring)ifuid<MIN_UIDorgid<MIN_GID:raiseVMMError(_(u"UID '%(uid)u' and/or GID '%(gid)u' are less "u"than %(min_uid)u/%(min_gid)u.")%{'uid':uid,'gid':gid,'min_gid':MIN_GID,'min_uid':MIN_UID},MAILDIR_PERM_MISMATCH)ifdomdir.count('..'):raiseVMMError(_(u'Found ".." in domain directory path: %s')%domdir,FOUND_DOTS_IN_PATH)ifnotlisdir(domdir):raiseVMMError(_(u"No such directory: %s")%domdir,NO_SUCH_DIRECTORY)os.chdir(domdir)userdir='%s'%uidifnotlisdir(userdir):self._warnings.append(_(u"No such directory: %s")%os.path.join(domdir,userdir))returnmdstat=os.lstat(userdir)if(mdstat.st_uid,mdstat.st_gid)!=(uid,gid):raiseVMMError(_(u'Detected owner/group mismatch in home 'u'directory.'),MAILDIR_PERM_MISMATCH)rmtree(userdir,ignore_errors=True)def_delete_domain_dir(self,domdir,gid):"""Delete a domain's directory. Arguments: `domdir` : basestring The domain's directory (commonly DomainObj.directory) `gid` : int/long The domain's GID (commonly DomainObj.gid) """assertisinstance(domdir,basestring)andisinstance(gid,(long,int))ifgid<MIN_GID:raiseVMMError(_(u"GID '%(gid)u' is less than '%(min_gid)u'.")%{'gid':gid,'min_gid':MIN_GID},DOMAINDIR_GROUP_MISMATCH)ifdomdir.count('..'):raiseVMMError(_(u'Found ".." in domain directory path: %s')%domdir,FOUND_DOTS_IN_PATH)ifnotlisdir(domdir):self._warnings.append(_('No such directory: %s')%domdir)returndirst=os.lstat(domdir)ifdirst.st_gid!=gid:raiseVMMError(_(u'Detected group mismatch in domain directory: 'u'%s')%domdir,DOMAINDIR_GROUP_MISMATCH)rmtree(domdir,ignore_errors=True)defhas_warnings(self):"""Checks if warnings are present, returns bool."""returnbool(len(self._warnings))defget_warnings(self):"""Returns a list with all available warnings and resets all warnings. """ret_val=self._warnings[:]delself._warnings[:]returnret_valdefcfg_dget(self,option):"""Get the configured value of the *option* (section.option). When the option was not configured its default value will be returned."""returnself._cfg.dget(option)defcfg_pget(self,option):"""Get the configured value of the *option* (section.option)."""returnself._cfg.pget(option)defcfg_install(self):"""Installs the cfg_dget method as ``cfg_dget`` into the built-in namespace."""import__builtin__assert'cfg_dget'notin__builtin__.__dict____builtin__.__dict__['cfg_dget']=self._cfg.dgetdefdomain_add(self,domainname,transport=None):"""Wrapper around Domain's set_quotalimit, set_transport and save."""dom=self._get_domain(domainname)iftransportisNone:dom.set_transport(Transport(self._dbh,transport=self._cfg.dget('domain.transport')))else:dom.set_transport(Transport(self._dbh,transport=transport))dom.set_quotalimit(QuotaLimit(self._dbh,bytes=long(self._cfg.dget('domain.quota_bytes')),messages=self._cfg.dget('domain.quota_messages')))dom.set_serviceset(ServiceSet(self._dbh,imap=self._cfg.dget('domain.imap'),pop3=self._cfg.dget('domain.pop3'),sieve=self._cfg.dget('domain.sieve'),smtp=self._cfg.dget('domain.smtp')))dom.set_directory(self._cfg.dget('misc.base_directory'))dom.save()self._make_domain_dir(dom)defdomain_quotalimit(self,domainname,bytes_,messages=0,force=None):"""Wrapper around Domain.update_quotalimit()."""ifnotall(isinstance(i,(int,long))foriin(bytes_,messages)):raiseTypeError("'bytes_' and 'messages' have to be ""integers or longs.")ifforceisnotNoneandforce!='force':raiseDomainError(_(u"Invalid argument: '%s'")%force,INVALID_ARGUMENT)dom=self._get_domain(domainname)quotalimit=QuotaLimit(self._dbh,bytes=bytes_,messages=messages)ifforceisNone:dom.update_quotalimit(quotalimit)else:dom.update_quotalimit(quotalimit,force=True)defdomain_services(self,domainname,force=None,*services):"""Wrapper around Domain.update_serviceset()."""kwargs=dict.fromkeys(SERVICES,False)ifforceisnotNoneandforce!='force':raiseDomainError(_(u"Invalid argument: '%s'")%force,INVALID_ARGUMENT)forserviceinset(services):ifservicenotinSERVICES:raiseDomainError(_(u"Unknown service: '%s'")%service,UNKNOWN_SERVICE)kwargs[service]=Truedom=self._get_domain(domainname)serviceset=ServiceSet(self._dbh,**kwargs)dom.update_serviceset(serviceset,(True,False)[notforce])defdomain_transport(self,domainname,transport,force=None):"""Wrapper around Domain.update_transport()"""ifforceisnotNoneandforce!='force':raiseDomainError(_(u"Invalid argument: '%s'")%force,INVALID_ARGUMENT)dom=self._get_domain(domainname)trsp=Transport(self._dbh,transport=transport)ifforceisNone:dom.update_transport(trsp)else:dom.update_transport(trsp,force=True)defdomain_note(self,domainname,note):"""Wrapper around Domain.update_note()"""dom=self._get_domain(domainname)dom.update_note(note)defdomain_delete(self,domainname,force=False):"""Wrapper around Domain.delete()"""ifnotisinstance(force,bool):raiseTypeError('force must be a bool')dom=self._get_domain(domainname)gid=dom.giddomdir=dom.directoryifself._cfg.dget('domain.force_deletion')orforce:dom.delete(True)else:dom.delete(False)ifself._cfg.dget('domain.delete_directory'):self._delete_domain_dir(domdir,gid)defdomain_info(self,domainname,details=None):"""Wrapper around Domain.get_info(), Domain.get_accounts(), Domain.get_aliase_names(), Domain.get_aliases() and Domain.get_relocated."""ifdetailsnotin[None,'accounts','aliasdomains','aliases','full','relocated','catchall']:raiseVMMError(_(u"Invalid argument: '%s'")%details,INVALID_ARGUMENT)dom=self._get_domain(domainname)dominfo=dom.get_info()ifdominfo['domain name'].startswith('xn--'):dominfo['domain name']+=' (%s)'% \dominfo['domain name'].decode('idna')ifdetailsisNone:returndominfoelifdetails=='accounts':return(dominfo,dom.get_accounts())elifdetails=='aliasdomains':return(dominfo,dom.get_aliase_names())elifdetails=='aliases':return(dominfo,dom.get_aliases())elifdetails=='relocated':return(dominfo,dom.get_relocated())elifdetails=='catchall':return(dominfo,dom.get_catchall())else:return(dominfo,dom.get_aliase_names(),dom.get_accounts(),dom.get_aliases(),dom.get_relocated(),dom.get_catchall())defaliasdomain_add(self,aliasname,domainname):"""Adds an alias domain to the domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the target domain """dom=self._get_domain(domainname)alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.set_destination(dom)alias_dom.save()defaliasdomain_info(self,aliasname):"""Returns a dict (keys: "alias" and "domain") with the names of the alias domain and its primary domain."""self._db_connect()alias_dom=AliasDomain(self._dbh,aliasname)returnalias_dom.info()defaliasdomain_switch(self,aliasname,domainname):"""Modifies the target domain of an existing alias domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the new target domain """dom=self._get_domain(domainname)alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.set_destination(dom)alias_dom.switch()defaliasdomain_delete(self,aliasname):"""Deletes the given alias domain. Argument: `aliasname` : basestring The name of the alias domain """self._db_connect()alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.delete()defdomain_list(self,pattern=None):"""Wrapper around function search() from module Domain."""fromVirtualMailManager.domainimportsearchlike=Falseifpatternand(pattern.startswith('%')orpattern.endswith('%')):like=Trueifnotre.match(RE_DOMAIN_SEARCH,pattern.strip('%')):raiseVMMError(_(u"The pattern '%s' contains invalid "u"characters.")%pattern,DOMAIN_INVALID)self._db_connect()returnsearch(self._dbh,pattern=pattern,like=like)defaddress_list(self,typelimit,pattern=None):"""TODO"""llike=dlike=Falselpattern=dpattern=Noneifpattern:parts=pattern.split('@',2)iflen(parts)==2:# The pattern includes '@', so let's treat the# parts separately to allow for pattern search like %@domain.%lpattern=parts[0]llike=lpattern.startswith('%')orlpattern.endswith('%')dpattern=parts[1]dlike=dpattern.startswith('%')ordpattern.endswith('%')ifllike:checkp=lpattern.strip('%')else:checkp=lpatterniflen(checkp)>0andre.search(RE_LOCALPART,checkp):raiseVMMError(_(u"The pattern '%s' contains invalid "u"characters.")%pattern,LOCALPART_INVALID)else:# else just match on domains# (or should that be local part, I don't know…)dpattern=parts[0]dlike=dpattern.startswith('%')ordpattern.endswith('%')ifdlike:checkp=dpattern.strip('%')else:checkp=dpatterniflen(checkp)>0andnotre.match(RE_DOMAIN_SEARCH,checkp):raiseVMMError(_(u"The pattern '%s' contains invalid "u"characters.")%pattern,DOMAIN_INVALID)self._db_connect()fromVirtualMailManager.commonimportsearch_addressesreturnsearch_addresses(self._dbh,typelimit=typelimit,lpattern=lpattern,llike=llike,dpattern=dpattern,dlike=dlike)defuser_add(self,emailaddress,password):"""Wrapper around Account.set_password() and Account.save()."""acc=self._get_account(emailaddress)ifacc:raiseVMMError(_(u"The account '%s' already exists.")%acc.address,ACCOUNT_EXISTS)self._is_other_address(acc.address,TYPE_ACCOUNT)acc.set_password(password)acc.save()self._make_account_dirs(acc)defalias_add(self,aliasaddress,*targetaddresses):"""Creates a new `Alias` entry for the given *aliasaddress* with the given *targetaddresses*."""alias=self._get_alias(aliasaddress)ifnotalias:self._is_other_address(alias.address,TYPE_ALIAS)destinations=[DestinationEmailAddress(addr,self._dbh) \foraddrintargetaddresses]warnings=[]destinations=alias.add_destinations(destinations,warnings)ifwarnings:self._warnings.append(_('Ignored destination addresses:'))self._warnings.extend((' * %s'%wforwinwarnings))fordestinationindestinations:ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_(u"The destination account/alias '%s' "u"does not exist.")%destination)defuser_delete(self,emailaddress,force=False):"""Wrapper around Account.delete(...)"""ifnotisinstance(force,bool):raiseTypeError('force must be a bool')acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)uid=acc.uidgid=acc.giddom_dir=acc.domain.directoryacc_dir=acc.homeacc.delete(force)ifself._cfg.dget('account.delete_directory'):try:self._delete_home(dom_dir,uid,gid)exceptVMMError,err:iferr.codein(FOUND_DOTS_IN_PATH,MAILDIR_PERM_MISMATCH,NO_SUCH_DIRECTORY):warning=_(u"""\The account has been successfully deleted from the database. But an error occurred while deleting the following directory: '%(directory)s' Reason: %(reason)s""")%{'directory':acc_dir,'reason':err.msg}self._warnings.append(warning)else:raisedefalias_info(self,aliasaddress):"""Returns an iterator object for all destinations (`EmailAddress` instances) for the `Alias` with the given *aliasaddress*."""alias=self._get_alias(aliasaddress)ifalias:returnalias.get_destinations()ifnotself._is_other_address(alias.address,TYPE_ALIAS):raiseVMMError(_(u"The alias '%s' does not exist.")%alias.address,NO_SUCH_ALIAS)defalias_delete(self,aliasaddress,targetaddress=None):"""Deletes the `Alias` *aliasaddress* with all its destinations from the database. If *targetaddress* is not ``None``, only this destination will be removed from the alias."""alias=self._get_alias(aliasaddress)iftargetaddressisNone:alias.delete()else:alias.del_destination(DestinationEmailAddress(targetaddress,self._dbh))defcatchall_add(self,domain,*targetaddresses):"""Creates a new `CatchallAlias` entry for the given *domain* with the given *targetaddresses*."""catchall=self._get_catchall(domain)destinations=[DestinationEmailAddress(addr,self._dbh) \foraddrintargetaddresses]warnings=[]destinations=catchall.add_destinations(destinations,warnings)ifwarnings:self._warnings.append(_('Ignored destination addresses:'))self._warnings.extend((' * %s'%wforwinwarnings))fordestinationindestinations:ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_(u"The destination account/alias '%s' "u"does not exist.")%destination)defcatchall_info(self,domain):"""Returns an iterator object for all destinations (`EmailAddress` instances) for the `CatchallAlias` with the given *domain*."""returnself._get_catchall(domain).get_destinations()defcatchall_delete(self,domain,targetaddress=None):"""Deletes the `CatchallAlias` for domain *domain* with all its destinations from the database. If *targetaddress* is not ``None``, only this destination will be removed from the alias."""catchall=self._get_catchall(domain)iftargetaddressisNone:catchall.delete()else:catchall.del_destination(DestinationEmailAddress(targetaddress,self._dbh))defuser_info(self,emailaddress,details=None):"""Wrapper around Account.get_info(...)"""ifdetailsnotin(None,'du','aliases','full'):raiseVMMError(_(u"Invalid argument: '%s'")%details,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:ifnotself._is_other_address(acc.address,TYPE_ACCOUNT):raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)info=acc.get_info()ifself._cfg.dget('account.disk_usage')ordetailsin('du','full'):path=os.path.join(acc.home,acc.mail_location.directory)info['disk usage']=self._get_disk_usage(path)ifdetailsin(None,'du'):returninfoifdetailsin('aliases','full'):return(info,acc.get_aliases())returninfodefuser_by_uid(self,uid):"""Search for an Account by its *uid*. Returns a dict (address, uid and gid) if a user could be found."""fromVirtualMailManager.accountimportget_account_by_uidself._db_connect()returnget_account_by_uid(uid,self._dbh)defuser_password(self,emailaddress,password):"""Wrapper for Account.modify('password' ...)."""ifnotisinstance(password,basestring)ornotpassword:raiseVMMError(_(u"Could not accept password: '%s'")%password,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('password',password)defuser_name(self,emailaddress,name):"""Wrapper for Account.modify('name', ...)."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('name',name)defuser_note(self,emailaddress,note):"""Wrapper for Account.modify('note', ...)."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('note',note)defuser_quotalimit(self,emailaddress,bytes_,messages=0):"""Wrapper for Account.update_quotalimit(QuotaLimit)."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)ifbytes_=='default':quotalimit=Noneelse:ifnotall(isinstance(i,(int,long))foriin(bytes_,messages)):raiseTypeError("'bytes_' and 'messages' have to be ""integers or longs.")quotalimit=QuotaLimit(self._dbh,bytes=bytes_,messages=messages)acc.update_quotalimit(quotalimit)defuser_transport(self,emailaddress,transport):"""Wrapper for Account.update_transport(Transport)."""ifnotisinstance(transport,basestring)ornottransport:raiseVMMError(_(u"Could not accept transport: '%s'")%transport,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)iftransport=='default':transport=Noneelse:transport=Transport(self._dbh,transport=transport)acc.update_transport(transport)defuser_services(self,emailaddress,*services):"""Wrapper around Account.update_serviceset()."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)iflen(services)==1andservices[0]=='default':serviceset=Noneelse:kwargs=dict.fromkeys(SERVICES,False)forserviceinset(services):ifservicenotinSERVICES:raiseVMMError(_(u"Unknown service: '%s'")%service,UNKNOWN_SERVICE)kwargs[service]=Trueserviceset=ServiceSet(self._dbh,**kwargs)acc.update_serviceset(serviceset)defrelocated_add(self,emailaddress,targetaddress):"""Creates a new `Relocated` entry in the database. If there is already a relocated user with the given *emailaddress*, only the *targetaddress* for the relocated user will be updated."""relocated=self._get_relocated(emailaddress)ifnotrelocated:self._is_other_address(relocated.address,TYPE_RELOCATED)destination=DestinationEmailAddress(targetaddress,self._dbh)relocated.set_destination(destination)ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_(u"The destination account/alias '%s' "u"does not exist.")%destination)defrelocated_info(self,emailaddress):"""Returns the target address of the relocated user with the given *emailaddress*."""relocated=self._get_relocated(emailaddress)ifrelocated:returnrelocated.get_info()ifnotself._is_other_address(relocated.address,TYPE_RELOCATED):raiseVMMError(_(u"The relocated user '%s' does not exist.")%relocated.address,NO_SUCH_RELOCATED)defrelocated_delete(self,emailaddress):"""Deletes the relocated user with the given *emailaddress* from the database."""relocated=self._get_relocated(emailaddress)relocated.delete()del_