VMM/…: Provide parameters as tuple to cursor.execute().
# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2010, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.domain ~~~~~~~~~~~~~~~~~~~~~~~~~ Virtual Mail Manager's Domain class to manage e-mail domains."""importosimportrefromrandomimportchoicefromVirtualMailManager.constantsimport \ACCOUNT_AND_ALIAS_PRESENT,DOMAIN_ALIAS_EXISTS,DOMAIN_EXISTS, \DOMAIN_INVALID,DOMAIN_TOO_LONG,NO_SUCH_DOMAINfromVirtualMailManager.errorsimportDomainErrorasDomErrfromVirtualMailManager.pycompatimportanyfromVirtualMailManager.transportimportTransportMAILDIR_CHARS='0123456789abcdefghijklmnopqrstuvwxyz'RE_DOMAIN=re.compile(r"^(?:[a-z0-9-]{1,63}\.){1,}[a-z]{2,6}$")_=lambdamsg:msgclassDomain(object):"""Class to manage e-mail domains."""__slots__=('_directory','_gid','_name','_transport','_dbh','_new')def__init__(self,dbh,domainname):"""Creates a new Domain instance. Loads all relevant data from the database, if the domain could be found. To create a new domain call the methods set_directory() and set_transport() before save(). A DomainError will be thrown when the *domainname* is the name of an alias domain. Arguments: `dbh` : pyPgSQL.PgSQL.Connection a database connection for the database access `domainname` : basestring The name of the domain """self._name=check_domainname(domainname)self._dbh=dbhself._gid=0self._transport=Noneself._directory=Noneself._new=Trueself._load()def_load(self):"""Load information from the database and checks if the domain name is the primary one. Raises a DomainError if Domain._name isn't the primary name of the domain. """dbc=self._dbh.cursor()dbc.execute('SELECT dd.gid, tid, domaindir, is_primary FROM ''domain_data dd, domain_name dn WHERE domainname = %s AND ''dn.gid = dd.gid',(self._name,))result=dbc.fetchone()dbc.close()ifresult:ifnotresult[3]:raiseDomErr(_(u"The domain '%s' is an alias domain.")%self._name,DOMAIN_ALIAS_EXISTS)self._gid,self._directory=result[0],result[2]self._transport=Transport(self._dbh,tid=result[1])self._new=Falsedef_set_gid(self):"""Sets the ID of the domain - if not set yet."""assertself._gid==0dbc=self._dbh.cursor()dbc.execute("SELECT nextval('domain_gid')")self._gid=dbc.fetchone()[0]dbc.close()def_check_for_addresses(self):"""Checks dependencies for deletion. Raises a DomainError if there are accounts, aliases and/or relocated users. """dbc=self._dbh.cursor()dbc.execute('SELECT count(gid) FROM users WHERE gid = %(gid)u ''UNION SELECT count(gid) FROM alias WHERE gid = %(gid)u ''UNION SELECT count(gid) FROM relocated WHERE gid = ''%(gid)u'%{'gid':self._gid})result=dbc.fetchall()dbc.close()result=[count[0]forcountinresult]ifany(result):keys=('account_count','alias_count','relocated_count')raiseDomErr(_(u'There are %(account_count)u accounts, 'u'%(alias_count)u aliases and %(relocated_count)u 'u'relocated users.')%dict(zip(keys,result)),ACCOUNT_AND_ALIAS_PRESENT)def_chk_state(self):"""Throws a DomainError if the Domain is new - not saved in the database."""ifself._new:raiseDomErr(_(u"The domain '%s' doesn't exist.")%self._name,NO_SUCH_DOMAIN)@propertydefgid(self):"""The GID of the Domain."""returnself._gid@propertydefname(self):"""The Domain's name."""returnself._name@propertydefdirectory(self):"""The Domain's directory."""returnself._directorydefset_directory(self,basedir):"""Set the path value of the Domain's directory, inside *basedir*. Argument: `basedir` : basestring The base directory of all domains """assertself._newandself._directoryisNoneself._set_gid()self._directory=os.path.join(basedir,choice(MAILDIR_CHARS),str(self._gid))@propertydeftransport(self):"""The Domain's transport."""returnself._transportdefset_transport(self,transport):"""Set the transport for the new Domain. Argument: `transport` : VirtualMailManager.Transport The transport of the new Domain """assertself._newandisinstance(transport,Transport)self._transport=transportdefsave(self):"""Stores the new domain in the database."""ifnotself._new:raiseDomErr(_(u"The domain '%s' already exists.")%self._name,DOMAIN_EXISTS)assertself._directoryisnotNoneandself._transportisnotNonedbc=self._dbh.cursor()dbc.execute("INSERT INTO domain_data VALUES (%s, %s, %s)",(self._gid,self._transport.tid,self._directory))dbc.execute("INSERT INTO domain_name VALUES (%s, %s, %s)",(self._name,self._gid,True))self._dbh.commit()dbc.close()self._new=Falsedefdelete(self,force=False):"""Deletes the domain. Arguments: `force` : bool force the deletion of all available accounts, aliases and relocated users. When *force* is `False` and there are accounts, aliases and/or relocated users a DomainError will be raised. Default `False` """ifnotisinstance(force,bool):raiseTypeError('force must be a bool')self._chk_state()ifnotforce:self._check_for_addresses()dbc=self._dbh.cursor()fortblin('alias','users','relocated','domain_name','domain_data'):dbc.execute("DELETE FROM %s WHERE gid = %u"%(tbl,self._gid))self._dbh.commit()dbc.close()self._gid=0self._directory=self._transport=Noneself._new=Truedefupdate_transport(self,transport,force=False):"""Sets a new transport for the Domain. If *force* is `True` the new *transport* will be assigned to all existing accounts. Otherwise the *transport* will be only used for accounts created from now on. Arguments: `transport` : VirtualMailManager.Transport the new transport `force` : bool enforce new transport setting for all accounts, default `False` """self._chk_state()assertisinstance(transport,Transport)iftransport==self._transport:returndbc=self._dbh.cursor()dbc.execute("UPDATE domain_data SET tid = %s WHERE gid = %s",(transport.tid,self._gid))ifdbc.rowcount>0:self._dbh.commit()ifforce:dbc.execute("UPDATE users SET tid = %s WHERE gid = %s",(transport.tid,self._gid))ifdbc.rowcount>0:self._dbh.commit()dbc.close()self._transport=transportdefget_info(self):"""Returns a dictionary with information about the domain."""self._chk_state()dbc=self._dbh.cursor()dbc.execute('SELECT gid, domainname, transport, domaindir, ''aliasdomains, accounts, aliases, relocated FROM ''vmm_domain_info WHERE gid = %s',(self._gid,))info=dbc.fetchone()dbc.close()keys=('gid','domainname','transport','domaindir','aliasdomains','accounts','aliases','relocated')returndict(zip(keys,info))defget_accounts(self):"""Returns a list with all accounts of the domain."""self._chk_state()dbc=self._dbh.cursor()dbc.execute('SELECT local_part from users where gid = %s ORDER BY ''local_part',(self._gid,))users=dbc.fetchall()dbc.close()accounts=[]ifusers:addr=u'@'.join_dom=self._nameaccounts=[addr((account[0],_dom))foraccountinusers]returnaccountsdefget_aliases(self):"""Returns a list with all aliases e-mail addresses of the domain."""self._chk_state()dbc=self._dbh.cursor()dbc.execute('SELECT DISTINCT address FROM alias WHERE gid = %s ORDER ''BY address',(self._gid,))addresses=dbc.fetchall()dbc.close()aliases=[]ifaddresses:addr=u'@'.join_dom=self._namealiases=[addr((alias[0],_dom))foraliasinaddresses]returnaliasesdefget_relocated(self):"""Returns a list with all addresses of relocated users."""self._chk_state()dbc=self._dbh.cursor()dbc.execute('SELECT address FROM relocated WHERE gid = %s ORDER BY ''address',(self._gid,))addresses=dbc.fetchall()dbc.close()relocated=[]ifaddresses:addr=u'@'.join_dom=self._namerelocated=[addr((address[0],_dom))foraddressinaddresses]returnrelocateddefget_aliase_names(self):"""Returns a list with all alias domain names of the domain."""self._chk_state()dbc=self._dbh.cursor()dbc.execute('SELECT domainname FROM domain_name WHERE gid = %s AND ''NOT is_primary ORDER BY domainname',(self._gid,))anames=dbc.fetchall()dbc.close()aliasdomains=[]ifanames:aliasdomains=[aname[0]foranameinanames]returnaliasdomainsdefcheck_domainname(domainname):"""Returns the validated domain name `domainname`. Throws an `DomainError`, if the domain name is too long or doesn't look like a valid domain name (label.label.label). """ifnotRE_DOMAIN.match(domainname):domainname=domainname.encode('idna')iflen(domainname)>255:raiseDomErr(_(u'The domain name is too long'),DOMAIN_TOO_LONG)ifnotRE_DOMAIN.match(domainname):raiseDomErr(_(u"The domain name '%s' is invalid")%domainname,DOMAIN_INVALID)returndomainnamedefget_gid(dbh,domainname):"""Returns the group id of the domain *domainname*. If the domain couldn't be found in the database 0 will be returned. """domainname=check_domainname(domainname)dbc=dbh.cursor()dbc.execute('SELECT gid FROM domain_name WHERE domainname = %s',(domainname,))gid=dbc.fetchone()dbc.close()ifgid:returngid[0]return0defsearch(dbh,pattern=None,like=False):"""'Search' for domains by *pattern* in the database. *pattern* may be a domain name or a partial domain name - starting and/or ending with a '%' sign. When the *pattern* starts or ends with a '%' sign *like* has to be `True` to perform a wildcard search. To retrieve all available domains use the arguments' default values. This function returns a tuple with a list and a dict: (order, domains). The order list contains the domains' gid, alphabetical sorted by the primary domain name. The domains dict's keys are the gids of the domains. The value of item is a list. The first list element contains the primary domain name or `None`. The elements [1:] contains the names of alias domains. Arguments: `pattern` : basestring a (partial) domain name (starting and/or ending with a "%" sign) `like` : bool should be `True` when *pattern* starts/ends with a "%" sign """ifpatternandnotlike:pattern=check_domainname(pattern)sql='SELECT gid, domainname, is_primary FROM domain_name'ifpattern:iflike:sql+=" WHERE domainname LIKE '%s'"%patternelse:sql+=" WHERE domainname = '%s'"%patternsql+=' ORDER BY is_primary DESC, domainname'dbc=dbh.cursor()dbc.execute(sql)result=dbc.fetchall()dbc.close()gids=[domain[0]fordomaininresultifdomain[2]]domains={}forgid,domain,is_primaryinresult:ifis_primary:ifnotgidindomains:domains[gid]=[domain]else:domains[gid].insert(0,domain)else:ifgidingids:ifgidindomains:domains[gid].append(domain)else:domains[gid]=[domain]else:gids.append(gid)domains[gid]=[None,domain]returngids,domainsdel_