* 'create_optional_types_and_functions.pgsql'
- Added to repository
- To be continued …
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Copyright 2007-2008 VEB IT
# See COPYING for distribution information.
# $Id$
"""The main class for vmm."""
from constants.VERSION import VERSION
__author__ = 'Pascal Volk <p.volk@veb-it.de>'
__version__ = VERSION
__revision__ = 'rev '+'$Rev$'.split()[1]
__date__ = '$Date$'.split()[1]
import os
import re
import sys
from encodings.idna import ToASCII, ToUnicode
from getpass import getpass
from shutil import rmtree
from subprocess import Popen, PIPE
from pyPgSQL import PgSQL # python-pgsql - http://pypgsql.sourceforge.net
from Exceptions import *
import constants.ERROR as ERR
from Config import Config as Cfg
from Account import Account
from Alias import Alias
from Domain import Domain
from AliasDomain import AliasDomain
SALTCHARS = './0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
RE_ASCII_CHARS = """^[\x20-\x7E]*$"""
RE_DOMAIN = """^(?:[a-z0-9-]{1,63}\.){1,}[a-z]{2,6}$"""
RE_DOMAIN_SRCH = """^[a-z0-9-\.]+$"""
RE_LOCALPART = """[^\w!#$%&'\*\+-\.\/=?^_`{\|}~]"""
RE_MBOX_NAMES = """^[\x20-\x25\x27-\x7E]*$"""
class VirtualMailManager:
"""The main class for vmm"""
def __init__(self):
"""Creates a new VirtualMailManager instance.
Throws a VMMNotRootException if your uid is greater 0.
"""
self.__cfgFileName = '/usr/local/etc/vmm.cfg'
self.__permWarnMsg = _(u"fix permissions for »%(cfgFileName)s«\n\
`chmod 0600 %(cfgFileName)s` would be great.") % {'cfgFileName':
self.__cfgFileName}
self.__warnings = []
self.__Cfg = None
self.__dbh = None
if os.geteuid():
raise VMMNotRootException(_(u"You are not root.\n\tGood bye!\n"),
ERR.CONF_NOPERM)
if self.__chkCfgFile():
self.__Cfg = Cfg(self.__cfgFileName)
self.__Cfg.load()
self.__Cfg.check()
self.__cfgSections = self.__Cfg.getsections()
self.__scheme = self.__Cfg.get('misc', 'passwdscheme')
if not sys.argv[1] in ['cf', 'configure']:
self.__chkenv()
def __chkCfgFile(self):
"""Checks the configuration file, returns bool"""
if not os.path.isfile(self.__cfgFileName):
raise VMMException(_(u"The file »%s« does not exists.") %
self.__cfgFileName, ERR.CONF_NOFILE)
fstat = os.stat(self.__cfgFileName)
fmode = int(oct(fstat.st_mode & 0777))
if fmode % 100 and fstat.st_uid != fstat.st_gid \
or fmode % 10 and fstat.st_uid == fstat.st_gid:
raise VMMPermException(self.__permWarnMsg, ERR.CONF_ERROR)
else:
return True
def __chkenv(self):
""""""
if not os.path.exists(self.__Cfg.get('domdir', 'base')):
old_umask = os.umask(0006)
os.makedirs(self.__Cfg.get('domdir', 'base'), 0771)
os.chown(self.__Cfg.get('domdir', 'base'), 0,
self.__Cfg.getint('misc', 'gid_mail'))
os.umask(old_umask)
elif not os.path.isdir(self.__Cfg.get('domdir', 'base')):
raise VMMException(_(u'»%s« is not a directory.\n\
(vmm.cfg: section "domdir", option "base")') %
self.__Cfg.get('domdir', 'base'), ERR.NO_SUCH_DIRECTORY)
for opt, val in self.__Cfg.items('bin'):
if not os.path.exists(val):
raise VMMException(_(u'»%(binary)s« doesn\'t exists.\n\
(vmm.cfg: section "bin", option "%(option)s")') %{'binary': val,'option': opt},
ERR.NO_SUCH_BINARY)
elif not os.access(val, os.X_OK):
raise VMMException(_(u'»%(binary)s« is not executable.\n\
(vmm.cfg: section "bin", option "%(option)s")') %{'binary': val,'option': opt},
ERR.NOT_EXECUTABLE)
def __dbConnect(self):
"""Creates a pyPgSQL.PgSQL.connection instance."""
try:
self.__dbh = PgSQL.connect(
database=self.__Cfg.get('database', 'name'),
user=self.__Cfg.get('database', 'user'),
host=self.__Cfg.get('database', 'host'),
password=self.__Cfg.get('database', 'pass'),
client_encoding='utf8', unicode_results=True)
dbc = self.__dbh.cursor()
dbc.execute("SET NAMES 'UTF8'")
dbc.close()
except PgSQL.libpq.DatabaseError, e:
raise VMMException(str(e), ERR.DATABASE_ERROR)
def chkLocalpart(localpart):
"""Validates the local part of an e-mail address.
Keyword arguments:
localpart -- the e-mail address that should be validated (str)
"""
if len(localpart) < 1:
raise VMMException(_(u'No localpart specified.'),
ERR.LOCALPART_INVALID)
if len(localpart) > 64:
raise VMMException(_(u'The local part »%s« is too long') %
localpart, ERR.LOCALPART_TOO_LONG)
ic = re.compile(RE_LOCALPART).findall(localpart)
if len(ic):
ichrs = ''
for c in set(ic):
ichrs += u"»%s« " % c
raise VMMException(_(u"The local part »%(lpart)s« contains invalid\
characters: %(ichrs)s") % {'lpart': localpart, 'ichrs': ichrs},
ERR.LOCALPART_INVALID)
return localpart
chkLocalpart = staticmethod(chkLocalpart)
def idn2ascii(domainname):
"""Converts an idn domainname in punycode.
Keyword arguments:
domainname -- the domainname to convert (str)
"""
tmp = []
for label in domainname.split('.'):
if len(label) == 0:
continue
tmp.append(ToASCII(label))
return '.'.join(tmp)
idn2ascii = staticmethod(idn2ascii)
def ace2idna(domainname):
"""Convertis a domainname from ACE according to IDNA
Keyword arguments:
domainname -- the domainname to convert (str)
"""
tmp = []
for label in domainname.split('.'):
if len(label) == 0:
continue
tmp.append(ToUnicode(label))
return '.'.join(tmp)
ace2idna = staticmethod(ace2idna)
def chkDomainname(domainname):
"""Validates the domain name of an e-mail address.
Keyword arguments:
domainname -- the domain name that should be validated
"""
re.compile(RE_ASCII_CHARS)
if not re.match(RE_ASCII_CHARS, domainname):
domainname = VirtualMailManager.idn2ascii(domainname)
if len(domainname) > 255:
raise VMMException(_(u'The domain name is too long.'),
ERR.DOMAIN_TOO_LONG)
re.compile(RE_DOMAIN)
if not re.match(RE_DOMAIN, domainname):
raise VMMException(_(u'The domain name is invalid.'),
ERR.DOMAIN_INVALID)
return domainname
chkDomainname = staticmethod(chkDomainname)
def chkEmailAddress(address):
try:
localpart, domain = address.split('@')
except ValueError:
raise VMMException(_(u"Missing '@' sign in e-mail address »%s«.") %
address, ERR.INVALID_ADDRESS)
except AttributeError:
raise VMMException(_(u"»%s« looks not like an e-mail address.") %
address, ERR.INVALID_ADDRESS)
if len(domain) > 0:
domain = VirtualMailManager.chkDomainname(domain)
else:
raise VMMException(_(u"Missing domain name after »%s@«.") %
localpart, ERR.DOMAIN_NO_NAME)
localpart = VirtualMailManager.chkLocalpart(localpart)
return '%s@%s' % (localpart, domain)
chkEmailAddress = staticmethod(chkEmailAddress)
def __getAccount(self, address, password=None):
self.__dbConnect()
if not password is None:
password = self.__pwhash(password)
return Account(self.__dbh, address, password)
def _readpass(self):
clear0 = ''
clear1 = '1'
while clear0 != clear1:
while len(clear0) < 1:
clear0 = getpass(prompt=_('Enter new password: '))
if len(clear0) < 1:
sys.stderr.write('%s\n'
% _('Sorry, empty passwords are not permitted'))
clear1 = getpass(prompt=_('Retype new password: '))
if clear0 != clear1:
clear0 = ''
sys.stderr.write('%s\n' % _('Sorry, passwords do not match'))
return clear0
def __getAlias(self, address, destination=None):
self.__dbConnect()
return Alias(self.__dbh, address, destination)
def __getDomain(self, domainname, transport=None):
if transport is None:
transport = self.__Cfg.get('misc', 'transport')
self.__dbConnect()
return Domain(self.__dbh, domainname,
self.__Cfg.get('domdir', 'base'), transport)
def __getDiskUsage(self, directory):
"""Estimate file space usage for the given directory.
Keyword arguments:
directory -- the directory to summarize recursively disk usage for
"""
if self.__isdir(directory):
return Popen([self.__Cfg.get('bin', 'du'), "-hs", directory],
stdout=PIPE).communicate()[0].split('\t')[0]
else:
return 0
def __isdir(self, directory):
isdir = os.path.isdir(directory)
if not isdir:
self.__warnings.append(_('No such directory: %s') % directory)
return isdir
def __makedir(self, directory, mode=None, uid=None, gid=None):
if mode is None:
mode = self.__Cfg.getint('maildir', 'mode')
if uid is None:
uid = 0
if gid is None:
gid = 0
os.makedirs(directory, mode)
os.chown(directory, uid, gid)
def __domDirMake(self, domdir, gid):
os.umask(0006)
oldpwd = os.getcwd()
basedir = self.__Cfg.get('domdir', 'base')
domdirdirs = domdir.replace(basedir+'/', '').split('/')
os.chdir(basedir)
if not os.path.isdir(domdirdirs[0]):
self.__makedir(domdirdirs[0], 489, 0,
self.__Cfg.getint('misc', 'gid_mail'))
os.chdir(domdirdirs[0])
os.umask(0007)
self.__makedir(domdirdirs[1], self.__Cfg.getint('domdir', 'mode'), 0,
gid)
os.chdir(oldpwd)
def __subscribeFL(self, folderlist, uid, gid):
fname = self.__Cfg.get('maildir', 'name') + '/subscriptions'
sf = file(fname, 'w')
for f in folderlist:
sf.write(f+'\n')
sf.flush()
sf.close()
os.chown(fname, uid, gid)
os.chmod(fname, 384)
def __mailDirMake(self, domdir, uid, gid):
"""Creates maildirs and maildir subfolders.
Keyword arguments:
domdir -- the path to the domain directory
uid -- user id from the account
gid -- group id from the account
"""
os.umask(0007)
oldpwd = os.getcwd()
os.chdir(domdir)
re.compile(RE_MBOX_NAMES)
maildir = self.__Cfg.get('maildir', 'name')
folders = [maildir]
for folder in self.__Cfg.get('maildir', 'folders').split(':'):
folder = folder.strip()
if len(folder) and not folder.count('..')\
and re.match(RE_MBOX_NAMES, folder):
folders.append('%s/.%s' % (maildir, folder))
subdirs = ['cur', 'new', 'tmp']
mode = self.__Cfg.getint('maildir', 'mode')
self.__makedir('%s' % uid, mode, uid, gid)
os.chdir('%s' % uid)
for folder in folders:
self.__makedir(folder, mode, uid, gid)
for subdir in subdirs:
self.__makedir(folder+'/'+subdir, mode, uid, gid)
self.__subscribeFL([f.replace(maildir+'/.', '') for f in folders[1:]],
uid, gid)
os.chdir(oldpwd)
def __userDirDelete(self, domdir, uid, gid):
if uid > 0 and gid > 0:
userdir = '%s' % uid
if userdir.count('..') or domdir.count('..'):
raise VMMException(_(u'Found ".." in home directory path.'),
ERR.FOUND_DOTS_IN_PATH)
if os.path.isdir(domdir):
os.chdir(domdir)
if os.path.isdir(userdir):
mdstat = os.stat(userdir)
if (mdstat.st_uid, mdstat.st_gid) != (uid, gid):
raise VMMException(
_(u'Owner/group mismatch in home directory detected.'),
ERR.MAILDIR_PERM_MISMATCH)
rmtree(userdir, ignore_errors=True)
else:
raise VMMException(_(u"No such directory: %s") %
domdir+'/'+userdir, ERR.NO_SUCH_DIRECTORY)
def __domDirDelete(self, domdir, gid):
if gid > 0:
if not self.__isdir(domdir):
return
basedir = '%s' % self.__Cfg.get('domdir', 'base')
domdirdirs = domdir.replace(basedir+'/', '').split('/')
if basedir.count('..') or domdir.count('..'):
raise VMMException(
_(u'FATAL: ".." in domain directory path detected.'),
ERR.FOUND_DOTS_IN_PATH)
if os.path.isdir('%s/%s' % (basedir, domdirdirs[0])):
os.chdir('%s/%s' % (basedir, domdirdirs[0]))
if os.lstat(domdirdirs[1]).st_gid != gid:
raise VMMException(
_(u'FATAL: group mismatch in domain directory detected'),
ERR.DOMAINDIR_GROUP_MISMATCH)
rmtree(domdirdirs[1], ignore_errors=True)
def __getSalt(self):
from random import choice
salt = None
if self.__scheme == 'CRYPT':
salt = '%s%s' % (choice(SALTCHARS), choice(SALTCHARS))
elif self.__scheme in ['MD5', 'MD5-CRYPT']:
salt = '$1$'
for i in range(8):
salt += choice(SALTCHARS)
salt += '$'
return salt
def __pwCrypt(self, password):
# for: CRYPT, MD5 and MD5-CRYPT
from crypt import crypt
return crypt(password, self.__getSalt())
def __pwSHA1(self, password):
# for: SHA/SHA1
import sha
from base64 import standard_b64encode
sha1 = sha.new(password)
return standard_b64encode(sha1.digest())
def __pwMD5(self, password, emailaddress=None):
import md5
_md5 = md5.new(password)
if self.__scheme == 'LDAP-MD5':
from base64 import standard_b64encode
return standard_b64encode(_md5.digest())
elif self.__scheme == 'PLAIN-MD5':
return _md5.hexdigest()
elif self.__scheme == 'DIGEST-MD5' and emailaddress is not None:
# use an empty realm - works better with usenames like user@dom
_md5 = md5.new('%s::%s' % (emailaddress, password))
return _md5.hexdigest()
def __pwMD4(self, password):
# for: PLAIN-MD4
from Crypto.Hash import MD4
_md4 = MD4.new(password)
return _md4.hexdigest()
def __pwhash(self, password, scheme=None, user=None):
if scheme is not None:
self.__scheme = scheme
if self.__scheme in ['CRYPT', 'MD5', 'MD5-CRYPT']:
return '{%s}%s' % (self.__scheme, self.__pwCrypt(password))
elif self.__scheme in ['SHA', 'SHA1']:
return '{%s}%s' % (self.__scheme, self.__pwSHA1(password))
elif self.__scheme in ['PLAIN-MD5', 'LDAP-MD5', 'DIGEST-MD5']:
return '{%s}%s' % (self.__scheme, self.__pwMD5(password, user))
elif self.__scheme == 'MD4':
return '{%s}%s' % (self.__scheme, self.__pwMD4(password))
elif self.__scheme in ['SMD5', 'SSHA', 'CRAM-MD5', 'HMAC-MD5',
'LANMAN', 'NTLM', 'RPA']:
return Popen([self.__Cfg.get('bin', 'dovecotpw'), '-s',
self.__scheme,'-p',password],stdout=PIPE).communicate()[0][:-1]
else:
return '{%s}%s' % (self.__scheme, password)
def hasWarnings(self):
"""Checks if warnings are present, returns bool."""
return bool(len(self.__warnings))
def getWarnings(self):
"""Returns a list with all available warnings."""
return self.__warnings
def cfgGetBoolean(self, section, option):
return self.__Cfg.getboolean(section, option)
def cfgGetInt(self, section, option):
return self.__Cfg.getint(section, option)
def cfgGetString(self, section, option):
return self.__Cfg.get(section, option)
def setupIsDone(self):
"""Checks if vmm is configured, returns bool"""
try:
return self.__Cfg.getboolean('config', 'done')
except ValueError, e:
raise VMMConfigException(_(u"""Configurtion error: "%s"
(in section "connfig", option "done") see also: vmm.cfg(5)\n""") % str(e),
ERR.CONF_ERROR)
def configure(self, section=None):
"""Starts interactive configuration.
Configures in interactive mode options in the given section.
If no section is given (default) all options from all sections
will be prompted.
Keyword arguments:
section -- the section to configure (default None):
'database', 'maildir', 'bin' or 'misc'
"""
if section is None:
self.__Cfg.configure(self.__cfgSections)
elif section in self.__cfgSections:
self.__Cfg.configure([section])
else:
raise VMMException(_(u"Invalid section: '%s'") % section,
ERR.INVALID_SECTION)
def domainAdd(self, domainname, transport=None):
dom = self.__getDomain(domainname, transport)
dom.save()
self.__domDirMake(dom.getDir(), dom.getID())
def domainTransport(self, domainname, transport, force=None):
if force is not None and force != 'force':
raise VMMDomainException(_(u"Invalid argument: '%s'") % force,
ERR.INVALID_OPTION)
dom = self.__getDomain(domainname, None)
if force is None:
dom.updateTransport(transport)
else:
dom.updateTransport(transport, force=True)
def domainDelete(self, domainname, force=None):
if not force is None and force not in ['deluser','delalias','delall']:
raise VMMDomainException(_(u"Invalid argument: »%s«") % force,
ERR.INVALID_OPTION)
dom = self.__getDomain(domainname)
gid = dom.getID()
domdir = dom.getDir()
if self.__Cfg.getboolean('misc', 'forcedel') or force == 'delall':
dom.delete(True, True)
elif force == 'deluser':
dom.delete(delUser=True)
elif force == 'delalias':
dom.delete(delAlias=True)
else:
dom.delete()
if self.__Cfg.getboolean('domdir', 'delete'):
self.__domDirDelete(domdir, gid)
def domainInfo(self, domainname, details=None):
if details not in [None, 'accounts', 'aliasdomains', 'aliases', 'full',
'detailed']:
raise VMMDomainException(_(u'Invalid argument: »%s«') % details,
ERR.INVALID_OPTION)
if details == 'detailed':
details = 'full'
warning = _(u"""\
The keyword »detailed« is deprecated and will be removed in a future release.
Please use the keyword »full« to get full details.""")
self.__warnings.append(warning)
dom = self.__getDomain(domainname)
dominfo = dom.getInfo()
if dominfo['domainname'].startswith('xn--'):
dominfo['domainname'] += ' (%s)'\
% VirtualMailManager.ace2idna(dominfo['domainname'])
if dominfo['aliases'] is None:
dominfo['aliases'] = 0
if details is None:
return dominfo
elif details == 'accounts':
return (dominfo, dom.getAccounts())
elif details == 'aliasdomains':
return (dominfo, dom.getAliaseNames())
elif details == 'aliases':
return (dominfo, dom.getAliases())
else:
return (dominfo, dom.getAliaseNames(), dom.getAccounts(),
dom.getAliases())
def aliasDomainAdd(self, aliasname, domainname):
"""Adds an alias domain to the domain.
Keyword arguments:
aliasname -- the name of the alias domain (str)
domainname -- name of the target domain (str)
"""
dom = self.__getDomain(domainname)
aliasDom = AliasDomain(self.__dbh, aliasname, dom)
aliasDom.save()
def aliasDomainInfo(self, aliasname):
self.__dbConnect()
aliasDom = AliasDomain(self.__dbh, aliasname, None)
return aliasDom.info()
def aliasDomainDelete(self, aliasname):
"""Deletes the specified alias domain.
Keyword arguments:
aliasname -- the name of the alias domain (str)
"""
self.__dbConnect()
aliasDom = AliasDomain(self.__dbh, aliasname, None)
aliasDom.delete()
def domainList(self, pattern=None):
from Domain import search
like = False
if pattern is not None:
if pattern.startswith('%') or pattern.endswith('%'):
like = True
if pattern.startswith('%') and pattern.endswith('%'):
domain = pattern[1:-1]
elif pattern.startswith('%'):
domain = pattern[1:]
elif pattern.endswith('%'):
domain = pattern[:-1]
re.compile(RE_DOMAIN_SRCH)
if not re.match(RE_DOMAIN_SRCH, domain):
raise VMMException(
_(u"The pattern »%s« contains invalid characters.") %
pattern, ERR.DOMAIN_INVALID)
self.__dbConnect()
return search(self.__dbh, pattern=pattern, like=like)
def userAdd(self, emailaddress, password):
acc = self.__getAccount(emailaddress, password)
if password is None:
password = self._readpass()
acc.setPassword(self.__pwhash(password))
acc.save(self.__Cfg.get('maildir', 'name'),
self.__Cfg.getboolean('services', 'smtp'),
self.__Cfg.getboolean('services', 'pop3'),
self.__Cfg.getboolean('services', 'imap'),
self.__Cfg.getboolean('services', 'managesieve'))
self.__mailDirMake(acc.getDir('domain'), acc.getUID(), acc.getGID())
def aliasAdd(self, aliasaddress, targetaddress):
alias = self.__getAlias(aliasaddress, targetaddress)
alias.save()
def userDelete(self, emailaddress, force=None):
if force not in [None, 'delalias']:
raise VMMException(_(u"Invalid argument: »%s«") % force,
ERR.INVALID_AGUMENT)
acc = self.__getAccount(emailaddress)
uid = acc.getUID()
gid = acc.getGID()
acc.delete(force)
if self.__Cfg.getboolean('maildir', 'delete'):
try:
self.__userDirDelete(acc.getDir('domain'), uid, gid)
except VMMException, e:
if e.code() in [ERR.FOUND_DOTS_IN_PATH,
ERR.MAILDIR_PERM_MISMATCH, ERR.NO_SUCH_DIRECTORY]:
warning = _(u"""\
The account has been successfully deleted from the database.
But an error occurred while deleting the following directory:
»%(directory)s«
Reason: %(raeson)s""") % {'directory': acc.getDir('home'),'raeson': e.msg()}
self.__warnings.append(warning)
else:
raise e
def aliasInfo(self, aliasaddress):
alias = self.__getAlias(aliasaddress)
return alias.getInfo()
def aliasDelete(self, aliasaddress, targetaddress=None):
alias = self.__getAlias(aliasaddress, targetaddress)
alias.delete()
def userInfo(self, emailaddress, diskusage=False):
acc = self.__getAccount(emailaddress)
info = acc.getInfo()
if self.__Cfg.getboolean('maildir', 'diskusage') or diskusage:
info['disk usage'] = self.__getDiskUsage('%(maildir)s' % info)
return info
def userByID(self, uid):
from Account import getAccountByID
self.__dbConnect()
return getAccountByID(uid, self.__dbh)
def userPassword(self, emailaddress, password):
acc = self.__getAccount(emailaddress)
if acc.getUID() == 0:
raise VMMException(_(u"Account doesn't exists"), ERR.NO_SUCH_ACCOUNT)
if password is None:
password = self._readpass()
acc.modify('password', self.__pwhash(password, user=emailaddress))
def userName(self, emailaddress, name):
acc = self.__getAccount(emailaddress)
acc.modify('name', name)
def userTransport(self, emailaddress, transport):
acc = self.__getAccount(emailaddress)
acc.modify('transport', transport)
def userDisable(self, emailaddress, service=None):
acc = self.__getAccount(emailaddress)
acc.disable(service)
def userEnable(self, emailaddress, service=None):
acc = self.__getAccount(emailaddress)
acc.enable(service)
def __del__(self):
if not self.__dbh is None and self.__dbh._isOpen:
self.__dbh.close()