VirtualMailManager/password.py
author martin f. krafft <madduck@madduck.net>
Tue, 10 Apr 2012 22:56:30 +0200
branchv0.6.x
changeset 514 d863a44a6353
parent 464 7d4d79ff08d0
child 568 14abdd04ddf5
permissions -rw-r--r--
Make PL/pgSQL function feed back identity for mailboxes/relocated when there are catchall destinations. Without catchall aliases, if no virtual_alias matches, the query can just return NULL and Postfix will later check mailboxes/relocated for the address to rewrite. However, since virtual aliases are handled long before mailboxes/relocated, a catchall alias would also catch mail to mailboxes and relocated addresses, which we do not want. The way to tell postfix to keep delivering is for the virtual alias map to return the search key itself (identity function). This patch changes the postfix_virtual_alias_maps Pl/pgSQL function to do exactly that, but only if there are catchall destinations defined for the domain in question — otherwise it returns NULL when no match is found.

# -*- coding: UTF-8 -*-
# Copyright (c) 2010 - 2011, Pascal Volk
# See COPYING for distribution information.
"""
    VirtualMailManager.password
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    VirtualMailManager's password module to generate password hashes from
    passwords or random passwords. This module provides following
    functions:

        hashed_password = pwhash(password[, scheme][, user])
        random_password = randompw()
        scheme, encoding = verify_scheme(scheme)
        schemes, encodings = list_schemes()
"""

from crypt import crypt
from random import SystemRandom
from subprocess import Popen, PIPE

try:
    import hashlib
except ImportError:
    from VirtualMailManager.pycompat import hashlib

from VirtualMailManager import ENCODING
from VirtualMailManager.emailaddress import EmailAddress
from VirtualMailManager.common import get_unicode, version_str
from VirtualMailManager.constants import VMM_ERROR
from VirtualMailManager.errors import VMMError

COMPAT = hasattr(hashlib, 'compat')
SALTCHARS = './0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
PASSWDCHARS = '._-+#*23456789abcdefghikmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ'
DEFAULT_B64 = (None, 'B64', 'BASE64')
DEFAULT_HEX = (None, 'HEX')
CRYPT_ID_MD5 = 1
CRYPT_ID_BLF = '2a'
CRYPT_ID_SHA256 = 5
CRYPT_ID_SHA512 = 6
CRYPT_SALT_LEN = 2
CRYPT_BLF_ROUNDS_MIN = 4
CRYPT_BLF_ROUNDS_MAX = 31
CRYPT_BLF_SALT_LEN = 22
CRYPT_MD5_SALT_LEN = 8
CRYPT_SHA2_ROUNDS_DEFAULT = 5000
CRYPT_SHA2_ROUNDS_MIN = 1000
CRYPT_SHA2_ROUNDS_MAX = 999999999
CRYPT_SHA2_SALT_LEN = 16
SALTED_ALGO_SALT_LEN = 4


_ = lambda msg: msg
cfg_dget = lambda option: None
_sys_rand = SystemRandom()
_choice = _sys_rand.choice
_get_salt = lambda s_len: ''.join(_choice(SALTCHARS) for x in xrange(s_len))


def _dovecotpw(password, scheme, encoding):
    """Communicates with dovecotpw (Dovecot 2.0: `doveadm pw`) and returns
    the hashed password: {scheme[.encoding]}hash
    """
    if encoding:
        scheme = '.'.join((scheme, encoding))
    cmd_args = [cfg_dget('bin.dovecotpw'), '-s', scheme, '-p',
                get_unicode(password)]
    if cfg_dget('misc.dovecot_version') >= 0x20000a01:
        cmd_args.insert(1, 'pw')
    process = Popen(cmd_args, stdout=PIPE, stderr=PIPE)
    stdout, stderr = process.communicate()
    if process.returncode:
        raise VMMError(stderr.strip(), VMM_ERROR)
    hashed = stdout.strip()
    if not hashed.startswith('{%s}' % scheme):
        raise VMMError('Unexpected result from %s: %s' %
                       (cfg_dget('bin.dovecotpw'), hashed), VMM_ERROR)
    return hashed


