--- a/server/sources/native.py Fri Feb 12 15:18:00 2010 +0100
+++ b/server/sources/native.py Wed Mar 24 10:23:31 2010 +0100
@@ -17,7 +17,9 @@
from datetime import datetime
from base64 import b64decode, b64encode
+from logilab.common.compat import any
from logilab.common.cache import Cache
+from logilab.common.decorators import cached, clear_cache
from logilab.common.configuration import Method
from logilab.common.adbh import get_adv_func_helper
from logilab.common.shellutils import getlogin
@@ -26,6 +28,7 @@
from cubicweb import UnknownEid, AuthenticationError, Binary, server
from cubicweb.cwconfig import CubicWebNoAppConfiguration
+from cubicweb.server import hook
from cubicweb.server.utils import crypt_password
from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
from cubicweb.server.rqlannotation import set_qdata
@@ -96,11 +99,6 @@
"""adapter for source using the native cubicweb schema (see below)
"""
sqlgen_class = SQLGenerator
-
- passwd_rql = "Any P WHERE X is CWUser, X login %(login)s, X upassword P"
- auth_rql = "Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"
- _sols = ({'X': 'CWUser', 'P': 'Password'},)
-
options = (
('db-driver',
{'type' : 'string',
@@ -148,18 +146,23 @@
def __init__(self, repo, appschema, source_config, *args, **kwargs):
SQLAdapterMixIn.__init__(self, source_config)
+ self.authentifiers = [LoginPasswordAuthentifier(self)]
AbstractSource.__init__(self, repo, appschema, source_config,
*args, **kwargs)
+ # full text index helper
+ self.do_fti = not repo.config['delay-full-text-indexation']
+ if self.do_fti:
+ self.indexer = get_indexer(self.dbdriver, self.encoding)
+ # XXX should go away with logilab.db
+ self.dbhelper.fti_uid_attr = self.indexer.uid_attr
+ self.dbhelper.fti_table = self.indexer.table
+ self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
+ self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
+ else:
+ self.dbhelper.fti_need_distinct_query = False
# sql generator
self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
self.encoding, ATTR_MAP.copy())
- # full text index helper
- self.indexer = get_indexer(self.dbdriver, self.encoding)
- # advanced functionality helper
- self.dbhelper.fti_uid_attr = self.indexer.uid_attr
- self.dbhelper.fti_table = self.indexer.table
- self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
- self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
# sql queries cache
self._cache = Cache(repo.config['rql-cache-size'])
self._temp_table_data = {}
@@ -182,6 +185,11 @@
# consuming, find another way
return SQLAdapterMixIn.get_connection(self)
+ def add_authentifier(self, authentifier):
+ self.authentifiers.append(authentifier)
+ authentifier.source = self
+ authentifier.set_schema(self.schema)
+
def reset_caches(self):
"""method called during test to reset potential source caches"""
self._cache = Cache(self.repo.config['rql-cache-size'])
@@ -200,17 +208,19 @@
pool = self.repo._get_pool()
pool.pool_set()
# check full text index availibility
- if not self.indexer.has_fti_table(pool['system']):
- self.error('no text index table')
- self.indexer = None
+ if self.do_fti:
+ if not self.indexer.has_fti_table(pool['system']):
+ if not self.repo.config.creating:
+ self.critical('no text index table')
+ self.do_fti = False
pool.pool_reset()
self.repo._free_pool(pool)
- def backup(self, backupfile):
+ def backup(self, backupfile, confirm):
"""method called to create a backup of the source's data"""
self.close_pool_connections()
try:
- self.backup_to_file(backupfile)
+ self.backup_to_file(backupfile, confirm)
finally:
self.open_pool_connections()
@@ -230,12 +240,15 @@
def map_attribute(self, etype, attr, cb):
self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = cb
+ def unmap_attribute(self, etype, attr):
+ self._rql_sqlgen.attr_map.pop('%s.%s' % (etype, attr), None)
+
# ISource interface #######################################################
- def compile_rql(self, rql):
+ def compile_rql(self, rql, sols):
rqlst = self.repo.vreg.rqlhelper.parse(rql)
rqlst.restricted_vars = ()
- rqlst.children[0].solutions = self._sols
+ rqlst.children[0].solutions = sols
self.repo.querier.sqlgen_annotate(rqlst)
set_qdata(self.schema.rschema, rqlst, ())
return rqlst
@@ -249,10 +262,9 @@
self._rql_sqlgen.schema = schema
except AttributeError:
pass # __init__
- if 'CWUser' in schema: # probably an empty schema if not true...
- # rql syntax trees used to authenticate users
- self._passwd_rqlst = self.compile_rql(self.passwd_rql)
- self._auth_rqlst = self.compile_rql(self.auth_rql)
+ for authentifier in self.authentifiers:
+ authentifier.set_schema(self.schema)
+ clear_cache(self, 'need_fti_indexation')
def support_entity(self, etype, write=False):
"""return true if the given entity's type is handled by this adapter
@@ -273,30 +285,16 @@
def may_cross_relation(self, rtype):
return True
- def authenticate(self, session, login, password):
- """return CWUser eid for the given login/password if this account is
- defined in this source, else raise `AuthenticationError`
-
- two queries are needed since passwords are stored crypted, so we have
- to fetch the salt first
+ def authenticate(self, session, login, **kwargs):
+ """return CWUser eid for the given login and other authentication
+ information found in kwargs, else raise `AuthenticationError`
"""
- args = {'login': login, 'pwd' : password}
- if password is not None:
- rset = self.syntax_tree_search(session, self._passwd_rqlst, args)
+ for authentifier in self.authentifiers:
try:
- pwd = rset[0][0]
- except IndexError:
- raise AuthenticationError('bad login')
- # passwords are stored using the Bytes type, so we get a StringIO
- if pwd is not None:
- args['pwd'] = Binary(crypt_password(password, pwd.getvalue()[:2]))
- # get eid from login and (crypted) password
- # XXX why not simply compare password?
- rset = self.syntax_tree_search(session, self._auth_rqlst, args)
- try:
- return rset[0][0]
- except IndexError:
- raise AuthenticationError('bad password')
+ return authentifier.authenticate(session, login, **kwargs)
+ except AuthenticationError:
+ continue
+ raise AuthenticationError()
def syntax_tree_search(self, session, union, args=None, cachekey=None,
varmap=None):
@@ -390,7 +388,8 @@
def update_entity(self, session, entity):
"""replace an entity in the source"""
attrs = self.preprocess_entity(entity)
- sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid'])
+ sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs,
+ [SQL_PREFIX + 'eid'])
self.doexec(session, sql, attrs)
def delete_entity(self, session, etype, eid):
@@ -399,10 +398,16 @@
sql = self.sqlgen.delete(SQL_PREFIX + etype, attrs)
self.doexec(session, sql, attrs)
- def add_relation(self, session, subject, rtype, object):
+ def add_relation(self, session, subject, rtype, object, inlined=False):
"""add a relation to the source"""
- attrs = {'eid_from': subject, 'eid_to': object}
- sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
+ if inlined is False:
+ attrs = {'eid_from': subject, 'eid_to': object}
+ sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
+ else: # used by data import
+ etype = session.describe(subject)[0]
+ attrs = {SQL_PREFIX + 'eid': subject, SQL_PREFIX + rtype: object}
+ sql = self.sqlgen.update(SQL_PREFIX + etype, attrs,
+ [SQL_PREFIX + 'eid'])
self.doexec(session, sql, attrs)
def delete_relation(self, session, subject, rtype, object):
@@ -433,12 +438,16 @@
# str(query) to avoid error if it's an unicode string
cursor.execute(str(query), args)
except Exception, ex:
- self.critical("sql: %r\n args: %s\ndbms message: %r",
- query, args, ex.args[0])
+ if self.repo.config.mode != 'test':
+ # during test we get those message when trying to alter sqlite
+ # db schema
+ self.critical("sql: %r\n args: %s\ndbms message: %r",
+ query, args, ex.args[0])
if rollback:
try:
session.pool.connection(self.uri).rollback()
- self.critical('transaction has been rollbacked')
+ if self.repo.config.mode != 'test':
+ self.critical('transaction has been rollbacked')
except:
pass
raise
@@ -455,11 +464,15 @@
# str(query) to avoid error if it's an unicode string
cursor.executemany(str(query), args)
except Exception, ex:
- self.critical("sql many: %r\n args: %s\ndbms message: %r",
- query, args, ex.args[0])
+ if self.repo.config.mode != 'test':
+ # during test we get those message when trying to alter sqlite
+ # db schema
+ self.critical("sql many: %r\n args: %s\ndbms message: %r",
+ query, args, ex.args[0])
try:
session.pool.connection(self.uri).rollback()
- self.critical('transaction has been rollbacked')
+ if self.repo.config.mode != 'test':
+ self.critical('transaction has been rollbacked')
except:
pass
raise
@@ -528,15 +541,29 @@
finally:
self._eid_creation_lock.release()
- def add_info(self, session, entity, source, extid=None):
+ def add_info(self, session, entity, source, extid=None, complete=True):
"""add type and source info for an eid into the system table"""
# begin by inserting eid/type/source/extid into the entities table
if extid is not None:
assert isinstance(extid, str)
extid = b64encode(extid)
- attrs = {'type': entity.id, 'eid': entity.eid, 'extid': extid,
+ attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
'source': source.uri, 'mtime': datetime.now()}
session.system_sql(self.sqlgen.insert('entities', attrs), attrs)
+ # now we can update the full text index
+ if self.do_fti and self.need_fti_indexation(entity.__regid__):
+ if complete:
+ entity.complete(entity.e_schema.indexable_attributes())
+ FTIndexEntityOp(session, entity=entity)
+
+ def update_info(self, session, entity, need_fti_update):
+ if self.do_fti and need_fti_update:
+ # reindex the entity only if this query is updating at least
+ # one indexable attribute
+ FTIndexEntityOp(session, entity=entity)
+ # update entities.mtime
+ attrs = {'eid': entity.eid, 'mtime': datetime.now()}
+ session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
def delete_info(self, session, eid, etype, uri, extid):
"""delete system information on deletion of an entity by transfering
@@ -551,30 +578,6 @@
'source': uri, 'dtime': datetime.now()}
session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
- def fti_unindex_entity(self, session, eid):
- """remove text content for entity with the given eid from the full text
- index
- """
- try:
- self.indexer.cursor_unindex_object(eid, session.pool['system'])
- except Exception: # let KeyboardInterrupt / SystemExit propagate
- if self.indexer is not None:
- self.exception('error while unindexing %s', eid)
-
- def fti_index_entity(self, session, entity):
- """add text content of a created/modified entity to the full text index
- """
- self.info('reindexing %r', entity.eid)
- try:
- self.indexer.cursor_reindex_object(entity.eid, entity,
- session.pool['system'])
- except Exception: # let KeyboardInterrupt / SystemExit propagate
- if self.indexer is not None:
- self.exception('error while reindexing %s', entity)
- # update entities.mtime
- attrs = {'eid': entity.eid, 'mtime': datetime.now()}
- session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
-
def modified_entities(self, session, etypes, mtime):
"""return a 2-uple:
* list of (etype, eid) of entities of the given types which have been
@@ -591,6 +594,71 @@
delentities = cursor.fetchall()
return modentities, delentities
+ # full text index handling #################################################
+
+ @cached
+ def need_fti_indexation(self, etype):
+ eschema = self.schema.eschema(etype)
+ if any(eschema.indexable_attributes()):
+ return True
+ if any(eschema.fulltext_containers()):
+ return True
+ return False
+
+ def index_entity(self, session, entity):
+ """create an operation to [re]index textual content of the given entity
+ on commit
+ """
+ FTIndexEntityOp(session, entity=entity)
+
+ def fti_unindex_entity(self, session, eid):
+ """remove text content for entity with the given eid from the full text
+ index
+ """
+ try:
+ self.indexer.cursor_unindex_object(eid, session.pool['system'])
+ except Exception: # let KeyboardInterrupt / SystemExit propagate
+ self.exception('error while unindexing %s', eid)
+
+ def fti_index_entity(self, session, entity):
+ """add text content of a created/modified entity to the full text index
+ """
+ self.debug('reindexing %r', entity.eid)
+ try:
+ # use cursor_index_object, not cursor_reindex_object since
+ # unindexing done in the FTIndexEntityOp
+ self.indexer.cursor_index_object(entity.eid, entity,
+ session.pool['system'])
+ except Exception: # let KeyboardInterrupt / SystemExit propagate
+ self.exception('error while reindexing %s', entity)
+
+
+class FTIndexEntityOp(hook.LateOperation):
+ """operation to delay entity full text indexation to commit
+
+ since fti indexing may trigger discovery of other entities, it should be
+ triggered on precommit, not commit, and this should be done after other
+ precommit operation which may add relations to the entity
+ """
+
+ def precommit_event(self):
+ session = self.session
+ entity = self.entity
+ if entity.eid in session.transaction_data.get('pendingeids', ()):
+ return # entity added and deleted in the same transaction
+ alreadydone = session.transaction_data.setdefault('indexedeids', set())
+ if entity.eid in alreadydone:
+ self.debug('skipping reindexation of %s, already done', entity.eid)
+ return
+ alreadydone.add(entity.eid)
+ source = session.repo.system_source
+ for container in entity.fti_containers():
+ source.fti_unindex_entity(session, container.eid)
+ source.fti_index_entity(session, container)
+
+ def commit_event(self):
+ pass
+
def sql_schema(driver):
helper = get_adv_func_helper(driver)
@@ -644,3 +712,49 @@
result += 'GRANT ALL ON deleted_entities TO %s;\n' % user
result += 'GRANT ALL ON entities_id_seq TO %s;\n' % user
return result
+
+
+class BaseAuthentifier(object):
+
+ def __init__(self, source=None):
+ self.source = source
+
+ def set_schema(self, schema):
+ """set the instance'schema"""
+ pass
+
+class LoginPasswordAuthentifier(BaseAuthentifier):
+ passwd_rql = "Any P WHERE X is CWUser, X login %(login)s, X upassword P"
+ auth_rql = "Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"
+ _sols = ({'X': 'CWUser', 'P': 'Password'},)
+
+ def set_schema(self, schema):
+ """set the instance'schema"""
+ if 'CWUser' in schema: # probably an empty schema if not true...
+ # rql syntax trees used to authenticate users
+ self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols)
+ self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols)
+
+ def authenticate(self, session, login, password=None, **kwargs):
+ """return CWUser eid for the given login/password if this account is
+ defined in this source, else raise `AuthenticationError`
+
+ two queries are needed since passwords are stored crypted, so we have
+ to fetch the salt first
+ """
+ args = {'login': login, 'pwd' : password}
+ if password is not None:
+ rset = self.source.syntax_tree_search(session, self._passwd_rqlst, args)
+ try:
+ pwd = rset[0][0]
+ except IndexError:
+ raise AuthenticationError('bad login')
+ # passwords are stored using the Bytes type, so we get a StringIO
+ if pwd is not None:
+ args['pwd'] = Binary(crypt_password(password, pwd.getvalue()[:2]))
+ # get eid from login and (crypted) password
+ rset = self.source.syntax_tree_search(session, self._auth_rqlst, args)
+ try:
+ return rset[0][0]
+ except IndexError:
+ raise AuthenticationError('bad password')