--- a/VirtualMailManager/VirtualMailManager.py Sat Sep 06 03:07:28 2008 +0000
+++ b/VirtualMailManager/VirtualMailManager.py Mon Sep 08 05:30:17 2008 +0000
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Copyright 2007-2008 VEB IT
# See COPYING for distribution information.
@@ -23,13 +22,15 @@
from pyPgSQL import PgSQL # python-pgsql - http://pypgsql.sourceforge.net
-from Exceptions import *
import constants.ERROR as ERR
-from Config import Config as Cfg
from Account import Account
from Alias import Alias
+from AliasDomain import AliasDomain
+from Config import Config as Cfg
from Domain import Domain
-from AliasDomain import AliasDomain
+from EmailAddress import EmailAddress
+from Exceptions import *
+from Relocated import Relocated
SALTCHARS = './0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
RE_ASCII_CHARS = """^[\x20-\x7E]*$"""
@@ -114,29 +115,6 @@
except PgSQL.libpq.DatabaseError, e:
raise VMMException(str(e), ERR.DATABASE_ERROR)
- def chkLocalpart(localpart):
- """Validates the local part of an e-mail address.
-
- Keyword arguments:
- localpart -- the e-mail address that should be validated (str)
- """
- if len(localpart) < 1:
- raise VMMException(_(u'No localpart specified.'),
- ERR.LOCALPART_INVALID)
- if len(localpart) > 64:
- raise VMMException(_(u'The local part »%s« is too long') %
- localpart, ERR.LOCALPART_TOO_LONG)
- ic = re.compile(RE_LOCALPART).findall(localpart)
- if len(ic):
- ichrs = ''
- for c in set(ic):
- ichrs += u"»%s« " % c
- raise VMMException(_(u"The local part »%(lpart)s« contains invalid\
- characters: %(ichrs)s") % {'lpart': localpart, 'ichrs': ichrs},
- ERR.LOCALPART_INVALID)
- return localpart
- chkLocalpart = staticmethod(chkLocalpart)
-
def idn2ascii(domainname):
"""Converts an idn domainname in punycode.
@@ -179,52 +157,70 @@
ERR.DOMAIN_TOO_LONG)
re.compile(RE_DOMAIN)
if not re.match(RE_DOMAIN, domainname):
- raise VMMException(_(u'The domain name is invalid.'),
- ERR.DOMAIN_INVALID)
+ raise VMMException(_(u'The domain name »%s« is invalid.') %\
+ domainname, ERR.DOMAIN_INVALID)
return domainname
chkDomainname = staticmethod(chkDomainname)
- def chkEmailAddress(address):
- try:
- localpart, domain = address.split('@')
- except ValueError:
- raise VMMException(_(u"Missing '@' sign in e-mail address »%s«.") %
- address, ERR.INVALID_ADDRESS)
- except AttributeError:
- raise VMMException(_(u"»%s« looks not like an e-mail address.") %
- address, ERR.INVALID_ADDRESS)
- if len(domain) > 0:
- domain = VirtualMailManager.chkDomainname(domain)
+ def _exists(dbh, query):
+ dbc = dbh.cursor()
+ dbc.execute(query)
+ gid = dbc.fetchone()
+ dbc.close()
+ if gid is None:
+ return False
else:
- raise VMMException(_(u"Missing domain name after »%s@«.") %
- localpart, ERR.DOMAIN_NO_NAME)
- localpart = VirtualMailManager.chkLocalpart(localpart)
- return '%s@%s' % (localpart, domain)
- chkEmailAddress = staticmethod(chkEmailAddress)
+ return True
+ _exists = staticmethod(_exists)
+
+ def accountExists(dbh, address):
+ sql = "SELECT gid FROM users WHERE gid = (SELECT gid FROM domain_name\
+ WHERE domainname = '%(_domainname)s') AND local_part = '%(_localpart)s'" %\
+ address.__dict__
+ return VirtualMailManager._exists(dbh, sql)
+ accountExists = staticmethod(accountExists)
+
+ def aliasExists(dbh, address):
+ sql = "SELECT DISTINCT gid FROM alias WHERE gid = (SELECT gid FROM\
+ domain_name WHERE domainname = '%(_domainname)s') AND address =\
+ '%(_localpart)s'" % address.__dict__
+ return VirtualMailManager._exists(dbh, sql)
+ aliasExists = staticmethod(aliasExists)
+
+ def relocatedExists(dbh, address):
+ sql = "SELECT gid FROM relocated WHERE gid = (SELECT gid FROM\
+ domain_name WHERE domainname = '%(_domainname)s') AND address =\
+ '%(_localpart)s'" % address.__dict__
+ return VirtualMailManager._exists(dbh, sql)
+ relocatedExists = staticmethod(relocatedExists)
def __getAccount(self, address, password=None):
self.__dbConnect()
+ address = EmailAddress(address)
if not password is None:
password = self.__pwhash(password)
return Account(self.__dbh, address, password)
def _readpass(self):
- clear0 = ''
- clear1 = '1'
- while clear0 != clear1:
- while len(clear0) < 1:
- clear0 = getpass(prompt=_('Enter new password: '))
- if len(clear0) < 1:
- sys.stderr.write('%s\n'
- % _('Sorry, empty passwords are not permitted'))
+ mismatched = True
+ while mismatched:
+ clear0 = getpass(prompt=_('Enter new password: '))
clear1 = getpass(prompt=_('Retype new password: '))
if clear0 != clear1:
- clear0 = ''
sys.stderr.write('%s\n' % _('Sorry, passwords do not match'))
+ continue
+ if len(clear0) < 1 or len(clear1) < 1:
+ sys.stderr.write('%s\n'
+ % _('Sorry, empty passwords are not permitted'))
+ continue
+ mismatched = False
return clear0
def __getAlias(self, address, destination=None):
self.__dbConnect()
+ address = EmailAddress(address)
+ if destination is not None:
+ destination = EmailAddress(destination)
return Alias(self.__dbh, address, destination)
def __getDomain(self, domainname, transport=None):