183 self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None) |
183 self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None) |
184 self._cache.pop('Any X WHERE X eid %s' % eid, None) |
184 self._cache.pop('Any X WHERE X eid %s' % eid, None) |
185 |
185 |
186 def sqlexec(self, session, sql, args=None): |
186 def sqlexec(self, session, sql, args=None): |
187 """execute the query and return its result""" |
187 """execute the query and return its result""" |
188 cursor = session.pool[self.uri] |
188 return self.process_result(self.doexec(session, sql, args)) |
189 self.doexec(cursor, sql, args) |
|
190 return self.process_result(cursor) |
|
191 |
189 |
192 def init_creating(self): |
190 def init_creating(self): |
193 pool = self.repo._get_pool() |
191 pool = self.repo._get_pool() |
194 pool.pool_set() |
192 pool.pool_set() |
195 # check full text index availibility |
193 # check full text index availibility |
303 except KeyError: |
301 except KeyError: |
304 self.cache_miss += 1 |
302 self.cache_miss += 1 |
305 sql, query_args = self._rql_sqlgen.generate(union, args, varmap) |
303 sql, query_args = self._rql_sqlgen.generate(union, args, varmap) |
306 self._cache[cachekey] = sql, query_args |
304 self._cache[cachekey] = sql, query_args |
307 args = self.merge_args(args, query_args) |
305 args = self.merge_args(args, query_args) |
308 cursor = session.pool[self.uri] |
|
309 assert isinstance(sql, basestring), repr(sql) |
306 assert isinstance(sql, basestring), repr(sql) |
310 try: |
307 try: |
311 self.doexec(cursor, sql, args) |
308 cursor = self.doexec(session, sql, args) |
312 except (self.dbapi_module.OperationalError, |
309 except (self.dbapi_module.OperationalError, |
313 self.dbapi_module.InterfaceError): |
310 self.dbapi_module.InterfaceError): |
314 # FIXME: better detection of deconnection pb |
311 # FIXME: better detection of deconnection pb |
315 self.info("request failed '%s' ... retry with a new cursor", sql) |
312 self.info("request failed '%s' ... retry with a new cursor", sql) |
316 session.pool.reconnect(self) |
313 session.pool.reconnect(self) |
317 cursor = session.pool[self.uri] |
314 cursor = self.doexec(session, sql, args) |
318 self.doexec(cursor, sql, args) |
|
319 res = self.process_result(cursor) |
315 res = self.process_result(cursor) |
320 if server.DEBUG: |
316 if server.DEBUG: |
321 print '------>', res |
317 print '------>', res |
322 return res |
318 return res |
323 |
319 |
335 print union.as_string() |
331 print union.as_string() |
336 print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children) |
332 print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children) |
337 # generate sql queries if we are able to do so |
333 # generate sql queries if we are able to do so |
338 sql, query_args = self._rql_sqlgen.generate(union, args, varmap) |
334 sql, query_args = self._rql_sqlgen.generate(union, args, varmap) |
339 query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding)) |
335 query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding)) |
340 self.doexec(session.pool[self.uri], query, |
336 self.doexec(session, query, self.merge_args(args, query_args)) |
341 self.merge_args(args, query_args)) |
|
342 else: |
337 else: |
343 super(NativeSQLSource, self).flying_insert(table, session, union, |
338 super(NativeSQLSource, self).flying_insert(table, session, union, |
344 args, varmap) |
339 args, varmap) |
345 |
340 |
346 def _manual_insert(self, results, table, session): |
341 def _manual_insert(self, results, table, session): |
356 for index, cell in enumerate(row): |
351 for index, cell in enumerate(row): |
357 if isinstance(cell, Binary): |
352 if isinstance(cell, Binary): |
358 cell = self.binary(cell.getvalue()) |
353 cell = self.binary(cell.getvalue()) |
359 kwargs[str(index)] = cell |
354 kwargs[str(index)] = cell |
360 kwargs_list.append(kwargs) |
355 kwargs_list.append(kwargs) |
361 self.doexecmany(session.pool[self.uri], query, kwargs_list) |
356 self.doexecmany(session, query, kwargs_list) |
362 |
357 |
363 def clean_temp_data(self, session, temptables): |
358 def clean_temp_data(self, session, temptables): |
364 """remove temporary data, usually associated to temporary tables""" |
359 """remove temporary data, usually associated to temporary tables""" |
365 if temptables: |
360 if temptables: |
366 cursor = session.pool[self.uri] |
|
367 for table in temptables: |
361 for table in temptables: |
368 try: |
362 try: |
369 self.doexec(cursor,'DROP TABLE %s' % table) |
363 self.doexec(session,'DROP TABLE %s' % table) |
370 except: |
364 except: |
371 pass |
365 pass |
372 try: |
366 try: |
373 del self._temp_table_data[table] |
367 del self._temp_table_data[table] |
374 except KeyError: |
368 except KeyError: |
376 |
370 |
377 def add_entity(self, session, entity): |
371 def add_entity(self, session, entity): |
378 """add a new entity to the source""" |
372 """add a new entity to the source""" |
379 attrs = self.preprocess_entity(entity) |
373 attrs = self.preprocess_entity(entity) |
380 sql = self.sqlgen.insert(SQL_PREFIX + str(entity.e_schema), attrs) |
374 sql = self.sqlgen.insert(SQL_PREFIX + str(entity.e_schema), attrs) |
381 self.doexec(session.pool[self.uri], sql, attrs) |
375 self.doexec(session, sql, attrs) |
382 |
376 |
383 def update_entity(self, session, entity): |
377 def update_entity(self, session, entity): |
384 """replace an entity in the source""" |
378 """replace an entity in the source""" |
385 attrs = self.preprocess_entity(entity) |
379 attrs = self.preprocess_entity(entity) |
386 sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid']) |
380 sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid']) |
387 self.doexec(session.pool[self.uri], sql, attrs) |
381 self.doexec(session, sql, attrs) |
388 |
382 |
389 def delete_entity(self, session, etype, eid): |
383 def delete_entity(self, session, etype, eid): |
390 """delete an entity from the source""" |
384 """delete an entity from the source""" |
391 attrs = {SQL_PREFIX + 'eid': eid} |
385 attrs = {SQL_PREFIX + 'eid': eid} |
392 sql = self.sqlgen.delete(SQL_PREFIX + etype, attrs) |
386 sql = self.sqlgen.delete(SQL_PREFIX + etype, attrs) |
393 self.doexec(session.pool[self.uri], sql, attrs) |
387 self.doexec(session, sql, attrs) |
394 |
388 |
395 def add_relation(self, session, subject, rtype, object): |
389 def add_relation(self, session, subject, rtype, object): |
396 """add a relation to the source""" |
390 """add a relation to the source""" |
397 attrs = {'eid_from': subject, 'eid_to': object} |
391 attrs = {'eid_from': subject, 'eid_to': object} |
398 sql = self.sqlgen.insert('%s_relation' % rtype, attrs) |
392 sql = self.sqlgen.insert('%s_relation' % rtype, attrs) |
399 self.doexec(session.pool[self.uri], sql, attrs) |
393 self.doexec(session, sql, attrs) |
400 |
394 |
401 def delete_relation(self, session, subject, rtype, object): |
395 def delete_relation(self, session, subject, rtype, object): |
402 """delete a relation from the source""" |
396 """delete a relation from the source""" |
403 rschema = self.schema.rschema(rtype) |
397 rschema = self.schema.rschema(rtype) |
404 if rschema.inlined: |
398 if rschema.inlined: |
408 SQL_PREFIX) |
402 SQL_PREFIX) |
409 attrs = {'eid' : subject} |
403 attrs = {'eid' : subject} |
410 else: |
404 else: |
411 attrs = {'eid_from': subject, 'eid_to': object} |
405 attrs = {'eid_from': subject, 'eid_to': object} |
412 sql = self.sqlgen.delete('%s_relation' % rtype, attrs) |
406 sql = self.sqlgen.delete('%s_relation' % rtype, attrs) |
413 self.doexec(session.pool[self.uri], sql, attrs) |
407 self.doexec(session, sql, attrs) |
414 |
408 |
415 def doexec(self, cursor, query, args=None): |
409 def doexec(self, session, query, args=None): |
416 """Execute a query. |
410 """Execute a query. |
417 it's a function just so that it shows up in profiling |
411 it's a function just so that it shows up in profiling |
418 """ |
412 """ |
419 #t1 = time() |
|
420 if server.DEBUG: |
413 if server.DEBUG: |
421 print 'exec', query, args |
414 print 'exec', query, args |
422 #import sys |
415 cursor = session.pool[self.uri] |
423 #sys.stdout.flush() |
416 try: |
424 # str(query) to avoid error if it's an unicode string |
417 # str(query) to avoid error if it's an unicode string |
425 try: |
|
426 cursor.execute(str(query), args) |
418 cursor.execute(str(query), args) |
427 except Exception, ex: |
419 except Exception, ex: |
428 self.critical("sql: %r\n args: %s\ndbms message: %r", |
420 self.critical("sql: %r\n args: %s\ndbms message: %r", |
429 query, args, ex.args[0]) |
421 query, args, ex.args[0]) |
|
422 try: |
|
423 session.pool.connection(self.uri).rollback() |
|
424 self.critical('transaction has been rollbacked') |
|
425 except: |
|
426 pass |
430 raise |
427 raise |
431 |
428 return cursor |
432 def doexecmany(self, cursor, query, args): |
429 |
|
430 def doexecmany(self, session, query, args): |
433 """Execute a query. |
431 """Execute a query. |
434 it's a function just so that it shows up in profiling |
432 it's a function just so that it shows up in profiling |
435 """ |
433 """ |
436 #t1 = time() |
|
437 if server.DEBUG: |
434 if server.DEBUG: |
438 print 'execmany', query, 'with', len(args), 'arguments' |
435 print 'execmany', query, 'with', len(args), 'arguments' |
439 #import sys |
436 cursor = session.pool[self.uri] |
440 #sys.stdout.flush() |
437 try: |
441 # str(query) to avoid error if it's an unicode string |
438 # str(query) to avoid error if it's an unicode string |
442 try: |
|
443 cursor.executemany(str(query), args) |
439 cursor.executemany(str(query), args) |
444 except: |
440 except Exception, ex: |
445 self.critical("sql many: %r\n args: %s", query, args) |
441 self.critical("sql many: %r\n args: %s\ndbms message: %r", |
|
442 query, args, ex.args[0]) |
|
443 try: |
|
444 session.pool.connection(self.uri).rollback() |
|
445 self.critical('transaction has been rollbacked') |
|
446 except: |
|
447 pass |
446 raise |
448 raise |
447 |
449 |
448 # short cut to method requiring advanced db helper usage ################## |
450 # short cut to method requiring advanced db helper usage ################## |
449 |
451 |
450 def create_index(self, session, table, column, unique=False): |
452 def create_index(self, session, table, column, unique=False): |
496 def create_temp_table(self, session, table, schema): |
498 def create_temp_table(self, session, table, schema): |
497 # we don't want on commit drop, this may cause problem when |
499 # we don't want on commit drop, this may cause problem when |
498 # running with an ldap source, and table will be deleted manually any way |
500 # running with an ldap source, and table will be deleted manually any way |
499 # on commit |
501 # on commit |
500 sql = self.dbhelper.sql_temporary_table(table, schema, False) |
502 sql = self.dbhelper.sql_temporary_table(table, schema, False) |
501 self.doexec(session.pool[self.uri], sql) |
503 self.doexec(session, sql) |
502 |
504 |
503 def create_eid(self, session): |
505 def create_eid(self, session): |
504 self._eid_creation_lock.acquire() |
506 self._eid_creation_lock.acquire() |
505 try: |
507 try: |
506 cursor = session.pool[self.uri] |
|
507 for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'): |
508 for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'): |
508 self.doexec(cursor, sql) |
509 self.doexec(session, sql) |
509 return cursor.fetchone()[0] |
510 return cursor.fetchone()[0] |
510 finally: |
511 finally: |
511 self._eid_creation_lock.release() |
512 self._eid_creation_lock.release() |
512 |
513 |
513 def add_info(self, session, entity, source, extid=None): |
514 def add_info(self, session, entity, source, extid=None): |