Moved VirtualMailManager/Exceptions to VirtualMailManager/errors.
Renamed VMM*Exception classes to *Error.
No longer add the attribute 'message' to VMMError if it doesn't exist, like in
Python 2.4. It has been deprecated as of Python 2.6.
Also removed the methods code() and msg(), the values are now accessible via
the attributes 'code' and 'msg'.
# -*- coding: UTF-8 -*-
# Copyright (c) 2007 - 2010, Pascal Volk
# See COPYING for distribution information.
"""Virtual Mail Manager's Domain class to manage e-mail domains."""
from random import choice
from VirtualMailManager import check_domainname
from VirtualMailManager.constants.ERROR import \
ACCOUNT_AND_ALIAS_PRESENT, ACCOUNT_PRESENT, ALIAS_PRESENT, \
DOMAIN_ALIAS_EXISTS, DOMAIN_EXISTS, NO_SUCH_DOMAIN
from VirtualMailManager.errors import DomainError as DomErr
from VirtualMailManager.Transport import Transport
MAILDIR_CHARS = '0123456789abcdefghijklmnopqrstuvwxyz'
class Domain(object):
"""Class to manage e-mail domains."""
__slots__ = ('_basedir','_domaindir','_id','_name','_transport','_dbh')
def __init__(self, dbh, domainname, basedir=None, transport=None):
"""Creates a new Domain instance.
Keyword arguments:
dbh -- a pyPgSQL.PgSQL.connection
domainname -- name of the domain (str)
transport -- default vmm.cfg/misc/transport (str)
"""
self._dbh = dbh
self._name = check_domainname(domainname)
self._basedir = basedir
if transport is not None:
self._transport = Transport(self._dbh, transport=transport)
else:
self._transport = transport
self._id = 0
self._domaindir = None
if not self._exists() and self._isAlias():
raise DomErr(_(u"The domain “%s” is an alias domain.") %
self._name, DOMAIN_ALIAS_EXISTS)
def _exists(self):
"""Checks if the domain already exists.
If the domain exists _id will be set and returns True, otherwise False
will be returned.
"""
dbc = self._dbh.cursor()
dbc.execute("SELECT gid, tid, domaindir FROM domain_data WHERE gid =\
(SELECT gid FROM domain_name WHERE domainname = %s AND is_primary)",
self._name)
result = dbc.fetchone()
dbc.close()
if result is not None:
self._id, self._domaindir = result[0], result[2]
self._transport = Transport(self._dbh, tid=result[1])
return True
else:
return False
def _isAlias(self):
"""Checks if self._name is known for an alias domain."""
dbc = self._dbh.cursor()
dbc.execute('SELECT is_primary FROM domain_name WHERE domainname = %s',
self._name)
result = dbc.fetchone()
dbc.close()
if result is not None and not result[0]:
return True
else:
return False
def _setID(self):
"""Sets the ID of the domain."""
dbc = self._dbh.cursor()
dbc.execute("SELECT nextval('domain_gid')")
self._id = dbc.fetchone()[0]
dbc.close()
def _prepare(self):
self._setID()
self._domaindir = "%s/%s/%i" % (self._basedir, choice(MAILDIR_CHARS),
self._id)
def _has(self, what):
"""Checks if aliases or accounts are assigned to the domain.
If there are assigned accounts or aliases True will be returned,
otherwise False will be returned.
Keyword arguments:
what -- 'alias' or 'users' (strings)
"""
if what not in ['alias', 'users']:
return False
dbc = self._dbh.cursor()
if what == 'users':
dbc.execute("SELECT count(gid) FROM users WHERE gid=%s", self._id)
else:
dbc.execute("SELECT count(gid) FROM alias WHERE gid=%s", self._id)
count = dbc.fetchone()
dbc.close()
if count[0] > 0:
return True
else:
return False
def _chkDelete(self, delUser, delAlias):
"""Checks dependencies for deletion.
Keyword arguments:
delUser -- ignore available accounts (bool)
delAlias -- ignore available aliases (bool)
"""
if not delUser:
hasUser = self._has('users')
else:
hasUser = False
if not delAlias:
hasAlias = self._has('alias')
else:
hasAlias = False
if hasUser and hasAlias:
raise DomErr(_(u'There are accounts and aliases.'),
ACCOUNT_AND_ALIAS_PRESENT)
elif hasUser:
raise DomErr(_(u'There are accounts.'), ACCOUNT_PRESENT)
elif hasAlias:
raise DomErr(_(u'There are aliases.'), ALIAS_PRESENT)
def save(self):
"""Stores the new domain in the database."""
if self._id < 1:
self._prepare()
dbc = self._dbh.cursor()
dbc.execute("INSERT INTO domain_data (gid, tid, domaindir)\
VALUES (%s, %s, %s)", self._id, self._transport.getID(), self._domaindir)
dbc.execute("INSERT INTO domain_name (domainname, gid, is_primary)\
VALUES (%s, %s, %s)", self._name, self._id, True)
self._dbh.commit()
dbc.close()
else:
raise DomErr(_(u'The domain “%s” already exists.') % self._name,
DOMAIN_EXISTS)
def delete(self, delUser=False, delAlias=False):
"""Deletes the domain.
Keyword arguments:
delUser -- force deletion of available accounts (bool)
delAlias -- force deletion of available aliases (bool)
"""
if self._id > 0:
self._chkDelete(delUser, delAlias)
dbc = self._dbh.cursor()
for t in ('alias','users','relocated','domain_name','domain_data'):
dbc.execute("DELETE FROM %s WHERE gid = %d" % (t, self._id))
self._dbh.commit()
dbc.close()
else:
raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
NO_SUCH_DOMAIN)
def updateTransport(self, transport, force=False):
"""Sets a new transport for the domain.
Keyword arguments:
transport -- the new transport (str)
force -- True/False force new transport for all accounts (bool)
"""
if self._id > 0:
if transport == self._transport.getTransport():
return
trsp = Transport(self._dbh, transport=transport)
dbc = self._dbh.cursor()
dbc.execute("UPDATE domain_data SET tid = %s WHERE gid = %s",
trsp.getID(), self._id)
if dbc.rowcount > 0:
self._dbh.commit()
if force:
dbc.execute("UPDATE users SET tid = %s WHERE gid = %s",
trsp.getID(), self._id)
if dbc.rowcount > 0:
self._dbh.commit()
dbc.close()
else:
raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
NO_SUCH_DOMAIN)
def getID(self):
"""Returns the ID of the domain."""
return self._id
def getDir(self):
"""Returns the directory of the domain."""
return self._domaindir
def getTransport(self):
"""Returns domain's transport."""
return self._transport.getTransport()
def getTransportID(self):
"""Returns the ID from the domain's transport."""
return self._transport.getID()
def getInfo(self):
"""Returns a dictionary with information about the domain."""
sql = """\
SELECT gid, domainname, transport, domaindir, aliasdomains, accounts,
aliases, relocated
FROM vmm_domain_info
WHERE gid = %i""" % self._id
dbc = self._dbh.cursor()
dbc.execute(sql)
info = dbc.fetchone()
dbc.close()
if info is None:
raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
NO_SUCH_DOMAIN)
else:
keys = ['gid', 'domainname', 'transport', 'domaindir',
'aliasdomains', 'accounts', 'aliases', 'relocated']
return dict(zip(keys, info))
def getAccounts(self):
"""Returns a list with all accounts from the domain."""
dbc = self._dbh.cursor()
dbc.execute("SELECT local_part from users where gid = %s ORDER BY\
local_part", self._id)
users = dbc.fetchall()
dbc.close()
accounts = []
if len(users) > 0:
addr = u'@'.join
_dom = self._name
accounts = [addr((account[0], _dom)) for account in users]
return accounts
def getAliases(self):
"""Returns a list with all aliases from the domain."""
dbc = self._dbh.cursor()
dbc.execute("SELECT DISTINCT address FROM alias WHERE gid = %s\
ORDER BY address", self._id)
addresses = dbc.fetchall()
dbc.close()
aliases = []
if len(addresses) > 0:
addr = u'@'.join
_dom = self._name
aliases = [addr((alias[0], _dom)) for alias in addresses]
return aliases
def getRelocated(self):
"""Returns a list with all addresses from relocated users."""
dbc = self._dbh.cursor()
dbc.execute("SELECT address FROM relocated WHERE gid = %s\
ORDER BY address", self._id)
addresses = dbc.fetchall()
dbc.close()
relocated = []
if len(addresses) > 0:
addr = u'@'.join
_dom = self._name
relocated = [addr((address[0], _dom)) for address in addresses]
return relocated
def getAliaseNames(self):
"""Returns a list with all alias names from the domain."""
dbc = self._dbh.cursor()
dbc.execute("SELECT domainname FROM domain_name WHERE gid = %s\
AND NOT is_primary ORDER BY domainname", self._id)
anames = dbc.fetchall()
dbc.close()
aliasdomains = []
if len(anames) > 0:
aliasdomains = [aname[0] for aname in anames]
return aliasdomains
def search(dbh, pattern=None, like=False):
if pattern is not None and like is False:
pattern = check_domainname(pattern)
sql = 'SELECT gid, domainname, is_primary FROM domain_name'
if pattern is None:
pass
elif like:
sql += " WHERE domainname LIKE '%s'" % pattern
else:
sql += " WHERE domainname = '%s'" % pattern
sql += ' ORDER BY is_primary DESC, domainname'
dbc = dbh.cursor()
dbc.execute(sql)
doms = dbc.fetchall()
dbc.close()
domdict = {}
order = [dom[0] for dom in doms if dom[2]]
if len(order) == 0:
for dom in doms:
if dom[0] not in order:
order.append(dom[0])
for gid, dom, is_primary in doms:
if is_primary:
domdict[gid] = [dom]
else:
try:
domdict[gid].append(dom)
except KeyError:
domdict[gid] = [None, dom]
del doms
return order, domdict
def get_gid(dbh, domainname):
"""Returns the *GID* of the domain *domainname*.
Raises an `DomainError` if the domain does not exist.
"""
domainname = check_domainname(domainname)
dbc = dbh.cursor()
dbc.execute('SELECT gid FROM domain_name WHERE domainname=%s', domainname)
gid = dbc.fetchone()
dbc.close()
if gid:
return gid[0]
else:
raise DomErr(_(u"The domain “%s” doesn't exist.") % domainname,
NO_SUCH_DOMAIN)