Moved VirtualMailManager/Exceptions to VirtualMailManager/errors.
Renamed VMM*Exception classes to *Error.
No longer add the attribute 'message' to VMMError if it doesn't exist, like in
Python 2.4. It has been deprecated as of Python 2.6.
Also removed the methods code() and msg(), the values are now accessible via
the attributes 'code' and 'msg'.
# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2010, Pascal Volk# See COPYING for distribution information."""Virtual Mail Manager's Domain class to manage e-mail domains."""fromrandomimportchoicefromVirtualMailManagerimportcheck_domainnamefromVirtualMailManager.constants.ERRORimport \ACCOUNT_AND_ALIAS_PRESENT,ACCOUNT_PRESENT,ALIAS_PRESENT, \DOMAIN_ALIAS_EXISTS,DOMAIN_EXISTS,NO_SUCH_DOMAINfromVirtualMailManager.errorsimportDomainErrorasDomErrfromVirtualMailManager.TransportimportTransportMAILDIR_CHARS='0123456789abcdefghijklmnopqrstuvwxyz'classDomain(object):"""Class to manage e-mail domains."""__slots__=('_basedir','_domaindir','_id','_name','_transport','_dbh')def__init__(self,dbh,domainname,basedir=None,transport=None):"""Creates a new Domain instance. Keyword arguments: dbh -- a pyPgSQL.PgSQL.connection domainname -- name of the domain (str) transport -- default vmm.cfg/misc/transport (str) """self._dbh=dbhself._name=check_domainname(domainname)self._basedir=basediriftransportisnotNone:self._transport=Transport(self._dbh,transport=transport)else:self._transport=transportself._id=0self._domaindir=Noneifnotself._exists()andself._isAlias():raiseDomErr(_(u"The domain “%s” is an alias domain.")%self._name,DOMAIN_ALIAS_EXISTS)def_exists(self):"""Checks if the domain already exists. If the domain exists _id will be set and returns True, otherwise False will be returned. """dbc=self._dbh.cursor()dbc.execute("SELECT gid, tid, domaindir FROM domain_data WHERE gid =\ (SELECT gid FROM domain_name WHERE domainname = %s AND is_primary)",self._name)result=dbc.fetchone()dbc.close()ifresultisnotNone:self._id,self._domaindir=result[0],result[2]self._transport=Transport(self._dbh,tid=result[1])returnTrueelse:returnFalsedef_isAlias(self):"""Checks if self._name is known for an alias domain."""dbc=self._dbh.cursor()dbc.execute('SELECT is_primary FROM domain_name WHERE domainname = %s',self._name)result=dbc.fetchone()dbc.close()ifresultisnotNoneandnotresult[0]:returnTrueelse:returnFalsedef_setID(self):"""Sets the ID of the domain."""dbc=self._dbh.cursor()dbc.execute("SELECT nextval('domain_gid')")self._id=dbc.fetchone()[0]dbc.close()def_prepare(self):self._setID()self._domaindir="%s/%s/%i"%(self._basedir,choice(MAILDIR_CHARS),self._id)def_has(self,what):"""Checks if aliases or accounts are assigned to the domain. If there are assigned accounts or aliases True will be returned, otherwise False will be returned. Keyword arguments: what -- 'alias' or 'users' (strings) """ifwhatnotin['alias','users']:returnFalsedbc=self._dbh.cursor()ifwhat=='users':dbc.execute("SELECT count(gid) FROM users WHERE gid=%s",self._id)else:dbc.execute("SELECT count(gid) FROM alias WHERE gid=%s",self._id)count=dbc.fetchone()dbc.close()ifcount[0]>0:returnTrueelse:returnFalsedef_chkDelete(self,delUser,delAlias):"""Checks dependencies for deletion. Keyword arguments: delUser -- ignore available accounts (bool) delAlias -- ignore available aliases (bool) """ifnotdelUser:hasUser=self._has('users')else:hasUser=FalseifnotdelAlias:hasAlias=self._has('alias')else:hasAlias=FalseifhasUserandhasAlias:raiseDomErr(_(u'There are accounts and aliases.'),ACCOUNT_AND_ALIAS_PRESENT)elifhasUser:raiseDomErr(_(u'There are accounts.'),ACCOUNT_PRESENT)elifhasAlias:raiseDomErr(_(u'There are aliases.'),ALIAS_PRESENT)defsave(self):"""Stores the new domain in the database."""ifself._id<1:self._prepare()dbc=self._dbh.cursor()dbc.execute("INSERT INTO domain_data (gid, tid, domaindir)\ VALUES (%s, %s, %s)",self._id,self._transport.getID(),self._domaindir)dbc.execute("INSERT INTO domain_name (domainname, gid, is_primary)\ VALUES (%s, %s, %s)",self._name,self._id,True)self._dbh.commit()dbc.close()else:raiseDomErr(_(u'The domain “%s” already exists.')%self._name,DOMAIN_EXISTS)defdelete(self,delUser=False,delAlias=False):"""Deletes the domain. Keyword arguments: delUser -- force deletion of available accounts (bool) delAlias -- force deletion of available aliases (bool) """ifself._id>0:self._chkDelete(delUser,delAlias)dbc=self._dbh.cursor()fortin('alias','users','relocated','domain_name','domain_data'):dbc.execute("DELETE FROM %s WHERE gid = %d"%(t,self._id))self._dbh.commit()dbc.close()else:raiseDomErr(_(u"The domain “%s” doesn't exist.")%self._name,NO_SUCH_DOMAIN)defupdateTransport(self,transport,force=False):"""Sets a new transport for the domain. Keyword arguments: transport -- the new transport (str) force -- True/False force new transport for all accounts (bool) """ifself._id>0:iftransport==self._transport.getTransport():returntrsp=Transport(self._dbh,transport=transport)dbc=self._dbh.cursor()dbc.execute("UPDATE domain_data SET tid = %s WHERE gid = %s",trsp.getID(),self._id)ifdbc.rowcount>0:self._dbh.commit()ifforce:dbc.execute("UPDATE users SET tid = %s WHERE gid = %s",trsp.getID(),self._id)ifdbc.rowcount>0:self._dbh.commit()dbc.close()else:raiseDomErr(_(u"The domain “%s” doesn't exist.")%self._name,NO_SUCH_DOMAIN)defgetID(self):"""Returns the ID of the domain."""returnself._iddefgetDir(self):"""Returns the directory of the domain."""returnself._domaindirdefgetTransport(self):"""Returns domain's transport."""returnself._transport.getTransport()defgetTransportID(self):"""Returns the ID from the domain's transport."""returnself._transport.getID()defgetInfo(self):"""Returns a dictionary with information about the domain."""sql="""\SELECT gid, domainname, transport, domaindir, aliasdomains, accounts, aliases, relocated FROM vmm_domain_info WHERE gid = %i"""%self._iddbc=self._dbh.cursor()dbc.execute(sql)info=dbc.fetchone()dbc.close()ifinfoisNone:raiseDomErr(_(u"The domain “%s” doesn't exist.")%self._name,NO_SUCH_DOMAIN)else:keys=['gid','domainname','transport','domaindir','aliasdomains','accounts','aliases','relocated']returndict(zip(keys,info))defgetAccounts(self):"""Returns a list with all accounts from the domain."""dbc=self._dbh.cursor()dbc.execute("SELECT local_part from users where gid = %s ORDER BY\ local_part",self._id)users=dbc.fetchall()dbc.close()accounts=[]iflen(users)>0:addr=u'@'.join_dom=self._nameaccounts=[addr((account[0],_dom))foraccountinusers]returnaccountsdefgetAliases(self):"""Returns a list with all aliases from the domain."""dbc=self._dbh.cursor()dbc.execute("SELECT DISTINCT address FROM alias WHERE gid = %s\ ORDER BY address",self._id)addresses=dbc.fetchall()dbc.close()aliases=[]iflen(addresses)>0:addr=u'@'.join_dom=self._namealiases=[addr((alias[0],_dom))foraliasinaddresses]returnaliasesdefgetRelocated(self):"""Returns a list with all addresses from relocated users."""dbc=self._dbh.cursor()dbc.execute("SELECT address FROM relocated WHERE gid = %s\ ORDER BY address",self._id)addresses=dbc.fetchall()dbc.close()relocated=[]iflen(addresses)>0:addr=u'@'.join_dom=self._namerelocated=[addr((address[0],_dom))foraddressinaddresses]returnrelocateddefgetAliaseNames(self):"""Returns a list with all alias names from the domain."""dbc=self._dbh.cursor()dbc.execute("SELECT domainname FROM domain_name WHERE gid = %s\ AND NOT is_primary ORDER BY domainname",self._id)anames=dbc.fetchall()dbc.close()aliasdomains=[]iflen(anames)>0:aliasdomains=[aname[0]foranameinanames]returnaliasdomainsdefsearch(dbh,pattern=None,like=False):ifpatternisnotNoneandlikeisFalse:pattern=check_domainname(pattern)sql='SELECT gid, domainname, is_primary FROM domain_name'ifpatternisNone:passeliflike:sql+=" WHERE domainname LIKE '%s'"%patternelse:sql+=" WHERE domainname = '%s'"%patternsql+=' ORDER BY is_primary DESC, domainname'dbc=dbh.cursor()dbc.execute(sql)doms=dbc.fetchall()dbc.close()domdict={}order=[dom[0]fordomindomsifdom[2]]iflen(order)==0:fordomindoms:ifdom[0]notinorder:order.append(dom[0])forgid,dom,is_primaryindoms:ifis_primary:domdict[gid]=[dom]else:try:domdict[gid].append(dom)exceptKeyError:domdict[gid]=[None,dom]deldomsreturnorder,domdictdefget_gid(dbh,domainname):"""Returns the *GID* of the domain *domainname*. Raises an `DomainError` if the domain does not exist. """domainname=check_domainname(domainname)dbc=dbh.cursor()dbc.execute('SELECT gid FROM domain_name WHERE domainname=%s',domainname)gid=dbc.fetchone()dbc.close()ifgid:returngid[0]else:raiseDomErr(_(u"The domain “%s” doesn't exist.")%domainname,NO_SUCH_DOMAIN)