diff -r 659c4476c57c -r b678a1c43027 VirtualMailManager/common.py --- a/VirtualMailManager/common.py Mon Mar 24 19:22:04 2014 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,287 +0,0 @@ -# -*- coding: UTF-8 -*- -# Copyright (c) 2010 - 2014, Pascal Volk -# See COPYING for distribution information. -""" - VirtualMailManager.common - ~~~~~~~~~~~~~~~~~~~~~~~~~ - - Some common functions -""" - -import locale -import os -import re -import stat - -from VirtualMailManager import ENCODING -from VirtualMailManager.constants import INVALID_MAIL_LOCATION, \ - NOT_EXECUTABLE, NO_SUCH_BINARY, TYPE_ACCOUNT, TYPE_ALIAS, TYPE_RELOCATED -from VirtualMailManager.errors import VMMError - -VERSION_RE = re.compile(r'^(\d+)\.(\d+)\.(?:(\d+)|(alpha|beta|rc)(\d+))$') - -_version_level = dict(alpha=0xA, beta=0xB, rc=0xC) -_version_cache = {} -_ = lambda msg: msg - - -def expand_path(path): - """Expands paths, starting with ``.`` or ``~``, to an absolute path.""" - if path.startswith('.'): - return os.path.abspath(path) - if path.startswith('~'): - return os.path.expanduser(path) - return path - - -def get_unicode(string): - """Converts `string` to `unicode`, if necessary.""" - if isinstance(string, unicode): - return string - return unicode(string, ENCODING, 'replace') - - -def lisdir(path): - """Checks if `path` is a directory. Doesn't follow symbolic links. - Returns bool. - """ - try: - lstat = os.lstat(path) - except OSError: - return False - return stat.S_ISDIR(lstat.st_mode) - - -def exec_ok(binary): - """Checks if the `binary` exists and if it is executable. - - Throws a `VMMError` if the `binary` isn't a file or is not - executable. - """ - binary = expand_path(binary) - if not os.path.isfile(binary): - raise VMMError(_(u"No such file: '%s'") % get_unicode(binary), - NO_SUCH_BINARY) - if not os.access(binary, os.X_OK): - raise VMMError(_(u"File is not executable: '%s'") % - get_unicode(binary), NOT_EXECUTABLE) - return binary - - -def human_size(size): - """Converts the `size` in bytes in human readable format.""" - if not isinstance(size, (long, int)): - try: - size = long(size) - except ValueError: - raise TypeError("'size' must be a positive long or int.") - if size < 0: - raise ValueError("'size' must be a positive long or int.") - if size < 1024: - return str(size) - # TP: abbreviations of gibibyte, tebibyte kibibyte and mebibyte - prefix_multiply = ((_(u'TiB'), 1 << 40), (_(u'GiB'), 1 << 30), - (_(u'MiB'), 1 << 20), (_(u'KiB'), 1 << 10)) - for prefix, multiply in prefix_multiply: - if size >= multiply: - # TP: e.g.: '%(size)s %(prefix)s' -> '118.30 MiB' - return _(u'%(size)s %(prefix)s') % { - 'size': locale.format('%.2f', float(size) / multiply, - True).decode(ENCODING, 'replace'), - 'prefix': prefix} - - -def size_in_bytes(size): - """Converts the string `size` to a long (size in bytes). - - The string `size` can be suffixed with *b* (bytes), *k* (kilobytes), - *M* (megabytes) or *G* (gigabytes). - """ - if not isinstance(size, basestring) or not size: - raise TypeError('size must be a non empty string.') - if size[-1].upper() in ('B', 'K', 'M', 'G'): - try: - num = int(size[:-1]) - except ValueError: - raise ValueError('Not a valid integer value: %r' % size[:-1]) - unit = size[-1].upper() - if unit == 'B': - return num - elif unit == 'K': - return num << 10L - elif unit == 'M': - return num << 20L - else: - return num << 30L - else: - try: - num = int(size) - except ValueError: - raise ValueError('Not a valid size value: %r' % size) - return num - - -def validate_transport(transport, maillocation): - """Checks if the `transport` is usable for the given `maillocation`. - - Throws a `VMMError` if the chosen `transport` is unable to write - messages in the `maillocation`'s mailbox format. - - Arguments: - - `transport` : VirtualMailManager.transport.Transport - a Transport object - `maillocation` : VirtualMailManager.maillocation.MailLocation - a MailLocation object - """ - if transport.transport in ('virtual', 'virtual:') and \ - not maillocation.postfix: - raise VMMError(_(u"Invalid transport '%(transport)s' for mailbox " - u"format '%(mbfmt)s'.") % - {'transport': transport.transport, - 'mbfmt': maillocation.mbformat}, INVALID_MAIL_LOCATION) - - -def version_hex(version_string): - """Converts a Dovecot version, e.g.: '1.2.3' or '2.0.beta4', to an int. - Raises a `ValueError` if the *version_string* has the wrong™ format. - - version_hex('1.2.3') -> 270548736 - hex(version_hex('1.2.3')) -> '0x10203f00' - """ - global _version_cache - if version_string in _version_cache: - return _version_cache[version_string] - version = 0 - version_mo = VERSION_RE.match(version_string) - if not version_mo: - raise ValueError('Invalid version string: %r' % version_string) - major, minor, patch, level, serial = version_mo.groups() - major = int(major) - minor = int(minor) - if patch: - patch = int(patch) - if serial: - serial = int(serial) - - if major > 0xFF or minor > 0xFF or \ - patch and patch > 0xFF or serial and serial > 0xFF: - raise ValueError('Invalid version string: %r' % version_string) - - version += major << 28 - version += minor << 20 - if patch: - version += patch << 12 - version += _version_level.get(level, 0xF) << 8 - if serial: - version += serial - - _version_cache[version_string] = version - return version - - -def version_str(version): - """Converts a Dovecot version previously converted with version_hex back to - a string. - Raises a `TypeError` if *version* is not an int/long. - Raises a `ValueError` if *version* is an incorrect int version. - """ - global _version_cache - if version in _version_cache: - return _version_cache[version] - if not isinstance(version, (int, long)): - raise TypeError('Argument is not a int/long: %r', version) - major = (version >> 28) & 0xFF - minor = (version >> 20) & 0xFF - patch = (version >> 12) & 0xFF - level = (version >> 8) & 0x0F - serial = version & 0xFF - - levels = dict(zip(_version_level.values(), _version_level.keys())) - if level == 0xF and not serial: - version_string = '%u.%u.%u' % (major, minor, patch) - elif level in levels and not patch: - version_string = '%u.%u.%s%u' % (major, minor, levels[level], serial) - else: - raise ValueError('Invalid version: %r' % hex(version)) - - _version_cache[version] = version_string - return version_string - - -def format_domain_default(domaindata): - """Format info output when the value displayed is the domain default.""" - # TP: [domain default] indicates that a user's setting is the same as - # configured in the user's domain. - # e.g.: [ 0.84%] 42/5,000 [domain default] - return _(u'%s [domain default]') % domaindata - - -def search_addresses(dbh, typelimit=None, lpattern=None, llike=False, - dpattern=None, dlike=False): - """'Search' for addresses by *pattern* in the database. - - The search is limited by *typelimit*, a bitfield with values TYPE_ACCOUNT, - TYPE_ALIAS, TYPE_RELOCATED, or a bitwise OR thereof. If no limit is - specified, all types will be searched. - - *lpattern* may be a local part or a partial local part - starting and/or - ending with a '%' sign. When the *lpattern* starts or ends with a '%' sign - *llike* has to be `True` to perform a wildcard search. To retrieve all - available addresses use the arguments' default values. - - *dpattern* and *dlike* behave analogously for the domain part of an - address, allowing for separate pattern matching: testuser%@example.% - - The return value of this function is a tuple. The first element is a list - of domain IDs sorted alphabetically by the corresponding domain names. The - second element is a dictionary indexed by domain ID, holding lists to - associated addresses. Each address is itself actually a tuple of address, - type, and boolean indicating whether the address stems from an alias - domain. - """ - if typelimit is None: - typelimit = TYPE_ACCOUNT | TYPE_ALIAS | TYPE_RELOCATED - queries = [] - if typelimit & TYPE_ACCOUNT: - queries.append('SELECT gid, local_part, %d AS type FROM users' - % TYPE_ACCOUNT) - if typelimit & TYPE_ALIAS: - queries.append('SELECT DISTINCT gid, address as local_part, ' - '%d AS type FROM alias' % TYPE_ALIAS) - if typelimit & TYPE_RELOCATED: - queries.append('SELECT gid, address as local_part, %d AS type ' - 'FROM relocated' % TYPE_RELOCATED) - sql = "SELECT gid, local_part || '@' || domainname AS address, " - sql += 'type, NOT is_primary AS from_aliasdomain FROM (' - sql += ' UNION '.join(queries) - sql += ') a JOIN domain_name USING (gid)' - nextkw = 'WHERE' - sqlargs = [] - for like, field, pattern in ((dlike, 'domainname', dpattern), - (llike, 'local_part', lpattern)): - if like: - match = 'LIKE' - else: - if not pattern: - continue - match = '=' - sql += ' %s %s %s %%s' % (nextkw, field, match) - sqlargs.append(pattern) - nextkw = 'AND' - sql += ' ORDER BY domainname, local_part' - dbc = dbh.cursor() - dbc.execute(sql, sqlargs) - result = dbc.fetchall() - dbc.close() - - gids = [] - daddrs = {} - for gid, address, addrtype, aliasdomain in result: - if gid not in daddrs: - gids.append(gid) - daddrs[gid] = [] - daddrs[gid].append((address, addrtype, aliasdomain)) - return gids, daddrs - -del _