# -*- coding: UTF-8 -*-# Copyright (c) 2010 - 2014, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.common ~~~~~~~~~~~~~~~~~~~~~~~~~ Some common functions"""importlocaleimportosimportreimportstatfromVirtualMailManagerimportENCODINGfromVirtualMailManager.constantsimportINVALID_MAIL_LOCATION, \NOT_EXECUTABLE,NO_SUCH_BINARY,TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATEDfromVirtualMailManager.errorsimportVMMErrorVERSION_RE=re.compile(r'^(\d+)\.(\d+)\.(?:(\d+)|(alpha|beta|rc)(\d+))$')_version_level=dict(alpha=0xA,beta=0xB,rc=0xC)_version_cache={}_=lambdamsg:msgdefexpand_path(path):"""Expands paths, starting with ``.`` or ``~``, to an absolute path."""ifpath.startswith('.'):returnos.path.abspath(path)ifpath.startswith('~'):returnos.path.expanduser(path)returnpathdefget_unicode(string):"""Converts `string` to `unicode`, if necessary."""ifisinstance(string,str):returnstringreturnstr(string,ENCODING,'replace')deflisdir(path):"""Checks if `path` is a directory. Doesn't follow symbolic links. Returns bool. """try:lstat=os.lstat(path)exceptOSError:returnFalsereturnstat.S_ISDIR(lstat.st_mode)defexec_ok(binary):"""Checks if the `binary` exists and if it is executable. Throws a `VMMError` if the `binary` isn't a file or is not executable. """binary=expand_path(binary)ifnotos.path.isfile(binary):raiseVMMError(_("No such file: '%s'")%get_unicode(binary),NO_SUCH_BINARY)ifnotos.access(binary,os.X_OK):raiseVMMError(_("File is not executable: '%s'")%get_unicode(binary),NOT_EXECUTABLE)returnbinarydefhuman_size(size):"""Converts the `size` in bytes in human readable format."""ifnotisinstance(size,int):try:size=int(size)exceptValueError:raiseTypeError("'size' must be a positive integer.")ifsize<0:raiseValueError("'size' must be a positive integer.")ifsize<1024:returnstr(size)# TP: abbreviations of gibibyte, tebibyte kibibyte and mebibyteprefix_multiply=((_('TiB'),1<<40),(_('GiB'),1<<30),(_('MiB'),1<<20),(_('KiB'),1<<10))forprefix,multiplyinprefix_multiply:ifsize>=multiply:# TP: e.g.: '%(size)s %(prefix)s' -> '118.30 MiB'return_('%(size)s%(prefix)s')%{'size':locale.format('%.2f',float(size)/multiply,True),'prefix':prefix}defsize_in_bytes(size):"""Converts the string `size` to an integer (size in bytes). The string `size` can be suffixed with *b* (bytes), *k* (kilobytes), *M* (megabytes) or *G* (gigabytes). """ifnotisinstance(size,str)ornotsize:raiseTypeError('size must be a non empty string.')ifsize[-1].upper()in('B','K','M','G'):try:num=int(size[:-1])exceptValueError:raiseValueError('Not a valid integer value: %r'%size[:-1])unit=size[-1].upper()ifunit=='B':returnnumelifunit=='K':returnnum<<10elifunit=='M':returnnum<<20else:returnnum<<30else:try:num=int(size)exceptValueError:raiseValueError('Not a valid size value: %r'%size)returnnumdefvalidate_transport(transport,maillocation):"""Checks if the `transport` is usable for the given `maillocation`. Throws a `VMMError` if the chosen `transport` is unable to write messages in the `maillocation`'s mailbox format. Arguments: `transport` : VirtualMailManager.transport.Transport a Transport object `maillocation` : VirtualMailManager.maillocation.MailLocation a MailLocation object """iftransport.transportin('virtual','virtual:')and \notmaillocation.postfix:raiseVMMError(_("Invalid transport '%(transport)s' for mailbox ""format '%(mbfmt)s'.")%{'transport':transport.transport,'mbfmt':maillocation.mbformat},INVALID_MAIL_LOCATION)defversion_hex(version_string):"""Converts a Dovecot version, e.g.: '1.2.3' or '2.0.beta4', to an int. Raises a `ValueError` if the *version_string* has the wrong™ format. version_hex('1.2.3') -> 270548736 hex(version_hex('1.2.3')) -> '0x10203f00' """global_version_cacheifversion_stringin_version_cache:return_version_cache[version_string]version=0version_mo=VERSION_RE.match(version_string)ifnotversion_mo:raiseValueError('Invalid version string: %r'%version_string)major,minor,patch,level,serial=version_mo.groups()major=int(major)minor=int(minor)ifpatch:patch=int(patch)ifserial:serial=int(serial)ifmajor>0xFForminor>0xFFor \patchandpatch>0xFForserialandserial>0xFF:raiseValueError('Invalid version string: %r'%version_string)version+=major<<28version+=minor<<20ifpatch:version+=patch<<12version+=_version_level.get(level,0xF)<<8ifserial:version+=serial_version_cache[version_string]=versionreturnversiondefversion_str(version):"""Converts a Dovecot version previously converted with version_hex back to a string. Raises a `TypeError` if *version* is not an integer. Raises a `ValueError` if *version* is an incorrect int version. """global_version_cacheifversionin_version_cache:return_version_cache[version]ifnotisinstance(version,int):raiseTypeError('Argument is not a integer: %r',version)major=(version>>28)&0xFFminor=(version>>20)&0xFFpatch=(version>>12)&0xFFlevel=(version>>8)&0x0Fserial=version&0xFFlevels=dict(list(zip(list(_version_level.values()),list(_version_level.keys()))))iflevel==0xFandnotserial:version_string='%u.%u.%u'%(major,minor,patch)eliflevelinlevelsandnotpatch:version_string='%u.%u.%s%u'%(major,minor,levels[level],serial)else:raiseValueError('Invalid version: %r'%hex(version))_version_cache[version]=version_stringreturnversion_stringdefformat_domain_default(domaindata):"""Format info output when the value displayed is the domain default."""# TP: [domain default] indicates that a user's setting is the same as# configured in the user's domain.# e.g.: [ 0.84%] 42/5,000 [domain default]return_('%s [domain default]')%domaindatadefsearch_addresses(dbh,typelimit=None,lpattern=None,llike=False,dpattern=None,dlike=False):"""'Search' for addresses by *pattern* in the database. The search is limited by *typelimit*, a bitfield with values TYPE_ACCOUNT, TYPE_ALIAS, TYPE_RELOCATED, or a bitwise OR thereof. If no limit is specified, all types will be searched. *lpattern* may be a local part or a partial local part - starting and/or ending with a '%' sign. When the *lpattern* starts or ends with a '%' sign *llike* has to be `True` to perform a wildcard search. To retrieve all available addresses use the arguments' default values. *dpattern* and *dlike* behave analogously for the domain part of an address, allowing for separate pattern matching: testuser%@example.% The return value of this function is a tuple. The first element is a list of domain IDs sorted alphabetically by the corresponding domain names. The second element is a dictionary indexed by domain ID, holding lists to associated addresses. Each address is itself actually a tuple of address, type, and boolean indicating whether the address stems from an alias domain. """iftypelimitisNone:typelimit=TYPE_ACCOUNT|TYPE_ALIAS|TYPE_RELOCATEDqueries=[]iftypelimit&TYPE_ACCOUNT:queries.append('SELECT gid, local_part, %d AS type FROM users'%TYPE_ACCOUNT)iftypelimit&TYPE_ALIAS:queries.append('SELECT DISTINCT gid, address as local_part, ''%d AS type FROM alias'%TYPE_ALIAS)iftypelimit&TYPE_RELOCATED:queries.append('SELECT gid, address as local_part, %d AS type ''FROM relocated'%TYPE_RELOCATED)sql="SELECT gid, local_part || '@' || domainname AS address, "sql+='type, NOT is_primary AS from_aliasdomain FROM ('sql+=' UNION '.join(queries)sql+=') a JOIN domain_name USING (gid)'nextkw='WHERE'sqlargs=[]forlike,field,patternin((dlike,'domainname',dpattern),(llike,'local_part',lpattern)):iflike:match='LIKE'else:ifnotpattern:continuematch='='sql+=' %s%s%s%%s'%(nextkw,field,match)sqlargs.append(pattern)nextkw='AND'sql+=' ORDER BY domainname, local_part'dbc=dbh.cursor()dbc.execute(sql,sqlargs)result=dbc.fetchall()dbc.close()gids=[]daddrs={}forgid,address,addrtype,aliasdomaininresult:ifgidnotindaddrs:gids.append(gid)daddrs[gid]=[]daddrs[gid].append((address,addrtype,aliasdomain))returngids,daddrsdel_