# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2010, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.Alias Virtual Mail Manager's Alias class to manage e-mail aliases."""fromVirtualMailManager.Domainimportget_gidfromVirtualMailManager.EmailAddressimportEmailAddressfromVirtualMailManager.errorsimportAliasErrorasAErrfromVirtualMailManager.ext.PostconfimportPostconffromVirtualMailManager.pycompatimportallfromVirtualMailManager.constants.ERRORimport \ALIAS_EXCEEDS_EXPANSION_LIMIT,NO_SUCH_ALIAS,NO_SUCH_DOMAIN_=lambdamsg:msgcfg_dget=lambdaoption:NoneclassAlias(object):"""Class to manage e-mail aliases."""__slots__=('_addr','_dests','_gid','_dbh')def__init__(self,dbh,address):assertisinstance(address,EmailAddress)self._addr=addressself._dbh=dbhself._gid=get_gid(self._dbh,self._addr.domainname)ifnotself._gid:raiseAErr(_(u"The domain '%s' doesn't exist.")%self._addr.domainname,NO_SUCH_DOMAIN)self._dests=[]self.__load_dests()def__load_dests(self):"""Loads all known destination addresses into the _dests list."""dbc=self._dbh.cursor()dbc.execute('SELECT destination FROM alias WHERE gid = %s AND ''address = %s',self._gid,self._addr.localpart)dests=dbc.fetchall()ifdbc.rowcount>0:self._dests.extend(EmailAddress(dest[0])fordestindests)dbc.close()def__check_expansion(self,count_new):"""Checks the current expansion limit of the alias."""postconf=Postconf(cfg_dget('bin.postconf'))limit=long(postconf.read('virtual_alias_expansion_limit'))dcount=len(self._dests)failed=Falseifdcount==limitordcount+count_new>limit:failed=Trueerrmsg=_(u"""Can't add %(count_new)i new destination(s) to alias '%(address)s'.Currently this alias expands into %(count)i/%(limit)i recipients.%(count_new)i additional destination(s) will render this alias unusable.Hint: Increase Postfix' virtual_alias_expansion_limit""")elifdcount>limit:failed=Trueerrmsg=_(u"""Can't add %(count_new)i new destination(s) to alias '%(address)s'.This alias already exceeds its expansion limit (%(count)i/%(limit)i).So its unusable, all messages addressed to this alias will be bounced.Hint: Delete some destination addresses.""")iffailed:raiseAErr(errmsg%{'address':self._addr,'count':dcount,'limit':limit,'count_new':count_new},ALIAS_EXCEEDS_EXPANSION_LIMIT)def__delete(self,destination=None):"""Deletes a destination from the alias, if ``destination`` is not ``None``. If ``destination`` is None, the alias with all its destination addresses will be deleted. """dbc=self._dbh.cursor()ifnotdestination:dbc.execute('DELETE FROM alias WHERE gid = %s AND address = %s',self._gid,self._addr.localpart)else:dbc.execute('DELETE FROM alias WHERE gid = %s AND address = %s ''AND destination = %s',self._gid,self._addr.localpart,str(destination))ifdbc.rowcount>0:self._dbh.commit()dbc.close()def__len__(self):"""Returns the number of destinations of the alias."""returnlen(self._dests)@propertydefaddress(self):"""The Alias' EmailAddress instance."""returnself._addrdefadd_destinations(self,destinations,warnings=None):"""Adds the `EmailAddress`es from *destinations* list to the destinations of the alias. Destinations, that are already assigned to the alias, will be removed from *destinations*. When done, this method will return a set with all destinations, that were saved in the database. """destinations=set(destinations)assertdestinationsand \all(isinstance(dest,EmailAddress)fordestindestinations)ifnotwarningsisNone:assertisinstance(warnings,list)ifself._addrindestinations:destinations.remove(self._addr)ifnotwarningsisNone:warnings.append(self._addr)duplicates=destinations.intersection(set(self._dests))ifduplicates:destinations.difference_update(set(self._dests))ifnotwarningsisNone:warnings.extend(duplicates)ifnotdestinations:returndestinationsself.__check_expansion(len(destinations))dbc=self._dbh.cursor()dbc.executemany("INSERT INTO alias VALUES (%d, '%s', %%s)"%(self._gid,self._addr.localpart),(str(destination)fordestinationindestinations))self._dbh.commit()dbc.close()self._dests.extend(destinations)returndestinationsdefdel_destination(self,destination):"""Deletes the specified ``destination`` address from the alias."""assertisinstance(destination,EmailAddress)ifnotself._dests:raiseAErr(_(u"The alias '%s' doesn't exist.")%self._addr,NO_SUCH_ALIAS)ifnotdestinationinself._dests:raiseAErr(_(u"The address '%(addr)s' isn't a destination of "u"the alias '%(alias)s'.")%{'addr':self._addr,'alias':destination},NO_SUCH_ALIAS)self.__delete(destination)self._dests.remove(destination)defget_destinations(self):"""Returns an iterator for all destinations of the alias."""ifnotself._dests:raiseAErr(_(u"The alias '%s' doesn't exist.")%self._addr,NO_SUCH_ALIAS)returniter(self._dests)defdelete(self):"""Deletes the alias with all its destinations."""ifnotself._dests:raiseAErr(_(u"The alias '%s' doesn't exist.")%self._addr,NO_SUCH_ALIAS)self.__delete()delself._dests[:]del_,cfg_dget