--- a/VirtualMailManager/Domain.py Sat Apr 03 02:14:13 2010 +0000
+++ b/VirtualMailManager/Domain.py Sun Apr 04 08:16:46 2010 +0000
@@ -2,8 +2,13 @@
# Copyright (c) 2007 - 2010, Pascal Volk
# See COPYING for distribution information.
-"""Virtual Mail Manager's Domain class to manage e-mail domains."""
+"""
+ VirtualMailManager.Domain
+ Virtual Mail Manager's Domain class to manage e-mail domains.
+"""
+
+import os
from random import choice
from VirtualMailManager import check_domainname
@@ -20,219 +25,246 @@
class Domain(object):
"""Class to manage e-mail domains."""
- __slots__ = ('_basedir', '_domaindir', '_id', '_name', '_transport',
- '_dbh')
+ __slots__ = ('_directory', '_gid', '_name', '_transport', '_dbh', '_new')
- def __init__(self, dbh, domainname, basedir=None, transport=None):
+ def __init__(self, dbh, domainname):
"""Creates a new Domain instance.
- Keyword arguments:
- dbh -- a pyPgSQL.PgSQL.connection
- domainname -- name of the domain (str)
- transport -- default vmm.cfg/misc/transport (str)
+ Loads all relevant data from the database, if the domain could be
+ found. To create a new domain call the methods set_directory() and
+ set_transport() before save().
+
+ A DomainError will be thrown when the *domainname* is the name of
+ an alias domain.
+
+ Arguments:
+
+ `dbh` : pyPgSQL.PgSQL.Connection
+ a database connection for the database access
+ `domainname` : basestring
+ The name of the domain
"""
- self._dbh = dbh
self._name = check_domainname(domainname)
- self._basedir = basedir
- if transport is not None:
- self._transport = Transport(self._dbh, transport=transport)
- else:
- self._transport = transport
- self._id = 0
- self._domaindir = None
- if not self._exists() and self._isAlias():
- raise DomErr(_(u"The domain “%s” is an alias domain.") %
- self._name, DOMAIN_ALIAS_EXISTS)
+ self._dbh = dbh
+ self._gid = 0
+ self._transport = None
+ self._directory = None
+ self._new = True
+ self._load()
- def _exists(self):
- """Checks if the domain already exists.
+ def _load(self):
+ """Load information from the database and checks if the domain name
+ is the primary one.
- If the domain exists _id will be set and returns True, otherwise False
- will be returned.
+ Raises a DomainError if Domain._name isn't the primary name of the
+ domain.
"""
dbc = self._dbh.cursor()
- dbc.execute("SELECT gid, tid, domaindir FROM domain_data WHERE gid =\
- (SELECT gid FROM domain_name WHERE domainname = %s AND is_primary)",
- self._name)
+ dbc.execute('SELECT dd.gid, tid, domaindir, is_primary FROM \
+ domain_data dd, domain_name dn WHERE domainname = %s AND dn.gid = dd.gid',
+ self._name)
result = dbc.fetchone()
dbc.close()
- if result is not None:
- self._id, self._domaindir = result[0], result[2]
+ if result:
+ if not result[3]:
+ raise DomErr(_(u"The domain '%s' is an alias domain.") %
+ self._name, DOMAIN_ALIAS_EXISTS)
+ self._gid, self._directory = result[0], result[2]
self._transport = Transport(self._dbh, tid=result[1])
- return True
- else:
- return False
+ self._new = False
- def _isAlias(self):
- """Checks if self._name is known for an alias domain."""
- dbc = self._dbh.cursor()
- dbc.execute('SELECT is_primary FROM domain_name WHERE domainname = %s',
- self._name)
- result = dbc.fetchone()
- dbc.close()
- if result is not None and not result[0]:
- return True
- else:
- return False
-
- def _setID(self):
- """Sets the ID of the domain."""
+ def _set_gid(self):
+ """Sets the ID of the domain - if not set yet."""
+ assert self._gid == 0
dbc = self._dbh.cursor()
dbc.execute("SELECT nextval('domain_gid')")
- self._id = dbc.fetchone()[0]
+ self._gid = dbc.fetchone()[0]
dbc.close()
- def _prepare(self):
- self._setID()
- self._domaindir = "%s/%s/%i" % (self._basedir, choice(MAILDIR_CHARS),
- self._id)
-
def _has(self, what):
"""Checks if aliases or accounts are assigned to the domain.
If there are assigned accounts or aliases True will be returned,
otherwise False will be returned.
- Keyword arguments:
- what -- 'alias' or 'users' (strings)
+ Argument:
+
+ `what` : basestring
+ "alias" or "users"
"""
- if what not in ['alias', 'users']:
- return False
+ assert what in ('alias', 'users')
dbc = self._dbh.cursor()
if what == 'users':
- dbc.execute("SELECT count(gid) FROM users WHERE gid=%s", self._id)
+ dbc.execute("SELECT count(gid) FROM users WHERE gid=%s", self._gid)
else:
- dbc.execute("SELECT count(gid) FROM alias WHERE gid=%s", self._id)
+ dbc.execute("SELECT count(gid) FROM alias WHERE gid=%s", self._gid)
count = dbc.fetchone()
dbc.close()
- if count[0] > 0:
- return True
- else:
- return False
+ return count[0] > 0
- def _chkDelete(self, delUser, delAlias):
+ def _chk_delete(self, deluser, delalias):
"""Checks dependencies for deletion.
- Keyword arguments:
- delUser -- ignore available accounts (bool)
- delAlias -- ignore available aliases (bool)
+ Arguments:
+ deluser -- ignore available accounts (bool)
+ delalias -- ignore available aliases (bool)
"""
- if not delUser:
- hasUser = self._has('users')
+ if not deluser:
+ hasuser = self._has('users')
else:
- hasUser = False
- if not delAlias:
- hasAlias = self._has('alias')
+ hasuser = False
+ if not delalias:
+ hasalias = self._has('alias')
else:
- hasAlias = False
- if hasUser and hasAlias:
+ hasalias = False
+ if hasuser and hasalias:
raise DomErr(_(u'There are accounts and aliases.'),
ACCOUNT_AND_ALIAS_PRESENT)
- elif hasUser:
+ elif hasuser:
raise DomErr(_(u'There are accounts.'), ACCOUNT_PRESENT)
- elif hasAlias:
+ elif hasalias:
raise DomErr(_(u'There are aliases.'), ALIAS_PRESENT)
+ def _chk_state(self):
+ """Throws a DomainError if the Domain is new - not saved in the
+ database."""
+ if self._new:
+ raise DomErr(_(u"The domain '%s' doesn't exist.") % self._name,
+ NO_SUCH_DOMAIN)
+
+ @property
+ def gid(self):
+ """The GID of the Domain."""
+ return self._gid
+
+ @property
+ def name(self):
+ """The Domain's name."""
+ return self._name
+
+ @property
+ def directory(self):
+ """The Domain's directory."""
+ return self._directory
+
+ def set_directory(self, basedir):
+ """Set the path value of the Domain's directory, inside *basedir*.
+
+ Argument:
+
+ `basedir` : basestring
+ The base directory of all domains
+ """
+ assert self._new and self._directory is None
+ self._set_gid()
+ self._directory = os.path.join(basedir, choice(MAILDIR_CHARS),
+ str(self._gid))
+
+ @property
+ def transport(self):
+ """The Domain's transport."""
+ return self._transport
+
+ def set_transport(self, transport):
+ """Set the transport for the new Domain.
+
+ Argument:
+
+ `transport` : VirtualMailManager.Transport
+ The transport of the new Domain
+ """
+ assert self._new and isinstance(transport, Transport)
+ self._transport = transport
+
def save(self):
"""Stores the new domain in the database."""
- if self._id < 1:
- self._prepare()
- dbc = self._dbh.cursor()
- dbc.execute("INSERT INTO domain_data (gid, tid, domaindir)\
- VALUES (%s, %s, %s)", self._id, self._transport.id, self._domaindir)
- dbc.execute("INSERT INTO domain_name (domainname, gid, is_primary)\
- VALUES (%s, %s, %s)", self._name, self._id, True)
- self._dbh.commit()
- dbc.close()
- else:
- raise DomErr(_(u'The domain “%s” already exists.') % self._name,
+ if not self._new:
+ raise DomErr(_(u"The domain '%s' already exists.") % self._name,
DOMAIN_EXISTS)
+ assert self._directory is not None and self._transport is not None
+ dbc = self._dbh.cursor()
+ dbc.execute("INSERT INTO domain_data VALUES (%s, %s, %s)", self._gid,
+ self._transport.tid, self._directory)
+ dbc.execute("INSERT INTO domain_name VALUES (%s, %s, %s)", self._name,
+ self._gid, True)
+ self._dbh.commit()
+ dbc.close()
+ self._new = False
- def delete(self, delUser=False, delAlias=False):
+ def delete(self, deluser=False, delalias=False):
"""Deletes the domain.
- Keyword arguments:
- delUser -- force deletion of available accounts (bool)
- delAlias -- force deletion of available aliases (bool)
+ Arguments:
+
+ `deluser` : bool
+ force deletion of all available accounts, default `False`
+ `delalias` : bool
+ force deletion of all available aliases, default `False`
"""
- if self._id > 0:
- self._chkDelete(delUser, delAlias)
- dbc = self._dbh.cursor()
- for tbl in ('alias', 'users', 'relocated', 'domain_name',
- 'domain_data'):
- dbc.execute("DELETE FROM %s WHERE gid = %d" % (tbl, self._id))
+ self._chk_state()
+ self._chk_delete(deluser, delalias)
+ dbc = self._dbh.cursor()
+ for tbl in ('alias', 'users', 'relocated', 'domain_name',
+ 'domain_data'):
+ dbc.execute("DELETE FROM %s WHERE gid = %d" % (tbl, self._gid))
+ self._dbh.commit()
+ dbc.close()
+ self._gid = 0
+ self._directory = self._transport = None
+ self._new = True
+
+ def update_transport(self, transport, force=False):
+ """Sets a new transport for the Domain.
+
+ If *force* is `True` the new *transport* will be assigned to all
+ existing accounts. Otherwise the *transport* will be only used for
+ accounts created from now on.
+
+ Arguments:
+
+ `transport` : VirtualMailManager.Transport
+ the new transport
+ `force` : bool
+ enforce new transport setting for all accounts, default `False`
+ """
+ self._chk_state()
+ assert isinstance(transport, Transport)
+ if transport == self._transport:
+ return
+ dbc = self._dbh.cursor()
+ dbc.execute("UPDATE domain_data SET tid = %s WHERE gid = %s",
+ transport.tid, self._gid)
+ if dbc.rowcount > 0:
self._dbh.commit()
- dbc.close()
- else:
- raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
- NO_SUCH_DOMAIN)
-
- def updateTransport(self, transport, force=False):
- """Sets a new transport for the domain.
-
- Keyword arguments:
- transport -- the new transport (str)
- force -- True/False force new transport for all accounts (bool)
- """
- if self._id > 0:
- if transport == self._transport.transport:
- return
- trsp = Transport(self._dbh, transport=transport)
- dbc = self._dbh.cursor()
- dbc.execute("UPDATE domain_data SET tid = %s WHERE gid = %s",
- trsp.id, self._id)
+ if force:
+ dbc.execute("UPDATE users SET tid = %s WHERE gid = %s",
+ transport.tid, self._gid)
if dbc.rowcount > 0:
self._dbh.commit()
- if force:
- dbc.execute("UPDATE users SET tid = %s WHERE gid = %s",
- trsp.id, self._id)
- if dbc.rowcount > 0:
- self._dbh.commit()
- dbc.close()
- else:
- raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
- NO_SUCH_DOMAIN)
-
- def getID(self):
- """Returns the ID of the domain."""
- return self._id
+ dbc.close()
+ self._transport = transport
- def getDir(self):
- """Returns the directory of the domain."""
- return self._domaindir
-
- def getTransport(self):
- """Returns domain's transport."""
- return self._transport.transport
-
- def getTransportID(self):
- """Returns the ID from the domain's transport."""
- return self._transport.id
-
- def getInfo(self):
+ def get_info(self):
"""Returns a dictionary with information about the domain."""
- sql = """\
-SELECT gid, domainname, transport, domaindir, aliasdomains, accounts,
- aliases, relocated
+ self._chk_state()
+ sql = """SELECT gid, domainname, transport, domaindir, aliasdomains,
+ accounts, aliases, relocated
FROM vmm_domain_info
- WHERE gid = %i""" % self._id
+ WHERE gid = %i""" % self._gid
dbc = self._dbh.cursor()
dbc.execute(sql)
info = dbc.fetchone()
dbc.close()
- if info is None:
- raise DomErr(_(u"The domain “%s” doesn't exist.") % self._name,
- NO_SUCH_DOMAIN)
- else:
- keys = ['gid', 'domainname', 'transport', 'domaindir',
- 'aliasdomains', 'accounts', 'aliases', 'relocated']
- return dict(zip(keys, info))
+ keys = ('gid', 'domainname', 'transport', 'domaindir', 'aliasdomains',
+ 'accounts', 'aliases', 'relocated')
+ return dict(zip(keys, info))
- def getAccounts(self):
- """Returns a list with all accounts from the domain."""
+ def get_accounts(self):
+ """Returns a list with all accounts of the domain."""
+ self._chk_state()
dbc = self._dbh.cursor()
dbc.execute("SELECT local_part from users where gid = %s ORDER BY\
- local_part", self._id)
+ local_part", self._gid)
users = dbc.fetchall()
dbc.close()
accounts = []
@@ -242,48 +274,72 @@
accounts = [addr((account[0], _dom)) for account in users]
return accounts
- def getAliases(self):
- """Returns a list with all aliases from the domain."""
+ def get_aliases(self):
+ """Returns a list with all aliases e-mail addresses of the domain."""
+ self._chk_state()
dbc = self._dbh.cursor()
dbc.execute("SELECT DISTINCT address FROM alias WHERE gid = %s\
- ORDER BY address", self._id)
+ ORDER BY address", self._gid)
addresses = dbc.fetchall()
dbc.close()
aliases = []
- if len(addresses) > 0:
+ if addresses:
addr = u'@'.join
_dom = self._name
aliases = [addr((alias[0], _dom)) for alias in addresses]
return aliases
- def getRelocated(self):
- """Returns a list with all addresses from relocated users."""
+ def get_relocated(self):
+ """Returns a list with all addresses of relocated users."""
+ self._chk_state()
dbc = self._dbh.cursor()
dbc.execute("SELECT address FROM relocated WHERE gid = %s\
- ORDER BY address", self._id)
+ ORDER BY address", self._gid)
addresses = dbc.fetchall()
dbc.close()
relocated = []
- if len(addresses) > 0:
+ if addresses:
addr = u'@'.join
_dom = self._name
relocated = [addr((address[0], _dom)) for address in addresses]
return relocated
- def getAliaseNames(self):
- """Returns a list with all alias names from the domain."""
+ def get_aliase_names(self):
+ """Returns a list with all alias domain names of the domain."""
+ self._chk_state()
dbc = self._dbh.cursor()
dbc.execute("SELECT domainname FROM domain_name WHERE gid = %s\
- AND NOT is_primary ORDER BY domainname", self._id)
+ AND NOT is_primary ORDER BY domainname", self._gid)
anames = dbc.fetchall()
dbc.close()
aliasdomains = []
- if len(anames) > 0:
+ if anames:
aliasdomains = [aname[0] for aname in anames]
return aliasdomains
def search(dbh, pattern=None, like=False):
+ """'Search' for domains by *pattern* in the database.
+
+ *pattern* may be a domain name or a partial domain name - starting
+ and/or ending with a '%' sign. When the *pattern* starts or ends with
+ a '%' sign *like* has to be `True` to perform a wildcard search.
+ To retrieve all available domains use the arguments' default values.
+
+ This function returns a tuple with a list and a dict: (order, domains).
+ The order list contains the domains' gid, alphabetical sorted by the
+ primary domain name. The domains dict's keys are the gids of the
+ domains. The value of item is a list. The first list element contains
+ the primary domain name or `None`. The elements [1:] contains the
+ names of alias domains.
+
+ Arguments:
+
+ `pattern` : basestring
+ a (partial) domain name (starting and/or ending with a "%" sign)
+ `like` : bool
+ should be `True` when *pattern* starts/ends with a "%" sign
+ """
if pattern is not None and like is False:
pattern = check_domainname(pattern)
sql = 'SELECT gid, domainname, is_primary FROM domain_name'
@@ -301,7 +357,7 @@
domdict = {}
order = [dom[0] for dom in doms if dom[2]]
- if len(order) == 0:
+ if not order:
for dom in doms:
if dom[0] not in order:
order.append(dom[0])
@@ -321,7 +377,6 @@
"""Returns the group id of the domain *domainname*.
If the domain couldn't be found in the database 0 will be returned.
-
"""
domainname = check_domainname(domainname)
dbc = dbh.cursor()