def _md4_new():
    """Returns an new MD4-hash object if supported by the hashlib or
    provided by PyCrypto - other `None`.
    """
    try:
        return hashlib.new('md4')
    except ValueError, err:
        if str(err) == 'unsupported hash type':
            if not COMPAT:
                try:
                    from Crypto.Hash import MD4
                    return MD4.new()
                except ImportError:
                    return None
        else:
            raise


def _sha256_new(data=''):
    """Returns a new sha256 object from the hashlib.

    Returns `None` if the PyCrypto in pycompat.hashlib is too old."""
    if not COMPAT:
        return hashlib.sha256(data)
    try:
        return hashlib.new('sha256', data)
    except ValueError, err:
        if str(err) == 'unsupported hash type':
            return None
        else:
            raise


def _format_digest(digest, scheme, encoding):
    """Formats the arguments to a string: {scheme[.encoding]}digest."""
    if not encoding:
        return '{%s}%s' % (scheme, digest)
    return '{%s.%s}%s' % (scheme, encoding, digest)


def _clear_hash(password, scheme, encoding):
    """Generates a (encoded) CLEARTEXT/PLAIN 'hash'."""
    if encoding:
        if encoding == 'HEX':
            password = password.encode('hex')
        else:
            password = password.encode('base64').replace('\n', '')
        return _format_digest(password, scheme, encoding)
    return get_unicode('{%s}%s' % (scheme, password))


def _get_crypt_blowfish_salt():
    """Generates a salt for Blowfish crypt."""
    rounds = cfg_dget('misc.crypt_blowfish_rounds')
    if rounds < CRYPT_BLF_ROUNDS_MIN:
        rounds = CRYPT_BLF_ROUNDS_MIN
    elif rounds > CRYPT_BLF_ROUNDS_MAX:
        rounds = CRYPT_BLF_ROUNDS_MAX
    return '$%s$%02d$%s' % (CRYPT_ID_BLF, rounds,
                            _get_salt(CRYPT_BLF_SALT_LEN))


def _get_crypt_sha2_salt(crypt_id):
    """Generates a salt for crypt using the SHA-256 or SHA-512 encryption
    method.
    *crypt_id* must be either `5` (SHA-256) or `6` (SHA-512).
    """
    assert crypt_id in (CRYPT_ID_SHA256, CRYPT_ID_SHA512), 'invalid crypt ' \
           'id: %r' % crypt_id
    if crypt_id is CRYPT_ID_SHA512:
        rounds = cfg_dget('misc.crypt_sha512_rounds')
    else:
        rounds = cfg_dget('misc.crypt_sha256_rounds')
    if rounds < CRYPT_SHA2_ROUNDS_MIN:
        rounds = CRYPT_SHA2_ROUNDS_MIN
    elif rounds > CRYPT_SHA2_ROUNDS_MAX:
        rounds = CRYPT_SHA2_ROUNDS_MAX
    if rounds == CRYPT_SHA2_ROUNDS_DEFAULT:
        return '$%d$%s' % (crypt_id, _get_salt(CRYPT_SHA2_SALT_LEN))
    return '$%d$rounds=%d$%s' % (crypt_id, rounds,
                                 _get_salt(CRYPT_SHA2_SALT_LEN))


def _crypt_hash(password, scheme, encoding):
    """Generates (encoded) CRYPT/MD5/{BLF,MD5,SHA{256,512}}-CRYPT hashes."""
    if scheme == 'CRYPT':
        salt = _get_salt(CRYPT_SALT_LEN)
    elif scheme == 'BLF-CRYPT':
        salt = _get_crypt_blowfish_salt()
    elif scheme in ('MD5-CRYPT', 'MD5'):
        salt = '$%d$%s' % (CRYPT_ID_MD5, _get_salt(CRYPT_MD5_SALT_LEN))
    elif scheme == 'SHA256-CRYPT':
        salt = _get_crypt_sha2_salt(CRYPT_ID_SHA256)
    else:
        salt = _get_crypt_sha2_salt(CRYPT_ID_SHA512)
    encrypted = crypt(password, salt)
    if encoding:
        if encoding == 'HEX':
            encrypted = encrypted.encode('hex')
        else:
            encrypted = encrypted.encode('base64').replace('\n', '')
    if scheme in ('BLF-CRYPT', 'SHA256-CRYPT', 'SHA512-CRYPT') and \
       cfg_dget('misc.dovecot_version') < 0x20000b06:
        scheme = 'CRYPT'
    return _format_digest(encrypted, scheme, encoding)


