server/sources/native.py
changeset 4831 c5aec27c1bf7
parent 4829 3b79a0fc91db
child 4845 dc351b96f596
equal deleted inserted replaced
4829:3b79a0fc91db 4831:c5aec27c1bf7
    19 
    19 
    20 from logilab.common.compat import any
    20 from logilab.common.compat import any
    21 from logilab.common.cache import Cache
    21 from logilab.common.cache import Cache
    22 from logilab.common.decorators import cached, clear_cache
    22 from logilab.common.decorators import cached, clear_cache
    23 from logilab.common.configuration import Method
    23 from logilab.common.configuration import Method
    24 from logilab.common.adbh import get_adv_func_helper
       
    25 from logilab.common.shellutils import getlogin
    24 from logilab.common.shellutils import getlogin
    26 
    25 from logilab.db import get_db_helper
    27 from indexer import get_indexer
       
    28 
    26 
    29 from cubicweb import UnknownEid, AuthenticationError, Binary, server
    27 from cubicweb import UnknownEid, AuthenticationError, Binary, server
    30 from cubicweb.cwconfig import CubicWebNoAppConfiguration
    28 from cubicweb.cwconfig import CubicWebNoAppConfiguration
    31 from cubicweb.server import hook
    29 from cubicweb.server import hook
    32 from cubicweb.server.utils import crypt_password
    30 from cubicweb.server.utils import crypt_password
   149         self.authentifiers = [LoginPasswordAuthentifier(self)]
   147         self.authentifiers = [LoginPasswordAuthentifier(self)]
   150         AbstractSource.__init__(self, repo, appschema, source_config,
   148         AbstractSource.__init__(self, repo, appschema, source_config,
   151                                 *args, **kwargs)
   149                                 *args, **kwargs)
   152         # sql generator
   150         # sql generator
   153         self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
   151         self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
   154                                              self.encoding, ATTR_MAP.copy())
   152                                              ATTR_MAP.copy())
   155         # full text index helper
   153         # full text index helper
   156         self.do_fti = not repo.config['delay-full-text-indexation']
   154         self.do_fti = not repo.config['delay-full-text-indexation']
   157         if self.do_fti:
       
   158             self.indexer = get_indexer(self.dbdriver, self.encoding)
       
   159             # XXX should go away with logilab.db
       
   160             self.dbhelper.fti_uid_attr = self.indexer.uid_attr
       
   161             self.dbhelper.fti_table = self.indexer.table
       
   162             self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
       
   163             self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
       
   164         # sql queries cache
   155         # sql queries cache
   165         self._cache = Cache(repo.config['rql-cache-size'])
   156         self._cache = Cache(repo.config['rql-cache-size'])
   166         self._temp_table_data = {}
   157         self._temp_table_data = {}
   167         self._eid_creation_lock = Lock()
   158         self._eid_creation_lock = Lock()
   168         # XXX no_sqlite_wrap trick since we've a sqlite locking pb when
   159         # XXX no_sqlite_wrap trick since we've a sqlite locking pb when
   205     def init_creating(self):
   196     def init_creating(self):
   206         pool = self.repo._get_pool()
   197         pool = self.repo._get_pool()
   207         pool.pool_set()
   198         pool.pool_set()
   208         # check full text index availibility
   199         # check full text index availibility
   209         if self.do_fti:
   200         if self.do_fti:
   210             if not self.indexer.has_fti_table(pool['system']):
   201             if not self.dbhelper.has_fti_table(pool['system']):
   211                 if not self.repo.config.creating:
   202                 if not self.repo.config.creating:
   212                     self.critical('no text index table')
   203                     self.critical('no text index table')
   213                 self.do_fti = False
   204                 self.do_fti = False
   214         pool.pool_reset()
   205         pool.pool_reset()
   215         self.repo._free_pool(pool)
   206         self.repo._free_pool(pool)
   319                 self._cache[cachekey] = sql, query_args
   310                 self._cache[cachekey] = sql, query_args
   320         args = self.merge_args(args, query_args)
   311         args = self.merge_args(args, query_args)
   321         assert isinstance(sql, basestring), repr(sql)
   312         assert isinstance(sql, basestring), repr(sql)
   322         try:
   313         try:
   323             cursor = self.doexec(session, sql, args)
   314             cursor = self.doexec(session, sql, args)
   324         except (self.dbapi_module.OperationalError,
   315         except (self.OperationalError, self.InterfaceError):
   325                 self.dbapi_module.InterfaceError):
       
   326             # FIXME: better detection of deconnection pb
   316             # FIXME: better detection of deconnection pb
   327             self.info("request failed '%s' ... retry with a new cursor", sql)
   317             self.info("request failed '%s' ... retry with a new cursor", sql)
   328             session.pool.reconnect(self)
   318             session.pool.reconnect(self)
   329             cursor = self.doexec(session, sql, args)
   319             cursor = self.doexec(session, sql, args)
   330         results = self.process_result(cursor)
   320         results = self.process_result(cursor)
   340         assert dbg_st_search(
   330         assert dbg_st_search(
   341             self.uri, union, varmap, args,
   331             self.uri, union, varmap, args,
   342             prefix='ON THE FLY temp data insertion into %s from' % table)
   332             prefix='ON THE FLY temp data insertion into %s from' % table)
   343         # generate sql queries if we are able to do so
   333         # generate sql queries if we are able to do so
   344         sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
   334         sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
   345         query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding))
   335         query = 'INSERT INTO %s %s' % (table, sql.encode(self._dbencoding))
   346         self.doexec(session, query, self.merge_args(args, query_args))
   336         self.doexec(session, query, self.merge_args(args, query_args))
   347 
   337 
   348     def manual_insert(self, results, table, session):
   338     def manual_insert(self, results, table, session):
   349         """insert given result into a temporary table on the system source"""
   339         """insert given result into a temporary table on the system source"""
   350         if server.DEBUG & server.DBG_RQL:
   340         if server.DEBUG & server.DBG_RQL:
   357         for row in results:
   347         for row in results:
   358             kwargs = {}
   348             kwargs = {}
   359             row = tuple(row)
   349             row = tuple(row)
   360             for index, cell in enumerate(row):
   350             for index, cell in enumerate(row):
   361                 if isinstance(cell, Binary):
   351                 if isinstance(cell, Binary):
   362                     cell = self.binary(cell.getvalue())
   352                     cell = self._binary(cell.getvalue())
   363                 kwargs[str(index)] = cell
   353                 kwargs[str(index)] = cell
   364             kwargs_list.append(kwargs)
   354             kwargs_list.append(kwargs)
   365         self.doexecmany(session, query, kwargs_list)
   355         self.doexecmany(session, query, kwargs_list)
   366 
   356 
   367     def clean_temp_data(self, session, temptables):
   357     def clean_temp_data(self, session, temptables):
   612     def fti_unindex_entity(self, session, eid):
   602     def fti_unindex_entity(self, session, eid):
   613         """remove text content for entity with the given eid from the full text
   603         """remove text content for entity with the given eid from the full text
   614         index
   604         index
   615         """
   605         """
   616         try:
   606         try:
   617             self.indexer.cursor_unindex_object(eid, session.pool['system'])
   607             self.dbhelper.cursor_unindex_object(eid, session.pool['system'])
   618         except Exception: # let KeyboardInterrupt / SystemExit propagate
   608         except Exception: # let KeyboardInterrupt / SystemExit propagate
   619             self.exception('error while unindexing %s', eid)
   609             self.exception('error while unindexing %s', eid)
   620 
   610 
   621     def fti_index_entity(self, session, entity):
   611     def fti_index_entity(self, session, entity):
   622         """add text content of a created/modified entity to the full text index
   612         """add text content of a created/modified entity to the full text index
   623         """
   613         """
   624         self.debug('reindexing %r', entity.eid)
   614         self.debug('reindexing %r', entity.eid)
   625         try:
   615         try:
   626             # use cursor_index_object, not cursor_reindex_object since
   616             # use cursor_index_object, not cursor_reindex_object since
   627             # unindexing done in the FTIndexEntityOp
   617             # unindexing done in the FTIndexEntityOp
   628             self.indexer.cursor_index_object(entity.eid, entity,
   618             self.dbhelper.cursor_index_object(entity.eid, entity,
   629                                              session.pool['system'])
   619                                               session.pool['system'])
   630         except Exception: # let KeyboardInterrupt / SystemExit propagate
   620         except Exception: # let KeyboardInterrupt / SystemExit propagate
   631             self.exception('error while reindexing %s', entity)
   621             self.exception('error while reindexing %s', entity)
   632 
   622 
   633 
   623 
   634 class FTIndexEntityOp(hook.LateOperation):
   624 class FTIndexEntityOp(hook.LateOperation):
   657     def commit_event(self):
   647     def commit_event(self):
   658         pass
   648         pass
   659 
   649 
   660 
   650 
   661 def sql_schema(driver):
   651 def sql_schema(driver):
   662     helper = get_adv_func_helper(driver)
   652     helper = get_db_helper(driver)
   663     tstamp_col_type = helper.TYPE_MAPPING['Datetime']
   653     tstamp_col_type = helper.TYPE_MAPPING['Datetime']
   664     schema = """
   654     schema = """
   665 /* Create the repository's system database */
   655 /* Create the repository's system database */
   666 
   656 
   667 %s
   657 %s
   690 """ % (helper.sql_create_sequence('entities_id_seq'), tstamp_col_type, tstamp_col_type)
   680 """ % (helper.sql_create_sequence('entities_id_seq'), tstamp_col_type, tstamp_col_type)
   691     return schema
   681     return schema
   692 
   682 
   693 
   683 
   694 def sql_drop_schema(driver):
   684 def sql_drop_schema(driver):
   695     helper = get_adv_func_helper(driver)
   685     helper = get_db_helper(driver)
   696     return """
   686     return """
   697 %s
   687 %s
   698 DROP TABLE entities;
   688 DROP TABLE entities;
   699 DROP TABLE deleted_entities;
   689 DROP TABLE deleted_entities;
   700 """ % helper.sql_drop_sequence('entities_id_seq')
   690 """ % helper.sql_drop_sequence('entities_id_seq')