VMM/Handler: import the errors before raising them.
Rephrased the 'permission error' message.
# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2010, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.Handler A wrapper class. It wraps round all other classes and does some dependencies checks. Additionally it communicates with the PostgreSQL database, creates or deletes directories of domains or users."""importosimportrefromshutilimportrmtreefromsubprocessimportPopen,PIPEfrompyPgSQLimportPgSQL# python-pgsql - http://pypgsql.sourceforge.netimportVirtualMailManager.constants.ERRORasERRfromVirtualMailManager.AccountimportAccountfromVirtualMailManager.AliasimportAliasfromVirtualMailManager.AliasDomainimportAliasDomainfromVirtualMailManager.commonimportexec_okfromVirtualMailManager.ConfigimportConfigasCfgfromVirtualMailManager.DomainimportDomain,ace2idna,get_gidfromVirtualMailManager.EmailAddressimportEmailAddressfromVirtualMailManager.errorsimport \DomainError,NotRootError,PermissionError,VMMErrorfromVirtualMailManager.RelocatedimportRelocatedfromVirtualMailManager.TransportimportTransport_=lambdamsg:msgRE_DOMAIN_SEARCH="""^[a-z0-9-\.]+$"""RE_MBOX_NAMES="""^[\x20-\x25\x27-\x7E]*$"""TYPE_ACCOUNT=0x1TYPE_ALIAS=0x2TYPE_RELOCATED=0x4OTHER_TYPES={TYPE_ACCOUNT:(_(u'an account'),ERR.ACCOUNT_EXISTS),TYPE_ALIAS:(_(u'an alias'),ERR.ALIAS_EXISTS),TYPE_RELOCATED:(_(u'a relocated user'),ERR.RELOCATED_EXISTS),}classHandler(object):"""Wrapper class to simplify the access on all the stuff from VirtualMailManager"""__slots__=('_Cfg','_cfgFileName','_dbh','__warnings')def__init__(self,skip_some_checks=False):"""Creates a new Handler instance. ``skip_some_checks`` : bool When a derived class knows how to handle all checks this argument may be ``True``. By default it is ``False`` and all checks will be performed. Throws a NotRootError if your uid is greater 0. """self._cfgFileName=''self.__warnings=[]self._Cfg=Noneself._dbh=Noneifos.geteuid():raiseNotRootError(_(u"You are not root.\n\tGood bye!\n"),ERR.CONF_NOPERM)ifself.__chkCfgFile():self._Cfg=Cfg(self._cfgFileName)self._Cfg.load()ifnotskip_some_checks:self._Cfg.check()self._chkenv()def__findCfgFile(self):forpathin['/root','/usr/local/etc','/etc']:tmp=os.path.join(path,'vmm.cfg')ifos.path.isfile(tmp):self._cfgFileName=tmpbreakifnotlen(self._cfgFileName):raiseVMMError(_(u"No “vmm.cfg” found in: /root:/usr/local/etc:/etc"),ERR.CONF_NOFILE)def__chkCfgFile(self):"""Checks the configuration file, returns bool"""self.__findCfgFile()fstat=os.stat(self._cfgFileName)fmode=int(oct(fstat.st_mode&0777))iffmode%100andfstat.st_uid!=fstat.st_gidor \fmode%10andfstat.st_uid==fstat.st_gid:raisePermissionError(_(u"wrong permissions for '%(file)s': \%(perms)s\n`chmod 0600 %(file)s` would be great.")%{'file':self._cfgFileName,'perms':fmode},ERR.CONF_WRONGPERM)else:returnTruedef_chkenv(self):""""""basedir=self._Cfg.dget('misc.base_directory')ifnotos.path.exists(basedir):old_umask=os.umask(0006)os.makedirs(basedir,0771)os.chown(basedir,0,0)os.umask(old_umask)elifnotos.path.isdir(basedir):raiseVMMError(_(u'“%s” is not a directory.\n\(vmm.cfg: section "misc", option "base_directory")')%basedir,ERR.NO_SUCH_DIRECTORY)foropt,valinself._Cfg.items('bin'):try:exec_ok(val)exceptVMMError,e:ife.codeisERR.NO_SUCH_BINARY:raiseVMMError(_(u'“%(binary)s” doesn\'t exist.\n\(vmm.cfg: section "bin", option "%(option)s")')%{'binary':val,'option':opt},ERR.NO_SUCH_BINARY)elife.codeisERR.NOT_EXECUTABLE:raiseVMMError(_(u'“%(binary)s” is not executable.\\n(vmm.cfg: section "bin", option "%(option)s")')%{'binary':val,'option':opt},ERR.NOT_EXECUTABLE)else:raisedef__dbConnect(self):"""Creates a pyPgSQL.PgSQL.connection instance."""ifself._dbhisNoneor(isinstance(self._dbh,PgSQL.Connection)andnotself._dbh._isOpen):try:self._dbh=PgSQL.connect(database=self._Cfg.dget('database.name'),user=self._Cfg.pget('database.user'),host=self._Cfg.dget('database.host'),password=self._Cfg.pget('database.pass'),client_encoding='utf8',unicode_results=True)dbc=self._dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()exceptPgSQL.libpq.DatabaseError,e:raiseVMMError(str(e),ERR.DATABASE_ERROR)def_chk_other_address_types(self,address,exclude):"""Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`, `TYPE_ALIAS` or `TYPE_RELOCATED`, but not as the `TYPE_*` specified by *exclude*. If the *address* is known as one of the `TYPE_*`s the according `TYPE_*` constant will be returned. Otherwise 0 will be returned."""assertexcludein(TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATED)and \isinstance(address,EmailAddress)ifexcludeisnotTYPE_ACCOUNT:account=Account(self._dbh,address)ifaccount:returnTYPE_ACCOUNTifexcludeisnotTYPE_ALIAS:alias=Alias(self._dbh,address)ifalias:returnTYPE_ALIASifexcludeisnotTYPE_RELOCATED:relocated=Relocated(self._dbh,address)ifrelocated:returnTYPE_RELOCATEDreturn0def_is_other_address(self,address,exclude):"""Checks if *address* is known for an Account (TYPE_ACCOUNT), Alias (TYPE_ALIAS) or Relocated (TYPE_RELOCATED), except for *exclude*. Returns `False` if the address is not known for other types. Raises a `VMMError` if the address is known. """other=self._chk_other_address_types(address,exclude)ifnotother:returnFalsemsg=_(u"There is already %(a_type)s with the address '%(address)s'.")raiseVMMError(msg%{'a_type':OTHER_TYPES[other][0],'address':address},OTHER_TYPES[other][1])def__getAccount(self,address):address=EmailAddress(address)self.__dbConnect()returnAccount(self._dbh,address)def__getAlias(self,address):address=EmailAddress(address)self.__dbConnect()returnAlias(self._dbh,address)def__getRelocated(self,address):address=EmailAddress(address)self.__dbConnect()returnRelocated(self._dbh,address)def__getDomain(self,domainname):self.__dbConnect()returnDomain(self._dbh,domainname)def__getDiskUsage(self,directory):"""Estimate file space usage for the given directory. Keyword arguments: directory -- the directory to summarize recursively disk usage for """ifself.__isdir(directory):returnPopen([self._Cfg.dget('bin.du'),"-hs",directory],stdout=PIPE).communicate()[0].split('\t')[0]else:return0def__isdir(self,directory):isdir=os.path.isdir(directory)ifnotisdir:self.__warnings.append(_('No such directory: %s')%directory)returnisdirdef__makedir(self,directory,mode=None,uid=None,gid=None):ifmodeisNone:mode=self._Cfg.dget('account.directory_mode')ifuidisNone:uid=0ifgidisNone:gid=0os.makedirs(directory,mode)os.chown(directory,uid,gid)def__domDirMake(self,domdir,gid):#TODO: clenaup!os.umask(0006)oldpwd=os.getcwd()basedir=self._Cfg.dget('misc.base_directory')domdirdirs=domdir.replace(basedir+'/','').split('/')os.chdir(basedir)ifnotos.path.isdir(domdirdirs[0]):self.__makedir(domdirdirs[0],489,0,0)os.chdir(domdirdirs[0])os.umask(0007)self.__makedir(domdirdirs[1],self._Cfg.dget('domain.directory_mode'),0,gid)os.chdir(oldpwd)def__subscribe(self,folderlist,uid,gid):"""Creates a subscriptions file with the mailboxes from `folderlist`"""fname=os.path.join(self._Cfg.dget('maildir.name'),'subscriptions')sf=open(fname,'w')sf.write('\n'.join(folderlist))sf.write('\n')sf.flush()sf.close()os.chown(fname,uid,gid)os.chmod(fname,384)def__mailDirMake(self,domdir,uid,gid):"""Creates maildirs and maildir subfolders. Keyword arguments: domdir -- the path to the domain directory uid -- user id from the account gid -- group id from the account """# obsolete -> (mailbox / maillocation)returnos.umask(0007)oldpwd=os.getcwd()os.chdir(domdir)maildir=self._Cfg.dget('maildir.name')folders=[maildir]append=folders.appendforfolderinself._Cfg.dget('maildir.folders').split(':'):folder=folder.strip()iflen(folder)andnotfolder.count('..'):ifre.match(RE_MBOX_NAMES,folder):append('%s/.%s'%(maildir,folder))else:self.__warnings.append(_('Skipped mailbox folder: %r')%folder)else:self.__warnings.append(_('Skipped mailbox folder: %r')%folder)subdirs=['cur','new','tmp']mode=self._Cfg.dget('account.directory_mode')self.__makedir('%s'%uid,mode,uid,gid)os.chdir('%s'%uid)forfolderinfolders:self.__makedir(folder,mode,uid,gid)forsubdirinsubdirs:self.__makedir(os.path.join(folder,subdir),mode,uid,gid)self.__subscribe((f.replace(maildir+'/.','')forfinfolders[1:]),uid,gid)os.chdir(oldpwd)def__userDirDelete(self,domdir,uid,gid):ifuid>0andgid>0:userdir='%s'%uidifuserdir.count('..')ordomdir.count('..'):raiseVMMError(_(u'Found ".." in home directory path.'),ERR.FOUND_DOTS_IN_PATH)ifos.path.isdir(domdir):os.chdir(domdir)ifos.path.isdir(userdir):mdstat=os.stat(userdir)if(mdstat.st_uid,mdstat.st_gid)!=(uid,gid):raiseVMMError(_(u'Detected owner/group mismatch in home directory.'),ERR.MAILDIR_PERM_MISMATCH)rmtree(userdir,ignore_errors=True)else:raiseVMMError(_(u"No such directory: %s")%os.path.join(domdir,userdir),ERR.NO_SUCH_DIRECTORY)def__domDirDelete(self,domdir,gid):ifgid>0:ifnotself.__isdir(domdir):returnbasedir=self._Cfg.dget('misc.base_directory')domdirdirs=domdir.replace(basedir+'/','').split('/')domdirparent=os.path.join(basedir,domdirdirs[0])ifbasedir.count('..')ordomdir.count('..'):raiseVMMError(_(u'Found ".." in domain directory path.'),ERR.FOUND_DOTS_IN_PATH)ifos.path.isdir(domdirparent):os.chdir(domdirparent)ifos.lstat(domdirdirs[1]).st_gid!=gid:raiseVMMError(_(u'Detected group mismatch in domain directory.'),ERR.DOMAINDIR_GROUP_MISMATCH)rmtree(domdirdirs[1],ignore_errors=True)defhasWarnings(self):"""Checks if warnings are present, returns bool."""returnbool(len(self.__warnings))defgetWarnings(self):"""Returns a list with all available warnings and resets all warnings. """ret_val=self.__warnings[:]delself.__warnings[:]returnret_valdefcfg_dget(self,option):"""Get the configured value of the *option* (section.option). When the option was not configured its default value will be returned."""returnself._Cfg.dget(option)defcfg_pget(self,option):"""Get the configured value of the *option* (section.option)."""returnself._Cfg.pget(option)defcfg_install(self):"""Installs the cfg_dget method as ``cfg_dget`` into the built-in namespace."""import__builtin__assert'cfg_dget'notin__builtin__.__dict____builtin__.__dict__['cfg_dget']=self._Cfg.dgetdefdomainAdd(self,domainname,transport=None):dom=self.__getDomain(domainname)iftransportisNone:dom.set_transport(Transport(self._dbh,transport=self._Cfg.dget('misc.transport')))else:dom.set_transport(Transport(self._dbh,transport=transport))dom.set_directory(self._Cfg.dget('misc.base_directory'))dom.save()self.__domDirMake(dom.directory,dom.gid)defdomainTransport(self,domainname,transport,force=None):ifforceisnotNoneandforce!='force':raiseDomainError(_(u"Invalid argument: “%s”")%force,ERR.INVALID_OPTION)dom=self.__getDomain(domainname)trsp=Transport(self._dbh,transport=transport)ifforceisNone:dom.update_transport(trsp)else:dom.update_transport(trsp,force=True)defdomainDelete(self,domainname,force=None):ifnotforceisNoneandforcenotin['deluser','delalias','delall']:raiseDomainError(_(u'Invalid argument: “%s”')%force,ERR.INVALID_OPTION)dom=self.__getDomain(domainname)gid=dom.giddomdir=dom.directoryifself._Cfg.dget('domain.force_deletion')orforce=='delall':dom.delete(True,True)elifforce=='deluser':dom.delete(deluser=True)elifforce=='delalias':dom.delete(delalias=True)else:dom.delete()ifself._Cfg.dget('domain.delete_directory'):self.__domDirDelete(domdir,gid)defdomainInfo(self,domainname,details=None):ifdetailsnotin[None,'accounts','aliasdomains','aliases','full','relocated']:raiseVMMError(_(u'Invalid argument: “%s”')%details,ERR.INVALID_AGUMENT)dom=self.__getDomain(domainname)dominfo=dom.get_info()ifdominfo['domainname'].startswith('xn--'):dominfo['domainname']+=' (%s)'%ace2idna(dominfo['domainname'])ifdetailsisNone:returndominfoelifdetails=='accounts':return(dominfo,dom.get_accounts())elifdetails=='aliasdomains':return(dominfo,dom.get_aliase_names())elifdetails=='aliases':return(dominfo,dom.get_aliases())elifdetails=='relocated':return(dominfo,dom.get_relocated())else:return(dominfo,dom.get_aliase_names(),dom.get_accounts(),dom.get_aliases(),dom.get_relocated())defaliasDomainAdd(self,aliasname,domainname):"""Adds an alias domain to the domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the target domain """dom=self.__getDomain(domainname)aliasDom=AliasDomain(self._dbh,aliasname)aliasDom.set_destination(dom)aliasDom.save()defaliasDomainInfo(self,aliasname):"""Returns a dict (keys: "alias" and "domain") with the names of the alias domain and its primary domain."""self.__dbConnect()aliasDom=AliasDomain(self._dbh,aliasname)returnaliasDom.info()defaliasDomainSwitch(self,aliasname,domainname):"""Modifies the target domain of an existing alias domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the new target domain """dom=self.__getDomain(domainname)aliasDom=AliasDomain(self._dbh,aliasname)aliasDom.set_destination(dom)aliasDom.switch()defaliasDomainDelete(self,aliasname):"""Deletes the given alias domain. Argument: `aliasname` : basestring The name of the alias domain """self.__dbConnect()aliasDom=AliasDomain(self._dbh,aliasname)aliasDom.delete()defdomainList(self,pattern=None):fromVirtualMailManager.Domainimportsearchlike=Falseifpatternand(pattern.startswith('%')orpattern.endswith('%')):like=Trueifnotre.match(RE_DOMAIN_SEARCH,pattern.strip('%')):raiseVMMError(_(u"The pattern '%s' contains invalid characters.")%pattern,ERR.DOMAIN_INVALID)self.__dbConnect()returnsearch(self._dbh,pattern=pattern,like=like)defuser_add(self,emailaddress,password):"""Wrapper around Account.set_password() and Account.save()."""acc=self.__getAccount(emailaddress)acc.set_password(password)acc.save()# depends on modules mailbox and maillocation# self.__mailDirMake(acc.domain_directory, acc.uid, acc.gid)defaliasAdd(self,aliasaddress,*targetaddresses):"""Creates a new `Alias` entry for the given *aliasaddress* with the given *targetaddresses*."""alias=self.__getAlias(aliasaddress)destinations=[EmailAddress(address)foraddressintargetaddresses]warnings=[]destinations=alias.add_destinations(destinations,warnings)ifwarnings:self.__warnings.append(_('Ignored destination addresses:'))self.__warnings.extend((' * %s'%wforwinwarnings))fordestinationindestinations:ifget_gid(self._dbh,destination.domainname)and \notself._chk_other_address_types(destination,TYPE_RELOCATED):self.__warnings.append(_(u"The destination account/alias '%s' doesn't exist.")%destination)defuser_delete(self,emailaddress,force=None):"""Wrapper around Account.delete(...)"""ifforcenotin(None,'delalias'):raiseVMMError(_(u"Invalid argument: '%s'")%force,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' doesn't exist.")%acc.address,ERR.NO_SUCH_ACCOUNT)uid=acc.uidgid=acc.giddom_dir=acc.domain_directoryacc_dir=acc.homeacc.delete(bool(force))ifself._Cfg.dget('account.delete_directory'):try:self.__userDirDelete(dom_dir,uid,gid)exceptVMMError,err:iferr.codein(ERR.FOUND_DOTS_IN_PATH,ERR.MAILDIR_PERM_MISMATCH,ERR.NO_SUCH_DIRECTORY):warning=_(u"""\The account has been successfully deleted from the database. But an error occurred while deleting the following directory: “%(directory)s” Reason: %(reason)s""")% \{'directory':acc_dir,'reason':err.msg}self.__warnings.append(warning)else:raisedefaliasInfo(self,aliasaddress):"""Returns an iterator object for all destinations (`EmailAddress` instances) for the `Alias` with the given *aliasaddress*."""alias=self.__getAlias(aliasaddress)ifalias:returnalias.get_destinations()ifnotself._is_other_address(alias.address,TYPE_ALIAS):raiseVMMError(_(u"The alias '%s' doesn't exist.")%alias.address,ERR.NO_SUCH_ALIAS)defaliasDelete(self,aliasaddress,targetaddress=None):"""Deletes the `Alias` *aliasaddress* with all its destinations from the database. If *targetaddress* is not ``None``, only this destination will be removed from the alias."""alias=self.__getAlias(aliasaddress)iftargetaddressisNone:alias.delete()else:alias.del_destination(EmailAddress(targetaddress))defuser_info(self,emailaddress,details=None):"""Wrapper around Account.get_info(...)"""ifdetailsnotin(None,'du','aliases','full'):raiseVMMError(_(u"Invalid argument: '%s'")%details,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)ifnotacc:ifnotself._is_other_address(acc.address,TYPE_ACCOUNT):raiseVMMError(_(u"The account '%s' doesn't exist.")%acc.address,ERR.NO_SUCH_ACCOUNT)info=acc.get_info()ifself._Cfg.dget('account.disk_usage')ordetailsin('du','full'):path=os.path.join(acc.home,info['mail_location'].split('/')[-1])info['disk usage']=self.__getDiskUsage(path)ifdetailsin(None,'du'):returninfoifdetailsin('aliases','full'):return(info,acc.get_aliases())returninfodefuser_by_uid(self,uid):"""Search for an Account by its *uid*. Returns a dict (address, uid and gid) if a user could be found."""fromVirtualMailManager.Accountimportget_account_by_uidself.__dbConnect()returnget_account_by_uid(uid,self._dbh)defuser_password(self,emailaddress,password):"""Wrapper for Account.modify('password' ...)."""ifnotisinstance(password,basestring)ornotpassword:raiseVMMError(_(u"Could not accept password: '%s'")%password,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' doesn't exist.")%acc.address,ERR.NO_SUCH_ACCOUNT)acc.modify('password',password)defuser_name(self,emailaddress,name):"""Wrapper for Account.modify('name', ...)."""ifnotisinstance(name,basestring)ornotname:raiseVMMError(_(u"Could not accept name: '%s'")%name,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' doesn't exist.")%acc.address,ERR.NO_SUCH_ACCOUNT)acc.modify('name',name)defuser_transport(self,emailaddress,transport):"""Wrapper for Account.modify('transport', ...)."""ifnotisinstance(transport,basestring)ornottransport:raiseVMMError(_(u"Could not accept transport: '%s'")%transport,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' doesn't exist.")%acc.address,ERR.NO_SUCH_ACCOUNT)acc.modify('transport',transport)defuser_disable(self,emailaddress,service=None):"""Wrapper for Account.disable(service)"""ifservicenotin(None,'all','imap','pop3','smtp','sieve'):raiseVMMError(_(u"Could not accept service: '%s'")%service,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' doesn't exist.")%acc.address,ERR.NO_SUCH_ACCOUNT)acc.disable(service)defuser_enable(self,emailaddress,service=None):"""Wrapper for Account.enable(service)"""ifservicenotin(None,'all','imap','pop3','smtp','sieve'):raiseVMMError(_(u"could not accept service: '%s'")%service,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)ifnotacc:raiseVMMError(_(u"The account '%s' doesn't exist.")%acc.address,ERR.NO_SUCH_ACCOUNT)acc.enable(service)defrelocatedAdd(self,emailaddress,targetaddress):"""Creates a new `Relocated` entry in the database. If there is already a relocated user with the given *emailaddress*, only the *targetaddress* for the relocated user will be updated."""relocated=self.__getRelocated(emailaddress)relocated.set_destination(EmailAddress(targetaddress))defrelocatedInfo(self,emailaddress):"""Returns the target address of the relocated user with the given *emailaddress*."""relocated=self.__getRelocated(emailaddress)ifrelocated:returnrelocated.get_info()ifnotself._is_other_address(relocated.address,TYPE_RELOCATED):raiseVMMError(_(u"The relocated user '%s' doesn't exist.")%relocated.address,ERR.NO_SUCH_RELOCATED)defrelocatedDelete(self,emailaddress):"""Deletes the relocated user with the given *emailaddress* from the database."""relocated=self.__getRelocated(emailaddress)relocated.delete()def__del__(self):ifisinstance(self._dbh,PgSQL.Connection)andself._dbh._isOpen:self._dbh.close()del_