def _md4_hash(password, scheme, encoding):
    """Generates encoded PLAIN-MD4 hashes."""
    md4 = _md4_new()
    if md4:
        md4.update(password)
        if encoding in DEFAULT_HEX:
            digest = md4.hexdigest()
        else:
            digest = md4.digest().encode('base64').rstrip()
        return _format_digest(digest, scheme, encoding)
    return _dovecotpw(password, scheme, encoding)


def _md5_hash(password, scheme, encoding, user=None):
    """Generates DIGEST-MD5 aka PLAIN-MD5 and LDAP-MD5 hashes."""
    md5 = hashlib.md5()
    if scheme == 'DIGEST-MD5':
        #  Prior to Dovecot v1.1.12/v1.2.beta2 there was a problem with a
        #  empty auth_realms setting in dovecot.conf and user@domain.tld
        #  usernames. So we have to generate different hashes for different
        #  versions. See also:
        #       http://dovecot.org/list/dovecot-news/2009-March/000103.html
        #       http://hg.dovecot.org/dovecot-1.1/rev/2b0043ba89ae
        if cfg_dget('misc.dovecot_version') >= 0x1010cf00:
            md5.update('%s:%s:' % (user.localpart, user.domainname))
        else:
            md5.update('%s::' % user)
    md5.update(password)
    if (scheme in ('PLAIN-MD5', 'DIGEST-MD5') and encoding in DEFAULT_HEX) or \
       (scheme == 'LDAP-MD5' and encoding == 'HEX'):
        digest = md5.hexdigest()
    else:
        digest = md5.digest().encode('base64').rstrip()
    return _format_digest(digest, scheme, encoding)


def _ntlm_hash(password, scheme, encoding):
    """Generates NTLM hashes."""
    md4 = _md4_new()
    if md4:
        password = ''.join('%s\x00' % c for c in password)
        md4.update(password)
        if encoding in DEFAULT_HEX:
            digest = md4.hexdigest()
        else:
            digest = md4.digest().encode('base64').rstrip()
        return _format_digest(digest, scheme, encoding)
    return _dovecotpw(password, scheme, encoding)


def _sha1_hash(password, scheme, encoding):
    """Generates SHA1 aka SHA hashes."""
    sha1 = hashlib.sha1(password)
    if encoding in DEFAULT_B64:
        digest = sha1.digest().encode('base64').rstrip()
    else:
        digest = sha1.hexdigest()
    return _format_digest(digest, scheme, encoding)


def _sha256_hash(password, scheme, encoding):
    """Generates SHA256 hashes."""
    sha256 = _sha256_new(password)
    if sha256:
        if encoding in DEFAULT_B64:
            digest = sha256.digest().encode('base64').rstrip()
        else:
            digest = sha256.hexdigest()
        return _format_digest(digest, scheme, encoding)
    return _dovecotpw(password, scheme, encoding)


def _sha512_hash(password, scheme, encoding):
    """Generates SHA512 hashes."""
    if not COMPAT:
        sha512 = hashlib.sha512(password)
        if encoding in DEFAULT_B64:
            digest = sha512.digest().encode('base64').replace('\n', '')
        else:
            digest = sha512.hexdigest()
        return _format_digest(digest, scheme, encoding)
    return _dovecotpw(password, scheme, encoding)


def _smd5_hash(password, scheme, encoding):
    """Generates SMD5 (salted PLAIN-MD5) hashes."""
    md5 = hashlib.md5(password)
    salt = _get_salt(SALTED_ALGO_SALT_LEN)
    md5.update(salt)
    if encoding in DEFAULT_B64:
        digest = (md5.digest() + salt).encode('base64').rstrip()
    else:
        digest = md5.hexdigest() + salt.encode('hex')
    return _format_digest(digest, scheme, encoding)


