# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2011, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.handler ~~~~~~~~~~~~~~~~~~~~~~~~~~ A wrapper class. It wraps round all other classes and does some dependencies checks. Additionally it communicates with the PostgreSQL database, creates or deletes directories of domains or users."""importosimportrefromshutilimportrmtreefromsubprocessimportPopen,PIPEfromVirtualMailManager.accountimportAccountfromVirtualMailManager.aliasimportAliasfromVirtualMailManager.aliasdomainimportAliasDomainfromVirtualMailManager.commonimportexec_ok,lisdirfromVirtualMailManager.configimportConfigasCfgfromVirtualMailManager.constantsimportMIN_GID,MIN_UID, \ACCOUNT_EXISTS,ALIAS_EXISTS,CONF_NOFILE,CONF_NOPERM,CONF_WRONGPERM, \DATABASE_ERROR,DOMAINDIR_GROUP_MISMATCH,DOMAIN_INVALID, \FOUND_DOTS_IN_PATH,INVALID_ARGUMENT,MAILDIR_PERM_MISMATCH, \NOT_EXECUTABLE,NO_SUCH_ACCOUNT,NO_SUCH_ALIAS,NO_SUCH_BINARY, \NO_SUCH_DIRECTORY,NO_SUCH_RELOCATED,RELOCATED_EXISTS,VMM_ERRORfromVirtualMailManager.domainimportDomain,get_gidfromVirtualMailManager.emailaddressimportDestinationEmailAddress, \EmailAddressfromVirtualMailManager.errorsimport \DomainError,NotRootError,PermissionError,VMMErrorfromVirtualMailManager.mailboximportnewasnew_mailboxfromVirtualMailManager.pycompatimportall,anyfromVirtualMailManager.quotalimitimportQuotaLimitfromVirtualMailManager.relocatedimportRelocatedfromVirtualMailManager.transportimportTransport_=lambdamsg:msg_db_mod=NoneCFG_FILE='vmm.cfg'CFG_PATH='/root:/usr/local/etc:/etc'RE_DOMAIN_SEARCH="""^[a-z0-9-\.]+$"""TYPE_ACCOUNT=0x1TYPE_ALIAS=0x2TYPE_RELOCATED=0x4OTHER_TYPES={TYPE_ACCOUNT:(_(u'an account'),ACCOUNT_EXISTS),TYPE_ALIAS:(_(u'an alias'),ALIAS_EXISTS),TYPE_RELOCATED:(_(u'a relocated user'),RELOCATED_EXISTS),}classHandler(object):"""Wrapper class to simplify the access on all the stuff from VirtualMailManager"""__slots__=('_cfg','_cfg_fname','_db_connect','_dbh','_warnings')def__init__(self,skip_some_checks=False):"""Creates a new Handler instance. ``skip_some_checks`` : bool When a derived class knows how to handle all checks this argument may be ``True``. By default it is ``False`` and all checks will be performed. Throws a NotRootError if your uid is greater 0. """self._cfg_fname=''self._warnings=[]self._cfg=Noneself._dbh=Noneself._db_connect=Noneifos.geteuid():raiseNotRootError(_(u"You are not root.\n\tGood bye!\n"),CONF_NOPERM)ifself._check_cfg_file():self._cfg=Cfg(self._cfg_fname)self._cfg.load()ifnotskip_some_checks:self._cfg.check()self._chkenv()self._set_db_connect()def_find_cfg_file(self):"""Search the CFG_FILE in CFG_PATH. Raise a VMMError when no vmm.cfg could be found. """forpathinCFG_PATH.split(':'):tmp=os.path.join(path,CFG_FILE)ifos.path.isfile(tmp):self._cfg_fname=tmpbreakifnotself._cfg_fname:raiseVMMError(_(u"Could not find '%(cfg_file)s' in: "u"'%(cfg_path)s'")%{'cfg_file':CFG_FILE,'cfg_path':CFG_PATH},CONF_NOFILE)def_check_cfg_file(self):"""Checks the configuration file, returns bool"""self._find_cfg_file()fstat=os.stat(self._cfg_fname)fmode=int(oct(fstat.st_mode&0777))iffmode%100andfstat.st_uid!=fstat.st_gidor \fmode%10andfstat.st_uid==fstat.st_gid:raisePermissionError(_(u"wrong permissions for '%(file)s': "u"%(perms)s\n`chmod 0600 %(file)s` would "u"be great.")%{'file':self._cfg_fname,'perms':fmode},CONF_WRONGPERM)else:returnTruedef_chkenv(self):"""Make sure our base_directory is a directory and that all required executables exists and are executable. If not, a VMMError will be raised"""dir_created=Falsebasedir=self._cfg.dget('misc.base_directory')ifnotos.path.exists(basedir):old_umask=os.umask(0006)os.makedirs(basedir,0771)os.chown(basedir,0,0)os.umask(old_umask)dir_created=Trueifnotdir_createdandnotlisdir(basedir):raiseVMMError(_(u"'%(path)s' is not a directory.\n(%(cfg_file)s: "u"section 'misc', option 'base_directory')")%{'path':basedir,'cfg_file':self._cfg_fname},NO_SUCH_DIRECTORY)foropt,valinself._cfg.items('bin'):try:exec_ok(val)exceptVMMError,err:iferr.codein(NO_SUCH_BINARY,NOT_EXECUTABLE):raiseVMMError(err.msg+_(u"\n(%(cfg_file)s: section "u"'bin', option '%(option)s')")%{'cfg_file':self._cfg_fname,'option':opt},err.code)else:raisedef_set_db_connect(self):"""check which module to use and set self._db_connect"""global_db_modifself._cfg.dget('database.module').lower()=='psycopg2':try:_db_mod=__import__('psycopg2')exceptImportError:raiseVMMError(_(u"Unable to import database module '%s'.")%'psycopg2',VMM_ERROR)self._db_connect=self._psycopg2_connectelse:try:tmp=__import__('pyPgSQL',globals(),locals(),['PgSQL'])exceptImportError:raiseVMMError(_(u"Unable to import database module '%s'.")%'pyPgSQL',VMM_ERROR)_db_mod=tmp.PgSQLself._db_connect=self._pypgsql_connectdef_pypgsql_connect(self):"""Creates a pyPgSQL.PgSQL.connection instance."""ifself._dbhisNoneor(isinstance(self._dbh,_db_mod.Connection)andnotself._dbh._isOpen):try:self._dbh=_db_mod.connect(database=self._cfg.dget('database.name'),user=self._cfg.pget('database.user'),host=self._cfg.dget('database.host'),port=self._cfg.dget('database.port'),password=self._cfg.pget('database.pass'),client_encoding='utf8',unicode_results=True)dbc=self._dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()except_db_mod.libpq.DatabaseError,err:raiseVMMError(str(err),DATABASE_ERROR)def_psycopg2_connect(self):"""Return a new psycopg2 connection object."""ifself._dbhisNoneor \(isinstance(self._dbh,_db_mod.extensions.connection)andself._dbh.closed):try:self._dbh=_db_mod.connect(host=self._cfg.dget('database.host'),sslmode=self._cfg.dget('database.sslmode'),port=self._cfg.dget('database.port'),database=self._cfg.dget('database.name'),user=self._cfg.pget('database.user'),password=self._cfg.pget('database.pass'))self._dbh.set_client_encoding('utf8')_db_mod.extensions.register_type(_db_mod.extensions.UNICODE)dbc=self._dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()except_db_mod.DatabaseError,err:raiseVMMError(str(err),DATABASE_ERROR)def_chk_other_address_types(self,address,exclude):"""Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`, `TYPE_ALIAS` or `TYPE_RELOCATED`, but not as the `TYPE_*` specified by *exclude*. If the *address* is known as one of the `TYPE_*`s the according `TYPE_*` constant will be returned. Otherwise 0 will be returned."""assertexcludein(TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATED)and \isinstance(address,EmailAddress)ifexcludeisnotTYPE_ACCOUNT:account=Account(self._dbh,address)ifaccount:returnTYPE_ACCOUNTifexcludeisnotTYPE_ALIAS:alias=Alias(self._dbh,address)ifalias:returnTYPE_ALIASifexcludeisnotTYPE_RELOCATED:relocated=Relocated(self._dbh,address)ifrelocated:returnTYPE_RELOCATEDreturn0def_is_other_address(self,address,exclude):"""Checks if *address* is known for an Account (TYPE_ACCOUNT), Alias (TYPE_ALIAS) or Relocated (TYPE_RELOCATED), except for *exclude*. Returns `False` if the address is not known for other types. Raises a `VMMError` if the address is known. """other=self._chk_other_address_types(address,exclude)ifnotother:returnFalse# TP: %(a_type)s will be one of: 'an account', 'an alias' or# 'a relocated user'msg=_(u"There is already %(a_type)s with the address '%(address)s'.")raiseVMMError(msg%{'a_type':OTHER_TYPES[other][0],'address':address},OTHER_TYPES[other][1])def_get_account(self,address):"""Return an Account instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnAccount(self._dbh,address)def_get_alias(self,address):"""Return an Alias instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnAlias(self._dbh,address)def_get_relocated(self,address):"""Return a Relocated instances for the given address (str)."""address=EmailAddress(address)self._db_connect()returnRelocated(self._dbh,address)def_get_domain(self,domainname):"""Return a Domain instances for the given domain name (str)."""self._db_connect()returnDomain(self._dbh,domainname)def_get_disk_usage(self,directory):"""Estimate file space usage for the given directory. Arguments: `directory` : basestring The directory to summarize recursively disk usage for """iflisdir(directory):returnPopen([self._cfg.dget('bin.du'),"-hs",directory],stdout=PIPE).communicate()[0].split('\t')[0]else:self._warnings.append(_('No such directory: %s')%directory)return0def_make_domain_dir(self,domain):"""Create a directory for the `domain` and its accounts."""cwd=os.getcwd()hashdir,domdir=domain.directory.split(os.path.sep)[-2:]dir_created=Falseos.chdir(self._cfg.dget('misc.base_directory'))ifnotos.path.exists(hashdir):os.mkdir(hashdir,0711)os.chown(hashdir,0,0)dir_created=Trueifnotdir_createdandnotlisdir(hashdir):raiseVMMError(_(u"'%s' is not a directory.")%hashdir,NO_SUCH_DIRECTORY)ifos.path.exists(domain.directory):raiseVMMError(_(u"The file/directory '%s' already exists.")%domain.directory,VMM_ERROR)os.mkdir(os.path.join(hashdir,domdir),self._cfg.dget('domain.directory_mode'))os.chown(domain.directory,0,domain.gid)os.chdir(cwd)def_make_home(self,account):"""Create a home directory for the new Account *account*."""domdir=account.domain.directoryifnotlisdir(domdir):self._make_domain_dir(account.domain)os.umask(0007)uid=account.uidos.chdir(domdir)os.mkdir('%s'%uid,self._cfg.dget('account.directory_mode'))os.chown('%s'%uid,uid,account.gid)def_make_account_dirs(self,account):"""Create all necessary directories for the account."""oldpwd=os.getcwd()self._make_home(account)mailbox=new_mailbox(account)mailbox.create()folders=self._cfg.dget('mailbox.folders').split(':')ifany(folders):bad=mailbox.add_boxes(folders,self._cfg.dget('mailbox.subscribe'))ifbad:self._warnings.append(_(u"Skipped mailbox folders:")+'\n\t- '+'\n\t- '.join(bad))os.chdir(oldpwd)def_delete_home(self,domdir,uid,gid):"""Delete a user's home directory. Arguments: `domdir` : basestring The directory of the domain the user belongs to (commonly AccountObj.domain.directory) `uid` : int/long The user's UID (commonly AccountObj.uid) `gid` : int/long The user's GID (commonly AccountObj.gid) """assertall(isinstance(xid,(long,int))forxidin(uid,gid))and \isinstance(domdir,basestring)ifuid<MIN_UIDorgid<MIN_GID:raiseVMMError(_(u"UID '%(uid)u' and/or GID '%(gid)u' are less "u"than %(min_uid)u/%(min_gid)u.")%{'uid':uid,'gid':gid,'min_gid':MIN_GID,'min_uid':MIN_UID},MAILDIR_PERM_MISMATCH)ifdomdir.count('..'):raiseVMMError(_(u'Found ".." in domain directory path: %s')%domdir,FOUND_DOTS_IN_PATH)ifnotlisdir(domdir):raiseVMMError(_(u"No such directory: %s")%domdir,NO_SUCH_DIRECTORY)os.chdir(domdir)userdir='%s'%uidifnotlisdir(userdir):self._warnings.append(_(u"No such directory: %s")%os.path.join(domdir,userdir))returnmdstat=os.lstat(userdir)if(mdstat.st_uid,mdstat.st_gid)!=(uid,gid):raiseVMMError(_(u'Detected owner/group mismatch in home 'u'directory.'),MAILDIR_PERM_MISMATCH)rmtree(userdir,ignore_errors=True)def_delete_domain_dir(self,domdir,gid):"""Delete a domain's directory. Arguments: `domdir` : basestring The domain's directory (commonly DomainObj.directory) `gid` : int/long The domain's GID (commonly DomainObj.gid) """assertisinstance(domdir,basestring)andisinstance(gid,(long,int))ifgid<MIN_GID:raiseVMMError(_(u"GID '%(gid)u' is less than '%(min_gid)u'.")%{'gid':gid,'min_gid':MIN_GID},DOMAINDIR_GROUP_MISMATCH)ifdomdir.count('..'):raiseVMMError(_(u'Found ".." in domain directory path: %s')%domdir,FOUND_DOTS_IN_PATH)ifnotlisdir(domdir):self._warnings.append(_('No such directory: %s')%domdir)returndirst=os.lstat(domdir)ifdirst.st_gid!=gid:raiseVMMError(_(u'Detected group mismatch in domain directory: 'u'%s')%domdir,DOMAINDIR_GROUP_MISMATCH)rmtree(domdir,ignore_errors=True)defhas_warnings(self):"""Checks if warnings are present, returns bool."""returnbool(len(self._warnings))defget_warnings(self):"""Returns a list with all available warnings and resets all warnings. """ret_val=self._warnings[:]delself._warnings[:]returnret_valdefcfg_dget(self,option):"""Get the configured value of the *option* (section.option). When the option was not configured its default value will be returned."""returnself._cfg.dget(option)defcfg_pget(self,option):"""Get the configured value of the *option* (section.option)."""returnself._cfg.pget(option)defcfg_install(self):"""Installs the cfg_dget method as ``cfg_dget`` into the built-in namespace."""import__builtin__assert'cfg_dget'notin__builtin__.__dict____builtin__.__dict__['cfg_dget']=self._cfg.dgetdefdomain_add(self,domainname,transport=None):"""Wrapper around Domain's set_quotalimit, set_transport and save."""dom=self._get_domain(domainname)iftransportisNone:dom.set_transport(Transport(self._dbh,transport=self._cfg.dget('misc.transport')))else:dom.set_transport(Transport(self._dbh,transport=transport))dom.set_quotalimit(QuotaLimit(self._dbh,bytes=long(self._cfg.dget('misc.quota_bytes')),messages=self._cfg.dget('misc.quota_messages')))dom.set_directory(self._cfg.dget('misc.base_directory'))dom.save()self._make_domain_dir(dom)defdomain_quotalimit(self,domainname,bytes_,messages=0,force=None):"""Wrapper around Domain.update_quotalimit()."""ifnotall(isinstance(i,(int,long))foriin(bytes_,messages)):raiseTypeError("'bytes_' and 'messages' have to be ""integers or longs.")ifforceisnotNoneandforce!='force':raiseDomainError(_(u"Invalid argument: '%s'")%force,INVALID_ARGUMENT)dom=self._get_domain(domainname)quotalimit=QuotaLimit(self._dbh,bytes=bytes_,messages=messages)ifforceisNone:dom.update_quotalimit(quotalimit)else:dom.update_quotalimit(quotalimit,force=True)defdomain_transport(self,domainname,transport,force=None):"""Wrapper around Domain.update_transport()"""ifforceisnotNoneandforce!='force':raiseDomainError(_(u"Invalid argument: '%s'")%force,INVALID_ARGUMENT)dom=self._get_domain(domainname)trsp=Transport(self._dbh,transport=transport)ifforceisNone:dom.update_transport(trsp)else:dom.update_transport(trsp,force=True)defdomain_delete(self,domainname,force=False):"""Wrapper around Domain.delete()"""ifnotisinstance(force,bool):raiseTypeError('force must be a bool')dom=self._get_domain(domainname)gid=dom.giddomdir=dom.directoryifself._cfg.dget('domain.force_deletion')orforce:dom.delete(True)else:dom.delete(False)ifself._cfg.dget('domain.delete_directory'):self._delete_domain_dir(domdir,gid)defdomain_info(self,domainname,details=None):"""Wrapper around Domain.get_info(), Domain.get_accounts(), Domain.get_aliase_names(), Domain.get_aliases() and Domain.get_relocated."""ifdetailsnotin[None,'accounts','aliasdomains','aliases','full','relocated']:raiseVMMError(_(u"Invalid argument: '%s'")%details,INVALID_ARGUMENT)dom=self._get_domain(domainname)dominfo=dom.get_info()ifdominfo['domainname'].startswith('xn--'):dominfo['domainname']+=' (%s)'% \dominfo['domainname'].decode('idna')ifdetailsisNone:returndominfoelifdetails=='accounts':return(dominfo,dom.get_accounts())elifdetails=='aliasdomains':return(dominfo,dom.get_aliase_names())elifdetails=='aliases':return(dominfo,dom.get_aliases())elifdetails=='relocated':return(dominfo,dom.get_relocated())else:return(dominfo,dom.get_aliase_names(),dom.get_accounts(),dom.get_aliases(),dom.get_relocated())defaliasdomain_add(self,aliasname,domainname):"""Adds an alias domain to the domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the target domain """dom=self._get_domain(domainname)alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.set_destination(dom)alias_dom.save()defaliasdomain_info(self,aliasname):"""Returns a dict (keys: "alias" and "domain") with the names of the alias domain and its primary domain."""self._db_connect()alias_dom=AliasDomain(self._dbh,aliasname)returnalias_dom.info()defaliasdomain_switch(self,aliasname,domainname):"""Modifies the target domain of an existing alias domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the new target domain """dom=self._get_domain(domainname)alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.set_destination(dom)alias_dom.switch()defaliasdomain_delete(self,aliasname):"""Deletes the given alias domain. Argument: `aliasname` : basestring The name of the alias domain """self._db_connect()alias_dom=AliasDomain(self._dbh,aliasname)alias_dom.delete()defdomain_list(self,pattern=None):"""Wrapper around function search() from module Domain."""fromVirtualMailManager.domainimportsearchlike=Falseifpatternand(pattern.startswith('%')orpattern.endswith('%')):like=Trueifnotre.match(RE_DOMAIN_SEARCH,pattern.strip('%')):raiseVMMError(_(u"The pattern '%s' contains invalid "u"characters.")%pattern,DOMAIN_INVALID)self._db_connect()returnsearch(self._dbh,pattern=pattern,like=like)defuser_add(self,emailaddress,password):"""Wrapper around Account.set_password() and Account.save()."""acc=self._get_account(emailaddress)ifacc:raiseVMMError(_(u"The account '%s' already exists.")%acc.address,ACCOUNT_EXISTS)self._is_other_address(acc.address,TYPE_ACCOUNT)acc.set_password(password)acc.save()self._make_account_dirs(acc)defalias_add(self,aliasaddress,*targetaddresses):"""Creates a new `Alias` entry for the given *aliasaddress* with the given *targetaddresses*."""alias=self._get_alias(aliasaddress)ifnotalias:self._is_other_address(alias.address,TYPE_ALIAS)destinations=[DestinationEmailAddress(addr,self._dbh) \foraddrintargetaddresses]warnings=[]destinations=alias.add_destinations(destinations,warnings)ifwarnings:self._warnings.append(_('Ignored destination addresses:'))self._warnings.extend((' * %s'%wforwinwarnings))fordestinationindestinations:ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_(u"The destination account/alias '%s' "u"does not exist.")%destination)defuser_delete(self,emailaddress,force=False):"""Wrapper around Account.delete(...)"""ifnotisinstance(force,bool):raiseTypeError('force must be a bool')acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)uid=acc.uidgid=acc.giddom_dir=acc.domain.directoryacc_dir=acc.homeacc.delete(force)ifself._cfg.dget('account.delete_directory'):try:self._delete_home(dom_dir,uid,gid)exceptVMMError,err:iferr.codein(FOUND_DOTS_IN_PATH,MAILDIR_PERM_MISMATCH,NO_SUCH_DIRECTORY):warning=_(u"""\The account has been successfully deleted from the database. But an error occurred while deleting the following directory: '%(directory)s' Reason: %(reason)s""")%{'directory':acc_dir,'reason':err.msg}self._warnings.append(warning)else:raisedefalias_info(self,aliasaddress):"""Returns an iterator object for all destinations (`EmailAddress` instances) for the `Alias` with the given *aliasaddress*."""alias=self._get_alias(aliasaddress)ifalias:returnalias.get_destinations()ifnotself._is_other_address(alias.address,TYPE_ALIAS):raiseVMMError(_(u"The alias '%s' does not exist.")%alias.address,NO_SUCH_ALIAS)defalias_delete(self,aliasaddress,targetaddress=None):"""Deletes the `Alias` *aliasaddress* with all its destinations from the database. If *targetaddress* is not ``None``, only this destination will be removed from the alias."""alias=self._get_alias(aliasaddress)iftargetaddressisNone:alias.delete()else:alias.del_destination(DestinationEmailAddress(targetaddress,self._dbh))defuser_info(self,emailaddress,details=None):"""Wrapper around Account.get_info(...)"""ifdetailsnotin(None,'du','aliases','full'):raiseVMMError(_(u"Invalid argument: '%s'")%details,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:ifnotself._is_other_address(acc.address,TYPE_ACCOUNT):raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)info=acc.get_info()ifself._cfg.dget('account.disk_usage')ordetailsin('du','full'):path=os.path.join(acc.home,acc.mail_location.directory)info['disk usage']=self._get_disk_usage(path)ifdetailsin(None,'du'):returninfoifdetailsin('aliases','full'):return(info,acc.get_aliases())returninfodefuser_by_uid(self,uid):"""Search for an Account by its *uid*. Returns a dict (address, uid and gid) if a user could be found."""fromVirtualMailManager.accountimportget_account_by_uidself._db_connect()returnget_account_by_uid(uid,self._dbh)defuser_password(self,emailaddress,password):"""Wrapper for Account.modify('password' ...)."""ifnotisinstance(password,basestring)ornotpassword:raiseVMMError(_(u"Could not accept password: '%s'")%password,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('password',password)defuser_name(self,emailaddress,name):"""Wrapper for Account.modify('name', ...)."""ifnotisinstance(name,basestring)ornotname:raiseVMMError(_(u"Could not accept name: '%s'")%name,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.modify('name',name)defuser_quotalimit(self,emailaddress,bytes_,messages=0):"""Wrapper for Account.update_quotalimit(QuotaLimit)."""ifnotall(isinstance(i,(int,long))foriin(bytes_,messages)):raiseTypeError("'bytes_' and 'messages' have to be ""integers or longs.")acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.update_quotalimit(QuotaLimit(self._dbh,bytes=bytes_,messages=messages))defuser_transport(self,emailaddress,transport):"""Wrapper for Account.update_transport(Transport)."""ifnotisinstance(transport,basestring)ornottransport:raiseVMMError(_(u"Could not accept transport: '%s'")%transport,INVALID_ARGUMENT)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.update_transport(Transport(self._dbh,transport=transport))defuser_disable(self,emailaddress,services=None):"""Wrapper for Account.disable(*services)"""ifservicesisNone:services=[]else:assertisinstance(services,list)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.disable(*services)defuser_enable(self,emailaddress,services=None):"""Wrapper for Account.enable(*services)"""ifservicesisNone:services=[]else:assertisinstance(services,list)acc=self._get_account(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' does not exist.")%acc.address,NO_SUCH_ACCOUNT)acc.enable(*services)defrelocated_add(self,emailaddress,targetaddress):"""Creates a new `Relocated` entry in the database. If there is already a relocated user with the given *emailaddress*, only the *targetaddress* for the relocated user will be updated."""relocated=self._get_relocated(emailaddress)ifnotrelocated:self._is_other_address(relocated.address,TYPE_RELOCATED)destination=DestinationEmailAddress(targetaddress,self._dbh)relocated.set_destination(destination)ifdestination.gidand \notself._chk_other_address_types(destination,TYPE_RELOCATED):self._warnings.append(_(u"The destination account/alias '%s' "u"does not exist.")%destination)defrelocated_info(self,emailaddress):"""Returns the target address of the relocated user with the given *emailaddress*."""relocated=self._get_relocated(emailaddress)ifrelocated:returnrelocated.get_info()ifnotself._is_other_address(relocated.address,TYPE_RELOCATED):raiseVMMError(_(u"The relocated user '%s' does not exist.")%relocated.address,NO_SUCH_RELOCATED)defrelocated_delete(self,emailaddress):"""Deletes the relocated user with the given *emailaddress* from the database."""relocated=self._get_relocated(emailaddress)relocated.delete()del_