# ZPsycopgDA/DA.py - ZPsycopgDA Zope product: Database Connection # # Copyright (C) 2004-2010 Federico Di Gregorio # # psycopg2 is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # psycopg2 is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # Import modules needed by _psycopg to allow tools like py2exe to do # their work without bothering about the module dependencies. import sys import time import db import re import Acquisition import Shared.DC.ZRDB.Connection from db import DB from Globals import HTMLFile from ExtensionClass import Base from App.Dialogs import MessageDialog from DateTime import DateTime # ImageFile is deprecated in Zope >= 2.9 try: from App.ImageFile import ImageFile except ImportError: # Zope < 2.9. If PIL's installed with a .pth file, we're probably # hosed. from ImageFile import ImageFile # import psycopg and functions/singletons needed for date/time conversions import psycopg2 from psycopg2 import NUMBER, STRING, ROWID, DATETIME from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE from psycopg2.extensions import TIME, INTERVAL from psycopg2.extensions import new_type, register_type # add a new connection to a folder manage_addZPsycopgConnectionForm = HTMLFile('dtml/add',globals()) def manage_addZPsycopgConnection(self, id, title, connection_string, zdatetime=None, tilevel=2, encoding='', check=None, REQUEST=None): """Add a DB connection to a folder.""" self._setObject(id, Connection(id, title, connection_string, zdatetime, check, tilevel, encoding)) if REQUEST is not None: return self.manage_main(self, REQUEST) # the connection object class Connection(Shared.DC.ZRDB.Connection.Connection): """ZPsycopg Connection.""" _isAnSQLConnection = 1 id = 'Psycopg2_database_connection' database_type = 'Psycopg2' meta_type = title = 'Z Psycopg 2 Database Connection' icon = 'misc_/conn' def __init__(self, id, title, connection_string, zdatetime, check=None, tilevel=2, encoding='UTF-8'): self.zdatetime = zdatetime self.id = str(id) self.edit(title, connection_string, zdatetime, check=check, tilevel=tilevel, encoding=encoding) def factory(self): return DB ## connection parameters editing ## def edit(self, title, connection_string, zdatetime, check=None, tilevel=2, encoding='UTF-8'): self.title = title self.connection_string = connection_string self.zdatetime = zdatetime self.tilevel = tilevel self.encoding = encoding if check: self.connect(self.connection_string) manage_properties = HTMLFile('dtml/edit', globals()) def manage_edit(self, title, connection_string, zdatetime=None, check=None, tilevel=2, encoding='UTF-8', REQUEST=None): """Edit the DB connection.""" self.edit(title, connection_string, zdatetime, check=check, tilevel=tilevel, encoding=encoding) if REQUEST is not None: msg = "Connection edited." return self.manage_main(self,REQUEST,manage_tabs_message=msg) def connect(self, s): try: self._v_database_connection.close() except: pass # check psycopg version and raise exception if does not match check_psycopg_version(psycopg2.__version__) self._v_connected = '' dbf = self.factory() # TODO: let the psycopg exception propagate, or not? self._v_database_connection = dbf( self.connection_string, self.tilevel, self.get_type_casts(), self.encoding) self._v_database_connection.open() self._v_connected = DateTime() return self def get_type_casts(self): # note that in both cases order *is* important if self.zdatetime: return ZDATETIME, ZDATE, ZTIME else: return DATETIME, DATE, TIME ## browsing and table/column management ## manage_options = Shared.DC.ZRDB.Connection.Connection.manage_options # + ( # {'label': 'Browse', 'action':'manage_browse'},) #manage_tables = HTMLFile('dtml/tables', globals()) #manage_browse = HTMLFile('dtml/browse', globals()) info = None def table_info(self): return self._v_database_connection.table_info() def __getitem__(self, name): if name == 'tableNamed': if not hasattr(self, '_v_tables'): self.tpValues() return self._v_tables.__of__(self) raise KeyError, name def tpValues(self): res = [] conn = self._v_database_connection for d in conn.tables(rdb=0): try: name = d['TABLE_NAME'] b = TableBrowser() b.__name__ = name b._d = d b._c = c try: b.icon = table_icons[d['TABLE_TYPE']] except: pass r.append(b) except: pass return res def check_psycopg_version(version): """ Check that the psycopg version used is compatible with the zope adpter. """ try: m = re.match(r'\d+\.\d+(\.\d+)?', version.split(' ')[0]) tver = tuple(map(int, m.group().split('.'))) except: raise ImportError("failed to parse psycopg version %s" % version) if tver < (2, 4): raise ImportError("psycopg version %s is too old" % version) if tver in ((2,4,2), (2,4,3)): raise ImportError("psycopg version %s is known to be buggy" % version) ## database connection registration data ## classes = (Connection,) meta_types = ({'name':'Z Psycopg 2 Database Connection', 'action':'manage_addZPsycopgConnectionForm'},) folder_methods = { 'manage_addZPsycopgConnection': manage_addZPsycopgConnection, 'manage_addZPsycopgConnectionForm': manage_addZPsycopgConnectionForm} __ac_permissions__ = ( ('Add Z Psycopg Database Connections', ('manage_addZPsycopgConnectionForm', 'manage_addZPsycopgConnection')),) # add icons misc_={'conn': ImageFile('icons/DBAdapterFolder_icon.gif', globals())} for icon in ('table', 'view', 'stable', 'what', 'field', 'text', 'bin', 'int', 'float', 'date', 'time', 'datetime'): misc_[icon] = ImageFile('icons/%s.gif' % icon, globals()) ## zope-specific psycopg typecasters ## # convert an ISO timestamp string from postgres to a Zope DateTime object def _cast_DateTime(iso, curs): if iso: if iso in ['-infinity', 'infinity']: return iso else: return DateTime(iso) # convert an ISO date string from postgres to a Zope DateTime object def _cast_Date(iso, curs): if iso: if iso in ['-infinity', 'infinity']: return iso else: return DateTime(iso) # Convert a time string from postgres to a Zope DateTime object. # NOTE: we set the day as today before feeding to DateTime so # that it has the same DST settings. def _cast_Time(iso, curs): if iso: if iso in ['-infinity', 'infinity']: return iso else: return DateTime(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())[:3]+ time.strptime(iso[:8], "%H:%M:%S")[3:])) # NOTE: we don't cast intervals anymore because they are passed # untouched to Zope. def _cast_Interval(iso, curs): return iso ZDATETIME = new_type((1184, 1114), "ZDATETIME", _cast_DateTime) ZINTERVAL = new_type((1186,), "ZINTERVAL", _cast_Interval) ZDATE = new_type((1082,), "ZDATE", _cast_Date) ZTIME = new_type((1083,), "ZTIME", _cast_Time) ## table browsing helpers ## class TableBrowserCollection(Acquisition.Implicit): pass class Browser(Base): def __getattr__(self, name): try: return self._d[name] except KeyError: raise AttributeError, name class values: def len(self): return 1 def __getitem__(self, i): try: return self._d[i] except AttributeError: pass self._d = self._f() return self._d[i] class TableBrowser(Browser, Acquisition.Implicit): icon = 'what' Description = check = '' info = HTMLFile('table_info', globals()) menu = HTMLFile('table_menu', globals()) def tpValues(self): v = values() v._f = self.tpValues_ return v def tpValues_(self): r=[] tname=self.__name__ for d in self._c.columns(tname): b=ColumnBrowser() b._d=d try: b.icon=field_icons[d['Type']] except: pass b.TABLE_NAME=tname r.append(b) return r def tpId(self): return self._d['TABLE_NAME'] def tpURL(self): return "Table/%s" % self._d['TABLE_NAME'] def Name(self): return self._d['TABLE_NAME'] def Type(self): return self._d['TABLE_TYPE'] manage_designInput=HTMLFile('designInput',globals()) def manage_buildInput(self, id, source, default, REQUEST=None): "Create a database method for an input form" args=[] values=[] names=[] columns=self._columns for i in range(len(source)): s=source[i] if s=='Null': continue c=columns[i] d=default[i] t=c['Type'] n=c['Name'] names.append(n) if s=='Argument': values.append("'" % (n, vartype(t))) a='%s%s' % (n, boboType(t)) if d: a="%s=%s" % (a,d) args.append(a) elif s=='Property': values.append("'" % (n, vartype(t))) else: if isStringType(t): if find(d,"\'") >= 0: d=join(split(d,"\'"),"''") values.append("'%s'" % d) elif d: values.append(str(d)) else: raise ValueError, ( 'no default was given for %s' % n) class ColumnBrowser(Browser): icon='field' def check(self): return ('\t' % (self.TABLE_NAME, self._d['Name'])) def tpId(self): return self._d['Name'] def tpURL(self): return "Column/%s" % self._d['Name'] def Description(self): d=self._d if d['Scale']: return " %(Type)s(%(Precision)s,%(Scale)s) %(Nullable)s" % d else: return " %(Type)s(%(Precision)s) %(Nullable)s" % d table_icons={ 'TABLE': 'table', 'VIEW':'view', 'SYSTEM_TABLE': 'stable', } field_icons={ NUMBER.name: 'i', STRING.name: 'text', DATETIME.name: 'date', INTEGER.name: 'int', FLOAT.name: 'float', BOOLEAN.name: 'bin', ROWID.name: 'int' }