def _ssha1_hash(password, scheme, encoding):
    """Generates SSHA (salted SHA/SHA1) hashes."""
    sha1 = hashlib.sha1(password)
    salt = _get_salt(SALTED_ALGO_SALT_LEN)
    sha1.update(salt)
    if encoding in DEFAULT_B64:
        digest = (sha1.digest() + salt).encode('base64').rstrip()
    else:
        digest = sha1.hexdigest() + salt.encode('hex')
    return _format_digest(digest, scheme, encoding)


def _ssha256_hash(password, scheme, encoding):
    """Generates SSHA256 (salted SHA256) hashes."""
    sha256 = _sha256_new(password)
    if sha256:
        salt = _get_salt(SALTED_ALGO_SALT_LEN)
        sha256.update(salt)
        if encoding in DEFAULT_B64:
            digest = (sha256.digest() + salt).encode('base64').rstrip()
        else:
            digest = sha256.hexdigest() + salt.encode('hex')
        return _format_digest(digest, scheme, encoding)
    return _dovecotpw(password, scheme, encoding)


def _ssha512_hash(password, scheme, encoding):
    """Generates SSHA512 (salted SHA512) hashes."""
    if not COMPAT:
        salt = _get_salt(SALTED_ALGO_SALT_LEN)
        sha512 = hashlib.sha512(password + salt)
        if encoding in DEFAULT_B64:
            digest = (sha512.digest() + salt).encode('base64').replace('\n',
                                                                       '')
        else:
            digest = sha512.hexdigest() + salt.encode('hex')
        return _format_digest(digest, scheme, encoding)
    return _dovecotpw(password, scheme, encoding)

_scheme_info = {
    'CLEARTEXT': (_clear_hash, 0x10000f00),
    'CRAM-MD5': (_dovecotpw, 0x10000f00),
    'CRYPT': (_crypt_hash, 0x10000f00),
    'DIGEST-MD5': (_md5_hash, 0x10000f00),
    'HMAC-MD5': (_dovecotpw, 0x10000f00),
    'LANMAN': (_dovecotpw, 0x10000f00),
    'LDAP-MD5': (_md5_hash, 0x10000f00),
    'MD5': (_crypt_hash, 0x10000f00),
    'MD5-CRYPT': (_crypt_hash, 0x10000f00),
    'NTLM': (_ntlm_hash, 0x10000f00),
    'OTP': (_dovecotpw, 0x10100a01),
    'PLAIN': (_clear_hash, 0x10000f00),
    'PLAIN-MD4': (_md4_hash, 0x10000f00),
    'PLAIN-MD5': (_md5_hash, 0x10000f00),
    'RPA': (_dovecotpw, 0x10000f00),
    'SHA': (_sha1_hash, 0x10000f00),
    'SHA1': (_sha1_hash, 0x10000f00),
    'SHA256': (_sha256_hash, 0x10100a01),
    'SHA512': (_sha512_hash, 0x20000b03),
    'SKEY': (_dovecotpw, 0x10100a01),
    'SMD5': (_smd5_hash, 0x10000f00),
    'SSHA': (_ssha1_hash, 0x10000f00),
    'SSHA256': (_ssha256_hash, 0x10200a04),
    'SSHA512': (_ssha512_hash, 0x20000b03),
}


def list_schemes():
    """Returns the tuple (schemes, encodings).

    `schemes` is an iterator for all supported password schemes (depends on
    the used Dovecot version and features of the libc).
    `encodings` is a tuple with all usable encoding suffixes. The tuple may
    be empty.
    """
    dcv = cfg_dget('misc.dovecot_version')
    schemes = (k for (k, v) in _scheme_info.iteritems() if v[1] <= dcv)
    if dcv >= 0x10100a01:
        encodings = ('.B64', '.BASE64', '.HEX')
    else:
        encodings = ()
    return schemes, encodings


