VirtualMailManager/Domain.py
author Pascal Volk <neverseen@users.sourceforge.net>
Fri, 26 Feb 2010 02:35:25 +0000
branchv0.6.x
changeset 216 0c8c053b451c
parent 199 0684790fff7c
child 222 d0c16e70a9fb
permissions -rw-r--r--
Moved VirtualMailManager/Exceptions to VirtualMailManager/errors. Renamed VMM*Exception classes to *Error. No longer add the attribute 'message' to VMMError if it doesn't exist, like in Python 2.4. It has been deprecated as of Python 2.6. Also removed the methods code() and msg(), the values are now accessible via the attributes 'code' and 'msg'.

# -*- coding: UTF-8 -*-
# Copyright (c) 2007 - 2010, Pascal Volk
# See COPYING for distribution information.

"""Virtual Mail Manager's Domain class to manage e-mail domains."""

from random import choice

from VirtualMailManager import check_domainname
from VirtualMailManager.constants.ERROR import \
     ACCOUNT_AND_ALIAS_PRESENT, ACCOUNT_PRESENT, ALIAS_PRESENT, \
     DOMAIN_ALIAS_EXISTS, DOMAIN_EXISTS, NO_SUCH_DOMAIN
from VirtualMailManager.errors import DomainError as DomErr
from VirtualMailManager.Transport import Transport


MAILDIR_CHARS = '0123456789abcdefghijklmnopqrstuvwxyz'


class Domain(object):
    """Class to manage e-mail domains."""
    __slots__ = ('_basedir','_domaindir','_id','_name','_transport','_dbh')
    def __init__(self, dbh, domainname, basedir=None, transport=None):
        """Creates a new Domain instance.

        Keyword arguments:
        dbh -- a pyPgSQL.PgSQL.connection
        domainname -- name of the domain (str)
        transport -- default vmm.cfg/misc/transport  (str)
        """
        self._dbh = dbh
        self._name = check_domainname(domainname)
        self._basedir = basedir
        if transport is not None:
            self._transport = Transport(self._dbh, transport=transport)
        else:
            self._transport = transport
        self._id = 0
        self._domaindir = None
        if not self._exists() and self._isAlias():
            raise DomErr(_(u"The domain “%s” is an alias domain.") %
                         self._name, DOMAIN_ALIAS_EXISTS)

    def _exists(self):
        """Checks if the domain already exists.

        If the domain exists _id will be set and returns True, otherwise False
        will be returned.
        """
        dbc = self._dbh.cursor()
        dbc.execute("SELECT gid, tid, domaindir FROM domain_data WHERE gid =\
 (SELECT gid FROM domain_name WHERE domainname = %s AND is_primary)",
                self._name)
        result = dbc.fetchone()
        dbc.close()
        if result is not None:
            self._id, self._domaindir = result[0], result[2]
            self._transport = Transport(self._dbh, tid=result[1])
            return True
        else:
            return False

    def _isAlias(self):
        """Checks if self._name is known for an alias domain."""
        dbc = self._dbh.cursor()
        dbc.execute('SELECT is_primary FROM domain_name WHERE domainname = %s',
                self._name)
        result = dbc.fetchone()
        dbc.close()
        if result is not None and not result[0]:
            return True
        else:
            return False

    def _setID(self):
        """Sets the ID of the domain."""
        dbc = self._dbh.cursor()
        dbc.execute("SELECT nextval('domain_gid')")
        self._id = dbc.fetchone()[0]
        dbc.close()

    def _prepare(self):
        self._setID()
        self._domaindir = "%s/%s/%i" % (self._basedir, choice(MAILDIR_CHARS),
                self._id)

    def _has(self, what):
        """Checks if aliases or accounts are assigned to the domain.

        If there are assigned accounts or aliases True will be returned,
        otherwise False will be returned.

        Keyword arguments:
        what -- 'alias' or 'users' (strings)
        """
        if what not in ['alias', 'users']:
            return False
        dbc = self._dbh.cursor()
        if what == 'users':
            dbc.execute("SELECT count(gid) FROM users WHERE gid=%s", self._id)
        else:
            dbc.execute("SELECT count(gid) FROM alias WHERE gid=%s", self._id)
        count = dbc.fetchone()
        dbc.close()
        if count[0] > 0:
            return True
        else:
            return False

    def _chkDelete(self, delUser, delAlias):
        """Checks dependencies for deletion.

        Keyword arguments:
        delUser -- ignore available accounts (bool)
        delAlias -- ignore available aliases (bool)
        """
        if not delUser:
            hasUser = self._has('users')
        else:
            hasUser = False
        if not delAlias:
            hasAlias = self._has('alias')
        else:
            hasAlias = False
        if hasUser and hasAlias:
            raise DomErr(_(u'There are accounts and aliases.'),
                         ACCOUNT_AND_ALIAS_PRESENT)
        elif hasUser:
            raise DomErr(_(u'There are accounts.'), ACCOUNT_PRESENT)
        elif hasAlias:
            raise DomErr(_(u'There are aliases.'), ALIAS_PRESENT)

    def save(self):
        """Stores the new domain in the database."""
        if self._id < 1:
            self._prepare()
            dbc = self._dbh.cursor()
            dbc.execute("INSERT INTO domain_data (gid, tid, domaindir)\
 VALUES (%s, %s, %s)", self._id, self._transport.getID(), self._domaindir)
            dbc.execute("INSERT INTO domain_name (domainname, gid, is_primary)\
 VALUES (%s, %s, %s)", self._name, self._id, True)
            self._dbh.commit()
            dbc.close()
        else:
            raise DomErr(_(u'The domain “%s” already exists.') % self._name,
                         DOMAIN_EXISTS)

    def delete(self, delUser=False, delAlias=False):
        """Deletes the domain.

        Keyword arguments:
        delUser -- force deletion of available accounts (bool)
        delAlias -- force deletion of available aliases (bool)
        """
        if self._id > 0:
            self._chkDelete(delUser, delAlias)
            dbc = self._dbh.cursor()
            for t in ('alias','users','relocated','domain_name','domain_data'):
                dbc.execute("DELETE FROM %s WHERE gid = %d" % (t, self._id))
            self._dbh.commit()
            dbc.close()
        else:
            raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
                         NO_SUCH_DOMAIN)

    def updateTransport(self, transport, force=False):
        """Sets a new transport for the domain.

        Keyword arguments:
        transport -- the new transport (str)
        force -- True/False force new transport for all accounts (bool)
        """
        if self._id > 0:
            if transport == self._transport.getTransport():
                return
            trsp = Transport(self._dbh, transport=transport)
            dbc = self._dbh.cursor()
            dbc.execute("UPDATE domain_data SET tid = %s WHERE gid = %s",
                    trsp.getID(), self._id)
            if dbc.rowcount > 0:
                self._dbh.commit()
            if force:
                dbc.execute("UPDATE users SET tid = %s WHERE gid = %s",
                        trsp.getID(), self._id)
                if dbc.rowcount > 0:
                    self._dbh.commit()
            dbc.close()
        else:
            raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
                         NO_SUCH_DOMAIN)

    def getID(self):
        """Returns the ID of the domain."""
        return self._id

    def getDir(self):
        """Returns the directory of the domain."""
        return self._domaindir

    def getTransport(self):
        """Returns domain's transport."""
        return self._transport.getTransport()

    def getTransportID(self):
        """Returns the ID from the domain's transport."""
        return self._transport.getID()

    def getInfo(self):
        """Returns a dictionary with information about the domain."""
        sql = """\
SELECT gid, domainname, transport, domaindir, aliasdomains, accounts,
       aliases, relocated
  FROM vmm_domain_info
 WHERE gid = %i""" % self._id
        dbc = self._dbh.cursor()
        dbc.execute(sql)
        info = dbc.fetchone()
        dbc.close()
        if info is None:
            raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
                         NO_SUCH_DOMAIN)
        else:
            keys = ['gid', 'domainname', 'transport', 'domaindir',
                    'aliasdomains', 'accounts', 'aliases', 'relocated']
            return dict(zip(keys, info))

    def getAccounts(self):
        """Returns a list with all accounts from the domain."""
        dbc = self._dbh.cursor()
        dbc.execute("SELECT local_part from users where gid = %s ORDER BY\
 local_part", self._id)
        users = dbc.fetchall()
        dbc.close()
        accounts = []
        if len(users) > 0:
            addr = u'@'.join
            _dom = self._name
            accounts = [addr((account[0], _dom)) for account in users]
        return accounts

    def getAliases(self):
        """Returns a list with all aliases from the domain."""
        dbc = self._dbh.cursor()
        dbc.execute("SELECT DISTINCT address FROM alias WHERE gid = %s\
 ORDER BY address",  self._id)
        addresses = dbc.fetchall()
        dbc.close()
        aliases = []
        if len(addresses) > 0:
            addr = u'@'.join
            _dom = self._name
            aliases = [addr((alias[0], _dom)) for alias in addresses]
        return aliases

    def getRelocated(self):
        """Returns a list with all addresses from relocated users."""
        dbc = self._dbh.cursor()
        dbc.execute("SELECT address FROM relocated WHERE gid = %s\
 ORDER BY address", self._id)
        addresses = dbc.fetchall()
        dbc.close()
        relocated = []
        if len(addresses) > 0:
            addr = u'@'.join
            _dom = self._name
            relocated = [addr((address[0], _dom)) for address in addresses]
        return relocated

    def getAliaseNames(self):
        """Returns a list with all alias names from the domain."""
        dbc = self._dbh.cursor()
        dbc.execute("SELECT domainname FROM domain_name WHERE gid = %s\
 AND NOT is_primary ORDER BY domainname", self._id)
        anames = dbc.fetchall()
        dbc.close()
        aliasdomains = []
        if len(anames) > 0:
            aliasdomains = [aname[0] for aname in anames]
        return aliasdomains


