changeset 0 b97547f5f1fa
child 167 b726c12af78f
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
     1 """DB-API 2.0 compliant module
     3 Take a look at http://www.python.org/peps/pep-0249.html
     5 (most parts of this document are reported here in docstrings)
     7 :organization: Logilab
     8 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     9 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
    10 """
    11 __docformat__ = "restructuredtext en"
    13 from logging import getLogger, StreamHandler
    14 from time import time, clock
    16 from cubicweb import ConnectionError, RequestSessionMixIn, set_log_methods
    17 from cubicweb.cwvreg import CubicWebRegistry, MulCnxCubicWebRegistry
    18 from cubicweb.cwconfig import CubicWebNoAppConfiguration
    20 _MARKER = object()
    22 class ConnectionProperties(object):
    23     def __init__(self, cnxtype=None, lang=None, close=True, log=False):
    24         self.cnxtype = cnxtype or 'pyro'
    25         self.lang = lang
    26         self.log_queries = log
    27         self.close_on_del = close
    30 def get_repository(method, database=None, config=None, vreg=None):
    31     """get a proxy object to the CubicWeb repository, using a specific RPC method.
    33     Only 'in-memory' and 'pyro' are supported for now. Either vreg or config
    34     argument should be given
    35     """
    36     assert method in ('pyro', 'inmemory')
    37     assert vreg or config
    38     if vreg and not config:
    39         config = vreg.config
    40     if method == 'inmemory':
    41         # get local access to the repository
    42         from cubicweb.server.repository import Repository
    43         return Repository(config, vreg=vreg)
    44     else: # method == 'pyro'
    45         from Pyro import core, naming, config as pyroconfig
    46         from Pyro.errors import NamingError, ProtocolError
    47         core.initClient(banner=0)
    48         pyroconfig.PYRO_NS_DEFAULTGROUP = ':' + config['pyro-ns-group']
    49         locator = naming.NameServerLocator()
    50         # resolve the Pyro object
    51         try:
    52             nshost, nsport = config['pyro-ns-host'], config['pyro-ns-port']
    53             uri = locator.getNS(nshost, nsport).resolve(database)
    54         except ProtocolError:
    55             raise ConnectionError('Could not connect to the Pyro name server '
    56                                   '(host: %s:%i)' % (nshost, nsport))
    57         except NamingError:
    58             raise ConnectionError('Could not get repository for %s '
    59                                   '(not registered in Pyro),'
    60                                   'you may have to restart your server-side '
    61                                   'application' % database)
    62         return core.getProxyForURI(uri)
    64 def repo_connect(repo, user, password, cnxprops=None):
    65     """Constructor to create a new connection to the CubicWeb repository.
    67     Returns a Connection instance.
    68     """
    69     cnxprops = cnxprops or ConnectionProperties('inmemory')
    70     cnxid = repo.connect(unicode(user), password, cnxprops=cnxprops)
    71     cnx = Connection(repo, cnxid, cnxprops)
    72     if cnxprops.cnxtype == 'inmemory':
    73         cnx.vreg = repo.vreg
    74     return cnx
    76 def connect(database=None, user=None, password=None, host=None,
    77             group=None, cnxprops=None, port=None, setvreg=True, mulcnx=True):
    78     """Constructor for creating a connection to the CubicWeb repository.
    79     Returns a Connection object.
    81     When method is 'pyro' and setvreg is True, use a special registry class
    82     (MulCnxCubicWebRegistry) made to deal with connections to differents instances
    83     in the same process unless specified otherwise by setting the mulcnx to
    84     False.
    85     """
    86     config = CubicWebNoAppConfiguration()
    87     if host:
    88         config.global_set_option('pyro-ns-host', host)
    89     if port:
    90         config.global_set_option('pyro-ns-port', port)
    91     if group:
    92         config.global_set_option('pyro-ns-group', group)
    93     cnxprops = cnxprops or ConnectionProperties()
    94     method = cnxprops.cnxtype
    95     repo = get_repository(method, database, config=config)
    96     if method == 'inmemory':
    97         vreg = repo.vreg
    98     elif setvreg:
    99         if mulcnx:
   100             vreg = MulCnxCubicWebRegistry(config)
   101         else:
   102             vreg = CubicWebRegistry(config)
   103         vreg.set_schema(repo.get_schema())
   104     else:
   105         vreg = None
   106     cnx = repo_connect(repo, user, password, cnxprops)
   107     cnx.vreg = vreg
   108     return cnx
   110 def in_memory_cnx(config, user, password):
   111     """usefull method for testing and scripting to get a dbapi.Connection
   112     object connected to an in-memory repository instance 
   113     """
   114     if isinstance(config, CubicWebRegistry):
   115         vreg = config
   116         config = None
   117     else:
   118         vreg = None
   119     # get local access to the repository
   120     repo = get_repository('inmemory', config=config, vreg=vreg)
   121     # connection to the CubicWeb repository
   122     cnxprops = ConnectionProperties('inmemory')
   123     cnx = repo_connect(repo, user, password, cnxprops=cnxprops)
   124     return repo, cnx
   127 class DBAPIRequest(RequestSessionMixIn):
   129     def __init__(self, vreg, cnx=None):
   130         super(DBAPIRequest, self).__init__(vreg)
   131         try:
   132             # no vreg or config which doesn't handle translations
   133             self.translations = vreg.config.translations
   134         except AttributeError:
   135             self.translations = {}
   136         self.set_default_language(vreg)
   137         # cache entities built during the request
   138         self._eid_cache = {}
   139         # these args are initialized after a connection is
   140         # established
   141         self.cnx = None   # connection associated to the request
   142         self._user = None # request's user, set at authentication
   143         if cnx is not None:
   144             self.set_connection(cnx)
   146     def base_url(self):
   147         return self.vreg.config['base-url']
   149     def from_controller(self):
   150         return 'view'
   152     def set_connection(self, cnx, user=None):
   153         """method called by the session handler when the user is authenticated
   154         or an anonymous connection is open
   155         """
   156         self.cnx = cnx
   157         self.cursor = cnx.cursor(self)
   158         self.set_user(user)
   160     def set_default_language(self, vreg):
   161         try:
   162             self.lang = vreg.property_value('ui.language')
   163         except: # property may not be registered
   164             self.lang = 'en'
   165         # use req.__ to translate a message without registering it to the catalog
   166         try:
   167             self._ = self.__ = self.translations[self.lang]
   168         except KeyError:
   169             # this occurs usually during test execution
   170             self._ = self.__ = unicode
   171         self.debug('request language: %s', self.lang)
   173     def decorate_rset(self, rset):
   174         rset.vreg = self.vreg
   175         rset.req = self
   176         return rset
   178     def describe(self, eid):
   179         """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
   180         return self.cnx.describe(eid)
   182     def source_defs(self):
   183         """return the definition of sources used by the repository."""
   184         return self.cnx.source_defs()
   186     # entities cache management ###############################################
   188     def entity_cache(self, eid):
   189         return self._eid_cache[eid]
   191     def set_entity_cache(self, entity):
   192         self._eid_cache[entity.eid] = entity
   194     def cached_entities(self):
   195         return self._eid_cache.values()
   197     def drop_entity_cache(self, eid=None):
   198         if eid is None:
   199             self._eid_cache = {}
   200         else:
   201             del self._eid_cache[eid]
   203     # low level session data management #######################################
   205     def session_data(self):
   206         """return a dictionnary containing session data"""
   207         return self.cnx.session_data()
   209     def get_session_data(self, key, default=None, pop=False):
   210         """return value associated to `key` in session data"""
   211         return self.cnx.get_session_data(key, default, pop)
   213     def set_session_data(self, key, value):
   214         """set value associated to `key` in session data"""
   215         return self.cnx.set_session_data(key, value)
   217     def del_session_data(self, key):
   218         """remove value associated to `key` in session data"""
   219         return self.cnx.del_session_data(key)
   221     def get_shared_data(self, key, default=None, pop=False):
   222         """return value associated to `key` in shared data"""
   223         return self.cnx.get_shared_data(key, default, pop)
   225     def set_shared_data(self, key, value, querydata=False):
   226         """set value associated to `key` in shared data
   228         if `querydata` is true, the value will be added to the repository
   229         session's query data which are cleared on commit/rollback of the current
   230         transaction, and won't be available through the connexion, only on the
   231         repository side.
   232         """
   233         return self.cnx.set_shared_data(key, value, querydata)
   235     # server session compat layer #############################################
   237     @property
   238     def user(self):
   239         if self._user is None and self.cnx:
   240             self.set_user(self.cnx.user(self))
   241         return self._user
   243     def set_user(self, user):
   244         self._user = user
   245         if user:
   246             self.set_entity_cache(user)
   248     def execute(self, *args, **kwargs):
   249         """Session interface compatibility"""
   250         return self.cursor.execute(*args, **kwargs)
   252 set_log_methods(DBAPIRequest, getLogger('cubicweb.dbapi'))
   255 # exceptions ##################################################################
   257 class ProgrammingError(Exception): #DatabaseError):
   258     """Exception raised for errors that are related to the database's operation
   259     and not necessarily under the control of the programmer, e.g. an unexpected
   260     disconnect occurs, the data source name is not found, a transaction could
   261     not be processed, a memory allocation error occurred during processing,
   262     etc.
   263     """
   265 # module level objects ########################################################
   268 apilevel = '2.0'
   270 """Integer constant stating the level of thread safety the interface supports.
   271 Possible values are:
   273                 0     Threads may not share the module.
   274                 1     Threads may share the module, but not connections.
   275                 2     Threads may share the module and connections.
   276                 3     Threads may share the module, connections and
   277                       cursors.
   279 Sharing in the above context means that two threads may use a resource without
   280 wrapping it using a mutex semaphore to implement resource locking. Note that
   281 you cannot always make external resources thread safe by managing access using
   282 a mutex: the resource may rely on global variables or other external sources
   283 that are beyond your control.
   284 """
   285 threadsafety = 1
   287 """String constant stating the type of parameter marker formatting expected by
   288 the interface. Possible values are :
   290                 'qmark'         Question mark style, 
   291                                 e.g. '...WHERE name=?'
   292                 'numeric'       Numeric, positional style, 
   293                                 e.g. '...WHERE name=:1'
   294                 'named'         Named style, 
   295                                 e.g. '...WHERE name=:name'
   296                 'format'        ANSI C printf format codes, 
   297                                 e.g. '...WHERE name=%s'
   298                 'pyformat'      Python extended format codes, 
   299                                 e.g. '...WHERE name=%(name)s'
   300 """
   301 paramstyle = 'pyformat'
   304 # connection object ###########################################################
   306 class Connection(object):
   307     """DB-API 2.0 compatible Connection object for CubicWebt
   308     """
   309     # make exceptions available through the connection object
   310     ProgrammingError = ProgrammingError
   312     def __init__(self, repo, cnxid, cnxprops=None):
   313         self._repo = repo
   314         self.sessionid = cnxid
   315         self._close_on_del = getattr(cnxprops, 'close_on_del', True)
   316         self._cnxtype = getattr(cnxprops, 'cnxtype', 'pyro')
   317         self._closed = None
   318         if cnxprops and cnxprops.log_queries:
   319             self.executed_queries = []
   320             self.cursor_class = LogCursor
   321         else:
   322             self.cursor_class = Cursor
   323         self.anonymous_connection = False
   324         self.vreg = None
   325         # session's data
   326         self.data = {}
   328     def __repr__(self):
   329         if self.anonymous_connection:
   330             return '<Connection %s (anonymous)>' % self.sessionid
   331         return '<Connection %s>' % self.sessionid
   333     def request(self):
   334         return DBAPIRequest(self.vreg, self)
   336     def session_data(self):
   337         """return a dictionnary containing session data"""
   338         return self.data
   340     def get_session_data(self, key, default=None, pop=False):
   341         """return value associated to `key` in session data"""
   342         if pop:
   343             return self.data.pop(key, default)
   344         else:
   345             return self.data.get(key, default)
   347     def set_session_data(self, key, value):
   348         """set value associated to `key` in session data"""
   349         self.data[key] = value
   351     def del_session_data(self, key):
   352         """remove value associated to `key` in session data"""
   353         try:
   354             del self.data[key]
   355         except KeyError:
   356             pass    
   358     def check(self):
   359         """raise `BadSessionId` if the connection is no more valid"""
   360         try:
   361             self._repo.check_session(self.sessionid)
   362         except AttributeError:
   363             # XXX backward compat for repository running cubicweb < 2.48.3
   364             self._repo.session_data(self.sessionid)
   366     def get_shared_data(self, key, default=None, pop=False):
   367         """return value associated to `key` in shared data"""
   368         return self._repo.get_shared_data(self.sessionid, key, default, pop)
   370     def set_shared_data(self, key, value, querydata=False):
   371         """set value associated to `key` in shared data
   373         if `querydata` is true, the value will be added to the repository
   374         session's query data which are cleared on commit/rollback of the current
   375         transaction, and won't be available through the connexion, only on the
   376         repository side.
   377         """
   378         return self._repo.set_shared_data(self.sessionid, key, value, querydata)
   380     def get_schema(self):
   381         """Return the schema currently used by the repository.
   383         This is NOT part of the DB-API.
   384         """
   385         if self._closed is not None:
   386             raise ProgrammingError('Closed connection')
   387         return self._repo.get_schema()
   389     def load_vobjects(self, cubes=_MARKER, subpath=None, expand=True, force_reload=None):
   390         config = self.vreg.config
   391         if cubes is _MARKER:
   392             cubes = self._repo.get_cubes()
   393         elif cubes is None:
   394             cubes = ()
   395         else:
   396             if not isinstance(cubes, (list, tuple)):
   397                 cubes = (cubes,)
   398             if expand:
   399                 cubes = config.expand_cubes(cubes)
   400         if subpath is None:
   401             subpath = esubpath = ('entities', 'views')
   402         else:
   403             esubpath = subpath
   404         if 'views' in subpath:
   405             esubpath = list(subpath)
   406             esubpath.remove('views')
   407             esubpath.append('web/views')
   408         cubes = reversed([config.cube_dir(p) for p in cubes])
   409         vpath = config.build_vregistry_path(cubes, evobjpath=esubpath,
   410                                             tvobjpath=subpath)
   411         self.vreg.register_objects(vpath, force_reload)
   412         if self._cnxtype == 'inmemory':
   413             # should reinit hooks manager as well
   414             hm, config = self._repo.hm, self._repo.config
   415             hm.set_schema(hm.schema) # reset structure
   416             hm.register_system_hooks(config)
   417             # application specific hooks
   418             if self._repo.config.application_hooks:
   419                 hm.register_hooks(config.load_hooks(self.vreg))
   421     def source_defs(self):
   422         """Return the definition of sources used by the repository.
   424         This is NOT part of the DB-API.
   425         """
   426         if self._closed is not None:
   427             raise ProgrammingError('Closed connection')
   428         return self._repo.source_defs()
   430     def user(self, req, props=None):
   431         """return the User object associated to this connection"""
   432         # cnx validity is checked by the call to .user_info
   433         eid, login, groups, properties = self._repo.user_info(self.sessionid, props)
   434         if req is None:
   435             req = self.request()
   436         rset = req.eid_rset(eid, 'EUser')
   437         user = self.vreg.etype_class('EUser')(req, rset, row=0, groups=groups,
   438                                               properties=properties)
   439         user['login'] = login # cache login
   440         return user
   442     def __del__(self):
   443         """close the remote connection if necessary"""
   444         if self._closed is None and self._close_on_del:
   445             try:
   446                 self.close()
   447             except:
   448                 pass
   450     def describe(self, eid):
   451         return self._repo.describe(self.sessionid, eid)
   453     def close(self):
   454         """Close the connection now (rather than whenever __del__ is called).
   456         The connection will be unusable from this point forward; an Error (or
   457         subclass) exception will be raised if any operation is attempted with
   458         the connection. The same applies to all cursor objects trying to use the
   459         connection.  Note that closing a connection without committing the
   460         changes first will cause an implicit rollback to be performed.
   461         """
   462         if self._closed:
   463             raise ProgrammingError('Connection is already closed')
   464         self._repo.close(self.sessionid)
   465         self._closed = 1
   467     def commit(self):
   468         """Commit any pending transaction to the database. Note that if the
   469         database supports an auto-commit feature, this must be initially off. An
   470         interface method may be provided to turn it back on.
   472         Database modules that do not support transactions should implement this
   473         method with void functionality.
   474         """
   475         if not self._closed is None:
   476             raise ProgrammingError('Connection is already closed')
   477         self._repo.commit(self.sessionid)
   479     def rollback(self):
   480         """This method is optional since not all databases provide transaction
   481         support.
   483         In case a database does provide transactions this method causes the the
   484         database to roll back to the start of any pending transaction.  Closing
   485         a connection without committing the changes first will cause an implicit
   486         rollback to be performed.
   487         """
   488         if not self._closed is None:
   489             raise ProgrammingError('Connection is already closed')
   490         self._repo.rollback(self.sessionid)
   492     def cursor(self, req=None):
   493         """Return a new Cursor Object using the connection.  If the database
   494         does not provide a direct cursor concept, the module will have to
   495         emulate cursors using other means to the extent needed by this
   496         specification.
   497         """
   498         if self._closed is not None:
   499             raise ProgrammingError('Can\'t get cursor on closed connection')
   500         if req is None:
   501             req = self.request()
   502         return self.cursor_class(self, self._repo, req=req)
   505 # cursor object ###############################################################
   507 class Cursor(object):
   508     """These objects represent a database cursor, which is used to manage the
   509     context of a fetch operation. Cursors created from the same connection are
   510     not isolated, i.e., any changes done to the database by a cursor are
   511     immediately visible by the other cursors. Cursors created from different
   512     connections can or can not be isolated, depending on how the transaction
   513     support is implemented (see also the connection's rollback() and commit()
   514     methods.)
   515     """
   517     def __init__(self, connection, repo, req=None):
   518         """This read-only attribute return a reference to the Connection
   519         object on which the cursor was created.
   520         """
   521         self.connection = connection
   522         """optionnal issuing request instance"""
   523         self.req = req
   525         """This read/write attribute specifies the number of rows to fetch at a
   526         time with fetchmany(). It defaults to 1 meaning to fetch a single row
   527         at a time.
   529         Implementations must observe this value with respect to the fetchmany()
   530         method, but are free to interact with the database a single row at a
   531         time. It may also be used in the implementation of executemany().
   532         """
   533         self.arraysize = 1
   535         self._repo = repo
   536         self._sessid = connection.sessionid
   537         self._res = None
   538         self._closed = None
   539         self._index = 0
   542     def close(self):
   543         """Close the cursor now (rather than whenever __del__ is called).  The
   544         cursor will be unusable from this point forward; an Error (or subclass)
   545         exception will be raised if any operation is attempted with the cursor.
   546         """
   547         self._closed = True
   550     def execute(self, operation, parameters=None, eid_key=None, build_descr=True):
   551         """Prepare and execute a database operation (query or command).
   552         Parameters may be provided as sequence or mapping and will be bound to
   553         variables in the operation.  Variables are specified in a
   554         database-specific notation (see the module's paramstyle attribute for
   555         details).
   557         A reference to the operation will be retained by the cursor.  If the
   558         same operation object is passed in again, then the cursor can optimize
   559         its behavior.  This is most effective for algorithms where the same
   560         operation is used, but different parameters are bound to it (many
   561         times).
   563         For maximum efficiency when reusing an operation, it is best to use the
   564         setinputsizes() method to specify the parameter types and sizes ahead
   565         of time.  It is legal for a parameter to not match the predefined
   566         information; the implementation should compensate, possibly with a loss
   567         of efficiency.
   569         The parameters may also be specified as list of tuples to e.g. insert
   570         multiple rows in a single operation, but this kind of usage is
   571         depreciated: executemany() should be used instead.
   573         Return values are not defined by the DB-API, but this here it returns a
   574         ResultSet object.
   575         """
   576         self._res = res = self._repo.execute(self._sessid, operation,
   577                                              parameters, eid_key, build_descr)
   578         self.req.decorate_rset(res)
   579         self._index = 0
   580         return res
   583     def executemany(self, operation, seq_of_parameters):
   584         """Prepare a database operation (query or command) and then execute it
   585         against all parameter sequences or mappings found in the sequence
   586         seq_of_parameters.
   588         Modules are free to implement this method using multiple calls to the
   589         execute() method or by using array operations to have the database
   590         process the sequence as a whole in one call.
   592         Use of this method for an operation which produces one or more result
   593         sets constitutes undefined behavior, and the implementation is
   594         permitted (but not required) to raise an exception when it detects that
   595         a result set has been created by an invocation of the operation.
   597         The same comments as for execute() also apply accordingly to this
   598         method.
   600         Return values are not defined.
   601         """
   602         for parameters in seq_of_parameters:
   603             self.execute(operation, parameters)
   604             if self._res.rows is not None:
   605                 self._res = None
   606                 raise ProgrammingError('Operation returned a result set')
   609     def fetchone(self):
   610         """Fetch the next row of a query result set, returning a single
   611         sequence, or None when no more data is available.
   613         An Error (or subclass) exception is raised if the previous call to
   614         execute*() did not produce any result set or no call was issued yet.
   615         """
   616         if self._res is None:
   617             raise ProgrammingError('No result set')
   618         row = self._res.rows[self._index]
   619         self._index += 1
   620         return row
   623     def fetchmany(self, size=None):
   624         """Fetch the next set of rows of a query result, returning a sequence
   625         of sequences (e.g. a list of tuples). An empty sequence is returned
   626         when no more rows are available.
   628         The number of rows to fetch per call is specified by the parameter.  If
   629         it is not given, the cursor's arraysize determines the number of rows
   630         to be fetched. The method should try to fetch as many rows as indicated
   631         by the size parameter. If this is not possible due to the specified
   632         number of rows not being available, fewer rows may be returned.
   634         An Error (or subclass) exception is raised if the previous call to
   635         execute*() did not produce any result set or no call was issued yet.
   637         Note there are performance considerations involved with the size
   638         parameter.  For optimal performance, it is usually best to use the
   639         arraysize attribute.  If the size parameter is used, then it is best
   640         for it to retain the same value from one fetchmany() call to the next.
   641         """
   642         if self._res is None:
   643             raise ProgrammingError('No result set')
   644         if size is None:
   645             size = self.arraysize
   646         rows = self._res.rows[self._index:self._index + size]
   647         self._index += size
   648         return rows
   651     def fetchall(self):
   652         """Fetch all (remaining) rows of a query result, returning them as a
   653         sequence of sequences (e.g. a list of tuples).  Note that the cursor's
   654         arraysize attribute can affect the performance of this operation.
   656         An Error (or subclass) exception is raised if the previous call to
   657         execute*() did not produce any result set or no call was issued yet.
   658         """
   659         if self._res is None:
   660             raise ProgrammingError('No result set')
   661         if not self._res.rows:
   662             return []
   663         rows = self._res.rows[self._index:]
   664         self._index = len(self._res)
   665         return rows
   668     def setinputsizes(self, sizes):
   669         """This can be used before a call to execute*() to predefine memory
   670         areas for the operation's parameters.
   672         sizes is specified as a sequence -- one item for each input parameter.
   673         The item should be a Type Object that corresponds to the input that
   674         will be used, or it should be an integer specifying the maximum length
   675         of a string parameter.  If the item is None, then no predefined memory
   676         area will be reserved for that column (this is useful to avoid
   677         predefined areas for large inputs).
   679         This method would be used before the execute*() method is invoked.
   681         Implementations are free to have this method do nothing and users are
   682         free to not use it.
   683         """
   684         pass
   687     def setoutputsize(self, size, column=None):
   688         """Set a column buffer size for fetches of large columns (e.g. LONGs,
   689         BLOBs, etc.).  The column is specified as an index into the result
   690         sequence.  Not specifying the column will set the default size for all
   691         large columns in the cursor.
   693         This method would be used before the execute*() method is invoked.
   695         Implementations are free to have this method do nothing and users are
   696         free to not use it.
   697         """    
   698         pass
   701 class LogCursor(Cursor):
   702     """override the standard cursor to log executed queries"""
   704     def execute(self, operation, parameters=None, eid_key=None, build_descr=True):
   705         """override the standard cursor to log executed queries"""
   706         tstart, cstart = time(), clock()
   707         rset = Cursor.execute(self, operation, parameters, eid_key, build_descr)
   708         self.connection.executed_queries.append((operation, parameters,
   709                                                  time() - tstart, clock() - cstart))
   710         return rset