# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
"""Source to query another RQL repository using pyro"""
__docformat__ = "restructuredtext en"
_ = unicode
import threading
from os.path import join
from time import mktime
from datetime import datetime
from base64 import b64decode
from Pyro.errors import PyroError, ConnectionClosedError
from logilab.common.configuration import REQUIRED
from logilab.common.optik_ext import check_yn
from yams.schema import role_name
from rql.nodes import Constant
from rql.utils import rqlvar_maker
from cubicweb import dbapi, server
from cubicweb import ValidationError, BadConnectionId, UnknownEid, ConnectionError
from cubicweb.schema import VIRTUAL_RTYPES
from cubicweb.cwconfig import register_persistent_options
from cubicweb.server.sources import (AbstractSource, ConnectionWrapper,
TimedCache, dbg_st_search, dbg_results)
from cubicweb.server.msplanner import neged_relation
def uidtype(union, col, etype, args):
select, col = union.locate_subquery(col, etype, args)
return getattr(select.selection[col], 'uidtype', None)
class ReplaceByInOperator(Exception):
def __init__(self, eids):
self.eids = eids
class PyroRQLSource(AbstractSource):
"""External repository source, using Pyro connection"""
# boolean telling if modification hooks should be called when something is
# modified in this source
should_call_hooks = False
# boolean telling if the repository should connect to this source during
# migration
connect_for_migration = False
options = (
# XXX pyro-ns host/port
('pyro-ns-id',
{'type' : 'string',
'default': REQUIRED,
'help': 'identifier of the repository in the pyro name server',
'group': 'pyro-source', 'level': 0,
}),
('cubicweb-user',
{'type' : 'string',
'default': REQUIRED,
'help': 'user to use for connection on the distant repository',
'group': 'pyro-source', 'level': 0,
}),
('cubicweb-password',
{'type' : 'password',
'default': '',
'help': 'user to use for connection on the distant repository',
'group': 'pyro-source', 'level': 0,
}),
('base-url',
{'type' : 'string',
'default': '',
'help': 'url of the web site for the distant repository, if you want '
'to generate external link to entities from this repository',
'group': 'pyro-source', 'level': 1,
}),
('skip-external-entities',
{'type' : 'yn',
'default': False,
'help': 'should entities not local to the source be considered or not',
'group': 'pyro-source', 'level': 0,
}),
('pyro-ns-host',
{'type' : 'string',
'default': None,
'help': 'Pyro name server\'s host. If not set, default to the value \
from all_in_one.conf. It may contains port information using <host>:<port> notation.',
'group': 'pyro-source', 'level': 1,
}),
('pyro-ns-group',
{'type' : 'string',
'default': None,
'help': 'Pyro name server\'s group where the repository will be \
registered. If not set, default to the value from all_in_one.conf.',
'group': 'pyro-source', 'level': 2,
}),
('synchronization-interval',
{'type' : 'time',
'default': '5min',
'help': 'interval between synchronization with the external \
repository (default to 5 minutes).',
'group': 'pyro-source', 'level': 2,
}),
)
PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
_conn = None
def __init__(self, repo, source_config, eid=None):
AbstractSource.__init__(self, repo, source_config, eid)
self.update_config(None, self.check_conf_dict(eid, source_config,
fail_if_unknown=False))
self._query_cache = TimedCache(1800)
def update_config(self, source_entity, processed_config):
"""update configuration from source entity"""
# XXX get it through pyro if unset
baseurl = processed_config.get('base-url')
if baseurl and not baseurl.endswith('/'):
processed_config['base-url'] += '/'
self.config = processed_config
self._skip_externals = processed_config['skip-external-entities']
if source_entity is not None:
self.latest_retrieval = source_entity.latest_retrieval
def reset_caches(self):
"""method called during test to reset potential source caches"""
self._query_cache = TimedCache(1800)
def init(self, activated, source_entity):
"""method called by the repository once ready to handle request"""
self.load_mapping(source_entity._cw)
if activated:
interval = self.config['synchronization-interval']
self.repo.looping_task(interval, self.synchronize)
self.repo.looping_task(self._query_cache.ttl.seconds/10,
self._query_cache.clear_expired)
self.latest_retrieval = source_entity.latest_retrieval
def load_mapping(self, session=None):
self.support_entities = {}
self.support_relations = {}
self.dont_cross_relations = set(('owned_by', 'created_by'))
self.cross_relations = set()
assert self.eid is not None
self._schemacfg_idx = {}
self._load_mapping(session)
etype_options = set(('write',))
rtype_options = set(('maycross', 'dontcross', 'write',))
def _check_options(self, schemacfg, allowedoptions):
if schemacfg.options:
options = set(w.strip() for w in schemacfg.options.split(':'))
else:
options = set()
if options - allowedoptions:
options = ', '.join(sorted(options - allowedoptions))
msg = _('unknown option(s): %s' % options)
raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
return options
def add_schema_config(self, schemacfg, checkonly=False):
"""added CWSourceSchemaConfig, modify mapping accordingly"""
try:
ertype = schemacfg.schema.name
except AttributeError:
msg = schemacfg._cw._("attribute/relation can't be mapped, only "
"entity and relation types")
raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
if schemacfg.schema.__regid__ == 'CWEType':
options = self._check_options(schemacfg, self.etype_options)
if not checkonly:
self.support_entities[ertype] = 'write' in options
else: # CWRType
if ertype in ('is', 'is_instance_of', 'cw_source') or ertype in VIRTUAL_RTYPES:
msg = schemacfg._cw._('%s relation should not be in mapped') % rtype
raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
options = self._check_options(schemacfg, self.rtype_options)
if 'dontcross' in options:
if 'maycross' in options:
msg = schemacfg._("can't mix dontcross and maycross options")
raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
if 'write' in options:
msg = schemacfg._("can't mix dontcross and write options")
raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
if not checkonly:
self.dont_cross_relations.add(ertype)
elif not checkonly:
self.support_relations[ertype] = 'write' in options
if 'maycross' in options:
self.cross_relations.add(ertype)
if not checkonly:
# add to an index to ease deletion handling
self._schemacfg_idx[schemacfg.eid] = ertype
def del_schema_config(self, schemacfg, checkonly=False):
"""deleted CWSourceSchemaConfig, modify mapping accordingly"""
if checkonly:
return
try:
ertype = self._schemacfg_idx[schemacfg.eid]
if ertype[0].isupper():
del self.support_entities[ertype]
else:
if ertype in self.support_relations:
del self.support_relations[ertype]
if ertype in self.cross_relations:
self.cross_relations.remove(ertype)
else:
self.dont_cross_relations.remove(ertype)
except:
self.error('while updating mapping consequently to removal of %s',
schemacfg)
def local_eid(self, cnx, extid, session):
etype, dexturi, dextid = cnx.describe(extid)
if dexturi == 'system' or not (
dexturi in self.repo.sources_by_uri or self._skip_externals):
return self.repo.extid2eid(self, str(extid), etype, session), True
if dexturi in self.repo.sources_by_uri:
source = self.repo.sources_by_uri[dexturi]
cnx = session.pool.connection(source.uri)
eid = source.local_eid(cnx, dextid, session)[0]
return eid, False
return None, None
def synchronize(self, mtime=None):
"""synchronize content known by this repository with content in the
external repository
"""
self.info('synchronizing pyro source %s', self.uri)
cnx = self.get_connection()
try:
extrepo = cnx._repo
except AttributeError:
# fake connection wrapper returned when we can't connect to the
# external source (hence we've no chance to synchronize...)
return
etypes = self.support_entities.keys()
if mtime is None:
mtime = self.latest_retrieval
updatetime, modified, deleted = extrepo.entities_modified_since(
etypes, mtime)
self._query_cache.clear()
repo = self.repo
session = repo.internal_session()
source = repo.system_source
try:
for etype, extid in modified:
try:
eid = self.local_eid(cnx, extid, session)[0]
if eid is not None:
rset = session.eid_rset(eid, etype)
entity = rset.get_entity(0, 0)
entity.complete(entity.e_schema.indexable_attributes())
source.index_entity(session, entity)
except:
self.exception('while updating %s with external id %s of source %s',
etype, extid, self.uri)
continue
for etype, extid in deleted:
try:
eid = self.extid2eid(str(extid), etype, session,
insert=False)
# entity has been deleted from external repository but is not known here
if eid is not None:
entity = session.entity_from_eid(eid, etype)
repo.delete_info(session, entity, self.uri, extid,
scleanup=self.eid)
except:
self.exception('while updating %s with external id %s of source %s',
etype, extid, self.uri)
continue
self.latest_retrieval = updatetime
session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
{'x': self.eid, 'date': self.latest_retrieval})
session.commit()
finally:
session.close()
def _get_connection(self):
"""open and return a connection to the source"""
nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host']
nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group']
self.info('connecting to instance :%s.%s for user %s',
nsgroup, self.config['pyro-ns-id'], self.config['cubicweb-user'])
#cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])
return dbapi.connect(database=self.config['pyro-ns-id'],
login=self.config['cubicweb-user'],
password=self.config['cubicweb-password'],
host=nshost, group=nsgroup,
setvreg=False) #cnxprops=cnxprops)
def get_connection(self):
try:
return self._get_connection()
except (ConnectionError, PyroError):
self.critical("can't get connection to source %s", self.uri,
exc_info=1)
return ConnectionWrapper()
def check_connection(self, cnx):
"""check connection validity, return None if the connection is still valid
else a new connection
"""
# we have to transfer manually thread ownership. This can be done safely
# since the pool to which belong the connection is affected to one
# session/thread and can't be called simultaneously
try:
cnx._repo._transferThread(threading.currentThread())
except AttributeError:
# inmemory connection
pass
if not isinstance(cnx, ConnectionWrapper):
try:
cnx.check()
return # ok
except (BadConnectionId, ConnectionClosedError):
pass
# try to reconnect
return self.get_connection()
def syntax_tree_search(self, session, union, args=None, cachekey=None,
varmap=None):
assert dbg_st_search(self.uri, union, varmap, args, cachekey)
rqlkey = union.as_string(kwargs=args)
try:
results = self._query_cache[rqlkey]
except KeyError:
results = self._syntax_tree_search(session, union, args)
self._query_cache[rqlkey] = results
assert dbg_results(results)
return results
def _syntax_tree_search(self, session, union, args):
"""return result from this source for a rql query (actually from a rql
syntax tree and a solution dictionary mapping each used variable to a
possible type). If cachekey is given, the query necessary to fetch the
results (but not the results themselves) may be cached using this key.
"""
if not args is None:
args = args.copy()
# get cached cursor anyway
cu = session.pool[self.uri]
if cu is None:
# this is a ConnectionWrapper instance
msg = session._("can't connect to source %s, some data may be missing")
session.set_shared_data('sources_error', msg % self.uri)
return []
translator = RQL2RQL(self)
try:
rql = translator.generate(session, union, args)
except UnknownEid, ex:
if server.DEBUG:
print ' unknown eid', ex, 'no results'
return []
if server.DEBUG & server.DBG_RQL:
print ' translated rql', rql
try:
rset = cu.execute(rql, args)
except Exception, ex:
self.exception(str(ex))
msg = session._("error while querying source %s, some data may be missing")
session.set_shared_data('sources_error', msg % self.uri)
return []
descr = rset.description
if rset:
needtranslation = []
rows = rset.rows
for i, etype in enumerate(descr[0]):
if (etype is None or not self.schema.eschema(etype).final
or uidtype(union, i, etype, args)):
needtranslation.append(i)
if needtranslation:
cnx = session.pool.connection(self.uri)
for rowindex in xrange(rset.rowcount - 1, -1, -1):
row = rows[rowindex]
localrow = False
for colindex in needtranslation:
if row[colindex] is not None: # optional variable
eid, local = self.local_eid(cnx, row[colindex], session)
if local:
localrow = True
if eid is not None:
row[colindex] = eid
else:
# skip this row
del rows[rowindex]
del descr[rowindex]
break
else:
# skip row if it only contains eids of entities which
# are actually from a source we also know locally,
# except if some args specified (XXX should actually
# check if there are some args local to the source)
if not (translator.has_local_eid or localrow):
del rows[rowindex]
del descr[rowindex]
results = rows
else:
results = []
return results
def _entity_relations_and_kwargs(self, session, entity):
relations = []
kwargs = {'x': self.eid2extid(entity.eid, session)}
for key, val in entity.cw_attr_cache.iteritems():
relations.append('X %s %%(%s)s' % (key, key))
kwargs[key] = val
return relations, kwargs
def add_entity(self, session, entity):
"""add a new entity to the source"""
raise NotImplementedError()
def update_entity(self, session, entity):
"""update an entity in the source"""
relations, kwargs = self._entity_relations_and_kwargs(session, entity)
cu = session.pool[self.uri]
cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), kwargs)
self._query_cache.clear()
entity.clear_all_caches()
def delete_entity(self, session, entity):
"""delete an entity from the source"""
cu = session.pool[self.uri]
cu.execute('DELETE %s X WHERE X eid %%(x)s' % entity.__regid__,
{'x': self.eid2extid(entity.eid, session)})
self._query_cache.clear()
def add_relation(self, session, subject, rtype, object):
"""add a relation to the source"""
cu = session.pool[self.uri]
cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
{'x': self.eid2extid(subject, session),
'y': self.eid2extid(object, session)})
self._query_cache.clear()
session.entity_from_eid(subject).clear_all_caches()
session.entity_from_eid(object).clear_all_caches()
def delete_relation(self, session, subject, rtype, object):
"""delete a relation from the source"""
cu = session.pool[self.uri]
cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
{'x': self.eid2extid(subject, session),
'y': self.eid2extid(object, session)})
self._query_cache.clear()
session.entity_from_eid(subject).clear_all_caches()
session.entity_from_eid(object).clear_all_caches()
class RQL2RQL(object):
"""translate a local rql query to be executed on a distant repository"""
def __init__(self, source):
self.source = source
self.current_operator = None
def _accept_children(self, node):
res = []
for child in node.children:
rql = child.accept(self)
if rql is not None:
res.append(rql)
return res
def generate(self, session, rqlst, args):
self._session = session
self.kwargs = args
self.need_translation = False
self.has_local_eid = False
return self.visit_union(rqlst)
def visit_union(self, node):
s = self._accept_children(node)
if len(s) > 1:
return ' UNION '.join('(%s)' % q for q in s)
return s[0]
def visit_select(self, node):
"""return the tree as an encoded rql string"""
self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
self._const_var = {}
if node.distinct:
base = 'DISTINCT Any'
else:
base = 'Any'
s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))]
if node.groupby:
s.append('GROUPBY %s' % ', '.join(group.accept(self)
for group in node.groupby))
if node.orderby:
s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term)
for term in node.orderby))
if node.limit is not None:
s.append('LIMIT %s' % node.limit)
if node.offset:
s.append('OFFSET %s' % node.offset)
restrictions = []
if node.where is not None:
nr = node.where.accept(self)
if nr is not None:
restrictions.append(nr)
if restrictions:
s.append('WHERE %s' % ','.join(restrictions))
if node.having:
s.append('HAVING %s' % ', '.join(term.accept(self)
for term in node.having))
subqueries = []
for subquery in node.with_:
subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
self.visit_union(subquery.query)))
if subqueries:
s.append('WITH %s' % (','.join(subqueries)))
return ' '.join(s)
def visit_and(self, node):
res = self._accept_children(node)
if res:
return ', '.join(res)
return
def visit_or(self, node):
res = self._accept_children(node)
if len(res) > 1:
return ' OR '.join('(%s)' % rql for rql in res)
elif res:
return res[0]
return
def visit_not(self, node):
rql = node.children[0].accept(self)
if rql:
return 'NOT (%s)' % rql
return
def visit_exists(self, node):
rql = node.children[0].accept(self)
if rql:
return 'EXISTS(%s)' % rql
return
def visit_relation(self, node):
try:
if isinstance(node.children[0], Constant):
# simplified rqlst, reintroduce eid relation
try:
restr, lhs = self.process_eid_const(node.children[0])
except UnknownEid:
# can safely skip not relation with an unsupported eid
if neged_relation(node):
return
raise
else:
lhs = node.children[0].accept(self)
restr = None
except UnknownEid:
# can safely skip not relation with an unsupported eid
if neged_relation(node):
return
# XXX what about optional relation or outer NOT EXISTS()
raise
if node.optional in ('left', 'both'):
lhs += '?'
if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).final:
self.need_translation = True
self.current_operator = node.operator()
if isinstance(node.children[0], Constant):
self.current_etypes = (node.children[0].uidtype,)
else:
self.current_etypes = node.children[0].variable.stinfo['possibletypes']
try:
rhs = node.children[1].accept(self)
except UnknownEid:
# can safely skip not relation with an unsupported eid
if neged_relation(node):
return
# XXX what about optional relation or outer NOT EXISTS()
raise
except ReplaceByInOperator, ex:
rhs = 'IN (%s)' % ','.join(eid for eid in ex.eids)
self.need_translation = False
self.current_operator = None
if node.optional in ('right', 'both'):
rhs += '?'
if restr is not None:
return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
return '%s %s %s' % (lhs, node.r_type, rhs)
def visit_comparison(self, node):
if node.operator in ('=', 'IS'):
return node.children[0].accept(self)
return '%s %s' % (node.operator.encode(),
node.children[0].accept(self))
def visit_mathexpression(self, node):
return '(%s %s %s)' % (node.children[0].accept(self),
node.operator.encode(),
node.children[1].accept(self))
def visit_function(self, node):
#if node.name == 'IN':
res = []
for child in node.children:
try:
rql = child.accept(self)
except UnknownEid, ex:
continue
res.append(rql)
if not res:
raise ex
return '%s(%s)' % (node.name, ', '.join(res))
def visit_constant(self, node):
if self.need_translation or node.uidtype:
if node.type == 'Int':
self.has_local_eid = True
return str(self.eid2extid(node.value))
if node.type == 'Substitute':
key = node.value
# ensure we have not yet translated the value...
if not key in self._const_var:
self.kwargs[key] = self.eid2extid(self.kwargs[key])
self._const_var[key] = None
self.has_local_eid = True
return node.as_string()
def visit_variableref(self, node):
"""get the sql name for a variable reference"""
return node.name
def visit_sortterm(self, node):
if node.asc:
return node.term.accept(self)
return '%s DESC' % node.term.accept(self)
def process_eid_const(self, const):
value = const.eval(self.kwargs)
try:
return None, self._const_var[value]
except:
var = self._varmaker.next()
self.need_translation = True
restr = '%s eid %s' % (var, self.visit_constant(const))
self.need_translation = False
self._const_var[value] = var
return restr, var
def eid2extid(self, eid):
try:
return self.source.eid2extid(eid, self._session)
except UnknownEid:
operator = self.current_operator
if operator is not None and operator != '=':
# deal with query like "X eid > 12"
#
# The problem is that eid order in the external source may
# differ from the local source
#
# So search for all eids from this source matching the condition
# locally and then to replace the "> 12" branch by "IN (eids)"
#
# XXX we may have to insert a huge number of eids...)
sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"
etypes = ','.join("'%s'" % etype for etype in self.current_etypes)
cu = self._session.system_sql(sql % (self.source.uri, etypes,
operator, eid))
# XXX buggy cu.rowcount which may be zero while there are some
# results
rows = cu.fetchall()
if rows:
raise ReplaceByInOperator((b64decode(r[0]) for r in rows))
raise