VMM: added 'Configuration' variable and set_configuration().
Handler.__init__() now exports its config via set_configuration().
# -*- coding: UTF-8 -*-# Copyright (c) 2007 - 2010, Pascal Volk# See COPYING for distribution information.""" VirtualMailManager.Handler A wrapper class. It wraps round all other classes and does some dependencies checks. Additionally it communicates with the PostgreSQL database, creates or deletes directories of domains or users."""importosimportrefromshutilimportrmtreefromsubprocessimportPopen,PIPEfrompyPgSQLimportPgSQL# python-pgsql - http://pypgsql.sourceforge.netimportVirtualMailManager.constants.ERRORasERRfromVirtualMailManagerimportENCODING,exec_ok,set_configurationfromVirtualMailManager.AccountimportAccountfromVirtualMailManager.AliasimportAliasfromVirtualMailManager.AliasDomainimportAliasDomainfromVirtualMailManager.ConfigimportConfigasCfgfromVirtualMailManager.DomainimportDomain,ace2idna,get_gidfromVirtualMailManager.EmailAddressimportEmailAddressfromVirtualMailManager.errorsimportVMMError,AliasError,DomainError, \RelocatedErrorfromVirtualMailManager.RelocatedimportRelocatedfromVirtualMailManager.TransportimportTransportfromVirtualMailManager.ext.PostconfimportPostconfSALTCHARS='./0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'RE_DOMAIN_SEARCH="""^[a-z0-9-\.]+$"""RE_MBOX_NAMES="""^[\x20-\x25\x27-\x7E]*$"""TYPE_ACCOUNT=0x1TYPE_ALIAS=0x2TYPE_RELOCATED=0x4classHandler(object):"""Wrapper class to simplify the access on all the stuff from VirtualMailManager"""__slots__=('_Cfg','_cfgFileName','_dbh','_scheme','__warnings','_postconf')def__init__(self,skip_some_checks=False):"""Creates a new Handler instance. ``skip_some_checks`` : bool When a derived class knows how to handle all checks this argument may be ``True``. By default it is ``False`` and all checks will be performed. Throws a NotRootError if your uid is greater 0. """self._cfgFileName=''self.__warnings=[]self._Cfg=Noneself._dbh=Noneifos.geteuid():raiseNotRootError(_(u"You are not root.\n\tGood bye!\n"),ERR.CONF_NOPERM)ifself.__chkCfgFile():self._Cfg=Cfg(self._cfgFileName)self._Cfg.load()ifnotskip_some_checks:self._Cfg.check()self._chkenv()# will be moved to the new password module and Alias#self._scheme = self._Cfg.dget('misc.password_scheme')#self._postconf = Postconf(self._Cfg.dget('bin.postconf'))set_configuration(self._Cfg)def__findCfgFile(self):forpathin['/root','/usr/local/etc','/etc']:tmp=os.path.join(path,'vmm.cfg')ifos.path.isfile(tmp):self._cfgFileName=tmpbreakifnotlen(self._cfgFileName):raiseVMMError(_(u"No “vmm.cfg” found in: /root:/usr/local/etc:/etc"),ERR.CONF_NOFILE)def__chkCfgFile(self):"""Checks the configuration file, returns bool"""self.__findCfgFile()fstat=os.stat(self._cfgFileName)fmode=int(oct(fstat.st_mode&0777))iffmode%100andfstat.st_uid!=fstat.st_gidor \fmode%10andfstat.st_uid==fstat.st_gid:raisePermissionError(_(u'fix permissions (%(perms)s) for “%(file)s”\n\`chmod 0600 %(file)s` would be great.')%{'file':self._cfgFileName,'perms':fmode},ERR.CONF_WRONGPERM)else:returnTruedef_chkenv(self):""""""basedir=self._Cfg.dget('misc.base_directory')ifnotos.path.exists(basedir):old_umask=os.umask(0006)os.makedirs(basedir,0771)os.chown(basedir,0,self._Cfg.dget('misc.gid_mail'))os.umask(old_umask)elifnotos.path.isdir(basedir):raiseVMMError(_(u'“%s” is not a directory.\n\(vmm.cfg: section "misc", option "base_directory")')%basedir,ERR.NO_SUCH_DIRECTORY)foropt,valinself._Cfg.items('bin'):try:exec_ok(val)exceptVMMError,e:ife.codeisERR.NO_SUCH_BINARY:raiseVMMError(_(u'“%(binary)s” doesn\'t exist.\n\(vmm.cfg: section "bin", option "%(option)s")')%{'binary':val,'option':opt},ERR.NO_SUCH_BINARY)elife.codeisERR.NOT_EXECUTABLE:raiseVMMError(_(u'“%(binary)s” is not executable.\\n(vmm.cfg: section "bin", option "%(option)s")')%{'binary':val,'option':opt},ERR.NOT_EXECUTABLE)else:raisedef__dbConnect(self):"""Creates a pyPgSQL.PgSQL.connection instance."""ifself._dbhisNoneor(isinstance(self._dbh,PgSQL.Connection)andnotself._dbh._isOpen):try:self._dbh=PgSQL.connect(database=self._Cfg.dget('database.name'),user=self._Cfg.pget('database.user'),host=self._Cfg.dget('database.host'),password=self._Cfg.pget('database.pass'),client_encoding='utf8',unicode_results=True)dbc=self._dbh.cursor()dbc.execute("SET NAMES 'UTF8'")dbc.close()exceptPgSQL.libpq.DatabaseError,e:raiseVMMError(str(e),ERR.DATABASE_ERROR)def_chk_other_address_types(self,address,exclude):"""Checks if the EmailAddress *address* is known as `TYPE_ACCOUNT`, `TYPE_ALIAS` or `TYPE_RELOCATED`, but not as the `TYPE_*` specified by *exclude*. If the *address* is known as one of the `TYPE_*`s the according `TYPE_*` constant will be returned. Otherwise 0 will be returned."""assertexcludein(TYPE_ACCOUNT,TYPE_ALIAS,TYPE_RELOCATED)and \isinstance(address,EmailAddress)ifexcludeisnotTYPE_ACCOUNT:account=Account(self._dbh,address)ifaccount.uid>0:returnTYPE_ACCOUNTifexcludeisnotTYPE_ALIAS:alias=Alias(self._dbh,address)ifalias:returnTYPE_ALIASifexcludeisnotTYPE_RELOCATED:relocated=Relocated(self._dbh,address)ifrelocated:returnTYPE_RELOCATEDreturn0def__getAccount(self,address,password=None):address=EmailAddress(address)ifnotpasswordisNone:password=self.__pwhash(password)self.__dbConnect()returnAccount(self._dbh,address,password)def__getAlias(self,address):address=EmailAddress(address)self.__dbConnect()returnAlias(self._dbh,address)def__getRelocated(self,address):address=EmailAddress(address)self.__dbConnect()returnRelocated(self._dbh,address)def__getDomain(self,domainname):self.__dbConnect()returnDomain(self._dbh,domainname)def__getDiskUsage(self,directory):"""Estimate file space usage for the given directory. Keyword arguments: directory -- the directory to summarize recursively disk usage for """ifself.__isdir(directory):returnPopen([self._Cfg.dget('bin.du'),"-hs",directory],stdout=PIPE).communicate()[0].split('\t')[0]else:return0def__isdir(self,directory):isdir=os.path.isdir(directory)ifnotisdir:self.__warnings.append(_('No such directory: %s')%directory)returnisdirdef__makedir(self,directory,mode=None,uid=None,gid=None):ifmodeisNone:mode=self._Cfg.dget('account.directory_mode')ifuidisNone:uid=0ifgidisNone:gid=0os.makedirs(directory,mode)os.chown(directory,uid,gid)def__domDirMake(self,domdir,gid):#TODO: clenaup!os.umask(0006)oldpwd=os.getcwd()basedir=self._Cfg.dget('misc.base_directory')domdirdirs=domdir.replace(basedir+'/','').split('/')os.chdir(basedir)ifnotos.path.isdir(domdirdirs[0]):self.__makedir(domdirdirs[0],489,0,self._Cfg.dget('misc.gid_mail'))os.chdir(domdirdirs[0])os.umask(0007)self.__makedir(domdirdirs[1],self._Cfg.dget('domain.directory_mode'),0,gid)os.chdir(oldpwd)def__subscribe(self,folderlist,uid,gid):"""Creates a subscriptions file with the mailboxes from `folderlist`"""fname=os.path.join(self._Cfg.dget('maildir.name'),'subscriptions')sf=open(fname,'w')sf.write('\n'.join(folderlist))sf.write('\n')sf.flush()sf.close()os.chown(fname,uid,gid)os.chmod(fname,384)def__mailDirMake(self,domdir,uid,gid):"""Creates maildirs and maildir subfolders. Keyword arguments: domdir -- the path to the domain directory uid -- user id from the account gid -- group id from the account """os.umask(0007)oldpwd=os.getcwd()os.chdir(domdir)maildir=self._Cfg.dget('maildir.name')folders=[maildir]append=folders.appendforfolderinself._Cfg.dget('maildir.folders').split(':'):folder=folder.strip()iflen(folder)andnotfolder.count('..'):ifre.match(RE_MBOX_NAMES,folder):append('%s/.%s'%(maildir,folder))else:self.__warnings.append(_('Skipped mailbox folder: %r')%folder)else:self.__warnings.append(_('Skipped mailbox folder: %r')%folder)subdirs=['cur','new','tmp']mode=self._Cfg.dget('account.directory_mode')self.__makedir('%s'%uid,mode,uid,gid)os.chdir('%s'%uid)forfolderinfolders:self.__makedir(folder,mode,uid,gid)forsubdirinsubdirs:self.__makedir(os.path.join(folder,subdir),mode,uid,gid)self.__subscribe((f.replace(maildir+'/.','')forfinfolders[1:]),uid,gid)os.chdir(oldpwd)def__userDirDelete(self,domdir,uid,gid):ifuid>0andgid>0:userdir='%s'%uidifuserdir.count('..')ordomdir.count('..'):raiseVMMError(_(u'Found ".." in home directory path.'),ERR.FOUND_DOTS_IN_PATH)ifos.path.isdir(domdir):os.chdir(domdir)ifos.path.isdir(userdir):mdstat=os.stat(userdir)if(mdstat.st_uid,mdstat.st_gid)!=(uid,gid):raiseVMMError(_(u'Detected owner/group mismatch in home directory.'),ERR.MAILDIR_PERM_MISMATCH)rmtree(userdir,ignore_errors=True)else:raiseVMMError(_(u"No such directory: %s")%os.path.join(domdir,userdir),ERR.NO_SUCH_DIRECTORY)def__domDirDelete(self,domdir,gid):ifgid>0:ifnotself.__isdir(domdir):returnbasedir=self._Cfg.dget('misc.base_directory')domdirdirs=domdir.replace(basedir+'/','').split('/')domdirparent=os.path.join(basedir,domdirdirs[0])ifbasedir.count('..')ordomdir.count('..'):raiseVMMError(_(u'Found ".." in domain directory path.'),ERR.FOUND_DOTS_IN_PATH)ifos.path.isdir(domdirparent):os.chdir(domdirparent)ifos.lstat(domdirdirs[1]).st_gid!=gid:raiseVMMError(_(u'Detected group mismatch in domain directory.'),ERR.DOMAINDIR_GROUP_MISMATCH)rmtree(domdirdirs[1],ignore_errors=True)def__getSalt(self):fromrandomimportchoicesalt=Noneifself._scheme=='CRYPT':salt='%s%s'%(choice(SALTCHARS),choice(SALTCHARS))elifself._schemein['MD5','MD5-CRYPT']:salt='$1$%s$'%''.join([choice(SALTCHARS)forxinxrange(8)])returnsaltdef__pwCrypt(self,password):# for: CRYPT, MD5 and MD5-CRYPTfromcryptimportcryptreturncrypt(password,self.__getSalt())def__pwSHA1(self,password):# for: SHA/SHA1importshafrombase64importstandard_b64encodesha1=sha.new(password)returnstandard_b64encode(sha1.digest())def__pwMD5(self,password,emailaddress=None):importmd5_md5=md5.new(password)ifself._scheme=='LDAP-MD5':frombase64importstandard_b64encodereturnstandard_b64encode(_md5.digest())elifself._scheme=='PLAIN-MD5':return_md5.hexdigest()elifself._scheme=='DIGEST-MD5'andemailaddressisnotNone:# use an empty realm - works better with usenames like user@dom_md5=md5.new('%s::%s'%(emailaddress,password))return_md5.hexdigest()def__pwMD4(self,password):# for: PLAIN-MD4fromCrypto.HashimportMD4_md4=MD4.new(password)return_md4.hexdigest()def__pwhash(self,password,scheme=None,user=None):ifschemeisnotNone:self._scheme=schemeifself._schemein['CRYPT','MD5','MD5-CRYPT']:return'{%s}%s'%(self._scheme,self.__pwCrypt(password))elifself._schemein['SHA','SHA1']:return'{%s}%s'%(self._scheme,self.__pwSHA1(password))elifself._schemein['PLAIN-MD5','LDAP-MD5','DIGEST-MD5']:return'{%s}%s'%(self._scheme,self.__pwMD5(password,user))elifself._scheme=='MD4':return'{%s}%s'%(self._scheme,self.__pwMD4(password))elifself._schemein['SMD5','SSHA','CRAM-MD5','HMAC-MD5','LANMAN','NTLM','RPA']:cmd_args=[self._Cfg.dget('bin.dovecotpw'),'-s',self._scheme,'-p',password]ifself._Cfg.dget('misc.dovecot_version')>=20:cmd_args.insert(1,'pw')returnPopen(cmd_args,stdout=PIPE).communicate()[0][:-1]else:return'{%s}%s'%(self._scheme,password)defhasWarnings(self):"""Checks if warnings are present, returns bool."""returnbool(len(self.__warnings))defgetWarnings(self):"""Returns a list with all available warnings and resets all warnings. """ret_val=self.__warnings[:]delself.__warnings[:]returnret_valdefcfg_dget(self,option):"""Get the configured value of the *option* (section.option). When the option was not configured its default value will be returned."""returnself._Cfg.dget(option)defcfg_pget(self,option):"""Get the configured value of the *option* (section.option)."""returnself._Cfg.pget(option)defdomainAdd(self,domainname,transport=None):dom=self.__getDomain(domainname)iftransportisNone:dom.set_transport(Transport(self._dbh,transport=self._Cfg.dget('misc.transport')))else:dom.set_transport(Transport(self._dbh,transport=transport))dom.set_directory(self._Cfg.dget('misc.base_directory'))dom.save()self.__domDirMake(dom.directory,dom.gid)defdomainTransport(self,domainname,transport,force=None):ifforceisnotNoneandforce!='force':raiseDomainError(_(u"Invalid argument: “%s”")%force,ERR.INVALID_OPTION)dom=self.__getDomain(domainname)trsp=Transport(self._dbh,transport=transport)ifforceisNone:dom.update_transport(trsp)else:dom.update_transport(trsp,force=True)defdomainDelete(self,domainname,force=None):ifnotforceisNoneandforcenotin['deluser','delalias','delall']:raiseDomainError(_(u'Invalid argument: “%s”')%force,ERR.INVALID_OPTION)dom=self.__getDomain(domainname)gid=dom.giddomdir=dom.directoryifself._Cfg.dget('domain.force_deletion')orforce=='delall':dom.delete(True,True)elifforce=='deluser':dom.delete(deluser=True)elifforce=='delalias':dom.delete(delalias=True)else:dom.delete()ifself._Cfg.dget('domain.delete_directory'):self.__domDirDelete(domdir,gid)defdomainInfo(self,domainname,details=None):ifdetailsnotin[None,'accounts','aliasdomains','aliases','full','relocated']:raiseVMMError(_(u'Invalid argument: “%s”')%details,ERR.INVALID_AGUMENT)dom=self.__getDomain(domainname)dominfo=dom.get_info()ifdominfo['domainname'].startswith('xn--'):dominfo['domainname']+=' (%s)'%ace2idna(dominfo['domainname'])ifdetailsisNone:returndominfoelifdetails=='accounts':return(dominfo,dom.get_accounts())elifdetails=='aliasdomains':return(dominfo,dom.get_aliase_names())elifdetails=='aliases':return(dominfo,dom.get_aliases())elifdetails=='relocated':return(dominfo,dom.get_relocated())else:return(dominfo,dom.get_aliase_names(),dom.get_accounts(),dom.get_aliases(),dom.get_relocated())defaliasDomainAdd(self,aliasname,domainname):"""Adds an alias domain to the domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the target domain """dom=self.__getDomain(domainname)aliasDom=AliasDomain(self._dbh,aliasname)aliasDom.set_destination(dom)aliasDom.save()defaliasDomainInfo(self,aliasname):"""Returns a dict (keys: "alias" and "domain") with the names of the alias domain and its primary domain."""self.__dbConnect()aliasDom=AliasDomain(self._dbh,aliasname)returnaliasDom.info()defaliasDomainSwitch(self,aliasname,domainname):"""Modifies the target domain of an existing alias domain. Arguments: `aliasname` : basestring The name of the alias domain `domainname` : basestring The name of the new target domain """dom=self.__getDomain(domainname)aliasDom=AliasDomain(self._dbh,aliasname)aliasDom.set_destination(dom)aliasDom.switch()defaliasDomainDelete(self,aliasname):"""Deletes the given alias domain. Argument: `aliasname` : basestring The name of the alias domain """self.__dbConnect()aliasDom=AliasDomain(self._dbh,aliasname)aliasDom.delete()defdomainList(self,pattern=None):fromVirtualMailManager.Domainimportsearchlike=Falseifpatternand(pattern.startswith('%')orpattern.endswith('%')):like=Trueifnotre.match(RE_DOMAIN_SEARCH,pattern.strip('%')):raiseVMMError(_(u"The pattern '%s' contains invalid characters.")%pattern,ERR.DOMAIN_INVALID)self.__dbConnect()returnsearch(self._dbh,pattern=pattern,like=like)defuserAdd(self,emailaddress,password):ifpasswordisNoneor(isinstance(password,basestring)andnotlen(password)):raiseValueError('could not accept password: %r'%password)acc=self.__getAccount(emailaddress,self.__pwhash(password))acc.save(self._Cfg.dget('maildir.name'),self._Cfg.dget('misc.dovecot_version'),self._Cfg.dget('account.smtp'),self._Cfg.dget('account.pop3'),self._Cfg.dget('account.imap'),self._Cfg.dget('account.sieve'))self.__mailDirMake(acc.getDir('domain'),acc.getUID(),acc.getGID())defaliasAdd(self,aliasaddress,*targetaddresses):"""Creates a new `Alias` entry for the given *aliasaddress* with the given *targetaddresses*."""alias=self.__getAlias(aliasaddress)destinations=[EmailAddress(address)foraddressintargetaddresses]warnings=[]destinations=alias.add_destinations(destinations,long(self._postconf.read('virtual_alias_expansion_limit')),warnings)ifwarnings:self.__warnings.append(_('Ignored destination addresses:'))self.__warnings.extend((' * %s'%wforwinwarnings))fordestinationindestinations:gid=get_gid(self._dbh,destination.domainname)ifgidand(notHandler.accountExists(self._dbh,destination)andnotHandler.aliasExists(self._dbh,destination)):self.__warnings.append(_(u"The destination account/alias %r doesn't exist.")%str(destination))defuserDelete(self,emailaddress,force=None):ifforcenotin[None,'delalias']:raiseVMMError(_(u"Invalid argument: “%s”")%force,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)uid=acc.getUID()gid=acc.getGID()acc.delete(force)ifself._Cfg.dget('account.delete_directory'):try:self.__userDirDelete(acc.getDir('domain'),uid,gid)exceptVMMError,e:ife.codein[ERR.FOUND_DOTS_IN_PATH,ERR.MAILDIR_PERM_MISMATCH,ERR.NO_SUCH_DIRECTORY]:warning=_(u"""\The account has been successfully deleted from the database. But an error occurred while deleting the following directory: “%(directory)s” Reason: %(reason)s""")% \{'directory':acc.getDir('home'),'reason':e.msg}self.__warnings.append(warning)else:raisedefaliasInfo(self,aliasaddress):"""Returns an iterator object for all destinations (`EmailAddress` instances) for the `Alias` with the given *aliasaddress*."""alias=self.__getAlias(aliasaddress)try:returnalias.get_destinations()exceptAliasError,err:iferr.code==ERR.NO_SUCH_ALIAS:other=self._chk_other_address_types(alias.address,TYPE_ALIAS)ifotherisTYPE_ACCOUNT:raiseVMMError(_(u"There is already an account with the \address '%s'.")%alias.address,ERR.ACCOUNT_EXISTS)elifotherisTYPE_RELOCATED:raiseVMMError(_(u"There is already a relocated user \with the address '%s'.")%alias.address,ERR.RELOCATED_EXISTS)else:# unknown addressraiseelse:# something other went wrongraisedefaliasDelete(self,aliasaddress,targetaddress=None):"""Deletes the `Alias` *aliasaddress* with all its destinations from the database. If *targetaddress* is not ``None``, only this destination will be removed from the alias."""alias=self.__getAlias(aliasaddress)iftargetaddressisNone:alias.delete()else:alias.del_destination(EmailAddress(targetaddress))defuserInfo(self,emailaddress,details=None):ifdetailsnotin(None,'du','aliases','full'):raiseVMMError(_(u'Invalid argument: “%s”')%details,ERR.INVALID_AGUMENT)acc=self.__getAccount(emailaddress)info=acc.getInfo(self._Cfg.dget('misc.dovecot_version'))ifself._Cfg.dget('account.disk_usage')ordetailsin('du','full'):info['disk usage']=self.__getDiskUsage('%(maildir)s'%info)ifdetailsin(None,'du'):returninfoifdetailsin('aliases','full'):return(info,acc.getAliases())returninfodefuser_by_uid(self,uid):"""Search for an Account by its *uid*. Returns a dict (address, uid and gid) if a user could be found."""fromVirtualMailManager.Accountimportget_account_by_uidself.__dbConnect()returnget_account_by_uid(uid,self._dbh)defuserPassword(self,emailaddress,password):ifpasswordisNoneor(isinstance(password,basestring)andnotlen(password)):raiseValueError('could not accept password: %r'%password)acc=self.__getAccount(emailaddress)ifacc.getUID()==0:raiseVMMError(_(u"Account doesn't exist"),ERR.NO_SUCH_ACCOUNT)acc.modify('password',self.__pwhash(password,user=emailaddress))defuserName(self,emailaddress,name):acc=self.__getAccount(emailaddress)acc.modify('name',name)defuserTransport(self,emailaddress,transport):acc=self.__getAccount(emailaddress)acc.modify('transport',transport)defuserDisable(self,emailaddress,service=None):ifservice=='managesieve':service='sieve'self.__warnings.append(_(u'\The service name “managesieve” is deprecated and will be removed\n\ in a future release.\n\ Please use the service name “sieve” instead.'))acc=self.__getAccount(emailaddress)acc.disable(self._Cfg.dget('misc.dovecot_version'),service)defuserEnable(self,emailaddress,service=None):ifservice=='managesieve':service='sieve'self.__warnings.append(_(u'\The service name “managesieve” is deprecated and will be removed\n\ in a future release.\n\ Please use the service name “sieve” instead.'))acc=self.__getAccount(emailaddress)acc.enable(self._Cfg.dget('misc.dovecot_version'),service)defrelocatedAdd(self,emailaddress,targetaddress):"""Creates a new `Relocated` entry in the database. If there is already a relocated user with the given *emailaddress*, only the *targetaddress* for the relocated user will be updated."""relocated=self.__getRelocated(emailaddress)relocated.set_destination(EmailAddress(targetaddress))defrelocatedInfo(self,emailaddress):"""Returns the target address of the relocated user with the given *emailaddress*."""relocated=self.__getRelocated(emailaddress)try:returnrelocated.get_info()exceptRelocatedError,err:iferr.code==ERR.NO_SUCH_RELOCATED:other=self._chk_other_address_types(relocated.address,TYPE_RELOCATED)ifotherisTYPE_ACCOUNT:raiseVMMError(_(u"There is already an account with the \address '%s'.")%relocated.address,ERR.ACCOUNT_EXISTS)elifotherisTYPE_ALIAS:raiseVMMError(_(u"There is already an alias with the \address '%s'.")%relocated.address,ERR.ALIAS_EXISTS)else:# unknown addressraiseelse:# something other went wrongraisedefrelocatedDelete(self,emailaddress):"""Deletes the relocated user with the given *emailaddress* from the database."""relocated=self.__getRelocated(emailaddress)relocated.delete()def__del__(self):ifisinstance(self._dbh,PgSQL.Connection)andself._dbh._isOpen:self._dbh.close()