server/sources/native.py
branchstable
changeset 2306 95da5d9f0870
parent 2072 8008e8812d76
child 2310 80fcdfbb8eed
equal deleted inserted replaced
2305:8f6dbe884700 2306:95da5d9f0870
   183         self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None)
   183         self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None)
   184         self._cache.pop('Any X WHERE X eid %s' % eid, None)
   184         self._cache.pop('Any X WHERE X eid %s' % eid, None)
   185 
   185 
   186     def sqlexec(self, session, sql, args=None):
   186     def sqlexec(self, session, sql, args=None):
   187         """execute the query and return its result"""
   187         """execute the query and return its result"""
   188         cursor = session.pool[self.uri]
   188         return self.process_result(self.doexec(session, sql, args))
   189         self.doexec(cursor, sql, args)
       
   190         return self.process_result(cursor)
       
   191 
   189 
   192     def init_creating(self):
   190     def init_creating(self):
   193         pool = self.repo._get_pool()
   191         pool = self.repo._get_pool()
   194         pool.pool_set()
   192         pool.pool_set()
   195         # check full text index availibility
   193         # check full text index availibility
   303             except KeyError:
   301             except KeyError:
   304                 self.cache_miss += 1
   302                 self.cache_miss += 1
   305                 sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
   303                 sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
   306                 self._cache[cachekey] = sql, query_args
   304                 self._cache[cachekey] = sql, query_args
   307         args = self.merge_args(args, query_args)
   305         args = self.merge_args(args, query_args)
   308         cursor = session.pool[self.uri]
       
   309         assert isinstance(sql, basestring), repr(sql)
   306         assert isinstance(sql, basestring), repr(sql)
   310         try:
   307         try:
   311             self.doexec(cursor, sql, args)
   308             cursor = self.doexec(session, sql, args)
   312         except (self.dbapi_module.OperationalError,
   309         except (self.dbapi_module.OperationalError,
   313                 self.dbapi_module.InterfaceError):
   310                 self.dbapi_module.InterfaceError):
   314             # FIXME: better detection of deconnection pb
   311             # FIXME: better detection of deconnection pb
   315             self.info("request failed '%s' ... retry with a new cursor", sql)
   312             self.info("request failed '%s' ... retry with a new cursor", sql)
   316             session.pool.reconnect(self)
   313             session.pool.reconnect(self)
   317             cursor = session.pool[self.uri]
   314             cursor = self.doexec(session, sql, args)
   318             self.doexec(cursor, sql, args)
       
   319         res = self.process_result(cursor)
   315         res = self.process_result(cursor)
   320         if server.DEBUG:
   316         if server.DEBUG:
   321             print '------>', res
   317             print '------>', res
   322         return res
   318         return res
   323 
   319 
   335                 print union.as_string()
   331                 print union.as_string()
   336                 print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children)
   332                 print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children)
   337             # generate sql queries if we are able to do so
   333             # generate sql queries if we are able to do so
   338             sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
   334             sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
   339             query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding))
   335             query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding))
   340             self.doexec(session.pool[self.uri], query,
   336             self.doexec(session, query, self.merge_args(args, query_args))
   341                         self.merge_args(args, query_args))
       
   342         else:
   337         else:
   343             super(NativeSQLSource, self).flying_insert(table, session, union,
   338             super(NativeSQLSource, self).flying_insert(table, session, union,
   344                                                        args, varmap)
   339                                                        args, varmap)
   345 
   340 
   346     def _manual_insert(self, results, table, session):
   341     def _manual_insert(self, results, table, session):
   356             for index, cell in enumerate(row):
   351             for index, cell in enumerate(row):
   357                 if isinstance(cell, Binary):
   352                 if isinstance(cell, Binary):
   358                     cell = self.binary(cell.getvalue())
   353                     cell = self.binary(cell.getvalue())
   359                 kwargs[str(index)] = cell
   354                 kwargs[str(index)] = cell
   360             kwargs_list.append(kwargs)
   355             kwargs_list.append(kwargs)
   361         self.doexecmany(session.pool[self.uri], query, kwargs_list)
   356         self.doexecmany(session, query, kwargs_list)
   362 
   357 
   363     def clean_temp_data(self, session, temptables):
   358     def clean_temp_data(self, session, temptables):
   364         """remove temporary data, usually associated to temporary tables"""
   359         """remove temporary data, usually associated to temporary tables"""
   365         if temptables:
   360         if temptables:
   366             cursor = session.pool[self.uri]
       
   367             for table in temptables:
   361             for table in temptables:
   368                 try:
   362                 try:
   369                     self.doexec(cursor,'DROP TABLE %s' % table)
   363                     self.doexec(session,'DROP TABLE %s' % table)
   370                 except:
   364                 except:
   371                     pass
   365                     pass
   372                 try:
   366                 try:
   373                     del self._temp_table_data[table]
   367                     del self._temp_table_data[table]
   374                 except KeyError:
   368                 except KeyError:
   376 
   370 
   377     def add_entity(self, session, entity):
   371     def add_entity(self, session, entity):
   378         """add a new entity to the source"""
   372         """add a new entity to the source"""
   379         attrs = self.preprocess_entity(entity)
   373         attrs = self.preprocess_entity(entity)
   380         sql = self.sqlgen.insert(SQL_PREFIX + str(entity.e_schema), attrs)
   374         sql = self.sqlgen.insert(SQL_PREFIX + str(entity.e_schema), attrs)
   381         self.doexec(session.pool[self.uri], sql, attrs)
   375         self.doexec(session, sql, attrs)
   382 
   376 
   383     def update_entity(self, session, entity):
   377     def update_entity(self, session, entity):
   384         """replace an entity in the source"""
   378         """replace an entity in the source"""
   385         attrs = self.preprocess_entity(entity)
   379         attrs = self.preprocess_entity(entity)
   386         sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid'])
   380         sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid'])
   387         self.doexec(session.pool[self.uri], sql, attrs)
   381         self.doexec(session, sql, attrs)
   388 
   382 
   389     def delete_entity(self, session, etype, eid):
   383     def delete_entity(self, session, etype, eid):
   390         """delete an entity from the source"""
   384         """delete an entity from the source"""
   391         attrs = {SQL_PREFIX + 'eid': eid}
   385         attrs = {SQL_PREFIX + 'eid': eid}
   392         sql = self.sqlgen.delete(SQL_PREFIX + etype, attrs)
   386         sql = self.sqlgen.delete(SQL_PREFIX + etype, attrs)
   393         self.doexec(session.pool[self.uri], sql, attrs)
   387         self.doexec(session, sql, attrs)
   394 
   388 
   395     def add_relation(self, session, subject, rtype, object):
   389     def add_relation(self, session, subject, rtype, object):
   396         """add a relation to the source"""
   390         """add a relation to the source"""
   397         attrs = {'eid_from': subject, 'eid_to': object}
   391         attrs = {'eid_from': subject, 'eid_to': object}
   398         sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
   392         sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
   399         self.doexec(session.pool[self.uri], sql, attrs)
   393         self.doexec(session, sql, attrs)
   400 
   394 
   401     def delete_relation(self, session, subject, rtype, object):
   395     def delete_relation(self, session, subject, rtype, object):
   402         """delete a relation from the source"""
   396         """delete a relation from the source"""
   403         rschema = self.schema.rschema(rtype)
   397         rschema = self.schema.rschema(rtype)
   404         if rschema.inlined:
   398         if rschema.inlined:
   408                                                                   SQL_PREFIX)
   402                                                                   SQL_PREFIX)
   409             attrs = {'eid' : subject}
   403             attrs = {'eid' : subject}
   410         else:
   404         else:
   411             attrs = {'eid_from': subject, 'eid_to': object}
   405             attrs = {'eid_from': subject, 'eid_to': object}
   412             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
   406             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
   413         self.doexec(session.pool[self.uri], sql, attrs)
   407         self.doexec(session, sql, attrs)
   414 
   408 
   415     def doexec(self, cursor, query, args=None):
   409     def doexec(self, session, query, args=None):
   416         """Execute a query.
   410         """Execute a query.
   417         it's a function just so that it shows up in profiling
   411         it's a function just so that it shows up in profiling
   418         """
   412         """
   419         #t1 = time()
       
   420         if server.DEBUG:
   413         if server.DEBUG:
   421             print 'exec', query, args
   414             print 'exec', query, args
   422         #import sys
   415         cursor = session.pool[self.uri]
   423         #sys.stdout.flush()
   416         try:
   424         # str(query) to avoid error if it's an unicode string
   417             # str(query) to avoid error if it's an unicode string
   425         try:
       
   426             cursor.execute(str(query), args)
   418             cursor.execute(str(query), args)
   427         except Exception, ex:
   419         except Exception, ex:
   428             self.critical("sql: %r\n args: %s\ndbms message: %r",
   420             self.critical("sql: %r\n args: %s\ndbms message: %r",
   429                           query, args, ex.args[0])
   421                           query, args, ex.args[0])
       
   422             try:
       
   423                 session.pool.connection(self.uri).rollback()
       
   424                 self.critical('transaction has been rollbacked')
       
   425             except:
       
   426                 pass
   430             raise
   427             raise
   431 
   428         return cursor
   432     def doexecmany(self, cursor, query, args):
   429 
       
   430     def doexecmany(self, session, query, args):
   433         """Execute a query.
   431         """Execute a query.
   434         it's a function just so that it shows up in profiling
   432         it's a function just so that it shows up in profiling
   435         """
   433         """
   436         #t1 = time()
       
   437         if server.DEBUG:
   434         if server.DEBUG:
   438             print 'execmany', query, 'with', len(args), 'arguments'
   435             print 'execmany', query, 'with', len(args), 'arguments'
   439         #import sys
   436         cursor = session.pool[self.uri]
   440         #sys.stdout.flush()
   437         try:
   441         # str(query) to avoid error if it's an unicode string
   438             # str(query) to avoid error if it's an unicode string
   442         try:
       
   443             cursor.executemany(str(query), args)
   439             cursor.executemany(str(query), args)
   444         except:
   440         except Exception, ex:
   445             self.critical("sql many: %r\n args: %s", query, args)
   441             self.critical("sql many: %r\n args: %s\ndbms message: %r",
       
   442                           query, args, ex.args[0])
       
   443             try:
       
   444                 session.pool.connection(self.uri).rollback()
       
   445                 self.critical('transaction has been rollbacked')
       
   446             except:
       
   447                 pass
   446             raise
   448             raise
   447 
   449 
   448     # short cut to method requiring advanced db helper usage ##################
   450     # short cut to method requiring advanced db helper usage ##################
   449 
   451 
   450     def create_index(self, session, table, column, unique=False):
   452     def create_index(self, session, table, column, unique=False):
   496     def create_temp_table(self, session, table, schema):
   498     def create_temp_table(self, session, table, schema):
   497         # we don't want on commit drop, this may cause problem when
   499         # we don't want on commit drop, this may cause problem when
   498         # running with an ldap source, and table will be deleted manually any way
   500         # running with an ldap source, and table will be deleted manually any way
   499         # on commit
   501         # on commit
   500         sql = self.dbhelper.sql_temporary_table(table, schema, False)
   502         sql = self.dbhelper.sql_temporary_table(table, schema, False)
   501         self.doexec(session.pool[self.uri], sql)
   503         self.doexec(session, sql)
   502 
   504 
   503     def create_eid(self, session):
   505     def create_eid(self, session):
   504         self._eid_creation_lock.acquire()
   506         self._eid_creation_lock.acquire()
   505         try:
   507         try:
   506             cursor = session.pool[self.uri]
       
   507             for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'):
   508             for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'):
   508                 self.doexec(cursor, sql)
   509                 self.doexec(session, sql)
   509             return cursor.fetchone()[0]
   510             return cursor.fetchone()[0]
   510         finally:
   511         finally:
   511             self._eid_creation_lock.release()
   512             self._eid_creation_lock.release()
   512 
   513 
   513     def add_info(self, session, entity, source, extid=None):
   514     def add_info(self, session, entity, source, extid=None):