diff -r 000000000000 -r b97547f5f1fa server/sources/native.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/sources/native.py Wed Nov 05 15:52:50 2008 +0100 @@ -0,0 +1,606 @@ +"""Adapters for native cubicweb sources. + +:organization: Logilab +:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr +""" +__docformat__ = "restructuredtext en" + +from threading import Lock + +from mx.DateTime import now + +from logilab.common.cache import Cache +from logilab.common.configuration import REQUIRED +from logilab.common.adbh import get_adv_func_helper + +from indexer import get_indexer + +from cubicweb import UnknownEid, AuthenticationError, Binary, server +from cubicweb.server.utils import crypt_password +from cubicweb.server.sqlutils import SQLAdapterMixIn +from cubicweb.server.rqlannotation import set_qdata +from cubicweb.server.sources import AbstractSource +from cubicweb.server.sources.rql2sql import SQLGenerator + + +NONSYSTEM_ETYPES = set() +NONSYSTEM_RELATIONS = set() + +class LogCursor(object): + def __init__(self, cursor): + self.cu = cursor + + def execute(self, query, args=None): + """Execute a query. + it's a function just so that it shows up in profiling + """ + if server.DEBUG: + print 'exec', query, args + try: + self.cu.execute(str(query), args) + except Exception, ex: + print "sql: %r\n args: %s\ndbms message: %r" % ( + query, args, ex.args[0]) + raise + + def fetchall(self): + return self.cu.fetchall() + + def fetchone(self): + return self.cu.fetchone() + +def make_schema(selected, solution, table, typemap): + """return a sql schema to store RQL query result""" + sql = [] + varmap = {} + for i, term in enumerate(selected): + name = 'C%s' % i + key = term.as_string() + varmap[key] = '%s.%s' % (table, name) + ttype = term.get_type(solution) + try: + sql.append('%s %s' % (name, typemap[ttype])) + except KeyError: + # assert not schema(ttype).is_final() + sql.append('%s %s' % (name, typemap['Int'])) + return ','.join(sql), varmap + +def _modified_sql(table, etypes): + # XXX protect against sql injection + if len(etypes) > 1: + restr = 'type IN (%s)' % ','.join("'%s'" % etype for etype in etypes) + else: + restr = "type='%s'" % etypes[0] + if table == 'entities': + attr = 'mtime' + else: + attr = 'dtime' + return 'SELECT type, eid FROM %s WHERE %s AND %s > %%(time)s' % ( + table, restr, attr) + + +class NativeSQLSource(SQLAdapterMixIn, AbstractSource): + """adapter for source using the native cubicweb schema (see below) + """ + # need default value on class since migration doesn't call init method + has_deleted_entitites_table = True + + passwd_rql = "Any P WHERE X is EUser, X login %(login)s, X upassword P" + auth_rql = "Any X WHERE X is EUser, X login %(login)s, X upassword %(pwd)s" + _sols = ({'X': 'EUser', 'P': 'Password'},) + + options = ( + ('db-driver', + {'type' : 'string', + 'default': 'postgres', + 'help': 'database driver (postgres or sqlite)', + 'group': 'native-source', 'inputlevel': 1, + }), + ('db-host', + {'type' : 'string', + 'default': '', + 'help': 'database host', + 'group': 'native-source', 'inputlevel': 1, + }), + ('db-name', + {'type' : 'string', + 'default': REQUIRED, + 'help': 'database name', + 'group': 'native-source', 'inputlevel': 0, + }), + ('db-user', + {'type' : 'string', + 'default': 'cubicweb', + 'help': 'database user', + 'group': 'native-source', 'inputlevel': 0, + }), + ('db-password', + {'type' : 'password', + 'default': '', + 'help': 'database password', + 'group': 'native-source', 'inputlevel': 0, + }), + ('db-encoding', + {'type' : 'string', + 'default': 'utf8', + 'help': 'database encoding', + 'group': 'native-source', 'inputlevel': 1, + }), + ) + + def __init__(self, repo, appschema, source_config, *args, **kwargs): + SQLAdapterMixIn.__init__(self, source_config) + AbstractSource.__init__(self, repo, appschema, source_config, + *args, **kwargs) + # sql generator + self._rql_sqlgen = SQLGenerator(appschema, self.dbhelper, + self.encoding) + # full text index helper + self.indexer = get_indexer(self.dbdriver, self.encoding) + # advanced functionality helper + self.dbhelper.fti_uid_attr = self.indexer.uid_attr + self.dbhelper.fti_table = self.indexer.table + self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql + self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct + # sql queries cache + self._cache = Cache(repo.config['rql-cache-size']) + self._temp_table_data = {} + self._eid_creation_lock = Lock() + + def reset_caches(self): + """method called during test to reset potential source caches""" + self._cache = Cache(self.repo.config['rql-cache-size']) + + def clear_eid_cache(self, eid, etype): + """clear potential caches for the given eid""" + self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None) + self._cache.pop('Any X WHERE X eid %s' % eid, None) + + def sqlexec(self, session, sql, args=None): + """execute the query and return its result""" + cursor = session.pool[self.uri] + self.doexec(cursor, sql, args) + return self.process_result(cursor) + + def init_creating(self): + # check full text index availibility + pool = self.repo._get_pool() + if not self.indexer.has_fti_table(pool['system']): + self.error('no text index table') + self.indexer = None + self.repo._free_pool(pool) + + def init(self): + self.init_creating() + pool = self.repo._get_pool() + # XXX cubicweb < 2.42 compat + if 'deleted_entities' in self.dbhelper.list_tables(pool['system']): + self.has_deleted_entitites_table = True + else: + self.has_deleted_entitites_table = False + self.repo._free_pool(pool) + + # ISource interface ####################################################### + + def compile_rql(self, rql): + rqlst = self.repo.querier._rqlhelper.parse(rql) + rqlst.restricted_vars = () + rqlst.children[0].solutions = self._sols + self.repo.querier.sqlgen_annotate(rqlst) + set_qdata(rqlst, ()) + return rqlst + + def set_schema(self, schema): + """set the application'schema""" + self._cache = Cache(self.repo.config['rql-cache-size']) + self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0 + self.schema = schema + try: + self._rql_sqlgen.schema = schema + except AttributeError: + pass # __init__ + if 'EUser' in schema: # probably an empty schema if not true... + # rql syntax trees used to authenticate users + self._passwd_rqlst = self.compile_rql(self.passwd_rql) + self._auth_rqlst = self.compile_rql(self.auth_rql) + + def support_entity(self, etype, write=False): + """return true if the given entity's type is handled by this adapter + if write is true, return true only if it's a RW support + """ + return not etype in NONSYSTEM_ETYPES + + def support_relation(self, rtype, write=False): + """return true if the given relation's type is handled by this adapter + if write is true, return true only if it's a RW support + """ + if write: + return not rtype in NONSYSTEM_RELATIONS + # due to current multi-sources implementation, the system source + # can't claim not supporting a relation + return True #not rtype == 'content_for' + + def authenticate(self, session, login, password): + """return EUser eid for the given login/password if this account is + defined in this source, else raise `AuthenticationError` + + two queries are needed since passwords are stored crypted, so we have + to fetch the salt first + """ + args = {'login': login, 'pwd' : password} + if password is not None: + rset = self.syntax_tree_search(session, self._passwd_rqlst, args) + try: + pwd = rset[0][0] + except IndexError: + raise AuthenticationError('bad login') + # passwords are stored using the bytea type, so we get a StringIO + if pwd is not None: + args['pwd'] = crypt_password(password, pwd.getvalue()[:2]) + # get eid from login and (crypted) password + rset = self.syntax_tree_search(session, self._auth_rqlst, args) + try: + return rset[0][0] + except IndexError: + raise AuthenticationError('bad password') + + def syntax_tree_search(self, session, union, args=None, cachekey=None, + varmap=None): + """return result from this source for a rql query (actually from + a rql syntax tree and a solution dictionary mapping each used + variable to a possible type). If cachekey is given, the query + necessary to fetch the results (but not the results themselves) + may be cached using this key. + """ + if server.DEBUG: + print 'RQL FOR NATIVE SOURCE', self.uri, cachekey + if varmap: + print 'USING VARMAP', varmap + print union.as_string() + if args: print 'ARGS', args + print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children) + # remember number of actually selected term (sql generation may append some) + if cachekey is None: + self.no_cache += 1 + # generate sql query if we are able to do so (not supported types...) + sql, query_args = self._rql_sqlgen.generate(union, args, varmap) + else: + # sql may be cached + try: + sql, query_args = self._cache[cachekey] + self.cache_hit += 1 + except KeyError: + self.cache_miss += 1 + sql, query_args = self._rql_sqlgen.generate(union, args, varmap) + self._cache[cachekey] = sql, query_args + args = self.merge_args(args, query_args) + cursor = session.pool[self.uri] + assert isinstance(sql, basestring), repr(sql) + try: + self.doexec(cursor, sql, args) + except (self.dbapi_module.OperationalError, + self.dbapi_module.InterfaceError): + # FIXME: better detection of deconnection pb + self.info("request failed '%s' ... retry with a new cursor", sql) + session.pool.reconnect(self) + cursor = session.pool[self.uri] + self.doexec(cursor, sql, args) + res = self.process_result(cursor) + if server.DEBUG: + print '------>', res + return res + + def flying_insert(self, table, session, union, args=None, varmap=None): + """similar as .syntax_tree_search, but inserts data in the + temporary table (on-the-fly if possible, eg for the system + source whose the given cursor come from). If not possible, + inserts all data by calling .executemany(). + """ + if self.uri == 'system': + if server.DEBUG: + print 'FLYING RQL FOR SOURCE', self.uri + if varmap: + print 'USING VARMAP', varmap + print union.as_string() + print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children) + # generate sql queries if we are able to do so + sql, query_args = self._rql_sqlgen.generate(union, args, varmap) + query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding)) + self.doexec(session.pool[self.uri], query, + self.merge_args(args, query_args)) +# XXX commented until it's proved to be necessary +# # XXX probably inefficient +# tempdata = self._temp_table_data.setdefault(table, set()) +# cursor = session.pool[self.uri] +# cursor.execute('select * from %s' % table) +# for row in cursor.fetchall(): +# print 'data', row +# tempdata.add(tuple(row)) + else: + super(NativeSQLSource, self).flying_insert(table, session, union, + args, varmap) + + def _manual_insert(self, results, table, session): + """insert given result into a temporary table on the system source""" + #print 'manual insert', table, results + if not results: + return + #cursor.execute('select * from %s'%table) + #assert len(cursor.fetchall())== 0 + encoding = self.encoding + # added chr to be sqlite compatible + query_args = ['%%(%s)s' % i for i in xrange(len(results[0]))] + query = 'INSERT INTO %s VALUES(%s)' % (table, ','.join(query_args)) + kwargs_list = [] +# tempdata = self._temp_table_data.setdefault(table, set()) + for row in results: + kwargs = {} + row = tuple(row) +# XXX commented until it's proved to be necessary +# if row in tempdata: +# continue +# tempdata.add(row) + for index, cell in enumerate(row): + if type(cell) is unicode: + cell = cell.encode(encoding) + elif isinstance(cell, Binary): + cell = self.binary(cell.getvalue()) + kwargs[str(index)] = cell + kwargs_list.append(kwargs) + self.doexecmany(session.pool[self.uri], query, kwargs_list) + + def clean_temp_data(self, session, temptables): + """remove temporary data, usually associated to temporary tables""" + if temptables: + cursor = session.pool[self.uri] + for table in temptables: + try: + self.doexec(cursor,'DROP TABLE %s' % table) + except: + pass + try: + del self._temp_table_data[table] + except KeyError: + continue + + def add_entity(self, session, entity): + """add a new entity to the source""" + attrs = self.preprocess_entity(entity) + sql = self.sqlgen.insert(str(entity.e_schema), attrs) + self.doexec(session.pool[self.uri], sql, attrs) + + def update_entity(self, session, entity): + """replace an entity in the source""" + attrs = self.preprocess_entity(entity) + sql = self.sqlgen.update(str(entity.e_schema), attrs, ['eid']) + self.doexec(session.pool[self.uri], sql, attrs) + + def delete_entity(self, session, etype, eid): + """delete an entity from the source""" + attrs = {'eid': eid} + sql = self.sqlgen.delete(etype, attrs) + self.doexec(session.pool[self.uri], sql, attrs) + + def add_relation(self, session, subject, rtype, object): + """add a relation to the source""" + attrs = {'eid_from': subject, 'eid_to': object} + sql = self.sqlgen.insert('%s_relation' % rtype, attrs) + self.doexec(session.pool[self.uri], sql, attrs) + + def delete_relation(self, session, subject, rtype, object): + """delete a relation from the source""" + rschema = self.schema.rschema(rtype) + if rschema.inlined: + etype = session.describe(subject)[0] + sql = 'UPDATE %s SET %s=NULL WHERE eid=%%(eid)s' % (etype, rtype) + attrs = {'eid' : subject} + else: + attrs = {'eid_from': subject, 'eid_to': object} + sql = self.sqlgen.delete('%s_relation' % rtype, attrs) + self.doexec(session.pool[self.uri], sql, attrs) + + def doexec(self, cursor, query, args=None): + """Execute a query. + it's a function just so that it shows up in profiling + """ + #t1 = time() + if server.DEBUG: + print 'exec', query, args + #import sys + #sys.stdout.flush() + # str(query) to avoid error if it's an unicode string + try: + cursor.execute(str(query), args) + except Exception, ex: + self.critical("sql: %r\n args: %s\ndbms message: %r", + query, args, ex.args[0]) + raise + + def doexecmany(self, cursor, query, args): + """Execute a query. + it's a function just so that it shows up in profiling + """ + #t1 = time() + if server.DEBUG: + print 'execmany', query, 'with', len(args), 'arguments' + #import sys + #sys.stdout.flush() + # str(query) to avoid error if it's an unicode string + try: + cursor.executemany(str(query), args) + except: + self.critical("sql many: %r\n args: %s", query, args) + raise + + # short cut to method requiring advanced db helper usage ################## + + def create_index(self, session, table, column, unique=False): + cursor = LogCursor(session.pool[self.uri]) + self.dbhelper.create_index(cursor, table, column, unique) + + def drop_index(self, session, table, column, unique=False): + cursor = LogCursor(session.pool[self.uri]) + self.dbhelper.drop_index(cursor, table, column, unique) + + # system source interface ################################################# + + def eid_type_source(self, session, eid): + """return a tuple (type, source, extid) for the entity with id """ + sql = 'SELECT type, source, extid FROM entities WHERE eid=%s' % eid + try: + res = session.system_sql(sql).fetchone() + except: + raise UnknownEid(eid) + if res is None: + raise UnknownEid(eid) + return res + + def extid2eid(self, session, source, lid): + """get eid from a local id. An eid is attributed if no record is found""" + cursor = session.system_sql('SELECT eid FROM entities WHERE ' + 'extid=%(x)s AND source=%(s)s', + # str() necessary with pg 8.3 + {'x': str(lid), 's': source.uri}) + # XXX testing rowcount cause strange bug with sqlite, results are there + # but rowcount is 0 + #if cursor.rowcount > 0: + try: + result = cursor.fetchone() + if result: + eid = result[0] + return eid + except: + pass + return None + + def temp_table_def(self, selected, sol, table): + return make_schema(selected, sol, table, self.dbhelper.TYPE_MAPPING) + + def create_temp_table(self, session, table, schema): + # we don't want on commit drop, this may cause problem when + # running with an ldap source, and table will be deleted manually any way + # on commit + sql = self.dbhelper.sql_temporary_table(table, schema, False) + self.doexec(session.pool[self.uri], sql) + + def create_eid(self, session): + self._eid_creation_lock.acquire() + try: + cursor = session.pool[self.uri] + for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'): + self.doexec(cursor, sql) + return cursor.fetchone()[0] + finally: + self._eid_creation_lock.release() + + def add_info(self, session, entity, source, extid=None): + """add type and source info for an eid into the system table""" + # begin by inserting eid/type/source/extid into the entities table + attrs = {'type': str(entity.e_schema), 'eid': entity.eid, + 'extid': extid, 'source': source.uri, 'mtime': now()} + session.system_sql(self.sqlgen.insert('entities', attrs), attrs) + + def delete_info(self, session, eid, etype, uri, extid): + """delete system information on deletion of an entity by transfering + record from the entities table to the deleted_entities table + """ + attrs = {'eid': eid} + session.system_sql(self.sqlgen.delete('entities', attrs), attrs) + if self.has_deleted_entitites_table: + attrs = {'type': etype, 'eid': eid, 'extid': extid, + 'source': uri, 'dtime': now()} + session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs) + + def fti_unindex_entity(self, session, eid): + """remove text content for entity with the given eid from the full text + index + """ + try: + self.indexer.cursor_unindex_object(eid, session.pool['system']) + except: + if self.indexer is not None: + self.exception('error while unindexing %s', eid) + + def fti_index_entity(self, session, entity): + """add text content of a created/modified entity to the full text index + """ + self.info('reindexing %r', entity.eid) + try: + self.indexer.cursor_reindex_object(entity.eid, entity, + session.pool['system']) + except: + if self.indexer is not None: + self.exception('error while reindexing %s', entity) + # update entities.mtime + attrs = {'eid': entity.eid, 'mtime': now()} + session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs) + + def modified_entities(self, session, etypes, mtime): + """return a 2-uple: + * list of (etype, eid) of entities of the given types which have been + modified since the given timestamp (actually entities whose full text + index content has changed) + * list of (etype, eid) of entities of the given types which have been + deleted since the given timestamp + """ + modsql = _modified_sql('entities', etypes) + cursor = session.system_sql(modsql, {'time': mtime}) + modentities = cursor.fetchall() + delsql = _modified_sql('deleted_entities', etypes) + cursor = session.system_sql(delsql, {'time': mtime}) + delentities = cursor.fetchall() + return modentities, delentities + + +def sql_schema(driver): + helper = get_adv_func_helper(driver) + schema = """ +/* Create the repository's system database */ + +%s + +CREATE TABLE entities ( + eid INTEGER PRIMARY KEY NOT NULL, + type VARCHAR(64) NOT NULL, + source VARCHAR(64) NOT NULL, + mtime TIMESTAMP NOT NULL, + extid VARCHAR(256) +); +CREATE INDEX entities_type_idx ON entities(type); +CREATE INDEX entities_mtime_idx ON entities(mtime); +CREATE INDEX entities_extid_idx ON entities(extid); + +CREATE TABLE deleted_entities ( + eid INTEGER PRIMARY KEY NOT NULL, + type VARCHAR(64) NOT NULL, + source VARCHAR(64) NOT NULL, + dtime TIMESTAMP NOT NULL, + extid VARCHAR(256) +); +CREATE INDEX deleted_entities_type_idx ON deleted_entities(type); +CREATE INDEX deleted_entities_dtime_idx ON deleted_entities(dtime); +CREATE INDEX deleted_entities_extid_idx ON deleted_entities(extid); +""" % helper.sql_create_sequence('entities_id_seq') + return schema + + +def sql_drop_schema(driver): + helper = get_adv_func_helper(driver) + return """ +%s +DROP TABLE entities; +DROP TABLE deleted_entities; +""" % helper.sql_drop_sequence('entities_id_seq') + + +def grant_schema(user, set_owner=True): + result = '' + if set_owner: + result = 'ALTER TABLE entities OWNER TO %s;\n' % user + result += 'ALTER TABLE deleted_entities OWNER TO %s;\n' % user + result += 'ALTER TABLE entities_id_seq OWNER TO %s;\n' % user + result += 'GRANT ALL ON entities TO %s;\n' % user + result += 'GRANT ALL ON deleted_entities TO %s;\n' % user + result += 'GRANT ALL ON entities_id_seq TO %s;\n' % user + return result