# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2013, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.alias ~~~~~~~~~~~~~~~~~~~~~~~~ Virtual Mail Manager's Alias class to manage e-mail aliases."""fromVirtualMailManager.domainimportget_gidfromVirtualMailManager.emailaddressimport \EmailAddress,DestinationEmailAddressasDestAddrfromVirtualMailManager.errorsimportAliasErrorasAErrfromVirtualMailManager.ext.postconfimportPostconffromVirtualMailManager.constantsimport \ALIAS_EXCEEDS_EXPANSION_LIMIT,NO_SUCH_ALIAS,NO_SUCH_DOMAIN_=lambdamsg:msgcfg_dget=lambdaoption:NoneclassAlias(object):"""Class to manage e-mail aliases."""__slots__=('_addr','_dests','_gid','_dbh')def__init__(self,dbh,address):assertisinstance(address,EmailAddress)self._addr=addressself._dbh=dbhself._gid=get_gid(self._dbh,self._addr.domainname)ifnotself._gid:raiseAErr(_("The domain '%s' does not exist.")%self._addr.domainname,NO_SUCH_DOMAIN)self._dests=[]self._load_dests()def_load_dests(self):"""Loads all known destination addresses into the _dests list."""dbc=self._dbh.cursor()dbc.execute('SELECT destination FROM alias WHERE gid = %s AND ''address = %s ORDER BY destination',(self._gid,self._addr.localpart))dests=dbc.fetchall()ifdbc.rowcount>0:self._dests.extend(DestAddr(dest[0],self._dbh)fordestindests)dbc.close()def_check_expansion(self,count_new):"""Checks the current expansion limit of the alias."""postconf=Postconf(cfg_dget('bin.postconf'))limit=int(postconf.read('virtual_alias_expansion_limit'))dcount=len(self._dests)failed=Falseifdcount==limitordcount+count_new>limit:failed=Trueerrmsg=_("""Cannot add %(count_new)i new destination(s) to alias '%(address)s'.Currently this alias expands into %(count)i/%(limit)i recipients.%(count_new)i additional destination(s) will render this alias unusable.Hint: Increase Postfix' virtual_alias_expansion_limit""")elifdcount>limit:failed=Trueerrmsg=_("""Cannot add %(count_new)i new destination(s) to alias '%(address)s'.This alias already exceeds its expansion limit (%(count)i/%(limit)i).So its unusable, all messages addressed to this alias will be bounced.Hint: Delete some destination addresses.""")iffailed:raiseAErr(errmsg%{'address':self._addr,'count':dcount,'limit':limit,'count_new':count_new},ALIAS_EXCEEDS_EXPANSION_LIMIT)def_delete(self,destinations=None):"""Deletes the *destinations* from the alias, if ``destinations`` is not ``None``. If ``destinations`` is None, the alias with all its destination addresses will be deleted. """dbc=self._dbh.cursor()ifnotdestinations:dbc.execute('DELETE FROM alias WHERE gid = %s AND address = %s',(self._gid,self._addr.localpart))else:dbc.executemany("DELETE FROM alias WHERE gid = %d AND address = ""'%s' AND destination = %%s"%(self._gid,self._addr.localpart),((str(dest),)fordestindestinations))ifdbc.rowcount>0:self._dbh.commit()dbc.close()def__len__(self):"""Returns the number of destinations of the alias."""returnlen(self._dests)@propertydefaddress(self):"""The Alias' EmailAddress instance."""returnself._addrdefadd_destinations(self,destinations,warnings=None):"""Adds the `EmailAddress`es from *destinations* list to the destinations of the alias. Destinations, that are already assigned to the alias, will be removed from *destinations*. When done, this method will return a set with all destinations, that were saved in the database. """destinations=set(destinations)assertdestinationsand \all(isinstance(dest,EmailAddress)fordestindestinations)ifnotwarningsisNone:assertisinstance(warnings,list)ifself._addrindestinations:destinations.remove(self._addr)ifnotwarningsisNone:warnings.append(self._addr)duplicates=destinations.intersection(set(self._dests))ifduplicates:destinations.difference_update(set(self._dests))ifnotwarningsisNone:warnings.extend(duplicates)ifnotdestinations:returndestinationsself._check_expansion(len(destinations))dbc=self._dbh.cursor()dbc.executemany("INSERT INTO alias (gid, address, destination) ""VALUES (%d, '%s', %%s)"%(self._gid,self._addr.localpart),((str(destination),)fordestinationindestinations))self._dbh.commit()dbc.close()self._dests.extend(destinations)returndestinationsdefdel_destinations(self,destinations,warnings=None):"""Delete the specified `EmailAddress`es of *destinations* from the alias's destinations. """destinations=set(destinations)assertdestinationsand \all(isinstance(dest,EmailAddress)fordestindestinations)ifnotwarningsisNone:assertisinstance(warnings,list)ifself._addrindestinations:destinations.remove(self._addr)ifnotwarningsisNone:warnings.append(self._addr)ifnotself._dests:raiseAErr(_("The alias '%s' does not exist.")%self._addr,NO_SUCH_ALIAS)unknown=destinations.difference(set(self._dests))ifunknown:destinations.intersection_update(set(self._dests))ifnotwarningsisNone:warnings.extend(unknown)ifnotdestinations:raiseAErr(_("No suitable destinations left to remove from alias ""'%s'.")%self._addr,NO_SUCH_ALIAS)self._delete(destinations)fordestinationindestinations:self._dests.remove(destination)defget_destinations(self):"""Returns an iterator for all destinations of the alias."""ifnotself._dests:raiseAErr(_("The alias '%s' does not exist.")%self._addr,NO_SUCH_ALIAS)returniter(self._dests)defdelete(self):"""Deletes the alias with all its destinations."""ifnotself._dests:raiseAErr(_("The alias '%s' does not exist.")%self._addr,NO_SUCH_ALIAS)self._delete()delself._dests[:]del_,cfg_dget