def search(dbh, pattern=None, like=False):
    if pattern is not None and like is False:
        pattern = check_domainname(pattern)
    sql = 'SELECT gid, domainname, is_primary FROM domain_name'
    if pattern is None:
        pass
    elif like:
        sql += " WHERE domainname LIKE '%s'" % pattern
    else:
        sql += " WHERE domainname = '%s'" % pattern
    sql += ' ORDER BY is_primary DESC, domainname'
    dbc = dbh.cursor()
    dbc.execute(sql)
    doms = dbc.fetchall()
    dbc.close()

    domdict = {}
    order = [dom[0] for dom in doms if dom[2]]
    if len(order) == 0:
        for dom in doms:
            if dom[0] not in order:
                order.append(dom[0])
    for gid, dom, is_primary in doms:
        if is_primary:
            domdict[gid] = [dom]
        else:
            try:
                domdict[gid].append(dom)
            except KeyError:
                domdict[gid] = [None, dom]
    del doms
    return order, domdict

def get_gid(dbh, domainname):
    """Returns the *GID* of the domain *domainname*.

    Raises an `DomainError` if the domain does not exist.
    """
    domainname = check_domainname(domainname)
    dbc = dbh.cursor()
    dbc.execute('SELECT gid FROM domain_name WHERE domainname=%s', domainname)
    gid = dbc.fetchone()
    dbc.close()
    if gid:
        return gid[0]
    else:
        raise DomErr(_(u"The domain “%s” doesn't exist.") % domainname,
                     NO_SUCH_DOMAIN)