we should not use inheritance there but proper view redirection, so if one adapt listitem view, he gets the same thing by default with the adapted list
"""Adapters for native cubicweb sources.
Notes:
* extid (aka external id, the primary key of an entity in the external source
from which it comes from) are stored in a varchar column encoded as a base64
string. This is because it should actually be Bytes but we want an index on
it for fast querying.
:organization: Logilab
:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
__docformat__ = "restructuredtext en"
from threading import Lock
from datetime import datetime
from base64 import b64decode, b64encode
from logilab.common.cache import Cache
from logilab.common.configuration import Method
from logilab.common.adbh import get_adv_func_helper
from logilab.common.shellutils import getlogin
from indexer import get_indexer
from cubicweb import UnknownEid, AuthenticationError, Binary, server
from cubicweb.cwconfig import CubicWebNoAppConfiguration
from cubicweb.server.utils import crypt_password
from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
from cubicweb.server.rqlannotation import set_qdata
from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results
from cubicweb.server.sources.rql2sql import SQLGenerator
ATTR_MAP = {}
NONSYSTEM_ETYPES = set()
NONSYSTEM_RELATIONS = set()
class LogCursor(object):
def __init__(self, cursor):
self.cu = cursor
def execute(self, query, args=None):
"""Execute a query.
it's a function just so that it shows up in profiling
"""
if server.DEBUG & server.DBG_SQL:
print 'exec', query, args
try:
self.cu.execute(str(query), args)
except Exception, ex:
print "sql: %r\n args: %s\ndbms message: %r" % (
query, args, ex.args[0])
raise
def fetchall(self):
return self.cu.fetchall()
def fetchone(self):
return self.cu.fetchone()
def make_schema(selected, solution, table, typemap):
"""return a sql schema to store RQL query result"""
sql = []
varmap = {}
for i, term in enumerate(selected):
name = 'C%s' % i
key = term.as_string()
varmap[key] = '%s.%s' % (table, name)
ttype = term.get_type(solution)
try:
sql.append('%s %s' % (name, typemap[ttype]))
except KeyError:
# assert not schema(ttype).final
sql.append('%s %s' % (name, typemap['Int']))
return ','.join(sql), varmap
def _modified_sql(table, etypes):
# XXX protect against sql injection
if len(etypes) > 1:
restr = 'type IN (%s)' % ','.join("'%s'" % etype for etype in etypes)
else:
restr = "type='%s'" % etypes[0]
if table == 'entities':
attr = 'mtime'
else:
attr = 'dtime'
return 'SELECT type, eid FROM %s WHERE %s AND %s > %%(time)s' % (
table, restr, attr)
class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
"""adapter for source using the native cubicweb schema (see below)
"""
sqlgen_class = SQLGenerator
options = (
('db-driver',
{'type' : 'string',
'default': 'postgres',
'help': 'database driver (postgres or sqlite)',
'group': 'native-source', 'inputlevel': 1,
}),
('db-host',
{'type' : 'string',
'default': '',
'help': 'database host',
'group': 'native-source', 'inputlevel': 1,
}),
('db-port',
{'type' : 'string',
'default': '',
'help': 'database port',
'group': 'native-source', 'inputlevel': 1,
}),
('db-name',
{'type' : 'string',
'default': Method('default_instance_id'),
'help': 'database name',
'group': 'native-source', 'inputlevel': 0,
}),
('db-user',
{'type' : 'string',
'default': CubicWebNoAppConfiguration.mode == 'user' and getlogin() or 'cubicweb',
'help': 'database user',
'group': 'native-source', 'inputlevel': 0,
}),
('db-password',
{'type' : 'password',
'default': '',
'help': 'database password',
'group': 'native-source', 'inputlevel': 0,
}),
('db-encoding',
{'type' : 'string',
'default': 'utf8',
'help': 'database encoding',
'group': 'native-source', 'inputlevel': 1,
}),
)
def __init__(self, repo, appschema, source_config, *args, **kwargs):
SQLAdapterMixIn.__init__(self, source_config)
self.authentifiers = [LoginPasswordAuthentifier(self)]
AbstractSource.__init__(self, repo, appschema, source_config,
*args, **kwargs)
# sql generator
self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
self.encoding, ATTR_MAP.copy())
# full text index helper
self.indexer = get_indexer(self.dbdriver, self.encoding)
# advanced functionality helper
self.dbhelper.fti_uid_attr = self.indexer.uid_attr
self.dbhelper.fti_table = self.indexer.table
self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
# sql queries cache
self._cache = Cache(repo.config['rql-cache-size'])
self._temp_table_data = {}
self._eid_creation_lock = Lock()
# XXX no_sqlite_wrap trick since we've a sqlite locking pb when
# running unittest_multisources with the wrapping below
if self.dbdriver == 'sqlite' and \
not getattr(repo.config, 'no_sqlite_wrap', False):
from cubicweb.server.sources.extlite import ConnectionWrapper
self.get_connection = lambda: ConnectionWrapper(self)
self.check_connection = lambda cnx: cnx
def pool_reset(cnx):
cnx.close()
self.pool_reset = pool_reset
@property
def _sqlcnx(self):
# XXX: sqlite connections can only be used in the same thread, so
# create a new one each time necessary. If it appears to be time
# consuming, find another way
return SQLAdapterMixIn.get_connection(self)
def add_authentifier(self, authentifier):
self.authentifiers.append(authentifier)
authentifier.source = self
authentifier.set_schema(self.schema)
def reset_caches(self):
"""method called during test to reset potential source caches"""
self._cache = Cache(self.repo.config['rql-cache-size'])
def clear_eid_cache(self, eid, etype):
"""clear potential caches for the given eid"""
self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None)
self._cache.pop('Any X WHERE X eid %s' % eid, None)
self._cache.pop('Any %s' % eid, None)
def sqlexec(self, session, sql, args=None):
"""execute the query and return its result"""
return self.process_result(self.doexec(session, sql, args))
def init_creating(self):
pool = self.repo._get_pool()
pool.pool_set()
# check full text index availibility
if not self.indexer.has_fti_table(pool['system']):
self.error('no text index table')
self.indexer = None
pool.pool_reset()
self.repo._free_pool(pool)
def backup(self, backupfile):
"""method called to create a backup of the source's data"""
self.close_pool_connections()
try:
self.backup_to_file(backupfile)
finally:
self.open_pool_connections()
def restore(self, backupfile, confirm, drop):
"""method called to restore a backup of source's data"""
if self.repo.config.open_connections_pools:
self.close_pool_connections()
try:
self.restore_from_file(backupfile, confirm, drop=drop)
finally:
if self.repo.config.open_connections_pools:
self.open_pool_connections()
def init(self):
self.init_creating()
def map_attribute(self, etype, attr, cb):
self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = cb
# ISource interface #######################################################
def compile_rql(self, rql, sols):
rqlst = self.repo.vreg.rqlhelper.parse(rql)
rqlst.restricted_vars = ()
rqlst.children[0].solutions = sols
self.repo.querier.sqlgen_annotate(rqlst)
set_qdata(self.schema.rschema, rqlst, ())
return rqlst
def set_schema(self, schema):
"""set the instance'schema"""
self._cache = Cache(self.repo.config['rql-cache-size'])
self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0
self.schema = schema
try:
self._rql_sqlgen.schema = schema
except AttributeError:
pass # __init__
for authentifier in self.authentifiers:
authentifier.set_schema(self.schema)
def support_entity(self, etype, write=False):
"""return true if the given entity's type is handled by this adapter
if write is true, return true only if it's a RW support
"""
return not etype in NONSYSTEM_ETYPES
def support_relation(self, rtype, write=False):
"""return true if the given relation's type is handled by this adapter
if write is true, return true only if it's a RW support
"""
if write:
return not rtype in NONSYSTEM_RELATIONS
# due to current multi-sources implementation, the system source
# can't claim not supporting a relation
return True #not rtype == 'content_for'
def may_cross_relation(self, rtype):
return True
def authenticate(self, session, login, **kwargs):
"""return CWUser eid for the given login and other authentication
information found in kwargs, else raise `AuthenticationError`
"""
for authentifier in self.authentifiers:
try:
return authentifier.authenticate(session, login, **kwargs)
except AuthenticationError:
continue
raise AuthenticationError()
def syntax_tree_search(self, session, union, args=None, cachekey=None,
varmap=None):
"""return result from this source for a rql query (actually from
a rql syntax tree and a solution dictionary mapping each used
variable to a possible type). If cachekey is given, the query
necessary to fetch the results (but not the results themselves)
may be cached using this key.
"""
assert dbg_st_search(self.uri, union, varmap, args, cachekey)
# remember number of actually selected term (sql generation may append some)
if cachekey is None:
self.no_cache += 1
# generate sql query if we are able to do so (not supported types...)
sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
else:
# sql may be cached
try:
sql, query_args = self._cache[cachekey]
self.cache_hit += 1
except KeyError:
self.cache_miss += 1
sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
self._cache[cachekey] = sql, query_args
args = self.merge_args(args, query_args)
assert isinstance(sql, basestring), repr(sql)
try:
cursor = self.doexec(session, sql, args)
except (self.dbapi_module.OperationalError,
self.dbapi_module.InterfaceError):
# FIXME: better detection of deconnection pb
self.info("request failed '%s' ... retry with a new cursor", sql)
session.pool.reconnect(self)
cursor = self.doexec(session, sql, args)
results = self.process_result(cursor)
assert dbg_results(results)
return results
def flying_insert(self, table, session, union, args=None, varmap=None):
"""similar as .syntax_tree_search, but inserts data in the
temporary table (on-the-fly if possible, eg for the system
source whose the given cursor come from). If not possible,
inserts all data by calling .executemany().
"""
assert dbg_st_search(
self.uri, union, varmap, args,
prefix='ON THE FLY temp data insertion into %s from' % table)
# generate sql queries if we are able to do so
sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding))
self.doexec(session, query, self.merge_args(args, query_args))
def manual_insert(self, results, table, session):
"""insert given result into a temporary table on the system source"""
if server.DEBUG & server.DBG_RQL:
print ' manual insertion of', results, 'into', table
if not results:
return
query_args = ['%%(%s)s' % i for i in xrange(len(results[0]))]
query = 'INSERT INTO %s VALUES(%s)' % (table, ','.join(query_args))
kwargs_list = []
for row in results:
kwargs = {}
row = tuple(row)
for index, cell in enumerate(row):
if isinstance(cell, Binary):
cell = self.binary(cell.getvalue())
kwargs[str(index)] = cell
kwargs_list.append(kwargs)
self.doexecmany(session, query, kwargs_list)
def clean_temp_data(self, session, temptables):
"""remove temporary data, usually associated to temporary tables"""
if temptables:
for table in temptables:
try:
self.doexec(session,'DROP TABLE %s' % table)
except:
pass
try:
del self._temp_table_data[table]
except KeyError:
continue
def add_entity(self, session, entity):
"""add a new entity to the source"""
attrs = self.preprocess_entity(entity)
sql = self.sqlgen.insert(SQL_PREFIX + str(entity.e_schema), attrs)
self.doexec(session, sql, attrs)
def update_entity(self, session, entity):
"""replace an entity in the source"""
attrs = self.preprocess_entity(entity)
sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid'])
self.doexec(session, sql, attrs)
def delete_entity(self, session, etype, eid):
"""delete an entity from the source"""
attrs = {SQL_PREFIX + 'eid': eid}
sql = self.sqlgen.delete(SQL_PREFIX + etype, attrs)
self.doexec(session, sql, attrs)
def add_relation(self, session, subject, rtype, object):
"""add a relation to the source"""
attrs = {'eid_from': subject, 'eid_to': object}
sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
self.doexec(session, sql, attrs)
def delete_relation(self, session, subject, rtype, object):
"""delete a relation from the source"""
rschema = self.schema.rschema(rtype)
if rschema.inlined:
table = SQL_PREFIX + session.describe(subject)[0]
column = SQL_PREFIX + rtype
sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column,
SQL_PREFIX)
attrs = {'eid' : subject}
else:
attrs = {'eid_from': subject, 'eid_to': object}
sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
self.doexec(session, sql, attrs)
def doexec(self, session, query, args=None, rollback=True):
"""Execute a query.
it's a function just so that it shows up in profiling
"""
cursor = session.pool[self.uri]
if server.DEBUG & server.DBG_SQL:
cnx = session.pool.connection(self.uri)
# getattr to get the actual connection if cnx is a ConnectionWrapper
# instance
print 'exec', query, args, getattr(cnx, '_cnx', cnx)
try:
# str(query) to avoid error if it's an unicode string
cursor.execute(str(query), args)
except Exception, ex:
self.critical("sql: %r\n args: %s\ndbms message: %r",
query, args, ex.args[0])
if rollback:
try:
session.pool.connection(self.uri).rollback()
self.critical('transaction has been rollbacked')
except:
pass
raise
return cursor
def doexecmany(self, session, query, args):
"""Execute a query.
it's a function just so that it shows up in profiling
"""
if server.DEBUG & server.DBG_SQL:
print 'execmany', query, 'with', len(args), 'arguments'
cursor = session.pool[self.uri]
try:
# str(query) to avoid error if it's an unicode string
cursor.executemany(str(query), args)
except Exception, ex:
self.critical("sql many: %r\n args: %s\ndbms message: %r",
query, args, ex.args[0])
try:
session.pool.connection(self.uri).rollback()
self.critical('transaction has been rollbacked')
except:
pass
raise
# short cut to method requiring advanced db helper usage ##################
def create_index(self, session, table, column, unique=False):
cursor = LogCursor(session.pool[self.uri])
self.dbhelper.create_index(cursor, table, column, unique)
def drop_index(self, session, table, column, unique=False):
cursor = LogCursor(session.pool[self.uri])
self.dbhelper.drop_index(cursor, table, column, unique)
# system source interface #################################################
def eid_type_source(self, session, eid):
"""return a tuple (type, source, extid) for the entity with id <eid>"""
sql = 'SELECT type, source, extid FROM entities WHERE eid=%s' % eid
try:
res = session.system_sql(sql).fetchone()
except:
assert session.pool, 'session has no pool set'
raise UnknownEid(eid)
if res is None:
raise UnknownEid(eid)
if res[-1] is not None:
if not isinstance(res, list):
res = list(res)
res[-1] = b64decode(res[-1])
return res
def extid2eid(self, session, source, extid):
"""get eid from an external id. Return None if no record found."""
assert isinstance(extid, str)
cursor = session.system_sql('SELECT eid FROM entities WHERE '
'extid=%(x)s AND source=%(s)s',
{'x': b64encode(extid), 's': source.uri})
# XXX testing rowcount cause strange bug with sqlite, results are there
# but rowcount is 0
#if cursor.rowcount > 0:
try:
result = cursor.fetchone()
if result:
return result[0]
except:
pass
return None
def temp_table_def(self, selected, sol, table):
return make_schema(selected, sol, table, self.dbhelper.TYPE_MAPPING)
def create_temp_table(self, session, table, schema):
# we don't want on commit drop, this may cause problem when
# running with an ldap source, and table will be deleted manually any way
# on commit
sql = self.dbhelper.sql_temporary_table(table, schema, False)
self.doexec(session, sql)
def create_eid(self, session):
self._eid_creation_lock.acquire()
try:
for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'):
cursor = self.doexec(session, sql)
return cursor.fetchone()[0]
finally:
self._eid_creation_lock.release()
def add_info(self, session, entity, source, extid=None):
"""add type and source info for an eid into the system table"""
# begin by inserting eid/type/source/extid into the entities table
if extid is not None:
assert isinstance(extid, str)
extid = b64encode(extid)
attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
'source': source.uri, 'mtime': datetime.now()}
session.system_sql(self.sqlgen.insert('entities', attrs), attrs)
def delete_info(self, session, eid, etype, uri, extid):
"""delete system information on deletion of an entity by transfering
record from the entities table to the deleted_entities table
"""
attrs = {'eid': eid}
session.system_sql(self.sqlgen.delete('entities', attrs), attrs)
if extid is not None:
assert isinstance(extid, str), type(extid)
extid = b64encode(extid)
attrs = {'type': etype, 'eid': eid, 'extid': extid,
'source': uri, 'dtime': datetime.now()}
session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
def fti_unindex_entity(self, session, eid):
"""remove text content for entity with the given eid from the full text
index
"""
try:
self.indexer.cursor_unindex_object(eid, session.pool['system'])
except Exception: # let KeyboardInterrupt / SystemExit propagate
if self.indexer is not None:
self.exception('error while unindexing %s', eid)
def fti_index_entity(self, session, entity):
"""add text content of a created/modified entity to the full text index
"""
self.info('reindexing %r', entity.eid)
try:
self.indexer.cursor_reindex_object(entity.eid, entity,
session.pool['system'])
except Exception: # let KeyboardInterrupt / SystemExit propagate
if self.indexer is not None:
self.exception('error while reindexing %s', entity)
# update entities.mtime
attrs = {'eid': entity.eid, 'mtime': datetime.now()}
session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
def modified_entities(self, session, etypes, mtime):
"""return a 2-uple:
* list of (etype, eid) of entities of the given types which have been
modified since the given timestamp (actually entities whose full text
index content has changed)
* list of (etype, eid) of entities of the given types which have been
deleted since the given timestamp
"""
modsql = _modified_sql('entities', etypes)
cursor = session.system_sql(modsql, {'time': mtime})
modentities = cursor.fetchall()
delsql = _modified_sql('deleted_entities', etypes)
cursor = session.system_sql(delsql, {'time': mtime})
delentities = cursor.fetchall()
return modentities, delentities
def sql_schema(driver):
helper = get_adv_func_helper(driver)
tstamp_col_type = helper.TYPE_MAPPING['Datetime']
schema = """
/* Create the repository's system database */
%s
CREATE TABLE entities (
eid INTEGER PRIMARY KEY NOT NULL,
type VARCHAR(64) NOT NULL,
source VARCHAR(64) NOT NULL,
mtime %s NOT NULL,
extid VARCHAR(256)
);
CREATE INDEX entities_type_idx ON entities(type);
CREATE INDEX entities_mtime_idx ON entities(mtime);
CREATE INDEX entities_extid_idx ON entities(extid);
CREATE TABLE deleted_entities (
eid INTEGER PRIMARY KEY NOT NULL,
type VARCHAR(64) NOT NULL,
source VARCHAR(64) NOT NULL,
dtime %s NOT NULL,
extid VARCHAR(256)
);
CREATE INDEX deleted_entities_type_idx ON deleted_entities(type);
CREATE INDEX deleted_entities_dtime_idx ON deleted_entities(dtime);
CREATE INDEX deleted_entities_extid_idx ON deleted_entities(extid);
""" % (helper.sql_create_sequence('entities_id_seq'), tstamp_col_type, tstamp_col_type)
return schema
def sql_drop_schema(driver):
helper = get_adv_func_helper(driver)
return """
%s
DROP TABLE entities;
DROP TABLE deleted_entities;
""" % helper.sql_drop_sequence('entities_id_seq')
def grant_schema(user, set_owner=True):
result = ''
if set_owner:
result = 'ALTER TABLE entities OWNER TO %s;\n' % user
result += 'ALTER TABLE deleted_entities OWNER TO %s;\n' % user
result += 'ALTER TABLE entities_id_seq OWNER TO %s;\n' % user
result += 'GRANT ALL ON entities TO %s;\n' % user
result += 'GRANT ALL ON deleted_entities TO %s;\n' % user
result += 'GRANT ALL ON entities_id_seq TO %s;\n' % user
return result
class BaseAuthentifier(object):
def __init__(self, source=None):
self.source = source
def set_schema(self, schema):
"""set the instance'schema"""
pass
class LoginPasswordAuthentifier(BaseAuthentifier):
passwd_rql = "Any P WHERE X is CWUser, X login %(login)s, X upassword P"
auth_rql = "Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"
_sols = ({'X': 'CWUser', 'P': 'Password'},)
def set_schema(self, schema):
"""set the instance'schema"""
if 'CWUser' in schema: # probably an empty schema if not true...
# rql syntax trees used to authenticate users
self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols)
self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols)
def authenticate(self, session, login, password=None, **kwargs):
"""return CWUser eid for the given login/password if this account is
defined in this source, else raise `AuthenticationError`
two queries are needed since passwords are stored crypted, so we have
to fetch the salt first
"""
args = {'login': login, 'pwd' : password}
if password is not None:
rset = self.source.syntax_tree_search(session, self._passwd_rqlst, args)
try:
pwd = rset[0][0]
except IndexError:
raise AuthenticationError('bad login')
# passwords are stored using the Bytes type, so we get a StringIO
if pwd is not None:
args['pwd'] = Binary(crypt_password(password, pwd.getvalue()[:2]))
# get eid from login and (crypted) password
rset = self.source.syntax_tree_search(session, self._auth_rqlst, args)
try:
return rset[0][0]
except IndexError:
raise AuthenticationError('bad password')