# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2014, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.handler ~~~~~~~~~~~~~~~~~~~~~~~~~~ A wrapper class. It wraps round all other classes and does some dependencies checks. Additionally it communicates with the PostgreSQL database, creates or deletes directories of domains or users."""importosimportrefromshutilimportrmtreefromstatimportS_IRGRP,S_IROTH,S_IWGRP,S_IWOTHfromsubprocessimportPopen,PIPEimportpsycopg2fromVirtualMailManager.accountimportAccountfromVirtualMailManager.aliasimportAliasfromVirtualMailManager.aliasdomainimportAliasDomainfromVirtualMailManager.catchallimportCatchallAliasfromVirtualMailManager.commonimportexec_ok,lisdirfromVirtualMailManager.configimportConfigasCfgfromVirtualMailManager.constantsimportMIN_GID,MIN_UID, \ACCOUNT_EXISTS,ALIAS_EXISTS,CONF_NOFILE,CONF_NOPERM,CONF_WRONGPERM, \DATABASE_ERROR,DOMAINDIR_GROUP_MISMATCH,DOMAIN_INVALID, \FOUND_DOTS_IN_PATH,INVALID_ARGUMENT,MAILDIR_PERM_MISMATCH, \NOT_EXECUTABLE,NO_SUCH_ACCOUNT,NO_SUCH_ALIAS,NO_SUCH_BINARY, \NO_SUCH_DIRECTORY,NO_SUCH_RELOCATED,RELOCATED_EXISTS,UNKNOWN_SERVICE, \VMM_ERROR,LOCALPART_INVALID,TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATEDfromVirtualMailManager.domainimportDomainfromVirtualMailManager.emailaddressimportDestinationEmailAddress, \EmailAddress,RE_LOCALPARTfromVirtualMailManager.errorsimport \DomainError,NotRootError,PermissionError,VMMErrorfromVirtualMailManager.mailboximportnewasnew_mailboxfromVirtualMailManager.quotalimitimportQuotaLimitfromVirtualMailManager.relocatedimportRelocatedfromVirtualMailManager.servicesetimportServiceSet,SERVICESfromVirtualMailManager.transportimportTransport_=lambdamsg:msgCFG_FILE='vmm.cfg'CFG_PATH='/root:/usr/local/etc:/etc'RE_DOMAIN_SEARCH="""^[a-z0-9-\.]+$"""OTHER_TYPES={TYPE_ACCOUNT:(_('an account'),ACCOUNT_EXISTS),TYPE_ALIAS:(_('an alias'),ALIAS_EXISTS),TYPE_RELOCATED:(_('a relocated user'),RELOCATED_EXISTS),}classHandler(object):"""Wrapper class to simplify the access on all the stuff from VirtualMailManager"""__slots__=('_cfg','_cfg_fname','_dbh','_warnings')def__init__(self,skip_some_checks=False):"""Creates a new Handler instance. ``skip_some_checks`` : bool When a derived class knows how to handle all checks this argument may be ``True``. By default it is ``False`` and all checks will be performed. Throws a NotRootError if your uid is greater 0. """self._cfg_fname=''self._warnings=[]self._cfg=Noneself._dbh=Noneifos.geteuid():raiseNotRootError(_("You are not root.\n\tGood bye!\n"),CONF_NOPERM)ifself._check_cfg_file():self._cfg=Cfg(self._cfg_fname)self._cfg.load()ifnotskip_some_checks:self._cfg.check()self._chkenv()def_find_cfg_file(self):"""Search the CFG_FILE in CFG_PATH. Raise a VMMError when no vmm.cfg could be found. """forpathinCFG_PATH.split(':'):tmp=os.path.join(path,CFG_FILE)ifos.path.isfile(tmp):self._cfg_fname=tmpbreakifnotself._cfg_fname:raiseVMMError(_("Could not find '%(cfg_file)s' in: ""'%(cfg_path)s'")%{'cfg_file':CFG_FILE,'cfg_path':CFG_PATH},CONF_NOFILE)def_check_cfg_file(self):"""Checks the configuration file, returns bool"""GRPRW=S_IRGRP|S_IWGRPOTHRW=S_IROTH|S_IWOTHself._find_cfg_file()fstat=os.stat(self._cfg_fname)if(fstat.st_uid==fstat.st_gidandfstat.st_mode&OTHRW)or \(fstat.st_uid!=fstat.st_gidandfstat.st_mode&(GRPRW|OTHRW)):# TP: Please keep the backticks around the command. `chmod 0600 …`raisePermissionError(_("wrong permissions for '%(file)s': ""%(perms)s\n`chmod 0600 %(file)s` would ""be great.")%{'file':self._cfg_fname,'perms':oct(fstat.st_mode)[-4:]},CONF_WRONGPERM)else:returnTruedef_chkenv(self):"""Make sure our base_directory is a directory and that all required executables exists and are executable. If not, a VMMError will be raised"""dir_created=Falsebasedir=self._cfg.dget('misc.base_directory')ifnotos.path.exists(basedir):old_umask=os.umask(0o006)os.makedirs(basedir,0o771)os.chown(basedir,0,0)os.umask(old_umask)dir_created=Trueifnotdir_createdandnotlisdir(basedir):raiseVMMError(_("'%(path)s' is not a directory.\n(%(cfg_file)s: ""section 'misc', option 'base_directory')")%{'path':basedir,'cfg_file':self._cfg_fname},NO_SUCH_DIRECTORY)foropt,valinself._cfg.items('bin'):try:exec_ok(val)exceptVMMErroraserr:iferr.codein(NO_SUCH_BINARY,NOT_EXECUTABLE):raiseVMMError(err.msg+_("\n(%(cfg_file)s: section ""'bin', option '%(option)s')")%{'cfg_file':self._cfg_fname,'option':opt},err.code)else:raisedef_db_connect(self):"""Return a new psycopg2 connection object."""ifself._dbhisNoneor \(isinstance(self._dbh,psycopg2.extensions.connection)andself._dbh.closed):try:self._dbh=psycopg2.connect(host=self._cfg.dget('database.host'),sslmode=self._cfg.dget('database.sslmode'),port=self._cfg.dget('database.port'),database=self._cfg.dget('database.name'),user=self._cfg.pget('database.user'),password=self._cfg.pget('database.pass'))self._dbh.set_client_encoding('utf8')dbc=self._dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()exceptpsycopg2.DatabaseErroraserr:raiseVMMError(str(err),DATABASE_ERROR)def_chk_other_address_types(self,address,exclude):"""Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`, `TYPE_ALIAS` or `TYPE_RELOCATED`, but not as the `TYPE_*` specified by *exclude*. If the *address* is known as one of the `TYPE_*`s the according `TYPE_*` constant will be returned. Otherwise 0 will be returned."""assertexcludein(TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATED)and \isinstance(address,EmailAddress)ifexcludeisnotTYPE_ACCOUNT:account=Account(self._dbh,address)ifaccount:returnTYPE_ACCOUNTifexcludeisnotTYPE_ALIAS:alias=Alias(self._dbh,address)ifalias:returnTYPE_ALIASifexcludeisnotTYPE_RELOCATED:relocated=Relocated(self._dbh,address)ifrelocated:returnTYPE_RELOCATEDreturn0def_is_other_address(self,address,exclude):"""Checks if *address* is known for an Account (TYPE_ACCOUNT), Alias (TYPE_ALIAS) or Relocated (TYPE_RELOCATED), except for *exclude*. Returns `False` if the address is not known for other types. Raises a `VMMError` if the address is known. """other=self._chk_other_address_types(address,exclude)ifnotother:returnFalse# TP: %(a_type)s will be one of: 'an account', 'an alias' or# 'a relocated user'msg=_("There is already %(a_type)s with the address '%(address)s'.")raiseVMMError(msg%{'a_type':OTHER_TYPES[other][0],'address':address},OTHER_TYPES[other][1])def_get_account(self,address):"""Return an Account instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnAccount(self._dbh,address)def_get_alias(self,address):"""Return an Alias instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnAlias(self._dbh,address)def_get_catchall(self,domain):"""Return a CatchallAlias instances for the given domain (str)."""self._db_connect()returnCatchallAlias(self._dbh,domain)def_get_relocated(self,address):"""Return a Relocated instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnRelocated(self._dbh,address)def_get_domain(self,domainname):"""Return a Domain instances for the given domain name (str)."""self._db_connect()returnDomain(self._dbh,domainname)def_get_disk_usage(self,directory):"""Estimate file space usage for the given directory. Arguments: `directory` : basestring The directory to summarize recursively disk usage for """iflisdir(directory):returnPopen([self._cfg.dget('bin.du'),"-hs",directory],stdout=PIPE).communicate()[0].decode().split('\t')[0]else:self._warnings.append(_('No such directory: %s')%directory)return0def_make_domain_dir(self,domain):"""Create a directory for the `domain` and its accounts."""cwd=os.getcwd()hashdir,domdir=domain.directory.split(os.path.sep)[-2:]dir_created=Falseos.chdir(self._cfg.dget('misc.base_directory'))old_umask=os.umask(0o022)ifnotos.path.exists(hashdir):os.mkdir(hashdir,0o711)os.chown(hashdir,0,0)dir_created=Trueifnotdir_createdandnotlisdir(hashdir):raiseVMMError(_("'%s' is not a directory.")%hashdir,NO_SUCH_DIRECTORY)ifos.path.exists(domain.directory):raiseVMMError(_("The file/directory '%s' already exists.")%domain.directory,VMM_ERROR)os.mkdir(os.path.join(hashdir,domdir),self._cfg.dget('domain.directory_mode'))os.chown(domain.directory,0,domain.gid)os.umask(old_umask)os.chdir(cwd)def_make_home(self,account):"""Create a home directory for the new Account *account*."""domdir=account.domain.directoryifnotlisdir(domdir):self._make_domain_dir(account.domain)os.umask(0o007)uid=account.uidos.chdir(domdir)os.mkdir('%s'%uid,self._cfg.dget('account.directory_mode'))os.chown('%s'%uid,uid,account.gid)def_make_account_dirs(self,account):"""Create all necessary directories for the account."""oldpwd=os.getcwd()self._make_home(account)mailbox=new_mailbox(account)mailbox.create()folders=self._cfg.dget('mailbox.folders').split(':')ifany(folders):bad=mailbox.add_boxes(folders,self._cfg.dget('mailbox.subscribe'))ifbad:self._warnings.append(_("Skipped mailbox folders:")+'\n\t- '+'\n\t- '.join(bad))os.chdir(oldpwd)def_delete_home(self,domdir,uid,gid):"""Delete a user's home directory. Arguments: `domdir` : basestring The directory of the domain the user belongs to (commonly AccountObj.domain.directory) `uid` : int The user's UID (commonly AccountObj.uid) `gid` : int The user's GID (commonly AccountObj.gid) """assertall(isinstance(xid,int)forxidin(uid,gid))and \isinstance(domdir,str)ifuid<MIN_UIDorgid<MIN_GID:raiseVMMError(_("UID '%(uid)u' and/or GID '%(gid)u' are less ""than %(min_uid)u/%(min_gid)u.")%{'uid':uid,'gid':gid,'min_gid':MIN_GID,'min_uid':MIN_UID},MAILDIR_PERM_MISMATCH)ifdomdir.count('..'):raiseVMMError(_('Found ".." in domain directory path: %s')%domdir,FOUND_DOTS_IN_PATH)ifnotlisdir(domdir):raiseVMMError(_("No such directory: %s")%domdir,NO_SUCH_DIRECTORY)os.chdir(domdir)userdir='%s'%uidifnotlisdir(userdir):self._warnings.append(_("No such directory: %s")%os.path.join(domdir,userdir))returnmdstat=os.lstat(userdir)if(mdstat.st_uid,mdstat.st_gid)!=(uid,gid):raiseVMMError(_('Detected owner/group mismatch in home ''directory.'),MAILDIR_PERM_MISMATCH)rmtree(userdir,ignore_errors=True)def_delete_domain_dir(self,domdir,gid):"""Delete a domain's directory. Arguments: `domdir` : basestring The domain's directory (commonly DomainObj.directory) `gid` : int The domain's GID (commonly DomainObj.gid) """assertisinstance(domdir,str)andisinstance(gid,int)ifgid<MIN_GID:raiseVMMError(_("GID '%(gid)u' is less than '%(min_gid)u'.")%{'gid':gid,'min_gid':MIN_GID},DOMAINDIR_GROUP_MISMATCH)ifdomdir.count('..'):raiseVMMError(_('Found ".." in domain directory path: %s')%domdir,FOUND_DOTS_IN_PATH)ifnotlisdir(domdir):self._warnings.append(_('No such directory: %s')%domdir)returndirst=os.lstat(domdir)ifdirst.st_gid!=gid:raiseVMMError(_('Detected group mismatch in domain directory: ''%s')%domdir,DOMAINDIR_GROUP_MISMATCH)rmtree(domdir,ignore_errors=True)defhas_warnings(self):"""Checks if warnings are present, returns bool."""returnbool(len(self._warnings))defget_warnings(self):"""Returns a list with all available warnings and resets all warnings. """ret_val=self._warnings[:]delself._warnings[:]returnret_valdefcfg_dget(self,option):"""Get the configured value of the *option* (section.option). When the option was not configured its default value will be returned."""returnself._cfg.dget(option)defcfg_pget(self,option):"""Get the configured value of the *option* (section.option)."""returnself._cfg.pget(option)defcfg_install(self):"""Installs the cfg_dget method as ``cfg_dget`` into the built-in namespace."""importbuiltinsassert'cfg_dget'notinbuiltins.__dict__builtins.__dict__['cfg_dget']=self._cfg.dgetdefdomain_add(self,domainname,transport=None,note=None):"""Wrapper around Domain's set_quotalimit, set_transport and save."""dom=self._get_domain(domainname)iftransportisNone:dom.set_transport(Transport(self._dbh,transport=self._cfg.dget('domain.transport')))else:dom.set_transport(Transport(self._dbh,transport=transport))ifnote:dom.set_note(note)dom.set_quotalimit(QuotaLimit(self._dbh,bytes=int(self._cfg.dget('domain.quota_bytes')),messages=self._cfg.dget('domain.quota_messages')))dom.set_serviceset(ServiceSet(self._dbh,imap=self._cfg.dget('domain.imap'),pop3=self._cfg.dget('domain.pop3'),sieve=self._cfg.dget('domain.sieve'),smtp=self._cfg.dget('domain.smtp')))dom.set_directory(self._cfg.dget('misc.base_directory'))dom.save()self._make_domain_dir(dom)defdomain_quotalimit(self,domainname,bytes_,messages=0,force=False):"""Wrapper around Domain.update_quotalimit()."""ifnotall(isinstance(i,int)foriin(bytes_,messages)):raiseTypeError("'bytes_' and 'messages' have to be ""integers or longs.")assertisinstance(force,bool)dom=self._get_domain(domainname)quotalimit=QuotaLimit(self._dbh,bytes=bytes_,messages=messages)dom.update_quotalimit(quotalimit,force)defdomain_services(self,domainname,force=False,*services):"""Wrapper around Domain.update_serviceset()."""assertisinstance(force,bool)kwargs=dict.fromkeys(SERVICES,False)forserviceinset(services):ifservicenotinSERVICES:raiseDomainError(_("Unknown service: '%s'")%service,UNKNOWN_SERVICE)kwargs[service]=Truedom=self._get_domain(domainname)serviceset=ServiceSet(self._dbh,**kwargs)dom.update_serviceset(serviceset,force)defdomain_transport(self,domainname,transport,force=False):"""Wrapper around Domain.update_transport()"""assertisinstance(force,bool)dom=self._get_domain(domainname)trsp=Transport(self._dbh,transport=transport)dom.update_transport(trsp,force)defdomain_note(self,domainname,note):"""Wrapper around Domain.update_note()"""dom=self._get_domain(domainname)dom.update_note(note)defdomain_delete(self,domainname,force=False):"""Wrapper around Domain.delete()"""ifnotisinstance(force,bool):raiseTypeError('force must be a bool')dom=self._get_domain(domainname)gid=dom.giddomdir=dom.directoryifself._cfg.dget('domain.force_deletion')orforce:dom.delete(True)else:dom.delete(False)ifself._cfg.dget('domain.delete_directory'):self._delete_domain_dir(domdir,gid)defdomain_info(self,domainname,details=None):"""Wrapper around Domain.get_info(), Domain.get_accounts(), Domain.get_aliase_names(), Domain.get_aliases() and Domain.get_relocated."""ifdetailsnotin[None,'accounts','aliasdomains','aliases','full','relocated','catchall']:raiseVMMError(_("Invalid argument: '%s'")%details,INVALID_ARGUMENT)dom=self._get_domain(domainname)dominfo=dom.get_info()ifdominfo['domain name'].startswith('xn--')or \dominfo['domain name'].count('.xn--'):dominfo['domain name']+=' (%s)'% \dominfo['domain name'].encode('utf-8').decode('idna')ifdetailsisNone:returndominfoelifdetails=='accounts':return(dominfo,dom.get_accounts())elifdetails=='aliasdomains':return(dominfo,dom.get_aliase_names())elifdetails=='aliases':return(dominfo,dom.get_aliases())elifdetails=='relocated':return(dominfo,dom.get_relocated())elifdetails=='catchall':return(dominfo,dom.get_catchall())else:return(dominfo,dom.get_aliase_names(),dom.get_accounts(),dom.get_aliases(),dom.get_relocated(),dom.get_catchall())defaliasdomain_add(self,aliasname,domainname):"""Adds an alias domain to the domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the target domain """dom=self._get_domain(domainname)alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.set_destination(dom)alias_dom.save()defaliasdomain_info(self,aliasname):"""Returns a dict (keys: "alias" and "domain") with the names of the alias domain and its primary domain."""self._db_connect()alias_dom=AliasDomain(self._dbh,aliasname)returnalias_dom.info()defaliasdomain_switch(self,aliasname,domainname):"""Modifies the target domain of an existing alias domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the new target domain """dom=self._get_domain(domainname)alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.set_destination(dom)alias_dom.switch()defaliasdomain_delete(self,aliasname):"""Deletes the given alias domain. Argument: `aliasname` : basestring The name of the alias domain """self._db_connect()alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.delete()defdomain_list(self,pattern=None):"""Wrapper around function search() from module Domain."""fromVirtualMailManager.domainimportsearchlike=Falseifpatternand(pattern.startswith('%')orpattern.endswith('%')):like=Trueifnotre.match(RE_DOMAIN_SEARCH,pattern.strip('%')):raiseVMMError(_("The pattern '%s' contains invalid ""characters.")%pattern,DOMAIN_INVALID)self._db_connect()returnsearch(self._dbh,pattern=pattern,like=like)defaddress_list(self,typelimit,pattern=None):"""TODO"""llike=dlike=Falselpattern=dpattern=Noneifpattern:parts=pattern.split('@',2)iflen(parts)==2:# The pattern includes '@', so let's treat the# parts separately to allow for pattern search like %@domain.%lpattern=parts[0]llike=lpattern.startswith('%')orlpattern.endswith('%')dpattern=parts[1]dlike=dpattern.startswith('%')ordpattern.endswith('%')checkp=lpattern.strip('%')ifllikeelselpatterniflen(checkp)>0andre.search(RE_LOCALPART,checkp):raiseVMMError(_("The pattern '%s' contains invalid ""characters.")%pattern,LOCALPART_INVALID)else:# else just match on domains# (or should that be local part, I don't know…)dpattern=parts[0]dlike=dpattern.startswith('%')ordpattern.endswith('%')checkp=dpattern.strip('%')ifdlikeelsedpatterniflen(checkp)>0andnotre.match(RE_DOMAIN_SEARCH,checkp):raiseVMMError(_("The pattern '%s' contains invalid ""characters.")%pattern,DOMAIN_INVALID)self._db_connect()fromVirtualMailManager.commonimportsearch_addressesreturnsearch_addresses(self._dbh,typelimit=typelimit,lpattern=lpattern,llike=llike,dpattern=dpattern,dlike=dlike)defuser_add(self,emailaddress,password,note=None):"""Wrapper around Account.set_password() and Account.save()."""acc=self._get_account(emailaddress)ifacc:raiseVMMError(_("The account '%s' already exists.")%acc.address,ACCOUNT_EXISTS)self._is_other_address(acc.address,TYPE_ACCOUNT)acc.set_password(password)ifnote:acc.set_note(note)acc.save()self._make_account_dirs(acc)defalias_add(self,aliasaddress,*targetaddresses):"""Creates a new `Alias` entry for the given *aliasaddress* with the given *targetaddresses*."""alias=self._get_alias(aliasaddress)ifnotalias:self._is_other_address(alias.address,TYPE_ALIAS)destinations=[DestinationEmailAddress(addr,self._dbh)foraddrintargetaddresses]warnings=[]destinations=alias.add_destinations(destinations,warnings)ifwarnings:self._warnings.append(_('Ignored destination addresses:'))self._warnings.extend((' * %s'%wforwinwarnings))fordestinationindestinations:ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_("The destination account/alias '%s' ""does not exist.")%destination)defuser_delete(self,emailaddress,force=False):"""Wrapper around Account.delete(...)"""ifnotisinstance(force,bool):raiseTypeError('force must be a bool')acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)uid=acc.uidgid=acc.giddom_dir=acc.domain.directoryacc_dir=acc.homeacc.delete(force)ifself._cfg.dget('account.delete_directory'):try:self._delete_home(dom_dir,uid,gid)exceptVMMErroraserr:iferr.codein(FOUND_DOTS_IN_PATH,MAILDIR_PERM_MISMATCH,NO_SUCH_DIRECTORY):warning=_("""\The account has been successfully deleted from the database. But an error occurred while deleting the following directory: '%(directory)s' Reason: %(reason)s""")%{'directory':acc_dir,'reason':err.msg}self._warnings.append(warning)else:raisedefalias_info(self,aliasaddress):"""Returns an iterator object for all destinations (`EmailAddress` instances) for the `Alias` with the given *aliasaddress*."""alias=self._get_alias(aliasaddress)ifalias:returnalias.get_destinations()ifnotself._is_other_address(alias.address,TYPE_ALIAS):raiseVMMError(_("The alias '%s' does not exist.")%alias.address,NO_SUCH_ALIAS)defalias_delete(self,aliasaddress,targetaddresses=None):"""Deletes the `Alias` *aliasaddress* with all its destinations from the database. If *targetaddresses* is not ``None``, only the given destinations will be removed from the alias."""alias=self._get_alias(aliasaddress)error=NoneiftargetaddressesisNone:alias.delete()else:destinations=[DestinationEmailAddress(addr,self._dbh)foraddrintargetaddresses]warnings=[]try:alias.del_destinations(destinations,warnings)exceptVMMErroraserr:error=errifwarnings:self._warnings.append(_('Ignored destination addresses:'))self._warnings.extend((' * %s'%wforwinwarnings))iferror:raiseerrordefcatchall_add(self,domain,*targetaddresses):"""Creates a new `CatchallAlias` entry for the given *domain* with the given *targetaddresses*."""catchall=self._get_catchall(domain)destinations=[DestinationEmailAddress(addr,self._dbh)foraddrintargetaddresses]warnings=[]destinations=catchall.add_destinations(destinations,warnings)ifwarnings:self._warnings.append(_('Ignored destination addresses:'))self._warnings.extend((' * %s'%wforwinwarnings))fordestinationindestinations:ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_("The destination account/alias '%s' ""does not exist.")%destination)defcatchall_info(self,domain):"""Returns an iterator object for all destinations (`EmailAddress` instances) for the `CatchallAlias` with the given *domain*."""returnself._get_catchall(domain).get_destinations()defcatchall_delete(self,domain,targetaddresses=None):"""Deletes the `CatchallAlias` for domain *domain* with all its destinations from the database. If *targetaddresses* is not ``None``, only those destinations will be removed from the alias."""catchall=self._get_catchall(domain)error=NoneiftargetaddressesisNone:catchall.delete()else:destinations=[DestinationEmailAddress(addr,self._dbh)foraddrintargetaddresses]warnings=[]try:catchall.del_destinations(destinations,warnings)exceptVMMErroraserr:error=errifwarnings:self._warnings.append(_('Ignored destination addresses:'))self._warnings.extend((' * %s'%wforwinwarnings))iferror:raiseerrordefuser_info(self,emailaddress,details=None):"""Wrapper around Account.get_info(...)"""ifdetailsnotin(None,'du','aliases','full'):raiseVMMError(_("Invalid argument: '%s'")%details,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:ifnotself._is_other_address(acc.address,TYPE_ACCOUNT):raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)info=acc.get_info()ifself._cfg.dget('account.disk_usage')ordetailsin('du','full'):path=os.path.join(acc.home,acc.mail_location.directory)info['disk usage']=self._get_disk_usage(path)ifdetailsin(None,'du'):returninfoifdetailsin('aliases','full'):return(info,acc.get_aliases())returninfodefuser_by_uid(self,uid):"""Search for an Account by its *uid*. Returns a dict (address, uid and gid) if a user could be found."""fromVirtualMailManager.accountimportget_account_by_uidself._db_connect()returnget_account_by_uid(uid,self._dbh)defuser_password(self,emailaddress,password):"""Wrapper for Account.modify('password' ...)."""ifnotisinstance(password,str)ornotpassword:raiseVMMError(_("Could not accept password: '%s'")%password,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('password',password)defuser_name(self,emailaddress,name):"""Wrapper for Account.modify('name', ...)."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('name',name)defuser_note(self,emailaddress,note):"""Wrapper for Account.modify('note', ...)."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('note',note)defuser_quotalimit(self,emailaddress,bytes_,messages=0):"""Wrapper for Account.update_quotalimit(QuotaLimit)."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)ifbytes_=='domain':quotalimit=Noneelse:ifnotall(isinstance(i,int)foriin(bytes_,messages)):raiseTypeError("'bytes_' and 'messages' have to be ""integers or longs.")quotalimit=QuotaLimit(self._dbh,bytes=bytes_,messages=messages)acc.update_quotalimit(quotalimit)defuser_transport(self,emailaddress,transport):"""Wrapper for Account.update_transport(Transport)."""ifnotisinstance(transport,str)ornottransport:raiseVMMError(_("Could not accept transport: '%s'")%transport,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)transport=Noneiftransport=='domain' \elseTransport(self._dbh,transport=transport)acc.update_transport(transport)defuser_services(self,emailaddress,*services):"""Wrapper around Account.update_serviceset()."""acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_("The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)iflen(services)==1andservices[0]=='domain':serviceset=Noneelse:kwargs=dict.fromkeys(SERVICES,False)forserviceinset(services):ifservicenotinSERVICES:raiseVMMError(_("Unknown service: '%s'")%service,UNKNOWN_SERVICE)kwargs[service]=Trueserviceset=ServiceSet(self._dbh,**kwargs)acc.update_serviceset(serviceset)defrelocated_add(self,emailaddress,targetaddress):"""Creates a new `Relocated` entry in the database. If there is already a relocated user with the given *emailaddress*, only the *targetaddress* for the relocated user will be updated."""relocated=self._get_relocated(emailaddress)ifnotrelocated:self._is_other_address(relocated.address,TYPE_RELOCATED)destination=DestinationEmailAddress(targetaddress,self._dbh)relocated.set_destination(destination)ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_("The destination account/alias '%s' ""does not exist.")%destination)defrelocated_info(self,emailaddress):"""Returns the target address of the relocated user with the given *emailaddress*."""relocated=self._get_relocated(emailaddress)ifrelocated:returnrelocated.get_info()ifnotself._is_other_address(relocated.address,TYPE_RELOCATED):raiseVMMError(_("The relocated user '%s' does not exist.")%relocated.address,NO_SUCH_RELOCATED)defrelocated_delete(self,emailaddress):"""Deletes the relocated user with the given *emailaddress* from the database."""relocated=self._get_relocated(emailaddress)relocated.delete()del_