def verify_scheme(scheme):
    """Checks if the password scheme *scheme* is known and supported by the
    configured `misc.dovecot_version`.

    The *scheme* maybe a password scheme's name (e.g.: 'PLAIN') or a scheme
    name with a encoding suffix (e.g. 'PLAIN.BASE64').  If the scheme is
    known and supported by the used Dovecot version,
    a tuple ``(scheme, encoding)`` will be returned.
    The `encoding` in the tuple may be `None`.

    Raises a `VMMError` if the password scheme:
      * is unknown
      * depends on a newer Dovecot version
      * has a unknown encoding suffix
    """
    assert isinstance(scheme, basestring), 'Not a str/unicode: %r' % scheme
    scheme_encoding = scheme.upper().split('.')
    scheme = scheme_encoding[0]
    if scheme not in _scheme_info:
        raise VMMError(_(u"Unsupported password scheme: '%s'") % scheme,
                       VMM_ERROR)
    if cfg_dget('misc.dovecot_version') < _scheme_info[scheme][1]:
        raise VMMError(_(u"The password scheme '%(scheme)s' requires Dovecot "
                         u">= v%(version)s.") % {'scheme': scheme,
                       'version': version_str(_scheme_info[scheme][1])},
                       VMM_ERROR)
    if len(scheme_encoding) > 1:
        if cfg_dget('misc.dovecot_version') < 0x10100a01:
            raise VMMError(_(u'Encoding suffixes for password schemes require '
                             u'Dovecot >= v1.1.alpha1.'), VMM_ERROR)
        if scheme_encoding[1] not in ('B64', 'BASE64', 'HEX'):
            raise VMMError(_(u"Unsupported password encoding: '%s'") %
                           scheme_encoding[1], VMM_ERROR)
        encoding = scheme_encoding[1]
    else:
        encoding = None
    return scheme, encoding


def pwhash(password, scheme=None, user=None):
    """Generates a password hash from the plain text *password* string.

    If no *scheme* is given the password scheme from the configuration will
    be used for the hash generation.  When 'DIGEST-MD5' is used as scheme,
    also an EmailAddress instance must be given as *user* argument.
    """
    if not isinstance(password, basestring):
        raise TypeError('Password is not a string: %r' % password)
    if isinstance(password, unicode):
        password = password.encode(ENCODING)
    password = password.strip()
    if not password:
        raise ValueError("Could not accept empty password.")
    if scheme is None:
        scheme = cfg_dget('misc.password_scheme')
    scheme, encoding = verify_scheme(scheme)
    if scheme == 'DIGEST-MD5':
        assert isinstance(user, EmailAddress)
        return _md5_hash(password, scheme, encoding, user)
    return _scheme_info[scheme][0](password, scheme, encoding)


def randompw():
    """Generates a plain text random password.

    The length of the password can be configured in the ``vmm.cfg``
    (account.password_length).
    """
    pw_len = cfg_dget('account.password_length')
    if pw_len < 8:
        pw_len = 8
    return ''.join(_sys_rand.sample(PASSWDCHARS, pw_len))


def _test_crypt_algorithms():
    """Check for Blowfish/SHA-256/SHA-512 support in crypt.crypt()."""
    _blowfish = '$2a$04$0123456789abcdefABCDE.N.drYX5yIAL1LkTaaZotW3yI0hQhZru'
    _sha256 = '$5$rounds=1000$0123456789abcdef$K/DksR0DT01hGc8g/kt9McEgrbFMKi\
9qrb1jehe7hn4'
    _sha512 = '$6$rounds=1000$0123456789abcdef$ZIAd5WqfyLkpvsVCVUU1GrvqaZTqvh\
JoouxdSqJO71l9Ld3tVrfOatEjarhghvEYADkq//LpDnTeO90tcbtHR1'

    if crypt('08/15!test~4711', '$2a$04$0123456789abcdefABCDEF$') == _blowfish:
        _scheme_info['BLF-CRYPT'] = (_crypt_hash, 0x10000f00)
    if crypt('08/15!test~4711', '$5$rounds=1000$0123456789abcdef$') == _sha256:
        _scheme_info['SHA256-CRYPT'] = (_crypt_hash, 0x10000f00)
    if crypt('08/15!test~4711', '$6$rounds=1000$0123456789abcdef$') == _sha512:
        _scheme_info['SHA512-CRYPT'] = (_crypt_hash, 0x10000f00)

_test_crypt_algorithms()
del _, cfg_dget, _test_crypt_algorithms