--- a/VirtualMailManager/Handler.py Wed Jul 28 01:03:56 2010 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,657 +0,0 @@
-# -*- coding: UTF-8 -*-
-# Copyright (c) 2007 - 2010, Pascal Volk
-# See COPYING for distribution information.
-
-"""
- VirtualMailManager.Handler
-
- A wrapper class. It wraps round all other classes and does some
- dependencies checks.
-
- Additionally it communicates with the PostgreSQL database, creates
- or deletes directories of domains or users.
-"""
-
-import os
-import re
-
-from shutil import rmtree
-from subprocess import Popen, PIPE
-
-from pyPgSQL import PgSQL # python-pgsql - http://pypgsql.sourceforge.net
-
-from VirtualMailManager.Account import Account
-from VirtualMailManager.Alias import Alias
-from VirtualMailManager.AliasDomain import AliasDomain
-from VirtualMailManager.common import exec_ok
-from VirtualMailManager.Config import Config as Cfg
-from VirtualMailManager.constants import \
- ACCOUNT_EXISTS, ALIAS_EXISTS, CONF_NOFILE, CONF_NOPERM, CONF_WRONGPERM, \
- DATABASE_ERROR, DOMAINDIR_GROUP_MISMATCH, DOMAIN_INVALID, \
- FOUND_DOTS_IN_PATH, INVALID_ARGUMENT, MAILDIR_PERM_MISMATCH, \
- NOT_EXECUTABLE, NO_SUCH_ACCOUNT, NO_SUCH_ALIAS, NO_SUCH_BINARY, \
- NO_SUCH_DIRECTORY, NO_SUCH_RELOCATED, RELOCATED_EXISTS
-from VirtualMailManager.Domain import Domain, get_gid
-from VirtualMailManager.EmailAddress import EmailAddress
-from VirtualMailManager.errors import \
- DomainError, NotRootError, PermissionError, VMMError
-from VirtualMailManager.mailbox import new as new_mailbox
-from VirtualMailManager.pycompat import any
-from VirtualMailManager.Relocated import Relocated
-from VirtualMailManager.Transport import Transport
-
-
-_ = lambda msg: msg
-
-CFG_FILE = 'vmm.cfg'
-CFG_PATH = '/root:/usr/local/etc:/etc'
-RE_DOMAIN_SEARCH = """^[a-z0-9-\.]+$"""
-TYPE_ACCOUNT = 0x1
-TYPE_ALIAS = 0x2
-TYPE_RELOCATED = 0x4
-OTHER_TYPES = {
- TYPE_ACCOUNT: (_(u'an account'), ACCOUNT_EXISTS),
- TYPE_ALIAS: (_(u'an alias'), ALIAS_EXISTS),
- TYPE_RELOCATED: (_(u'a relocated user'), RELOCATED_EXISTS),
-}
-
-
-class Handler(object):
- """Wrapper class to simplify the access on all the stuff from
- VirtualMailManager"""
- __slots__ = ('_cfg', '_cfg_fname', '_dbh', '_warnings')
-
- def __init__(self, skip_some_checks=False):
- """Creates a new Handler instance.
-
- ``skip_some_checks`` : bool
- When a derived class knows how to handle all checks this
- argument may be ``True``. By default it is ``False`` and
- all checks will be performed.
-
- Throws a NotRootError if your uid is greater 0.
- """
- self._cfg_fname = ''
- self._warnings = []
- self._cfg = None
- self._dbh = None
-
- if os.geteuid():
- raise NotRootError(_(u"You are not root.\n\tGood bye!\n"),
- CONF_NOPERM)
- if self._check_cfg_file():
- self._cfg = Cfg(self._cfg_fname)
- self._cfg.load()
- if not skip_some_checks:
- self._cfg.check()
- self._chkenv()
-
- def _find_cfg_file(self):
- """Search the CFG_FILE in CFG_PATH.
- Raise a VMMError when no vmm.cfg could be found.
- """
- for path in CFG_PATH.split(':'):
- tmp = os.path.join(path, CFG_FILE)
- if os.path.isfile(tmp):
- self._cfg_fname = tmp
- break
- if not self._cfg_fname:
- raise VMMError(_(u"Could not find '%(cfg_file)s' in: "
- u"'%(cfg_path)s'") % {'cfg_file': CFG_FILE,
- 'cfg_path': CFG_PATH}, CONF_NOFILE)
-
- def _check_cfg_file(self):
- """Checks the configuration file, returns bool"""
- self._find_cfg_file()
- fstat = os.stat(self._cfg_fname)
- fmode = int(oct(fstat.st_mode & 0777))
- if fmode % 100 and fstat.st_uid != fstat.st_gid or \
- fmode % 10 and fstat.st_uid == fstat.st_gid:
- raise PermissionError(_(u"wrong permissions for '%(file)s': "
- u"%(perms)s\n`chmod 0600 %(file)s` would "
- u"be great.") % {'file': self._cfg_fname,
- 'perms': fmode}, CONF_WRONGPERM)
- else:
- return True
-
- def _chkenv(self):
- """Make sure our base_directory is a directory and that all
- required executables exists and are executable.
- If not, a VMMError will be raised"""
- basedir = self._cfg.dget('misc.base_directory')
- if not os.path.exists(basedir):
- old_umask = os.umask(0006)
- os.makedirs(basedir, 0771)
- os.chown(basedir, 0, 0)
- os.umask(old_umask)
- elif not os.path.isdir(basedir):
- raise VMMError(_(u"'%(path)s' is not a directory.\n(%(cfg_file)s: "
- u"section 'misc', option 'base_directory')") %
- {'path': basedir, 'cfg_file': self._cfg_fname},
- NO_SUCH_DIRECTORY)
- for opt, val in self._cfg.items('bin'):
- try:
- exec_ok(val)
- except VMMError, err:
- if err.code is NO_SUCH_BINARY:
- raise VMMError(_(u"'%(binary)s' doesn't exist.\n"
- u"(%(cfg_file)s: section 'bin', option "
- u"'%(option)s')") % {'binary': val,
- 'cfg_file': self._cfg_fname, 'option': opt},
- err.code)
- elif err.code is NOT_EXECUTABLE:
- raise VMMError(_(u"'%(binary)s' is not executable.\n"
- u"(%(cfg_file)s: section 'bin', option "
- u"'%(option)s')") % {'binary': val,
- 'cfg_file': self._cfg_fname, 'option': opt},
- err.code)
- else:
- raise
-
- def _db_connect(self):
- """Creates a pyPgSQL.PgSQL.connection instance."""
- if self._dbh is None or (isinstance(self._dbh, PgSQL.Connection) and
- not self._dbh._isOpen):
- try:
- self._dbh = PgSQL.connect(
- database=self._cfg.dget('database.name'),
- user=self._cfg.pget('database.user'),
- host=self._cfg.dget('database.host'),
- password=self._cfg.pget('database.pass'),
- client_encoding='utf8', unicode_results=True)
- dbc = self._dbh.cursor()
- dbc.execute("SET NAMES 'UTF8'")
- dbc.close()
- except PgSQL.libpq.DatabaseError, err:
- raise VMMError(str(err), DATABASE_ERROR)
-
- def _chk_other_address_types(self, address, exclude):
- """Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`,
- `TYPE_ALIAS` or `TYPE_RELOCATED`, but not as the `TYPE_*` specified
- by *exclude*. If the *address* is known as one of the `TYPE_*`s
- the according `TYPE_*` constant will be returned. Otherwise 0 will
- be returned."""
- assert exclude in (TYPE_ACCOUNT, TYPE_ALIAS, TYPE_RELOCATED) and \
- isinstance(address, EmailAddress)
- if exclude is not TYPE_ACCOUNT:
- account = Account(self._dbh, address)
- if account:
- return TYPE_ACCOUNT
- if exclude is not TYPE_ALIAS:
- alias = Alias(self._dbh, address)
- if alias:
- return TYPE_ALIAS
- if exclude is not TYPE_RELOCATED:
- relocated = Relocated(self._dbh, address)
- if relocated:
- return TYPE_RELOCATED
- return 0
-
- def _is_other_address(self, address, exclude):
- """Checks if *address* is known for an Account (TYPE_ACCOUNT),
- Alias (TYPE_ALIAS) or Relocated (TYPE_RELOCATED), except for
- *exclude*. Returns `False` if the address is not known for other
- types.
-
- Raises a `VMMError` if the address is known.
- """
- other = self._chk_other_address_types(address, exclude)
- if not other:
- return False
- msg = _(u"There is already %(a_type)s with the address '%(address)s'.")
- raise VMMError(msg % {'a_type': OTHER_TYPES[other][0],
- 'address': address}, OTHER_TYPES[other][1])
-
- def _get_account(self, address):
- """Return an Account instances for the given address (str)."""
- address = EmailAddress(address)
- self._db_connect()
- return Account(self._dbh, address)
-
- def _get_alias(self, address):
- """Return an Alias instances for the given address (str)."""
- address = EmailAddress(address)
- self._db_connect()
- return Alias(self._dbh, address)
-
- def _get_relocated(self, address):
- """Return a Relocated instances for the given address (str)."""
- address = EmailAddress(address)
- self._db_connect()
- return Relocated(self._dbh, address)
-
- def _get_domain(self, domainname):
- """Return a Domain instances for the given domain name (str)."""
- self._db_connect()
- return Domain(self._dbh, domainname)
-
- def _get_disk_usage(self, directory):
- """Estimate file space usage for the given directory.
-
- Keyword arguments:
- directory -- the directory to summarize recursively disk usage for
- """
- if self._isdir(directory):
- return Popen([self._cfg.dget('bin.du'), "-hs", directory],
- stdout=PIPE).communicate()[0].split('\t')[0]
- else:
- return 0
-
- def _isdir(self, directory):
- """Check if `directory` is a directory. Returns bool.
- When `directory` isn't a directory, a warning will be appended to
- _warnings."""
- isdir = os.path.isdir(directory)
- if not isdir:
- self._warnings.append(_('No such directory: %s') % directory)
- return isdir
-
- def _make_domain_dir(self, domain):
- """Create a directory for the `domain` and its accounts."""
- cwd = os.getcwd()
- hashdir, domdir = domain.directory.split(os.path.sep)[-2:]
- os.chdir(self._cfg.dget('misc.base_directory'))
- if not os.path.isdir(hashdir):
- os.mkdir(hashdir, 0711)
- os.chown(hashdir, 0, 0)
- os.mkdir(os.path.join(hashdir, domdir),
- self._cfg.dget('domain.directory_mode'))
- os.chown(domain.directory, 0, domain.gid)
- os.chdir(cwd)
-
- def _make_home(self, account):
- """Create a home directory for the new Account *account*."""
- os.umask(0007)
- os.chdir(account.domain_directory)
- os.mkdir('%s' % account.uid, self._cfg.dget('account.directory_mode'))
- os.chown('%s' % account.uid, account.uid, account.gid)
-
- def _delete_home(self, domdir, uid, gid):
- """Delete a user's home directory."""
- if uid > 0 and gid > 0:
- userdir = '%s' % uid
- if userdir.count('..') or domdir.count('..'):
- raise VMMError(_(u'Found ".." in home directory path.'),
- FOUND_DOTS_IN_PATH)
- if os.path.isdir(domdir):
- os.chdir(domdir)
- if os.path.isdir(userdir):
- mdstat = os.stat(userdir)
- if (mdstat.st_uid, mdstat.st_gid) != (uid, gid):
- raise VMMError(_(u'Detected owner/group mismatch in '
- u'home directory.'),
- MAILDIR_PERM_MISMATCH)
- rmtree(userdir, ignore_errors=True)
- else:
- raise VMMError(_(u"No such directory: %s") %
- os.path.join(domdir, userdir),
- NO_SUCH_DIRECTORY)
-
- def _delete_domain_dir(self, domdir, gid):
- """Delete a domain's directory."""
- if gid > 0:
- if not self._isdir(domdir):
- return
- basedir = self._cfg.dget('misc.base_directory')
- domdirdirs = domdir.replace(basedir + '/', '').split('/')
- domdirparent = os.path.join(basedir, domdirdirs[0])
- if basedir.count('..') or domdir.count('..'):
- raise VMMError(_(u'Found ".." in domain directory path.'),
- FOUND_DOTS_IN_PATH)
- if os.path.isdir(domdirparent):
- os.chdir(domdirparent)
- if os.lstat(domdirdirs[1]).st_gid != gid:
- raise VMMError(_(u'Detected group mismatch in domain '
- u'directory.'), DOMAINDIR_GROUP_MISMATCH)
- rmtree(domdirdirs[1], ignore_errors=True)
-
- def has_warnings(self):
- """Checks if warnings are present, returns bool."""
- return bool(len(self._warnings))
-
- def get_warnings(self):
- """Returns a list with all available warnings and resets all
- warnings.
- """
- ret_val = self._warnings[:]
- del self._warnings[:]
- return ret_val
-
- def cfg_dget(self, option):
- """Get the configured value of the *option* (section.option).
- When the option was not configured its default value will be
- returned."""
- return self._cfg.dget(option)
-
- def cfg_pget(self, option):
- """Get the configured value of the *option* (section.option)."""
- return self._cfg.pget(option)
-
- def cfg_install(self):
- """Installs the cfg_dget method as ``cfg_dget`` into the built-in
- namespace."""
- import __builtin__
- assert 'cfg_dget' not in __builtin__.__dict__
- __builtin__.__dict__['cfg_dget'] = self._cfg.dget
-
- def domain_add(self, domainname, transport=None):
- """Wrapper around Domain.set_transport() and Domain.save()"""
- dom = self._get_domain(domainname)
- if transport is None:
- dom.set_transport(Transport(self._dbh,
- transport=self._cfg.dget('misc.transport')))
- else:
- dom.set_transport(Transport(self._dbh, transport=transport))
- dom.set_directory(self._cfg.dget('misc.base_directory'))
- dom.save()
- self._make_domain_dir(dom)
-
- def domain_transport(self, domainname, transport, force=None):
- """Wrapper around Domain.update_transport()"""
- if force is not None and force != 'force':
- raise DomainError(_(u"Invalid argument: '%s'") % force,
- INVALID_ARGUMENT)
- dom = self._get_domain(domainname)
- trsp = Transport(self._dbh, transport=transport)
- if force is None:
- dom.update_transport(trsp)
- else:
- dom.update_transport(trsp, force=True)
-
- def domain_delete(self, domainname, force=None):
- """Wrapper around Domain.delete()"""
- if force and force not in ('deluser', 'delalias', 'delall'):
- raise DomainError(_(u"Invalid argument: '%s'") % force,
- INVALID_ARGUMENT)
- dom = self._get_domain(domainname)
- gid = dom.gid
- domdir = dom.directory
- if self._cfg.dget('domain.force_deletion') or force == 'delall':
- dom.delete(True, True)
- elif force == 'deluser':
- dom.delete(deluser=True)
- elif force == 'delalias':
- dom.delete(delalias=True)
- else:
- dom.delete()
- if self._cfg.dget('domain.delete_directory'):
- self._delete_domain_dir(domdir, gid)
-
- def domain_info(self, domainname, details=None):
- """Wrapper around Domain.get_info(), Domain.get_accounts(),
- Domain.get_aliase_names(), Domain.get_aliases() and
- Domain.get_relocated."""
- if details not in [None, 'accounts', 'aliasdomains', 'aliases', 'full',
- 'relocated']:
- raise VMMError(_(u'Invalid argument: ā%sā') % details,
- INVALID_ARGUMENT)
- dom = self._get_domain(domainname)
- dominfo = dom.get_info()
- if dominfo['domainname'].startswith('xn--'):
- dominfo['domainname'] += ' (%s)' % \
- dominfo['domainname'].decode('idna')
- if details is None:
- return dominfo
- elif details == 'accounts':
- return (dominfo, dom.get_accounts())
- elif details == 'aliasdomains':
- return (dominfo, dom.get_aliase_names())
- elif details == 'aliases':
- return (dominfo, dom.get_aliases())
- elif details == 'relocated':
- return(dominfo, dom.get_relocated())
- else:
- return (dominfo, dom.get_aliase_names(), dom.get_accounts(),
- dom.get_aliases(), dom.get_relocated())
-
- def aliasdomain_add(self, aliasname, domainname):
- """Adds an alias domain to the domain.
-
- Arguments:
-
- `aliasname` : basestring
- The name of the alias domain
- `domainname` : basestring
- The name of the target domain
- """
- dom = self._get_domain(domainname)
- alias_dom = AliasDomain(self._dbh, aliasname)
- alias_dom.set_destination(dom)
- alias_dom.save()
-
- def aliasdomain_info(self, aliasname):
- """Returns a dict (keys: "alias" and "domain") with the names of
- the alias domain and its primary domain."""
- self._db_connect()
- alias_dom = AliasDomain(self._dbh, aliasname)
- return alias_dom.info()
-
- def aliasdomain_switch(self, aliasname, domainname):
- """Modifies the target domain of an existing alias domain.
-
- Arguments:
-
- `aliasname` : basestring
- The name of the alias domain
- `domainname` : basestring
- The name of the new target domain
- """
- dom = self._get_domain(domainname)
- alias_dom = AliasDomain(self._dbh, aliasname)
- alias_dom.set_destination(dom)
- alias_dom.switch()
-
- def aliasdomain_delete(self, aliasname):
- """Deletes the given alias domain.
-
- Argument:
-
- `aliasname` : basestring
- The name of the alias domain
- """
- self._db_connect()
- alias_dom = AliasDomain(self._dbh, aliasname)
- alias_dom.delete()
-
- def domain_list(self, pattern=None):
- """Wrapper around function search() from module Domain."""
- from VirtualMailManager.Domain import search
- like = False
- if pattern and (pattern.startswith('%') or pattern.endswith('%')):
- like = True
- if not re.match(RE_DOMAIN_SEARCH, pattern.strip('%')):
- raise VMMError(_(u"The pattern '%s' contains invalid "
- u"characters.") % pattern, DOMAIN_INVALID)
- self._db_connect()
- return search(self._dbh, pattern=pattern, like=like)
-
- def user_add(self, emailaddress, password):
- """Wrapper around Account.set_password() and Account.save()."""
- acc = self._get_account(emailaddress)
- acc.set_password(password)
- acc.save()
- oldpwd = os.getcwd()
- self._make_home(acc)
- mailbox = new_mailbox(acc)
- mailbox.create()
- folders = self._cfg.dget('mailbox.folders').split(':')
- if any(folders):
- bad = mailbox.add_boxes(folders,
- self._cfg.dget('mailbox.subscribe'))
- if bad:
- self._warnings.append(_(u"Skipped mailbox folders:") +
- '\n\t- ' + '\n\t- '.join(bad))
- os.chdir(oldpwd)
-
- def alias_add(self, aliasaddress, *targetaddresses):
- """Creates a new `Alias` entry for the given *aliasaddress* with
- the given *targetaddresses*."""
- alias = self._get_alias(aliasaddress)
- destinations = [EmailAddress(address) for address in targetaddresses]
- warnings = []
- destinations = alias.add_destinations(destinations, warnings)
- if warnings:
- self._warnings.append(_('Ignored destination addresses:'))
- self._warnings.extend((' * %s' % w for w in warnings))
- for destination in destinations:
- if get_gid(self._dbh, destination.domainname) and \
- not self._chk_other_address_types(destination, TYPE_RELOCATED):
- self._warnings.append(_(u"The destination account/alias '%s' "
- u"doesn't exist.") % destination)
-
- def user_delete(self, emailaddress, force=None):
- """Wrapper around Account.delete(...)"""
- if force not in (None, 'delalias'):
- raise VMMError(_(u"Invalid argument: '%s'") % force,
- INVALID_ARGUMENT)
- acc = self._get_account(emailaddress)
- if not acc:
- raise VMMError(_(u"The account '%s' doesn't exist.") %
- acc.address, NO_SUCH_ACCOUNT)
- uid = acc.uid
- gid = acc.gid
- dom_dir = acc.domain_directory
- acc_dir = acc.home
- acc.delete(bool(force))
- if self._cfg.dget('account.delete_directory'):
- try:
- self._delete_home(dom_dir, uid, gid)
- except VMMError, err:
- if err.code in (FOUND_DOTS_IN_PATH, MAILDIR_PERM_MISMATCH,
- NO_SUCH_DIRECTORY):
- warning = _(u"""\
-The account has been successfully deleted from the database.
- But an error occurred while deleting the following directory:
- ā%(directory)sā
- Reason: %(reason)s""") % \
- {'directory': acc_dir, 'reason': err.msg}
- self._warnings.append(warning)
- else:
- raise
-
- def alias_info(self, aliasaddress):
- """Returns an iterator object for all destinations (`EmailAddress`
- instances) for the `Alias` with the given *aliasaddress*."""
- alias = self._get_alias(aliasaddress)
- if alias:
- return alias.get_destinations()
- if not self._is_other_address(alias.address, TYPE_ALIAS):
- raise VMMError(_(u"The alias '%s' doesn't exist.") %
- alias.address, NO_SUCH_ALIAS)
-
- def alias_delete(self, aliasaddress, targetaddress=None):
- """Deletes the `Alias` *aliasaddress* with all its destinations from
- the database. If *targetaddress* is not ``None``, only this
- destination will be removed from the alias."""
- alias = self._get_alias(aliasaddress)
- if targetaddress is None:
- alias.delete()
- else:
- alias.del_destination(EmailAddress(targetaddress))
-
- def user_info(self, emailaddress, details=None):
- """Wrapper around Account.get_info(...)"""
- if details not in (None, 'du', 'aliases', 'full'):
- raise VMMError(_(u"Invalid argument: '%s'") % details,
- INVALID_ARGUMENT)
- acc = self._get_account(emailaddress)
- if not acc:
- if not self._is_other_address(acc.address, TYPE_ACCOUNT):
- raise VMMError(_(u"The account '%s' doesn't exist.") %
- acc.address, NO_SUCH_ACCOUNT)
- info = acc.get_info()
- if self._cfg.dget('account.disk_usage') or details in ('du', 'full'):
- path = os.path.join(acc.home, acc.mail_location.directory)
- info['disk usage'] = self._get_disk_usage(path)
- if details in (None, 'du'):
- return info
- if details in ('aliases', 'full'):
- return (info, acc.get_aliases())
- return info
-
- def user_by_uid(self, uid):
- """Search for an Account by its *uid*.
- Returns a dict (address, uid and gid) if a user could be found."""
- from VirtualMailManager.Account import get_account_by_uid
- self._db_connect()
- return get_account_by_uid(uid, self._dbh)
-
- def user_password(self, emailaddress, password):
- """Wrapper for Account.modify('password' ...)."""
- if not isinstance(password, basestring) or not password:
- raise VMMError(_(u"Could not accept password: '%s'") % password,
- INVALID_ARGUMENT)
- acc = self._get_account(emailaddress)
- if not acc:
- raise VMMError(_(u"The account '%s' doesn't exist.") %
- acc.address, NO_SUCH_ACCOUNT)
- acc.modify('password', password)
-
- def user_name(self, emailaddress, name):
- """Wrapper for Account.modify('name', ...)."""
- if not isinstance(name, basestring) or not name:
- raise VMMError(_(u"Could not accept name: '%s'") % name,
- INVALID_ARGUMENT)
- acc = self._get_account(emailaddress)
- if not acc:
- raise VMMError(_(u"The account '%s' doesn't exist.") %
- acc.address, NO_SUCH_ACCOUNT)
- acc.modify('name', name)
-
- def user_transport(self, emailaddress, transport):
- """Wrapper for Account.modify('transport', ...)."""
- if not isinstance(transport, basestring) or not transport:
- raise VMMError(_(u"Could not accept transport: '%s'") % transport,
- INVALID_ARGUMENT)
- acc = self._get_account(emailaddress)
- if not acc:
- raise VMMError(_(u"The account '%s' doesn't exist.") %
- acc.address, NO_SUCH_ACCOUNT)
- acc.modify('transport', transport)
-
- def user_disable(self, emailaddress, service=None):
- """Wrapper for Account.disable(service)"""
- if service not in (None, 'all', 'imap', 'pop3', 'smtp', 'sieve'):
- raise VMMError(_(u"Could not accept service: '%s'") % service,
- INVALID_ARGUMENT)
- acc = self._get_account(emailaddress)
- if not acc:
- raise VMMError(_(u"The account '%s' doesn't exist.") %
- acc.address, NO_SUCH_ACCOUNT)
- acc.disable(service)
-
- def user_enable(self, emailaddress, service=None):
- """Wrapper for Account.enable(service)"""
- if service not in (None, 'all', 'imap', 'pop3', 'smtp', 'sieve'):
- raise VMMError(_(u"Could not accept service: '%s'") % service,
- INVALID_ARGUMENT)
- acc = self._get_account(emailaddress)
- if not acc:
- raise VMMError(_(u"The account '%s' doesn't exist.") %
- acc.address, NO_SUCH_ACCOUNT)
- acc.enable(service)
-
- def relocated_add(self, emailaddress, targetaddress):
- """Creates a new `Relocated` entry in the database. If there is
- already a relocated user with the given *emailaddress*, only the
- *targetaddress* for the relocated user will be updated."""
- relocated = self._get_relocated(emailaddress)
- relocated.set_destination(EmailAddress(targetaddress))
-
- def relocated_info(self, emailaddress):
- """Returns the target address of the relocated user with the given
- *emailaddress*."""
- relocated = self._get_relocated(emailaddress)
- if relocated:
- return relocated.get_info()
- if not self._is_other_address(relocated.address, TYPE_RELOCATED):
- raise VMMError(_(u"The relocated user '%s' doesn't exist.") %
- relocated.address, NO_SUCH_RELOCATED)
-
- def relocated_delete(self, emailaddress):
- """Deletes the relocated user with the given *emailaddress* from
- the database."""
- relocated = self._get_relocated(emailaddress)
- relocated.delete()
-
-del _