Converted VirtualMailManager and Postconf to new-style classes.
A few small cleanups.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Copyright 2007 - 2009, VEB IT
# See COPYING for distribution information.
"""This is the vmm main script."""
from VirtualMailManager.constants.VERSION import *
import locale
import os
import sys
import gettext
from time import strftime, strptime
from VirtualMailManager.VirtualMailManager import VirtualMailManager
import VirtualMailManager.Exceptions as VMME
import VirtualMailManager.constants.EXIT as EXIT
import VirtualMailManager.constants.ERROR as ERR
def w_err(code, *args):
for arg in args:
sys.stderr.write(arg.encode(ENCODING, 'replace'))
sys.stderr.write('\n')
sys.exit(code)
def w_std(*args):
for arg in args:
sys.stdout.write(arg.encode(ENCODING, 'replace'))
sys.stdout.write('\n')
def usage(excode=0, errMsg=None):
u_head = _("""\
Usage: %s SUBCOMMAND OBJECT ARGS*
short long
subcommand object args (* = optional)\n""")\
% __prog__
u_body = """\
da domainadd domain.tld transport*
di domaininfo domain.tld details*
dt domaintransport domain.tld transport force*
dd domaindelete domain.tld delalias*|deluser*|delall*
ada aliasdomainadd aliasdomain.tld domain.tld
adi aliasdomaininfo aliasdomain.tld
ads aliasdomainswitch aliasdomain.tld domain.tld
add aliasdomaindelete aliasdomain.tld
ua useradd user@domain.tld password*
ui userinfo user@domain.tld details*
un username user@domain.tld 'Users Name'
up userpassword user@domain.tld password*
ut usertransport user@domain.tld transport
u0 userdisable user@domain.tld service*
u1 userenable user@domain.tld service*
ud userdelete user@domain.tld delalias*
aa aliasadd alias@domain.tld user@domain.tld
ai aliasinfo alias@domain.tld
ad aliasdelete alias@domain.tld user@domain.tld*
ra relocatedadd exaddr@domain.tld user@domain.tld
ri relocatedinfo exaddr@domain.tld
rf relocateddelete exaddr@domain.tld
gu getuser userid
ld listdomains pattern*
cf configure section*
h help
v version
"""
if excode > 0:
if errMsg is None:
w_err(excode, u_head, u_body)
else:
w_err(excode, u_head, u_body, '%s: %s\n' % (_('Error'), errMsg))
else:
w_std(u_head, u_body)
sys.exit(excode)
def get_vmm():
try:
vmm = VirtualMailManager()
return vmm
except (VMME.VMMException, VMME.VMMNotRootException, VMME.VMMPermException,
VMME.VMMConfigException), e:
w_err(e.code(), "%s: %s\n" % (_('Error'), e.msg()))
def _getOrder():
order = ()
if vmm.cfgGetInt('misc', 'dovecotvers') > 11:
sieve_name = 'sieve'
else:
sieve_name = 'managesieve'
if argv[1] in ['di', 'domaininfo']:
order = (('domainname', 0), ('gid', 1), ('transport', 0),
('domaindir', 0), ('aliasdomains', 0), ('accounts', 0),
('aliases', 0), ('relocated', 0))
elif argv[1] in ['ui', 'userinfo']:
if argc == 4 and argv[3] != 'aliases'\
or vmm.cfgGetBoolean('maildir', 'diskusage'):
order = (('address', 0), ('name', 0), ('uid', 1), ('gid', 1),
('transport', 0), ('maildir', 0), ('disk usage', 0),
('smtp', 1), ('pop3', 1), ('imap', 1), (sieve_name, 1))
else:
order = (('address', 0), ('name', 0), ('uid', 1), ('gid', 1),
('transport', 0), ('maildir', 0), ('smtp', 1), ('pop3', 1),
('imap', 1), (sieve_name, 1))
elif argv[1] in ['gu', 'getuser']:
order = (('uid', 1), ('gid', 1), ('address', 0))
return order
def _printInfo(info, title):
msg = '%s %s' % (title, _('information'))
w_std ('%s\n%s' % (msg, '-'*len(msg)))
for k,u in _getOrder():
if u:
w_std('\t%s: %s' % (k.upper().ljust(15, '.'), info[k]))
else:
w_std('\t%s: %s' % (k.title().ljust(15, '.'), info[k]))
print
def _printList(alist, title):
msg = '%s %s' % (_('Available'), title)
w_std('%s\n%s' % (msg, '-'*len(msg)))
if len(alist) > 0:
if title != _('alias domains'):
for val in alist:
w_std('\t%s' % val)
else:
for dom in alist:
if not dom.startswith('xn--'):
w_std('\t%s' % dom)
else:
w_std('\t%s (%s)' % (dom, vmm.ace2idna(dom)))
else:
w_std(_('\tNone'))
print
def _printAliases(alias, targets):
msg = _('Alias information')
w_std('%s\n%s' % (msg, '-'*len(msg)))
w_std(_('\tMail for %s will be redirected to:') % alias)
if len(targets) > 0:
for target in targets:
w_std('\t * %s' % target)
else:
w_std(_('\tNone'))
print
def _printRelocated(addr_dest):
msg = _(u'Relocated information')
w_std('%s\n%s' % (msg, '-'*len(msg)))
w_std(_(u'\tUser »%(addr)s« has moved to »%(dest)s«') % addr_dest)
print
def _formatDom(domain, main=True):
if domain.startswith('xn--'):
domain = '%s (%s)' % (domain, vmm.ace2idna(domain))
if main:
return '\t[+] %s' % domain
else:
return '\t[-] %s' % domain
def _printDomList(dids, domains):
if argc < 3:
msg = _('Available domains')
else:
msg = _('Matching domains')
w_std('%s\n%s' % (msg, '-'*len(msg)))
if not len(domains):
w_std(_('\tNone'))
else:
for id in dids:
if domains[id][0] is not None:
w_std(_formatDom(domains[id][0]))
if len(domains[id]) > 1:
for alias in domains[id][1:]:
w_std(_formatDom(alias, main=False))
print
def _printAliasDomInfo(info):
msg = _('Alias domain information')
for k in ['alias', 'domain']:
if info[k].startswith('xn--'):
info[k] = "%s (%s)" % (info[k], vmm.ace2idna(info[k]))
w_std('%s\n%s' % (msg, '-'*len(msg)))
w_std(
_('\tThe alias domain %(alias)s belongs to:\n\t * %(domain)s')%info)
print
def configure():
if need_setup or len(argv) < 3:
vmm.configure()
else:
vmm.configure(argv[2])
def domain_add():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing domain name.'))
elif argc < 4:
vmm.domainAdd(argv[2].lower())
else:
vmm.domainAdd(argv[2].lower(), argv[3])
def domain_delete():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing domain name.'))
elif argc < 4:
vmm.domainDelete(argv[2].lower())
else:
vmm.domainDelete(argv[2].lower(), argv[3])
def domain_info():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing domain name.'))
elif argc < 4:
_printInfo(vmm.domainInfo(argv[2].lower()), _('Domain'))
else:
details = argv[3].lower()
infos = vmm.domainInfo(argv[2].lower(), details)
_printInfo(infos[0], _('Domain'))
if details == 'accounts':
_printList(infos[1], _('accounts'))
elif details == 'aliasdomains':
_printList(infos[1], _('alias domains'))
elif details == 'aliases':
_printList(infos[1], _('aliases'))
elif details == 'relocated':
_printList(infos[1], _('relocated users'))
else:
_printList(infos[1], _('alias domains'))
_printList(infos[2], _('accounts'))
_printList(infos[3], _('aliases'))
_printList(infos[4], _('relocated users'))
def domain_transport():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing domain name and new transport.'))
if argc < 4:
usage(EXIT.MISSING_ARGS, _(u'Missing new transport.'))
elif argc < 5:
vmm.domainTransport(argv[2].lower(), argv[3])
else:
vmm.domainTransport(argv[2].lower(), argv[3], argv[4])
def alias_domain_add():
if argc < 3:
usage(EXIT.MISSING_ARGS,
_(u'Missing alias domain name and target domain name.'))
elif argc < 4:
usage(EXIT.MISSING_ARGS, _(u'Missing target domain name.'))
else:
vmm.aliasDomainAdd(argv[2].lower(), argv[3].lower())
def alias_domain_info():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing alias domain name.'))
else:
_printAliasDomInfo(vmm.aliasDomainInfo(argv[2].lower()))
def alias_domain_switch():
if argc < 3:
usage(EXIT.MISSING_ARGS,
_(u'Missing alias domain name and target domain name.'))
elif argc < 4:
usage(EXIT.MISSING_ARGS, _(u'Missing target domain name.'))
else:
vmm.aliasDomainSwitch(argv[2].lower(), argv[3].lower())
def alias_domain_delete():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing alias domain name.'))
else:
vmm.aliasDomainDelete(argv[2].lower())
def user_add():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address.'))
elif argc < 4:
password = None
else:
password = argv[3]
vmm.userAdd(argv[2].lower(), password)
def user_delete():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address.'))
elif argc < 4:
vmm.userDelete(argv[2].lower())
else:
vmm.userDelete(argv[2].lower(), argv[3].lower())
def user_info():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address.'))
elif argc < 4:
_printInfo(vmm.userInfo(argv[2].lower()), 'Account')
else:
arg3 = argv[3].lower()
info = vmm.userInfo(argv[2].lower(), arg3)
if arg3 in ['aliases', 'full']:
_printInfo(info[0], 'Account')
_printList(info[1], _(u'alias addresses'))
else:
_printInfo(info, 'Account')
def user_name():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address and users name.'))
if argc < 4:
usage(EXIT.MISSING_ARGS, _('Missing users name.'))
else:
vmm.userName(argv[2].lower(), argv[3])
def user_transport():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address and transport.'))
if argc <4:
usage(EXIT.MISSING_ARGS, _(u'Missing transport.'))
else:
vmm.userTransport(argv[2].lower(), argv[3])
def user_enable():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address.'))
elif argc < 4:
vmm.userEnable(argv[2].lower())
else:
vmm.userEnable(argv[2].lower(), argv[3].lower())
def user_disable():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address.'))
elif argc < 4:
vmm.userDisable(argv[2].lower())
else:
vmm.userDisable(argv[2].lower(), argv[3].lower())
def user_password():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing e-mail address.'))
elif argc < 4:
password = None
else:
password = argv[3]
vmm.userPassword(argv[2].lower(), password)
def alias_add():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing alias address and destination.'))
elif argc < 4:
usage(EXIT.MISSING_ARGS, _(u'Missing destination address.'))
else:
vmm.aliasAdd(argv[2].lower(), argv[3])
def alias_info():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing alias address'))
else:
_printAliases(argv[2].lower(), vmm.aliasInfo(argv[2].lower()))
def alias_delete():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing alias address'))
elif argc < 4:
vmm.aliasDelete(argv[2].lower())
else:
vmm.aliasDelete(argv[2].lower(), argv[3].lower())
def relocated_add():
if argc < 3:
usage(EXIT.MISSING_ARGS,
_(u'Missing relocated address and destination.'))
elif argc < 4:
usage(EXIT.MISSING_ARGS, _(u'Missing destination address.'))
else:
vmm.relocatedAdd(argv[2].lower(), argv[3])
def relocated_info():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing relocated address'))
else:
relocated = argv[2].lower()
_printRelocated({'addr': relocated,
'dest': vmm.relocatedInfo(relocated)})
def relocated_delete():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing relocated address'))
else:
vmm.relocatedDelete(argv[2].lower())
def user_byID():
if argc < 3:
usage(EXIT.MISSING_ARGS, _(u'Missing userid'))
else:
_printInfo(vmm.userByID(argv[2]), u'Account')
def domain_list():
if argc < 3:
order, doms = vmm.domainList()
else:
order, doms = vmm.domainList(argv[2].lower())
_printDomList(order, doms)
def show_warnings():
if vmm.hasWarnings():
w_std(_(u'Warnings:'))
for warning in vmm.getWarnings():
w_std( " * %s" % warning)
def show_version():
w_std('%s, %s %s (%s %s)\nPython %s %s %s\n\n%s %s' % (__prog__,
_('version'), __version__, _('from'), strftime(
locale.nl_langinfo(locale.D_FMT), strptime(__date__, '%Y-%m-%d')),
sys.version.split()[0], _(u'on'), os.uname()[0], __prog__,
'is free software and comes with ABSOLUTELY NO WARRANTY.'))
#def main():
if __name__ == '__main__':
__prog__ = os.path.basename(sys.argv[0])
locale.setlocale(locale.LC_ALL, '')
ENCODING = locale.nl_langinfo(locale.CODESET)
gettext.install(__prog__, '/usr/local/share/locale', unicode=1)
argv = [unicode(arg, ENCODING) for arg in sys.argv]
argc = len(sys.argv)
if argc < 2:
usage(EXIT.MISSING_ARGS)
vmm = get_vmm()
try:
need_setup = not vmm.setupIsDone()
if argv[1] in ['cf', 'configure'] or need_setup:
configure()
elif argv[1] in ['da', 'domainadd']:
domain_add()
elif argv[1] in ['di', 'domaininfo']:
domain_info()
elif argv[1] in ['dt', 'domaintransport']:
domain_transport()
elif argv[1] in ['dd', 'domaindelete']:
domain_delete()
elif argv[1] in ['ada', 'aliasdomainadd']:
alias_domain_add()
elif argv[1] in ['adi', 'aliasdomaininfo']:
alias_domain_info()
elif argv[1] in ['ads', 'aliasdomainswitch']:
alias_domain_switch()
elif argv[1] in ['add', 'aliasdomaindelete']:
alias_domain_delete()
elif argv[1] in ['ua', 'useradd']:
user_add()
elif argv[1] in ['ui', 'userinfo']:
user_info()
elif argv[1] in ['un', 'username']:
user_name()
elif argv[1] in ['up', 'userpassword']:
user_password()
elif argv[1] in ['ut', 'usertransport']:
user_transport()
elif argv[1] in ['u0', 'userdisable']:
user_disable()
elif argv[1] in ['u1', 'userenable']:
user_enable()
elif argv[1] in ['ud', 'userdelete']:
user_delete()
elif argv[1] in ['aa', 'aliasadd']:
alias_add()
elif argv[1] in ['ai', 'aliasinfo']:
alias_info()
elif argv[1] in ['ad', 'aliasdelete']:
alias_delete()
elif argv[1] in ['ra', 'relocatedadd']:
relocated_add()
elif argv[1] in ['ri', 'relocatedinfo']:
relocated_info()
elif argv[1] in ['rd', 'relocateddelete']:
relocated_delete()
elif argv[1] in ['gu', 'getuser']:
user_byID()
elif argv[1] in ['ld', 'listdomains']:
domain_list()
elif argv[1] in ['h', 'help']:
usage()
elif argv[1] in ['v', 'version']:
show_version()
else:
usage(EXIT.UNKNOWN_COMMAND,
u"%s: »%s«" % (_('Unknown subcommand'), argv[1]))
show_warnings()
except (EOFError, KeyboardInterrupt):
w_err(EXIT.USER_INTERRUPT, '\n%s!\n' % _(u'Ouch'))
except (VMME.VMMConfigException, VMME.VMMException), e:
if e.code() != ERR.DATABASE_ERROR:
w_err(e.code(), "%s: %s" % (_(u'Error'), e.msg()))
else:
w_err(e.code(), "%s" % unicode(e.msg(), ENCODING, 'replace'))