# -*- coding: UTF-8 -*-# Copyright 2007-2008 VEB IT# See COPYING for distribution information.# $Id$"""The main class for vmm."""fromconstants.VERSIONimportVERSION__author__='Pascal Volk <p.volk@veb-it.de>'__version__=VERSION__revision__='rev '+'$Rev$'.split()[1]__date__='$Date$'.split()[1]importosimportreimportsysfromencodings.idnaimportToASCII,ToUnicodefromgetpassimportgetpassfromshutilimportrmtreefromsubprocessimportPopen,PIPEfrompyPgSQLimportPgSQL# python-pgsql - http://pypgsql.sourceforge.netimportconstants.ERRORasERRfromAccountimportAccountfromAliasimportAliasfromAliasDomainimportAliasDomainfromConfigimportConfigasCfgfromDomainimportDomainfromEmailAddressimportEmailAddressfromExceptionsimport*fromRelocatedimportRelocatedSALTCHARS='./0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'RE_ASCII_CHARS="""^[\x20-\x7E]*$"""RE_DOMAIN="""^(?:[a-z0-9-]{1,63}\.){1,}[a-z]{2,6}$"""RE_DOMAIN_SRCH="""^[a-z0-9-\.]+$"""RE_LOCALPART="""[^\w!#$%&'\*\+-\.\/=?^_`{\|}~]"""RE_MBOX_NAMES="""^[\x20-\x25\x27-\x7E]*$"""classVirtualMailManager:"""The main class for vmm"""def__init__(self):"""Creates a new VirtualMailManager instance. Throws a VMMNotRootException if your uid is greater 0. """self.__cfgFileName='/usr/local/etc/vmm.cfg'self.__permWarnMsg=_(u"fix permissions for »%(cfgFileName)s«\n\`chmod 0600 %(cfgFileName)s` would be great.")%{'cfgFileName':self.__cfgFileName}self.__warnings=[]self.__Cfg=Noneself.__dbh=Noneifos.geteuid():raiseVMMNotRootException(_(u"You are not root.\n\tGood bye!\n"),ERR.CONF_NOPERM)ifself.__chkCfgFile():self.__Cfg=Cfg(self.__cfgFileName)self.__Cfg.load()self.__Cfg.check()self.__cfgSections=self.__Cfg.getsections()self.__scheme=self.__Cfg.get('misc','passwdscheme')ifnotsys.argv[1]in['cf','configure']:self.__chkenv()def__chkCfgFile(self):"""Checks the configuration file, returns bool"""ifnotos.path.isfile(self.__cfgFileName):raiseVMMException(_(u"The file »%s« does not exists.")%self.__cfgFileName,ERR.CONF_NOFILE)fstat=os.stat(self.__cfgFileName)fmode=int(oct(fstat.st_mode&0777))iffmode%100andfstat.st_uid!=fstat.st_gid \orfmode%10andfstat.st_uid==fstat.st_gid:raiseVMMPermException(self.__permWarnMsg,ERR.CONF_ERROR)else:returnTruedef__chkenv(self):""""""ifnotos.path.exists(self.__Cfg.get('domdir','base')):old_umask=os.umask(0006)os.makedirs(self.__Cfg.get('domdir','base'),0771)os.chown(self.__Cfg.get('domdir','base'),0,self.__Cfg.getint('misc','gid_mail'))os.umask(old_umask)elifnotos.path.isdir(self.__Cfg.get('domdir','base')):raiseVMMException(_(u'»%s« is not a directory.\n\(vmm.cfg: section "domdir", option "base")')%self.__Cfg.get('domdir','base'),ERR.NO_SUCH_DIRECTORY)foropt,valinself.__Cfg.items('bin'):ifnotos.path.exists(val):raiseVMMException(_(u'»%(binary)s« doesn\'t exists.\n\(vmm.cfg: section "bin", option "%(option)s")')%{'binary':val,'option':opt},ERR.NO_SUCH_BINARY)elifnotos.access(val,os.X_OK):raiseVMMException(_(u'»%(binary)s« is not executable.\n\(vmm.cfg: section "bin", option "%(option)s")')%{'binary':val,'option':opt},ERR.NOT_EXECUTABLE)def__dbConnect(self):"""Creates a pyPgSQL.PgSQL.connection instance."""try:self.__dbh=PgSQL.connect(database=self.__Cfg.get('database','name'),user=self.__Cfg.get('database','user'),host=self.__Cfg.get('database','host'),password=self.__Cfg.get('database','pass'),client_encoding='utf8',unicode_results=True)dbc=self.__dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()exceptPgSQL.libpq.DatabaseError,e:raiseVMMException(str(e),ERR.DATABASE_ERROR)defidn2ascii(domainname):"""Converts an idn domainname in punycode. Keyword arguments: domainname -- the domainname to convert (str) """tmp=[]forlabelindomainname.split('.'):iflen(label)==0:continuetmp.append(ToASCII(label))return'.'.join(tmp)idn2ascii=staticmethod(idn2ascii)deface2idna(domainname):"""Convertis a domainname from ACE according to IDNA Keyword arguments: domainname -- the domainname to convert (str) """tmp=[]forlabelindomainname.split('.'):iflen(label)==0:continuetmp.append(ToUnicode(label))return'.'.join(tmp)ace2idna=staticmethod(ace2idna)defchkDomainname(domainname):"""Validates the domain name of an e-mail address. Keyword arguments: domainname -- the domain name that should be validated """re.compile(RE_ASCII_CHARS)ifnotre.match(RE_ASCII_CHARS,domainname):domainname=VirtualMailManager.idn2ascii(domainname)iflen(domainname)>255:raiseVMMException(_(u'The domain name is too long.'),ERR.DOMAIN_TOO_LONG)re.compile(RE_DOMAIN)ifnotre.match(RE_DOMAIN,domainname):raiseVMMException(_(u'The domain name »%s« is invalid.')%\domainname,ERR.DOMAIN_INVALID)returndomainnamechkDomainname=staticmethod(chkDomainname)def_exists(dbh,query):dbc=dbh.cursor()dbc.execute(query)gid=dbc.fetchone()dbc.close()ifgidisNone:returnFalseelse:returnTrue_exists=staticmethod(_exists)defaccountExists(dbh,address):sql="SELECT gid FROM users WHERE gid = (SELECT gid FROM domain_name\ WHERE domainname = '%(_domainname)s') AND local_part = '%(_localpart)s'"%\address.__dict__returnVirtualMailManager._exists(dbh,sql)accountExists=staticmethod(accountExists)defaliasExists(dbh,address):sql="SELECT DISTINCT gid FROM alias WHERE gid = (SELECT gid FROM\ domain_name WHERE domainname = '%(_domainname)s') AND address =\ '%(_localpart)s'"%address.__dict__returnVirtualMailManager._exists(dbh,sql)aliasExists=staticmethod(aliasExists)defrelocatedExists(dbh,address):sql="SELECT gid FROM relocated WHERE gid = (SELECT gid FROM\ domain_name WHERE domainname = '%(_domainname)s') AND address =\ '%(_localpart)s'"%address.__dict__returnVirtualMailManager._exists(dbh,sql)relocatedExists=staticmethod(relocatedExists)def__getAccount(self,address,password=None):self.__dbConnect()address=EmailAddress(address)ifnotpasswordisNone:password=self.__pwhash(password)returnAccount(self.__dbh,address,password)def_readpass(self):mismatched=Truewhilemismatched:clear0=getpass(prompt=_('Enter new password: '))clear1=getpass(prompt=_('Retype new password: '))ifclear0!=clear1:sys.stderr.write('%s\n'%_('Sorry, passwords do not match'))continueiflen(clear0)<1orlen(clear1)<1:sys.stderr.write('%s\n'%_('Sorry, empty passwords are not permitted'))continuemismatched=Falsereturnclear0def__getAlias(self,address,destination=None):self.__dbConnect()address=EmailAddress(address)ifdestinationisnotNone:destination=EmailAddress(destination)returnAlias(self.__dbh,address,destination)def__getDomain(self,domainname,transport=None):iftransportisNone:transport=self.__Cfg.get('misc','transport')self.__dbConnect()returnDomain(self.__dbh,domainname,self.__Cfg.get('domdir','base'),transport)def__getDiskUsage(self,directory):"""Estimate file space usage for the given directory. Keyword arguments: directory -- the directory to summarize recursively disk usage for """ifself.__isdir(directory):returnPopen([self.__Cfg.get('bin','du'),"-hs",directory],stdout=PIPE).communicate()[0].split('\t')[0]else:return0def__isdir(self,directory):isdir=os.path.isdir(directory)ifnotisdir:self.__warnings.append(_('No such directory: %s')%directory)returnisdirdef__makedir(self,directory,mode=None,uid=None,gid=None):ifmodeisNone:mode=self.__Cfg.getint('maildir','mode')ifuidisNone:uid=0ifgidisNone:gid=0os.makedirs(directory,mode)os.chown(directory,uid,gid)def__domDirMake(self,domdir,gid):os.umask(0006)oldpwd=os.getcwd()basedir=self.__Cfg.get('domdir','base')domdirdirs=domdir.replace(basedir+'/','').split('/')os.chdir(basedir)ifnotos.path.isdir(domdirdirs[0]):self.__makedir(domdirdirs[0],489,0,self.__Cfg.getint('misc','gid_mail'))os.chdir(domdirdirs[0])os.umask(0007)self.__makedir(domdirdirs[1],self.__Cfg.getint('domdir','mode'),0,gid)os.chdir(oldpwd)def__subscribeFL(self,folderlist,uid,gid):fname=self.__Cfg.get('maildir','name')+'/subscriptions'sf=file(fname,'w')forfinfolderlist:sf.write(f+'\n')sf.flush()sf.close()os.chown(fname,uid,gid)os.chmod(fname,384)def__mailDirMake(self,domdir,uid,gid):"""Creates maildirs and maildir subfolders. Keyword arguments: domdir -- the path to the domain directory uid -- user id from the account gid -- group id from the account """os.umask(0007)oldpwd=os.getcwd()os.chdir(domdir)re.compile(RE_MBOX_NAMES)maildir=self.__Cfg.get('maildir','name')folders=[maildir]forfolderinself.__Cfg.get('maildir','folders').split(':'):folder=folder.strip()iflen(folder)andnotfolder.count('..')\andre.match(RE_MBOX_NAMES,folder):folders.append('%s/.%s'%(maildir,folder))subdirs=['cur','new','tmp']mode=self.__Cfg.getint('maildir','mode')self.__makedir('%s'%uid,mode,uid,gid)os.chdir('%s'%uid)forfolderinfolders:self.__makedir(folder,mode,uid,gid)forsubdirinsubdirs:self.__makedir(folder+'/'+subdir,mode,uid,gid)self.__subscribeFL([f.replace(maildir+'/.','')forfinfolders[1:]],uid,gid)os.chdir(oldpwd)def__userDirDelete(self,domdir,uid,gid):ifuid>0andgid>0:userdir='%s'%uidifuserdir.count('..')ordomdir.count('..'):raiseVMMException(_(u'Found ".." in home directory path.'),ERR.FOUND_DOTS_IN_PATH)ifos.path.isdir(domdir):os.chdir(domdir)ifos.path.isdir(userdir):mdstat=os.stat(userdir)if(mdstat.st_uid,mdstat.st_gid)!=(uid,gid):raiseVMMException(_(u'Owner/group mismatch in home directory detected.'),ERR.MAILDIR_PERM_MISMATCH)rmtree(userdir,ignore_errors=True)else:raiseVMMException(_(u"No such directory: %s")%domdir+'/'+userdir,ERR.NO_SUCH_DIRECTORY)def__domDirDelete(self,domdir,gid):ifgid>0:ifnotself.__isdir(domdir):returnbasedir='%s'%self.__Cfg.get('domdir','base')domdirdirs=domdir.replace(basedir+'/','').split('/')ifbasedir.count('..')ordomdir.count('..'):raiseVMMException(_(u'FATAL: ".." in domain directory path detected.'),ERR.FOUND_DOTS_IN_PATH)ifos.path.isdir('%s/%s'%(basedir,domdirdirs[0])):os.chdir('%s/%s'%(basedir,domdirdirs[0]))ifos.lstat(domdirdirs[1]).st_gid!=gid:raiseVMMException(_(u'FATAL: group mismatch in domain directory detected'),ERR.DOMAINDIR_GROUP_MISMATCH)rmtree(domdirdirs[1],ignore_errors=True)def__getSalt(self):fromrandomimportchoicesalt=Noneifself.__scheme=='CRYPT':salt='%s%s'%(choice(SALTCHARS),choice(SALTCHARS))elifself.__schemein['MD5','MD5-CRYPT']:salt='$1$'foriinrange(8):salt+=choice(SALTCHARS)salt+='$'returnsaltdef__pwCrypt(self,password):# for: CRYPT, MD5 and MD5-CRYPTfromcryptimportcryptreturncrypt(password,self.__getSalt())def__pwSHA1(self,password):# for: SHA/SHA1importshafrombase64importstandard_b64encodesha1=sha.new(password)returnstandard_b64encode(sha1.digest())def__pwMD5(self,password,emailaddress=None):importmd5_md5=md5.new(password)ifself.__scheme=='LDAP-MD5':frombase64importstandard_b64encodereturnstandard_b64encode(_md5.digest())elifself.__scheme=='PLAIN-MD5':return_md5.hexdigest()elifself.__scheme=='DIGEST-MD5'andemailaddressisnotNone:# use an empty realm - works better with usenames like user@dom_md5=md5.new('%s::%s'%(emailaddress,password))return_md5.hexdigest()def__pwMD4(self,password):# for: PLAIN-MD4fromCrypto.HashimportMD4_md4=MD4.new(password)return_md4.hexdigest()def__pwhash(self,password,scheme=None,user=None):ifschemeisnotNone:self.__scheme=schemeifself.__schemein['CRYPT','MD5','MD5-CRYPT']:return'{%s}%s'%(self.__scheme,self.__pwCrypt(password))elifself.__schemein['SHA','SHA1']:return'{%s}%s'%(self.__scheme,self.__pwSHA1(password))elifself.__schemein['PLAIN-MD5','LDAP-MD5','DIGEST-MD5']:return'{%s}%s'%(self.__scheme,self.__pwMD5(password,user))elifself.__scheme=='MD4':return'{%s}%s'%(self.__scheme,self.__pwMD4(password))elifself.__schemein['SMD5','SSHA','CRAM-MD5','HMAC-MD5','LANMAN','NTLM','RPA']:returnPopen([self.__Cfg.get('bin','dovecotpw'),'-s',self.__scheme,'-p',password],stdout=PIPE).communicate()[0][:-1]else:return'{%s}%s'%(self.__scheme,password)defhasWarnings(self):"""Checks if warnings are present, returns bool."""returnbool(len(self.__warnings))defgetWarnings(self):"""Returns a list with all available warnings."""returnself.__warningsdefcfgGetBoolean(self,section,option):returnself.__Cfg.getboolean(section,option)defcfgGetInt(self,section,option):returnself.__Cfg.getint(section,option)defcfgGetString(self,section,option):returnself.__Cfg.get(section,option)defsetupIsDone(self):"""Checks if vmm is configured, returns bool"""try:returnself.__Cfg.getboolean('config','done')exceptValueError,e:raiseVMMConfigException(_(u"""Configurtion error: "%s"(in section "connfig", option "done") see also: vmm.cfg(5)\n""")%str(e),ERR.CONF_ERROR)defconfigure(self,section=None):"""Starts interactive configuration. Configures in interactive mode options in the given section. If no section is given (default) all options from all sections will be prompted. Keyword arguments: section -- the section to configure (default None): 'database', 'maildir', 'bin' or 'misc' """ifsectionisNone:self.__Cfg.configure(self.__cfgSections)elifsectioninself.__cfgSections:self.__Cfg.configure([section])else:raiseVMMException(_(u"Invalid section: '%s'")%section,ERR.INVALID_SECTION)defdomainAdd(self,domainname,transport=None):dom=self.__getDomain(domainname,transport)dom.save()self.__domDirMake(dom.getDir(),dom.getID())defdomainTransport(self,domainname,transport,force=None):ifforceisnotNoneandforce!='force':raiseVMMDomainException(_(u"Invalid argument: '%s'")%force,ERR.INVALID_OPTION)dom=self.__getDomain(domainname,None)ifforceisNone:dom.updateTransport(transport)else:dom.updateTransport(transport,force=True)defdomainDelete(self,domainname,force=None):ifnotforceisNoneandforcenotin['deluser','delalias','delall']:raiseVMMDomainException(_(u"Invalid argument: »%s«")%force,ERR.INVALID_OPTION)dom=self.__getDomain(domainname)gid=dom.getID()domdir=dom.getDir()ifself.__Cfg.getboolean('misc','forcedel')orforce=='delall':dom.delete(True,True)elifforce=='deluser':dom.delete(delUser=True)elifforce=='delalias':dom.delete(delAlias=True)else:dom.delete()ifself.__Cfg.getboolean('domdir','delete'):self.__domDirDelete(domdir,gid)defdomainInfo(self,domainname,details=None):ifdetailsnotin[None,'accounts','aliasdomains','aliases','full','detailed']:raiseVMMDomainException(_(u'Invalid argument: »%s«')%details,ERR.INVALID_OPTION)ifdetails=='detailed':details='full'warning=_(u"""\The keyword »detailed« is deprecated and will be removed in a future release. Please use the keyword »full« to get full details.""")self.__warnings.append(warning)dom=self.__getDomain(domainname)dominfo=dom.getInfo()ifdominfo['domainname'].startswith('xn--'):dominfo['domainname']+=' (%s)'\%VirtualMailManager.ace2idna(dominfo['domainname'])ifdominfo['aliases']isNone:dominfo['aliases']=0ifdetailsisNone:returndominfoelifdetails=='accounts':return(dominfo,dom.getAccounts())elifdetails=='aliasdomains':return(dominfo,dom.getAliaseNames())elifdetails=='aliases':return(dominfo,dom.getAliases())else:return(dominfo,dom.getAliaseNames(),dom.getAccounts(),dom.getAliases())defaliasDomainAdd(self,aliasname,domainname):"""Adds an alias domain to the domain. Keyword arguments: aliasname -- the name of the alias domain (str) domainname -- name of the target domain (str) """dom=self.__getDomain(domainname)aliasDom=AliasDomain(self.__dbh,aliasname,dom)aliasDom.save()defaliasDomainInfo(self,aliasname):self.__dbConnect()aliasDom=AliasDomain(self.__dbh,aliasname,None)returnaliasDom.info()defaliasDomainSwitch(self,aliasname,domainname):"""Modifies the target domain of an existing alias domain. Keyword arguments: aliasname -- the name of the alias domain (str) domainname -- name of the new target domain (str) """dom=self.__getDomain(domainname)aliasDom=AliasDomain(self.__dbh,aliasname,dom)aliasDom.switch()defaliasDomainDelete(self,aliasname):"""Deletes the specified alias domain. Keyword arguments: aliasname -- the name of the alias domain (str) """self.__dbConnect()aliasDom=AliasDomain(self.__dbh,aliasname,None)aliasDom.delete()defdomainList(self,pattern=None):fromDomainimportsearchlike=FalseifpatternisnotNone:ifpattern.startswith('%')orpattern.endswith('%'):like=Trueifpattern.startswith('%')andpattern.endswith('%'):domain=pattern[1:-1]elifpattern.startswith('%'):domain=pattern[1:]elifpattern.endswith('%'):domain=pattern[:-1]re.compile(RE_DOMAIN_SRCH)ifnotre.match(RE_DOMAIN_SRCH,domain):raiseVMMException(_(u"The pattern »%s« contains invalid characters.")%pattern,ERR.DOMAIN_INVALID)self.__dbConnect()returnsearch(self.__dbh,pattern=pattern,like=like)defuserAdd(self,emailaddress,password):acc=self.__getAccount(emailaddress,password)ifpasswordisNone:password=self._readpass()acc.setPassword(self.__pwhash(password))acc.save(self.__Cfg.get('maildir','name'),self.__Cfg.getboolean('services','smtp'),self.__Cfg.getboolean('services','pop3'),self.__Cfg.getboolean('services','imap'),self.__Cfg.getboolean('services','managesieve'))self.__mailDirMake(acc.getDir('domain'),acc.getUID(),acc.getGID())defaliasAdd(self,aliasaddress,targetaddress):alias=self.__getAlias(aliasaddress,targetaddress)alias.save()defuserDelete(self,emailaddress,force=None):ifforcenotin[None,'delalias']:raiseVMMException(_(u"Invalid argument: »%s«")%force,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)uid=acc.getUID()gid=acc.getGID()acc.delete(force)ifself.__Cfg.getboolean('maildir','delete'):try:self.__userDirDelete(acc.getDir('domain'),uid,gid)exceptVMMException,e:ife.code()in[ERR.FOUND_DOTS_IN_PATH,ERR.MAILDIR_PERM_MISMATCH,ERR.NO_SUCH_DIRECTORY]:warning=_(u"""\The account has been successfully deleted from the database. But an error occurred while deleting the following directory: »%(directory)s« Reason: %(raeson)s""")%{'directory':acc.getDir('home'),'raeson':e.msg()}self.__warnings.append(warning)else:raiseedefaliasInfo(self,aliasaddress):alias=self.__getAlias(aliasaddress)returnalias.getInfo()defaliasDelete(self,aliasaddress,targetaddress=None):alias=self.__getAlias(aliasaddress,targetaddress)alias.delete()defuserInfo(self,emailaddress,diskusage=False):acc=self.__getAccount(emailaddress)info=acc.getInfo()ifself.__Cfg.getboolean('maildir','diskusage')ordiskusage:info['disk usage']=self.__getDiskUsage('%(maildir)s'%info)returninfodefuserByID(self,uid):fromAccountimportgetAccountByIDself.__dbConnect()returngetAccountByID(uid,self.__dbh)defuserPassword(self,emailaddress,password):acc=self.__getAccount(emailaddress)ifacc.getUID()==0:raiseVMMException(_(u"Account doesn't exists"),ERR.NO_SUCH_ACCOUNT)ifpasswordisNone:password=self._readpass()acc.modify('password',self.__pwhash(password,user=emailaddress))defuserName(self,emailaddress,name):acc=self.__getAccount(emailaddress)acc.modify('name',name)defuserTransport(self,emailaddress,transport):acc=self.__getAccount(emailaddress)acc.modify('transport',transport)defuserDisable(self,emailaddress,service=None):acc=self.__getAccount(emailaddress)acc.disable(service)defuserEnable(self,emailaddress,service=None):acc=self.__getAccount(emailaddress)acc.enable(service)def__del__(self):ifnotself.__dbhisNoneandself.__dbh._isOpen:self.__dbh.close()