server/sources/native.py
branchtls-sprint
changeset 1792 9eadf34fd860
parent 1398 5fe84a5f7035
child 1952 8e19c813750d
equal deleted inserted replaced
1791:c77629112437 1792:9eadf34fd860
    27 NONSYSTEM_RELATIONS = set()
    27 NONSYSTEM_RELATIONS = set()
    28 
    28 
    29 class LogCursor(object):
    29 class LogCursor(object):
    30     def __init__(self, cursor):
    30     def __init__(self, cursor):
    31         self.cu = cursor
    31         self.cu = cursor
    32         
    32 
    33     def execute(self, query, args=None):
    33     def execute(self, query, args=None):
    34         """Execute a query.
    34         """Execute a query.
    35         it's a function just so that it shows up in profiling
    35         it's a function just so that it shows up in profiling
    36         """
    36         """
    37         if server.DEBUG:
    37         if server.DEBUG:
    40             self.cu.execute(str(query), args)
    40             self.cu.execute(str(query), args)
    41         except Exception, ex:
    41         except Exception, ex:
    42             print "sql: %r\n args: %s\ndbms message: %r" % (
    42             print "sql: %r\n args: %s\ndbms message: %r" % (
    43                 query, args, ex.args[0])
    43                 query, args, ex.args[0])
    44             raise
    44             raise
    45         
    45 
    46     def fetchall(self):
    46     def fetchall(self):
    47         return self.cu.fetchall()
    47         return self.cu.fetchall()
    48         
    48 
    49     def fetchone(self):
    49     def fetchone(self):
    50         return self.cu.fetchone()
    50         return self.cu.fetchone()
    51     
    51 
    52 def make_schema(selected, solution, table, typemap):
    52 def make_schema(selected, solution, table, typemap):
    53     """return a sql schema to store RQL query result"""
    53     """return a sql schema to store RQL query result"""
    54     sql = []
    54     sql = []
    55     varmap = {}
    55     varmap = {}
    56     for i, term in enumerate(selected):
    56     for i, term in enumerate(selected):
    82 class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
    82 class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
    83     """adapter for source using the native cubicweb schema (see below)
    83     """adapter for source using the native cubicweb schema (see below)
    84     """
    84     """
    85     # need default value on class since migration doesn't call init method
    85     # need default value on class since migration doesn't call init method
    86     has_deleted_entitites_table = True
    86     has_deleted_entitites_table = True
    87     
    87 
    88     passwd_rql = "Any P WHERE X is CWUser, X login %(login)s, X upassword P"
    88     passwd_rql = "Any P WHERE X is CWUser, X login %(login)s, X upassword P"
    89     auth_rql = "Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"
    89     auth_rql = "Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"
    90     _sols = ({'X': 'CWUser', 'P': 'Password'},)
    90     _sols = ({'X': 'CWUser', 'P': 'Password'},)
    91     
    91 
    92     options = (
    92     options = (
    93         ('db-driver',
    93         ('db-driver',
    94          {'type' : 'string',
    94          {'type' : 'string',
    95           'default': 'postgres',
    95           'default': 'postgres',
    96           'help': 'database driver (postgres or sqlite)',
    96           'help': 'database driver (postgres or sqlite)',
   125           'default': 'utf8',
   125           'default': 'utf8',
   126           'help': 'database encoding',
   126           'help': 'database encoding',
   127           'group': 'native-source', 'inputlevel': 1,
   127           'group': 'native-source', 'inputlevel': 1,
   128           }),
   128           }),
   129     )
   129     )
   130     
   130 
   131     def __init__(self, repo, appschema, source_config, *args, **kwargs):
   131     def __init__(self, repo, appschema, source_config, *args, **kwargs):
   132         SQLAdapterMixIn.__init__(self, source_config)
   132         SQLAdapterMixIn.__init__(self, source_config)
   133         AbstractSource.__init__(self, repo, appschema, source_config,
   133         AbstractSource.__init__(self, repo, appschema, source_config,
   134                                 *args, **kwargs)
   134                                 *args, **kwargs)
   135         # sql generator
   135         # sql generator
   148         self._eid_creation_lock = Lock()
   148         self._eid_creation_lock = Lock()
   149 
   149 
   150     def reset_caches(self):
   150     def reset_caches(self):
   151         """method called during test to reset potential source caches"""
   151         """method called during test to reset potential source caches"""
   152         self._cache = Cache(self.repo.config['rql-cache-size'])
   152         self._cache = Cache(self.repo.config['rql-cache-size'])
   153     
   153 
   154     def clear_eid_cache(self, eid, etype):
   154     def clear_eid_cache(self, eid, etype):
   155         """clear potential caches for the given eid"""
   155         """clear potential caches for the given eid"""
   156         self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None)
   156         self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None)
   157         self._cache.pop('Any X WHERE X eid %s' % eid, None)
   157         self._cache.pop('Any X WHERE X eid %s' % eid, None)
   158         
   158 
   159     def sqlexec(self, session, sql, args=None):
   159     def sqlexec(self, session, sql, args=None):
   160         """execute the query and return its result"""
   160         """execute the query and return its result"""
   161         cursor = session.pool[self.uri]
   161         cursor = session.pool[self.uri]
   162         self.doexec(cursor, sql, args)
   162         self.doexec(cursor, sql, args)
   163         return self.process_result(cursor)
   163         return self.process_result(cursor)
   164     
   164 
   165     def init_creating(self):
   165     def init_creating(self):
   166         # check full text index availibility
   166         # check full text index availibility
   167         pool = self.repo._get_pool()
   167         pool = self.repo._get_pool()
   168         if not self.indexer.has_fti_table(pool['system']):
   168         if not self.indexer.has_fti_table(pool['system']):
   169             self.error('no text index table')
   169             self.error('no text index table')
   170             self.indexer = None
   170             self.indexer = None
   171         self.repo._free_pool(pool)
   171         self.repo._free_pool(pool)
   172 
   172 
   173     def init(self):
   173     def init(self):
   174         self.init_creating() 
   174         self.init_creating()
   175         pool = self.repo._get_pool()
   175         pool = self.repo._get_pool()
   176         # XXX cubicweb < 2.42 compat
   176         # XXX cubicweb < 2.42 compat
   177         if 'deleted_entities' in self.dbhelper.list_tables(pool['system']):
   177         if 'deleted_entities' in self.dbhelper.list_tables(pool['system']):
   178             self.has_deleted_entitites_table = True
   178             self.has_deleted_entitites_table = True
   179         else:
   179         else:
   180             self.has_deleted_entitites_table = False
   180             self.has_deleted_entitites_table = False
   181         self.repo._free_pool(pool)
   181         self.repo._free_pool(pool)
   182         
   182 
   183     # ISource interface #######################################################
   183     # ISource interface #######################################################
   184 
   184 
   185     def compile_rql(self, rql):
   185     def compile_rql(self, rql):
   186         rqlst = self.repo.querier._rqlhelper.parse(rql)
   186         rqlst = self.repo.querier._rqlhelper.parse(rql)
   187         rqlst.restricted_vars = ()
   187         rqlst.restricted_vars = ()
   188         rqlst.children[0].solutions = self._sols
   188         rqlst.children[0].solutions = self._sols
   189         self.repo.querier.sqlgen_annotate(rqlst)
   189         self.repo.querier.sqlgen_annotate(rqlst)
   190         set_qdata(self.schema.rschema, rqlst, ())
   190         set_qdata(self.schema.rschema, rqlst, ())
   191         return rqlst
   191         return rqlst
   192     
   192 
   193     def set_schema(self, schema):
   193     def set_schema(self, schema):
   194         """set the application'schema"""
   194         """set the application'schema"""
   195         self._cache = Cache(self.repo.config['rql-cache-size'])
   195         self._cache = Cache(self.repo.config['rql-cache-size'])
   196         self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0
   196         self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0
   197         self.schema = schema
   197         self.schema = schema
   201             pass # __init__
   201             pass # __init__
   202         if 'CWUser' in schema: # probably an empty schema if not true...
   202         if 'CWUser' in schema: # probably an empty schema if not true...
   203             # rql syntax trees used to authenticate users
   203             # rql syntax trees used to authenticate users
   204             self._passwd_rqlst = self.compile_rql(self.passwd_rql)
   204             self._passwd_rqlst = self.compile_rql(self.passwd_rql)
   205             self._auth_rqlst = self.compile_rql(self.auth_rql)
   205             self._auth_rqlst = self.compile_rql(self.auth_rql)
   206                 
   206 
   207     def support_entity(self, etype, write=False):
   207     def support_entity(self, etype, write=False):
   208         """return true if the given entity's type is handled by this adapter
   208         """return true if the given entity's type is handled by this adapter
   209         if write is true, return true only if it's a RW support
   209         if write is true, return true only if it's a RW support
   210         """
   210         """
   211         return not etype in NONSYSTEM_ETYPES
   211         return not etype in NONSYSTEM_ETYPES
   212     
   212 
   213     def support_relation(self, rtype, write=False):
   213     def support_relation(self, rtype, write=False):
   214         """return true if the given relation's type is handled by this adapter
   214         """return true if the given relation's type is handled by this adapter
   215         if write is true, return true only if it's a RW support
   215         if write is true, return true only if it's a RW support
   216         """
   216         """
   217         if write:
   217         if write:
   218             return not rtype in NONSYSTEM_RELATIONS
   218             return not rtype in NONSYSTEM_RELATIONS
   219         # due to current multi-sources implementation, the system source
   219         # due to current multi-sources implementation, the system source
   220         # can't claim not supporting a relation            
   220         # can't claim not supporting a relation
   221         return True #not rtype == 'content_for'
   221         return True #not rtype == 'content_for'
   222 
   222 
   223     def authenticate(self, session, login, password):
   223     def authenticate(self, session, login, password):
   224         """return CWUser eid for the given login/password if this account is
   224         """return CWUser eid for the given login/password if this account is
   225         defined in this source, else raise `AuthenticationError`
   225         defined in this source, else raise `AuthenticationError`
   241         rset = self.syntax_tree_search(session, self._auth_rqlst, args)
   241         rset = self.syntax_tree_search(session, self._auth_rqlst, args)
   242         try:
   242         try:
   243             return rset[0][0]
   243             return rset[0][0]
   244         except IndexError:
   244         except IndexError:
   245             raise AuthenticationError('bad password')
   245             raise AuthenticationError('bad password')
   246     
   246 
   247     def syntax_tree_search(self, session, union, args=None, cachekey=None, 
   247     def syntax_tree_search(self, session, union, args=None, cachekey=None,
   248                            varmap=None):
   248                            varmap=None):
   249         """return result from this source for a rql query (actually from
   249         """return result from this source for a rql query (actually from
   250         a rql syntax tree and a solution dictionary mapping each used
   250         a rql syntax tree and a solution dictionary mapping each used
   251         variable to a possible type). If cachekey is given, the query
   251         variable to a possible type). If cachekey is given, the query
   252         necessary to fetch the results (but not the results themselves)
   252         necessary to fetch the results (but not the results themselves)
   287             self.doexec(cursor, sql, args)
   287             self.doexec(cursor, sql, args)
   288         res = self.process_result(cursor)
   288         res = self.process_result(cursor)
   289         if server.DEBUG:
   289         if server.DEBUG:
   290             print '------>', res
   290             print '------>', res
   291         return res
   291         return res
   292                 
   292 
   293     def flying_insert(self, table, session, union, args=None, varmap=None):
   293     def flying_insert(self, table, session, union, args=None, varmap=None):
   294         """similar as .syntax_tree_search, but inserts data in the
   294         """similar as .syntax_tree_search, but inserts data in the
   295         temporary table (on-the-fly if possible, eg for the system
   295         temporary table (on-the-fly if possible, eg for the system
   296         source whose the given cursor come from). If not possible,
   296         source whose the given cursor come from). If not possible,
   297         inserts all data by calling .executemany().
   297         inserts all data by calling .executemany().
   317 #                 print 'data', row
   317 #                 print 'data', row
   318 #                 tempdata.add(tuple(row))
   318 #                 tempdata.add(tuple(row))
   319         else:
   319         else:
   320             super(NativeSQLSource, self).flying_insert(table, session, union,
   320             super(NativeSQLSource, self).flying_insert(table, session, union,
   321                                                        args, varmap)
   321                                                        args, varmap)
   322         
   322 
   323     def _manual_insert(self, results, table, session):
   323     def _manual_insert(self, results, table, session):
   324         """insert given result into a temporary table on the system source"""
   324         """insert given result into a temporary table on the system source"""
   325         #print 'manual insert', table, results
   325         #print 'manual insert', table, results
   326         if not results:
   326         if not results:
   327             return
   327             return
   360                     pass
   360                     pass
   361                 try:
   361                 try:
   362                     del self._temp_table_data[table]
   362                     del self._temp_table_data[table]
   363                 except KeyError:
   363                 except KeyError:
   364                     continue
   364                     continue
   365     
   365 
   366     def add_entity(self, session, entity):
   366     def add_entity(self, session, entity):
   367         """add a new entity to the source"""
   367         """add a new entity to the source"""
   368         attrs = self.preprocess_entity(entity)
   368         attrs = self.preprocess_entity(entity)
   369         sql = self.sqlgen.insert(SQL_PREFIX + str(entity.e_schema), attrs)
   369         sql = self.sqlgen.insert(SQL_PREFIX + str(entity.e_schema), attrs)
   370         self.doexec(session.pool[self.uri], sql, attrs)
   370         self.doexec(session.pool[self.uri], sql, attrs)
   371         
   371 
   372     def update_entity(self, session, entity):
   372     def update_entity(self, session, entity):
   373         """replace an entity in the source"""
   373         """replace an entity in the source"""
   374         attrs = self.preprocess_entity(entity)
   374         attrs = self.preprocess_entity(entity)
   375         sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid'])
   375         sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid'])
   376         self.doexec(session.pool[self.uri], sql, attrs)
   376         self.doexec(session.pool[self.uri], sql, attrs)
   384     def add_relation(self, session, subject, rtype, object):
   384     def add_relation(self, session, subject, rtype, object):
   385         """add a relation to the source"""
   385         """add a relation to the source"""
   386         attrs = {'eid_from': subject, 'eid_to': object}
   386         attrs = {'eid_from': subject, 'eid_to': object}
   387         sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
   387         sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
   388         self.doexec(session.pool[self.uri], sql, attrs)
   388         self.doexec(session.pool[self.uri], sql, attrs)
   389     
   389 
   390     def delete_relation(self, session, subject, rtype, object):
   390     def delete_relation(self, session, subject, rtype, object):
   391         """delete a relation from the source"""
   391         """delete a relation from the source"""
   392         rschema = self.schema.rschema(rtype)
   392         rschema = self.schema.rschema(rtype)
   393         if rschema.inlined:
   393         if rschema.inlined:
   394             table = SQL_PREFIX + session.describe(subject)[0]
   394             table = SQL_PREFIX + session.describe(subject)[0]
   397                                                                   SQL_PREFIX)
   397                                                                   SQL_PREFIX)
   398             attrs = {'eid' : subject}
   398             attrs = {'eid' : subject}
   399         else:
   399         else:
   400             attrs = {'eid_from': subject, 'eid_to': object}
   400             attrs = {'eid_from': subject, 'eid_to': object}
   401             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
   401             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
   402         self.doexec(session.pool[self.uri], sql, attrs)    
   402         self.doexec(session.pool[self.uri], sql, attrs)
   403 
   403 
   404     def doexec(self, cursor, query, args=None):
   404     def doexec(self, cursor, query, args=None):
   405         """Execute a query.
   405         """Execute a query.
   406         it's a function just so that it shows up in profiling
   406         it's a function just so that it shows up in profiling
   407         """
   407         """
   415             cursor.execute(str(query), args)
   415             cursor.execute(str(query), args)
   416         except Exception, ex:
   416         except Exception, ex:
   417             self.critical("sql: %r\n args: %s\ndbms message: %r",
   417             self.critical("sql: %r\n args: %s\ndbms message: %r",
   418                           query, args, ex.args[0])
   418                           query, args, ex.args[0])
   419             raise
   419             raise
   420         
   420 
   421     def doexecmany(self, cursor, query, args):
   421     def doexecmany(self, cursor, query, args):
   422         """Execute a query.
   422         """Execute a query.
   423         it's a function just so that it shows up in profiling
   423         it's a function just so that it shows up in profiling
   424         """
   424         """
   425         #t1 = time()
   425         #t1 = time()
   431         try:
   431         try:
   432             cursor.executemany(str(query), args)
   432             cursor.executemany(str(query), args)
   433         except:
   433         except:
   434             self.critical("sql many: %r\n args: %s", query, args)
   434             self.critical("sql many: %r\n args: %s", query, args)
   435             raise
   435             raise
   436         
   436 
   437     # short cut to method requiring advanced db helper usage ##################
   437     # short cut to method requiring advanced db helper usage ##################
   438             
   438 
   439     def create_index(self, session, table, column, unique=False):
   439     def create_index(self, session, table, column, unique=False):
   440         cursor = LogCursor(session.pool[self.uri])
   440         cursor = LogCursor(session.pool[self.uri])
   441         self.dbhelper.create_index(cursor, table, column, unique)
   441         self.dbhelper.create_index(cursor, table, column, unique)
   442             
   442 
   443     def drop_index(self, session, table, column, unique=False):
   443     def drop_index(self, session, table, column, unique=False):
   444         cursor = LogCursor(session.pool[self.uri])
   444         cursor = LogCursor(session.pool[self.uri])
   445         self.dbhelper.drop_index(cursor, table, column, unique)
   445         self.dbhelper.drop_index(cursor, table, column, unique)
   446 
   446 
   447     # system source interface #################################################
   447     # system source interface #################################################
   464                                     'extid=%(x)s AND source=%(s)s',
   464                                     'extid=%(x)s AND source=%(s)s',
   465                                     # str() necessary with pg 8.3
   465                                     # str() necessary with pg 8.3
   466                                     {'x': str(lid), 's': source.uri})
   466                                     {'x': str(lid), 's': source.uri})
   467         # XXX testing rowcount cause strange bug with sqlite, results are there
   467         # XXX testing rowcount cause strange bug with sqlite, results are there
   468         #     but rowcount is 0
   468         #     but rowcount is 0
   469         #if cursor.rowcount > 0: 
   469         #if cursor.rowcount > 0:
   470         try:
   470         try:
   471             result = cursor.fetchone()
   471             result = cursor.fetchone()
   472             if result:
   472             if result:
   473                 eid = result[0]
   473                 eid = result[0]
   474                 return eid            
   474                 return eid
   475         except:
   475         except:
   476             pass
   476             pass
   477         return None
   477         return None
   478     
   478 
   479     def temp_table_def(self, selected, sol, table):
   479     def temp_table_def(self, selected, sol, table):
   480         return make_schema(selected, sol, table, self.dbhelper.TYPE_MAPPING)
   480         return make_schema(selected, sol, table, self.dbhelper.TYPE_MAPPING)
   481 
   481 
   482     def create_temp_table(self, session, table, schema):
   482     def create_temp_table(self, session, table, schema):
   483         # we don't want on commit drop, this may cause problem when
   483         # we don't want on commit drop, this may cause problem when
   484         # running with an ldap source, and table will be deleted manually any way
   484         # running with an ldap source, and table will be deleted manually any way
   485         # on commit
   485         # on commit
   486         sql = self.dbhelper.sql_temporary_table(table, schema, False)
   486         sql = self.dbhelper.sql_temporary_table(table, schema, False)
   487         self.doexec(session.pool[self.uri], sql)
   487         self.doexec(session.pool[self.uri], sql)
   488     
   488 
   489     def create_eid(self, session):
   489     def create_eid(self, session):
   490         self._eid_creation_lock.acquire()
   490         self._eid_creation_lock.acquire()
   491         try:
   491         try:
   492             cursor = session.pool[self.uri]
   492             cursor = session.pool[self.uri]
   493             for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'):
   493             for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'):
   511         session.system_sql(self.sqlgen.delete('entities', attrs), attrs)
   511         session.system_sql(self.sqlgen.delete('entities', attrs), attrs)
   512         if self.has_deleted_entitites_table:
   512         if self.has_deleted_entitites_table:
   513             attrs = {'type': etype, 'eid': eid, 'extid': extid,
   513             attrs = {'type': etype, 'eid': eid, 'extid': extid,
   514                      'source': uri, 'dtime': datetime.now()}
   514                      'source': uri, 'dtime': datetime.now()}
   515             session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
   515             session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
   516         
   516 
   517     def fti_unindex_entity(self, session, eid):
   517     def fti_unindex_entity(self, session, eid):
   518         """remove text content for entity with the given eid from the full text
   518         """remove text content for entity with the given eid from the full text
   519         index
   519         index
   520         """
   520         """
   521         try:
   521         try:
   522             self.indexer.cursor_unindex_object(eid, session.pool['system'])
   522             self.indexer.cursor_unindex_object(eid, session.pool['system'])
   523         except:
   523         except:
   524             if self.indexer is not None:
   524             if self.indexer is not None:
   525                 self.exception('error while unindexing %s', eid)
   525                 self.exception('error while unindexing %s', eid)
   526         
   526 
   527     def fti_index_entity(self, session, entity):
   527     def fti_index_entity(self, session, entity):
   528         """add text content of a created/modified entity to the full text index
   528         """add text content of a created/modified entity to the full text index
   529         """
   529         """
   530         self.info('reindexing %r', entity.eid)
   530         self.info('reindexing %r', entity.eid)
   531         try:
   531         try:
   535             if self.indexer is not None:
   535             if self.indexer is not None:
   536                 self.exception('error while reindexing %s', entity)
   536                 self.exception('error while reindexing %s', entity)
   537         # update entities.mtime
   537         # update entities.mtime
   538         attrs = {'eid': entity.eid, 'mtime': datetime.now()}
   538         attrs = {'eid': entity.eid, 'mtime': datetime.now()}
   539         session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
   539         session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
   540         
   540 
   541     def modified_entities(self, session, etypes, mtime):
   541     def modified_entities(self, session, etypes, mtime):
   542         """return a 2-uple:
   542         """return a 2-uple:
   543         * list of (etype, eid) of entities of the given types which have been
   543         * list of (etype, eid) of entities of the given types which have been
   544           modified since the given timestamp (actually entities whose full text
   544           modified since the given timestamp (actually entities whose full text
   545           index content has changed)
   545           index content has changed)