# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2011, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.account ~~~~~~~~~~~~~~~~~~~~~~~~~~ Virtual Mail Manager's Account class to manage e-mail accounts."""fromVirtualMailManager.commonimportversion_str, \format_domain_defaultfromVirtualMailManager.constantsimport \ACCOUNT_EXISTS,ACCOUNT_MISSING_PASSWORD,ALIAS_PRESENT, \INVALID_ARGUMENT,INVALID_MAIL_LOCATION,NO_SUCH_ACCOUNT, \NO_SUCH_DOMAIN,VMM_ERRORfromVirtualMailManager.domainimportDomainfromVirtualMailManager.emailaddressimportEmailAddressfromVirtualMailManager.errorsimportVMMError,AccountErrorasAErrfromVirtualMailManager.maillocationimportMailLocationfromVirtualMailManager.passwordimportpwhashfromVirtualMailManager.quotalimitimportQuotaLimitfromVirtualMailManager.transportimportTransportfromVirtualMailManager.servicesetimportServiceSet__all__=('Account','get_account_by_uid')_=lambdamsg:msgcfg_dget=lambdaoption:NoneclassAccount(object):"""Class to manage e-mail accounts."""__slots__=('_addr','_dbh','_domain','_mail','_new','_passwd','_qlimit','_services','_transport','_note','_uid')def__init__(self,dbh,address):"""Creates a new Account instance. When an account with the given *address* could be found in the database all relevant data will be loaded. Arguments: `dbh` : pyPgSQL.PgSQL.Connection A database connection for the database access. `address` : VirtualMailManager.EmailAddress.EmailAddress The e-mail address of the (new) Account. """ifnotisinstance(address,EmailAddress):raiseTypeError("Argument 'address' is not an EmailAddress")self._addr=addressself._dbh=dbhself._domain=Domain(self._dbh,self._addr.domainname)ifnotself._domain.gid:# TP: Hm, what “quotation marks” should be used?# If you are unsure have a look at:# http://en.wikipedia.org/wiki/Quotation_mark,_non-English_usageraiseAErr(_(u"The domain '%s' does not exist.")%self._addr.domainname,NO_SUCH_DOMAIN)self._uid=0self._mail=Noneself._qlimit=Noneself._services=Noneself._transport=Noneself._note=Noneself._passwd=Noneself._new=Trueself._load()def__nonzero__(self):"""Returns `True` if the Account is known, `False` if it's new."""returnnotself._newdef_load(self):"""Load 'uid', 'mid', 'qid', 'ssid', 'tid' and 'note' from the database and set _new to `False` - if the user could be found. """dbc=self._dbh.cursor()dbc.execute('SELECT uid, mid, qid, ssid, tid, note FROM users ''WHERE gid = %s AND local_part = %s',(self._domain.gid,self._addr.localpart))result=dbc.fetchone()dbc.close()ifresult:self._uid,_mid,_qid,_ssid,_tid,_note=resultdefload_helper(ctor,own,field,dbresult):cur=NoneifownisNoneelsegetattr(own,field)ifcur!=dbresult:kwargs={field:dbresult}returnNoneifdbresultisNone \elsector(self._dbh,**kwargs)self._qlimit=load_helper(QuotaLimit,self._qlimit,'qid',_qid)self._services=load_helper(ServiceSet,self._services,'ssid',_ssid)self._transport=load_helper(Transport,self._transport,'tid',_tid)self._mail=MailLocation(self._dbh,mid=_mid)self._note=_noteself._new=Falsedef_set_uid(self):"""Set the unique ID for the new Account."""assertself._uid==0dbc=self._dbh.cursor()dbc.execute("SELECT nextval('users_uid')")self._uid=dbc.fetchone()[0]dbc.close()def_prepare(self,maillocation):"""Check and set different attributes - before we store the information in the database. """ifmaillocation.dovecot_version>cfg_dget('misc.dovecot_version'):raiseAErr(_(u"The mailbox format '%(mbfmt)s' requires Dovecot "u">= v%(version)s.")%{'mbfmt':maillocation.mbformat,'version':version_str(maillocation.dovecot_version)},INVALID_MAIL_LOCATION)ifnotmaillocation.postfixand \self._transport.transport.lower()in('virtual:','virtual'):raiseAErr(_(u"Invalid transport '%(transport)s' for mailbox "u"format '%(mbfmt)s'.")%{'transport':self._transport,'mbfmt':maillocation.mbformat},INVALID_MAIL_LOCATION)self._mail=maillocationself._set_uid()def_update_tables(self,column,value):"""Update various columns in the users table. Arguments: `column` : basestring Name of the table column. Currently: qid, ssid and tid `value` : long The referenced key """ifcolumnnotin('qid','ssid','tid'):raiseValueError('Unknown column: %r'%column)dbc=self._dbh.cursor()dbc.execute('UPDATE users SET %s = %%s WHERE uid = %%s'%column,(value,self._uid))ifdbc.rowcount>0:self._dbh.commit()dbc.close()def_count_aliases(self):"""Count all alias addresses where the destination address is the address of the Account."""dbc=self._dbh.cursor()dbc.execute('SELECT COUNT(destination) FROM alias WHERE destination ''= %s',(str(self._addr),))a_count=dbc.fetchone()[0]dbc.close()returna_countdef_chk_state(self):"""Raise an AccountError if the Account is new - not yet saved in the database."""ifself._new:raiseAErr(_(u"The account '%s' does not exist.")%self._addr,NO_SUCH_ACCOUNT)@propertydefaddress(self):"""The Account's EmailAddress instance."""returnself._addr@propertydefdomain(self):"""The Domain to which the Account belongs to."""ifself._domain:returnself._domainreturnNone@propertydefgid(self):"""The Account's group ID."""ifself._domain:returnself._domain.gidreturnNone@propertydefhome(self):"""The Account's home directory."""ifnotself._new:return'%s/%s'%(self._domain.directory,self._uid)returnNone@propertydefmail_location(self):"""The Account's MailLocation."""returnself._mail@propertydefnote(self):"""The Account's note."""returnself._note@propertydefuid(self):"""The Account's unique ID."""returnself._uiddefset_password(self,password):"""Set a password for the new Account. If you want to update the password of an existing Account use Account.modify(). Argument: `password` : basestring The password for the new Account. """ifnotself._new:raiseAErr(_(u"The account '%s' already exists.")%self._addr,ACCOUNT_EXISTS)ifnotisinstance(password,basestring)ornotpassword:raiseAErr(_(u"Could not accept password: '%s'")%password,ACCOUNT_MISSING_PASSWORD)self._passwd=passworddefset_note(self,note):"""Set the account's (optional) note. Argument: `note` : basestring or None The note, or None to remove """assertnoteisNoneorisinstance(note,basestring)self._note=notedefsave(self):"""Save the new Account in the database."""ifnotself._new:raiseAErr(_(u"The account '%s' already exists.")%self._addr,ACCOUNT_EXISTS)ifnotself._passwd:raiseAErr(_(u"No password set for account: '%s'")%self._addr,ACCOUNT_MISSING_PASSWORD)self._prepare(MailLocation(self._dbh,mbfmt=cfg_dget('mailbox.format'),directory=cfg_dget('mailbox.root')))dbc=self._dbh.cursor()dbc.execute('INSERT INTO users (local_part, passwd, uid, gid, mid, ''qid, ssid, tid, note) ''VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)',(self._addr.localpart,pwhash(self._passwd,user=self._addr),self._uid,self._domain.gid,self._mail.mid,self._qlimit.qidifself._qlimitelseNone,self._services.ssidifself._serviceselseNone,self._transport.tidifself._transportelseNone,self._note))self._dbh.commit()dbc.close()self._new=Falsedefmodify(self,field,value):"""Update the Account's *field* to the new *value*. Possible values for *field* are: 'name', 'password', 'note'. Arguments: `field` : basestring The attribute name: 'name', 'password' or 'note' `value` : basestring The new value of the attribute. """iffieldnotin('name','password','note'):raiseAErr(_(u"Unknown field: '%s'")%field,INVALID_ARGUMENT)self._chk_state()dbc=self._dbh.cursor()iffield=='password':dbc.execute('UPDATE users SET passwd = %s WHERE uid = %s',(pwhash(value,user=self._addr),self._uid))else:dbc.execute('UPDATE users SET %s = %%s WHERE uid = %%s'%field,(value,self._uid))ifdbc.rowcount>0:self._dbh.commit()dbc.close()defupdate_quotalimit(self,quotalimit):"""Update the user's quota limit. Arguments: `quotalimit` : VirtualMailManager.quotalimit.QuotaLimit the new quota limit of the domain. """ifcfg_dget('misc.dovecot_version')<0x10102f00:raiseVMMError(_(u'PostgreSQL-based dictionary quota requires 'u'Dovecot >= v1.1.2.'),VMM_ERROR)self._chk_state()ifquotalimit==self._qlimit:returnself._qlimit=quotalimitifquotalimitisnotNone:assertisinstance(quotalimit,QuotaLimit)quotalimit=quotalimit.qidself._update_tables('qid',quotalimit)defupdate_serviceset(self,serviceset):"""Assign a different set of services to the Account. Argument: `serviceset` : VirtualMailManager.serviceset.ServiceSet the new service set. """self._chk_state()ifserviceset==self._services:returnself._services=servicesetifservicesetisnotNone:assertisinstance(serviceset,ServiceSet)serviceset=serviceset.ssidself._update_tables('ssid',serviceset)defupdate_transport(self,transport):"""Sets a new transport for the Account. Arguments: `transport` : VirtualMailManager.transport.Transport the new transport """self._chk_state()iftransport==self._transport:returnself._transport=transportiftransportisnotNone:assertisinstance(transport,Transport)iftransport.transport.lower()in('virtual','virtual:')and \notself._mail.postfix:raiseAErr(_(u"Invalid transport '%(transport)s' for mailbox "u"format '%(mbfmt)s'.")%{'transport':transport,'mbfmt':self._mail.mbformat},INVALID_MAIL_LOCATION)transport=transport.tidself._update_tables('tid',transport)def_get_info_transport(self):ifself._transport:returnself._transport.transportreturnformat_domain_default(self._domain.transport.transport)def_get_info_serviceset(self):ifself._services:services=self._services.servicesfmt=lambdas:selse:services=self._domain.serviceset.servicesfmt=format_domain_defaultret={}forservice,stateinservices.iteritems():# TP: A service (e.g. pop3 or imap) may be enabled/usable or# disabled/unusable for a user.ret[service]=fmt((_('disabled'),_('enabled'))[state])returnretdefget_info(self):"""Returns a dict with some information about the Account. The keys of the dict are: 'address', 'gid', 'home', 'imap' 'mail_location', 'name', 'pop3', 'sieve', 'smtp', transport', 'uid', 'uq_bytes', 'uq_messages', 'ql_bytes', 'ql_messages', and 'ql_domaindefault'. """self._chk_state()dbc=self._dbh.cursor()dbc.execute('SELECT name, CASE WHEN bytes IS NULL THEN 0 ELSE bytes ''END, CASE WHEN messages IS NULL THEN 0 ELSE messages END ''FROM users LEFT JOIN userquota USING (uid) WHERE ''users.uid = %s',(self._uid,))info=dbc.fetchone()dbc.close()ifinfo:info=dict(zip(('name','uq_bytes','uq_messages'),info))info.update(self._get_info_serviceset())info['address']=self._addrinfo['gid']=self._domain.gidinfo['home']='%s/%s'%(self._domain.directory,self._uid)info['mail_location']=self._mail.mail_locationifself._qlimit:info['ql_bytes']=self._qlimit.bytesinfo['ql_messages']=self._qlimit.messagesinfo['ql_domaindefault']=Falseelse:info['ql_bytes']=self._domain.quotalimit.bytesinfo['ql_messages']=self._domain.quotalimit.messagesinfo['ql_domaindefault']=Trueinfo['transport']=self._get_info_transport()info['note']=self._noteinfo['uid']=self._uidreturninfo# nearly impossible‽raiseAErr(_(u"Could not fetch information for account: '%s'")%self._addr,NO_SUCH_ACCOUNT)defget_aliases(self):"""Return a list with all alias e-mail addresses, whose destination is the address of the Account."""self._chk_state()dbc=self._dbh.cursor()dbc.execute("SELECT address ||'@'|| domainname FROM alias, ""domain_name WHERE destination = %s AND domain_name.gid = ""alias.gid AND domain_name.is_primary ORDER BY address",(str(self._addr),))addresses=dbc.fetchall()dbc.close()aliases=[]ifaddresses:aliases=[alias[0]foraliasinaddresses]returnaliasesdefdelete(self,force=False):"""Delete the Account from the database. Argument: `force` : bool if *force* is `True`, all aliases, which points to the Account, will be also deleted. If there are aliases and *force* is `False`, an AccountError will be raised. """ifnotisinstance(force,bool):raiseTypeError('force must be a bool')self._chk_state()dbc=self._dbh.cursor()ifforce:dbc.execute('DELETE FROM users WHERE uid = %s',(self._uid),)# delete also all aliases where the destination address is the same# as for this account.dbc.execute("DELETE FROM alias WHERE destination = %s",(str(self._addr),))self._dbh.commit()else:# check first for aliasesa_count=self._count_aliases()ifa_count>0:dbc.close()raiseAErr(_(u"There are %(count)d aliases with the "u"destination address '%(address)s'.")%{'count':a_count,'address':self._addr},ALIAS_PRESENT)dbc.execute('DELETE FROM users WHERE uid = %s',(self._uid,))self._dbh.commit()dbc.close()self._new=Trueself._uid=0self._addr=self._dbh=self._domain=self._passwd=Noneself._mail=self._qlimit=self._services=self._transport=Nonedefget_account_by_uid(uid,dbh):"""Search an Account by its UID. This function returns a dict (keys: 'address', 'gid' and 'uid'), if an Account with the given *uid* exists. Argument: `uid` : long The Account unique ID. `dbh` : pyPgSQL.PgSQL.Connection a database connection for the database access. """try:uid=long(uid)exceptValueError:raiseAErr(_(u'UID must be an int/long.'),INVALID_ARGUMENT)ifuid<1:raiseAErr(_(u'UID must be greater than 0.'),INVALID_ARGUMENT)dbc=dbh.cursor()dbc.execute("SELECT local_part||'@'|| domain_name.domainname AS address, ""uid, users.gid, note FROM users LEFT JOIN domain_name ON ""(domain_name.gid = users.gid AND is_primary) WHERE uid = %s",(uid,))info=dbc.fetchone()dbc.close()ifnotinfo:raiseAErr(_(u"There is no account with the UID: '%d'")%uid,NO_SUCH_ACCOUNT)info=dict(zip(('address','uid','gid','note'),info))returninfodel_,cfg_dget