server/sources/pyrorql.py
changeset 257 4c7d3af7e94d
child 390 739d12586b9d
equal deleted inserted replaced
256:3dbee583526c 257:4c7d3af7e94d
       
     1 """Source to query another RQL repository using pyro
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2007-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 """
       
     7 __docformat__ = "restructuredtext en"
       
     8 
       
     9 import threading
       
    10 from os.path import join
       
    11 
       
    12 from mx.DateTime import DateTimeFromTicks
       
    13 
       
    14 from Pyro.errors import PyroError, ConnectionClosedError
       
    15 
       
    16 from logilab.common.configuration import REQUIRED
       
    17 
       
    18 from rql.nodes import Constant
       
    19 from rql.utils import rqlvar_maker
       
    20 
       
    21 from cubicweb import dbapi, server
       
    22 from cubicweb import BadConnectionId, UnknownEid, ConnectionError
       
    23 from cubicweb.cwconfig import register_persistent_options
       
    24 from cubicweb.server.sources import AbstractSource, ConnectionWrapper
       
    25 
       
    26 class ReplaceByInOperator:
       
    27     def __init__(self, eids):
       
    28         self.eids = eids
       
    29         
       
    30 class PyroRQLSource(AbstractSource):
       
    31     """External repository source, using Pyro connection"""
       
    32     
       
    33     # boolean telling if modification hooks should be called when something is
       
    34     # modified in this source
       
    35     should_call_hooks = False
       
    36     # boolean telling if the repository should connect to this source during
       
    37     # migration
       
    38     connect_for_migration = False
       
    39     
       
    40     support_entities = None
       
    41     
       
    42     options = (
       
    43         # XXX pyro-ns host/port
       
    44         ('pyro-ns-id',
       
    45          {'type' : 'string',
       
    46           'default': REQUIRED,
       
    47           'help': 'identifier of the repository in the pyro name server',
       
    48           'group': 'pyro-source', 'inputlevel': 0,
       
    49           }),
       
    50         ('mapping-file',
       
    51          {'type' : 'string',
       
    52           'default': REQUIRED,
       
    53           'help': 'path to a python file with the schema mapping definition',
       
    54           'group': 'pyro-source', 'inputlevel': 1,
       
    55           }),
       
    56         ('cubicweb-user',
       
    57          {'type' : 'string',
       
    58           'default': REQUIRED,
       
    59           'help': 'user to use for connection on the distant repository',
       
    60           'group': 'pyro-source', 'inputlevel': 0,
       
    61           }),
       
    62         ('cubicweb-password',
       
    63          {'type' : 'password',
       
    64           'default': '',
       
    65           'help': 'user to use for connection on the distant repository',
       
    66           'group': 'pyro-source', 'inputlevel': 0,
       
    67           }),
       
    68         ('base-url',
       
    69          {'type' : 'string',
       
    70           'default': '',
       
    71           'help': 'url of the web site for the distant repository, if you want '
       
    72           'to generate external link to entities from this repository',
       
    73           'group': 'pyro-source', 'inputlevel': 1,
       
    74           }),
       
    75         ('pyro-ns-host',
       
    76          {'type' : 'string',
       
    77           'default': None,
       
    78           'help': 'Pyro name server\'s host. If not set, default to the value \
       
    79 from all_in_one.conf.',
       
    80           'group': 'pyro-source', 'inputlevel': 1,
       
    81           }),
       
    82         ('pyro-ns-port',
       
    83          {'type' : 'int',
       
    84           'default': None,
       
    85           'help': 'Pyro name server\'s listening port. If not set, default to \
       
    86 the value from all_in_one.conf.',
       
    87           'group': 'pyro-source', 'inputlevel': 1,
       
    88           }),
       
    89         ('pyro-ns-group',
       
    90          {'type' : 'string',
       
    91           'default': None,
       
    92           'help': 'Pyro name server\'s group where the repository will be \
       
    93 registered. If not set, default to the value from all_in_one.conf.',
       
    94           'group': 'pyro-source', 'inputlevel': 1,
       
    95           }),
       
    96         ('synchronization-interval',
       
    97          {'type' : 'int',
       
    98           'default': 5*60,
       
    99           'help': 'interval between synchronization with the external \
       
   100 repository (default to 5 minutes).',
       
   101           'group': 'pyro-source', 'inputlevel': 2,
       
   102           }),
       
   103         
       
   104     )
       
   105 
       
   106     PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
       
   107     _conn = None
       
   108 
       
   109     def __init__(self, repo, appschema, source_config, *args, **kwargs):
       
   110         AbstractSource.__init__(self, repo, appschema, source_config,
       
   111                                 *args, **kwargs)
       
   112         mappingfile = source_config['mapping-file']
       
   113         if not mappingfile[0] == '/':
       
   114             mappingfile = join(repo.config.apphome, mappingfile)
       
   115         mapping = {}
       
   116         execfile(mappingfile, mapping)
       
   117         self.support_entities = mapping['support_entities']
       
   118         self.support_relations = mapping.get('support_relations', {})
       
   119         self.dont_cross_relations = mapping.get('dont_cross_relations', ())
       
   120         baseurl = source_config.get('base-url')
       
   121         if baseurl and not baseurl.endswith('/'):
       
   122             source_config['base-url'] += '/'
       
   123         self.config = source_config
       
   124         myoptions = (('%s.latest-update-time' % self.uri,
       
   125                       {'type' : 'int', 'sitewide': True,
       
   126                        'default': 0,
       
   127                        'help': _('timestamp of the latest source synchronization.'),
       
   128                        'group': 'sources', 
       
   129                        }),)
       
   130         register_persistent_options(myoptions)
       
   131 
       
   132     def last_update_time(self):
       
   133         pkey = u'sources.%s.latest-update-time' % self.uri
       
   134         rql = 'Any V WHERE X is EProperty, X value V, X pkey %(k)s'
       
   135         session = self.repo.internal_session()
       
   136         try:
       
   137             rset = session.execute(rql, {'k': pkey})
       
   138             if not rset:
       
   139                 # insert it
       
   140                 session.execute('INSERT EProperty X: X pkey %(k)s, X value %(v)s',
       
   141                                 {'k': pkey, 'v': u'0'})
       
   142                 session.commit()
       
   143                 timestamp = 0
       
   144             else:
       
   145                 assert len(rset) == 1
       
   146                 timestamp = int(rset[0][0])
       
   147             return DateTimeFromTicks(timestamp)
       
   148         finally:
       
   149             session.close()
       
   150 
       
   151     def init(self):
       
   152         """method called by the repository once ready to handle request"""
       
   153         interval = int(self.config.get('synchronization-interval', 5*60))
       
   154         self.repo.looping_task(interval, self.synchronize) 
       
   155 
       
   156     def synchronize(self, mtime=None):
       
   157         """synchronize content known by this repository with content in the
       
   158         external repository
       
   159         """
       
   160         self.info('synchronizing pyro source %s', self.uri)
       
   161         extrepo = self.get_connection()._repo
       
   162         etypes = self.support_entities.keys()
       
   163         if mtime is None:
       
   164             mtime = self.last_update_time()
       
   165         updatetime, modified, deleted = extrepo.entities_modified_since(etypes,
       
   166                                                                         mtime)
       
   167         repo = self.repo
       
   168         session = repo.internal_session()
       
   169         try:
       
   170             for etype, extid in modified:
       
   171                 try:
       
   172                     eid = self.extid2eid(extid, etype, session)
       
   173                     rset = session.eid_rset(eid, etype)
       
   174                     entity = rset.get_entity(0, 0)
       
   175                     entity.complete(entity.e_schema.indexable_attributes())
       
   176                     repo.index_entity(session, entity)
       
   177                 except:
       
   178                     self.exception('while updating %s with external id %s of source %s',
       
   179                                    etype, extid, self.uri)
       
   180                     continue
       
   181             for etype, extid in deleted:
       
   182                 try:
       
   183                     eid = self.extid2eid(extid, etype, session, insert=False)
       
   184                     # entity has been deleted from external repository but is not known here
       
   185                     if eid is not None:
       
   186                         repo.delete_info(session, eid)
       
   187                 except:
       
   188                     self.exception('while updating %s with external id %s of source %s',
       
   189                                    etype, extid, self.uri)
       
   190                     continue
       
   191             session.execute('SET X value %(v)s WHERE X pkey %(k)s',
       
   192                             {'k': u'sources.%s.latest-update-time' % self.uri,
       
   193                              'v': unicode(int(updatetime.ticks()))})
       
   194             session.commit()
       
   195         finally:
       
   196             session.close()
       
   197                 
       
   198     def _get_connection(self):
       
   199         """open and return a connection to the source"""
       
   200         nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host']
       
   201         nsport = self.config.get('pyro-ns-port') or self.repo.config['pyro-ns-port']
       
   202         nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group']
       
   203         #cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])
       
   204         return dbapi.connect(database=self.config['pyro-ns-id'],
       
   205                              user=self.config['cubicweb-user'],
       
   206                              password=self.config['cubicweb-password'],
       
   207                              host=nshost, port=nsport, group=nsgroup,
       
   208                              setvreg=False) #cnxprops=cnxprops)
       
   209 
       
   210     def get_connection(self):
       
   211         try:
       
   212             return self._get_connection()
       
   213         except (ConnectionError, PyroError):
       
   214             self.critical("can't get connection to source %s", self.uri,
       
   215                           exc_info=1)
       
   216             return ConnectionWrapper()
       
   217         
       
   218     def check_connection(self, cnx):
       
   219         """check connection validity, return None if the connection is still valid
       
   220         else a new connection
       
   221         """
       
   222         # we have to transfer manually thread ownership. This can be done safely
       
   223         # since the pool to which belong the connection is affected to one
       
   224         # session/thread and can't be called simultaneously
       
   225         try:
       
   226             cnx._repo._transferThread(threading.currentThread())
       
   227         except AttributeError:
       
   228             # inmemory connection
       
   229             pass
       
   230         if not isinstance(cnx, ConnectionWrapper):
       
   231             try:
       
   232                 cnx.check()
       
   233                 return # ok
       
   234             except (BadConnectionId, ConnectionClosedError):
       
   235                 pass
       
   236         # try to reconnect
       
   237         return self.get_connection()
       
   238         
       
   239     
       
   240     def syntax_tree_search(self, session, union, args=None, cachekey=None,
       
   241                            varmap=None):
       
   242         """return result from this source for a rql query (actually from a rql 
       
   243         syntax tree and a solution dictionary mapping each used variable to a 
       
   244         possible type). If cachekey is given, the query necessary to fetch the
       
   245         results (but not the results themselves) may be cached using this key.
       
   246         """
       
   247         if not args is None:
       
   248             args = args.copy()
       
   249         if server.DEBUG:
       
   250             print 'RQL FOR PYRO SOURCE', self.uri
       
   251             print union.as_string()
       
   252             if args: print 'ARGS', args
       
   253             print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children)
       
   254         # get cached cursor anyway
       
   255         cu = session.pool[self.uri]
       
   256         if cu is None:
       
   257             # this is a ConnectionWrapper instance
       
   258             msg = session._("can't connect to source %s, some data may be missing")
       
   259             session.set_shared_data('sources_error', msg % self.uri)
       
   260             return []
       
   261         try:
       
   262             rql, cachekey = RQL2RQL(self).generate(session, union, args)
       
   263         except UnknownEid, ex:
       
   264             if server.DEBUG:
       
   265                 print 'unknown eid', ex, 'no results'
       
   266             return []
       
   267         if server.DEBUG:
       
   268             print 'TRANSLATED RQL', rql
       
   269         try:
       
   270             rset = cu.execute(rql, args, cachekey)
       
   271         except Exception, ex:
       
   272             self.exception(str(ex))
       
   273             msg = session._("error while querying source %s, some data may be missing")
       
   274             session.set_shared_data('sources_error', msg % self.uri)
       
   275             return []
       
   276         descr = rset.description
       
   277         if rset:
       
   278             needtranslation = []
       
   279             for i, etype in enumerate(descr[0]):
       
   280                 if (etype is None or not self.schema.eschema(etype).is_final() or
       
   281                     getattr(union.locate_subquery(i, etype, args).selection[i], 'uidtype', None)):
       
   282                     needtranslation.append(i)
       
   283             if needtranslation:
       
   284                 for rowindex, row in enumerate(rset):
       
   285                     for colindex in needtranslation:
       
   286                         if row[colindex] is not None: # optional variable
       
   287                             etype = descr[rowindex][colindex]
       
   288                             eid = self.extid2eid(row[colindex], etype, session)
       
   289                             row[colindex] = eid
       
   290             results = rset.rows
       
   291         else:
       
   292             results = []
       
   293         if server.DEBUG:
       
   294             if len(results)>10:
       
   295                 print '--------------->', results[:10], '...', len(results)
       
   296             else:
       
   297                 print '--------------->', results
       
   298         return results
       
   299 
       
   300     def _entity_relations_and_kwargs(self, session, entity):
       
   301         relations = []
       
   302         kwargs = {'x': self.eid2extid(entity.eid, session)}
       
   303         for key, val in entity.iteritems():
       
   304             relations.append('X %s %%(%s)s' % (key, key))
       
   305             kwargs[key] = val
       
   306         return relations, kwargs
       
   307     
       
   308     def add_entity(self, session, entity):
       
   309         """add a new entity to the source"""
       
   310         raise NotImplementedError()
       
   311         
       
   312     def update_entity(self, session, entity):
       
   313         """update an entity in the source"""
       
   314         relations, kwargs = self._entity_relations_and_kwargs(session, entity)
       
   315         cu = session.pool[self.uri]
       
   316         cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations),
       
   317                    kwargs, 'x')
       
   318 
       
   319     def delete_entity(self, session, etype, eid):
       
   320         """delete an entity from the source"""
       
   321         cu = session.pool[self.uri]
       
   322         cu.execute('DELETE %s X WHERE X eid %%(x)s' % etype,
       
   323                    {'x': self.eid2extid(eid, session)}, 'x')
       
   324 
       
   325     def add_relation(self, session, subject, rtype, object):
       
   326         """add a relation to the source"""
       
   327         cu = session.pool[self.uri]
       
   328         cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   329                    {'x': self.eid2extid(subject, session),
       
   330                     'y': self.eid2extid(object, session)}, ('x', 'y'))
       
   331     
       
   332     def delete_relation(self, session, subject, rtype, object):
       
   333         """delete a relation from the source"""
       
   334         cu = session.pool[self.uri]
       
   335         cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   336                    {'x': self.eid2extid(subject, session),
       
   337                     'y': self.eid2extid(object, session)}, ('x', 'y'))
       
   338 
       
   339 
       
   340 class RQL2RQL(object):
       
   341     """translate a local rql query to be executed on a distant repository"""
       
   342     def __init__(self, source):
       
   343         self.source = source
       
   344 
       
   345     def _accept_children(self, node):
       
   346         res = []
       
   347         for child in node.children:
       
   348             rql = child.accept(self)
       
   349             if rql is not None:
       
   350                 res.append(rql)
       
   351         return res
       
   352         
       
   353     def generate(self, session, rqlst, args):
       
   354         self._session = session 
       
   355         self.kwargs = args
       
   356         self.cachekey = []
       
   357         self.need_translation = False
       
   358         return self.visit_union(rqlst), self.cachekey
       
   359     
       
   360     def visit_union(self, node):
       
   361         s = self._accept_children(node)
       
   362         if len(s) > 1:
       
   363             return ' UNION '.join('(%s)' % q for q in s)
       
   364         return s[0]
       
   365     
       
   366     def visit_select(self, node):
       
   367         """return the tree as an encoded rql string"""
       
   368         self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
       
   369         self._const_var = {}
       
   370         if node.distinct:
       
   371             base = 'DISTINCT Any'
       
   372         else:
       
   373             base = 'Any'
       
   374         s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))]
       
   375         if node.groupby:
       
   376             s.append('GROUPBY %s' % ', '.join(group.accept(self)
       
   377                                               for group in node.groupby))
       
   378         if node.orderby:
       
   379             s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term)
       
   380                                               for term in node.orderby))
       
   381         if node.limit is not None:
       
   382             s.append('LIMIT %s' % node.limit)
       
   383         if node.offset:
       
   384             s.append('OFFSET %s' % node.offset)
       
   385         restrictions = []
       
   386         if node.where is not None:
       
   387             nr = node.where.accept(self)
       
   388             if nr is not None:
       
   389                 restrictions.append(nr)
       
   390         if restrictions:
       
   391             s.append('WHERE %s' % ','.join(restrictions))
       
   392         
       
   393         if node.having:
       
   394             s.append('HAVING %s' % ', '.join(term.accept(self)
       
   395                                              for term in node.having))
       
   396         subqueries = []
       
   397         for subquery in node.with_:
       
   398             subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
       
   399                                                  self.visit_union(subquery.query)))
       
   400         if subqueries:
       
   401             s.append('WITH %s' % (','.join(subqueries)))
       
   402         return ' '.join(s)
       
   403     
       
   404     def visit_and(self, node):
       
   405         res = self._accept_children(node)
       
   406         if res:
       
   407             return ', '.join(res)
       
   408         return
       
   409     
       
   410     def visit_or(self, node):
       
   411         res = self._accept_children(node)
       
   412         if len(res) > 1:
       
   413             return ' OR '.join('(%s)' % rql for rql in res)
       
   414         elif res:
       
   415             return res[0]
       
   416         return
       
   417     
       
   418     def visit_not(self, node):
       
   419         rql = node.children[0].accept(self)
       
   420         if rql:
       
   421             return 'NOT (%s)' % rql
       
   422         return
       
   423     
       
   424     def visit_exists(self, node):
       
   425         return 'EXISTS(%s)' % node.children[0].accept(self)
       
   426         
       
   427     def visit_relation(self, node):
       
   428         try:
       
   429             if isinstance(node.children[0], Constant):
       
   430                 # simplified rqlst, reintroduce eid relation
       
   431                 restr, lhs = self.process_eid_const(node.children[0])
       
   432             else:
       
   433                 lhs = node.children[0].accept(self)
       
   434                 restr = None
       
   435         except UnknownEid:
       
   436             # can safely skip not relation with an unsupported eid
       
   437             if node.neged(strict=True):
       
   438                 return
       
   439             # XXX what about optional relation or outer NOT EXISTS()
       
   440             raise
       
   441         if node.optional in ('left', 'both'):
       
   442             lhs += '?'
       
   443         if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).is_final():
       
   444             self.need_translation = True
       
   445             self.current_operator = node.operator()
       
   446             if isinstance(node.children[0], Constant):
       
   447                 self.current_etypes = (node.children[0].uidtype,)
       
   448             else:
       
   449                 self.current_etypes = node.children[0].variable.stinfo['possibletypes']
       
   450         try:
       
   451             rhs = node.children[1].accept(self)
       
   452         except UnknownEid:
       
   453             # can safely skip not relation with an unsupported eid
       
   454             if node.neged(strict=True):
       
   455                 return
       
   456             # XXX what about optional relation or outer NOT EXISTS()
       
   457             raise
       
   458         except ReplaceByInOperator, ex:
       
   459             rhs = 'IN (%s)' % ','.join(str(eid) for eid in ex.eids)
       
   460         self.need_translation = False
       
   461         self.current_operator = None
       
   462         if node.optional in ('right', 'both'):
       
   463             rhs += '?'
       
   464         if restr is not None:
       
   465             return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
       
   466         return '%s %s %s' % (lhs, node.r_type, rhs)
       
   467         
       
   468     def visit_comparison(self, node):
       
   469         if node.operator in ('=', 'IS'):
       
   470             return node.children[0].accept(self)
       
   471         return '%s %s' % (node.operator.encode(),
       
   472                           node.children[0].accept(self))
       
   473             
       
   474     def visit_mathexpression(self, node):
       
   475         return '(%s %s %s)' % (node.children[0].accept(self),
       
   476                                node.operator.encode(),
       
   477                                node.children[1].accept(self))
       
   478         
       
   479     def visit_function(self, node):
       
   480         #if node.name == 'IN':
       
   481         res = []
       
   482         for child in node.children:
       
   483             try:
       
   484                 rql = child.accept(self)
       
   485             except UnknownEid, ex:
       
   486                 continue
       
   487             res.append(rql)
       
   488         if not res:
       
   489             raise ex
       
   490         return '%s(%s)' % (node.name, ', '.join(res))
       
   491         
       
   492     def visit_constant(self, node):
       
   493         if self.need_translation or node.uidtype:
       
   494             if node.type == 'Int':
       
   495                 return str(self.eid2extid(node.value))
       
   496             if node.type == 'Substitute':
       
   497                 key = node.value
       
   498                 # ensure we have not yet translated the value...
       
   499                 if not key in self._const_var:
       
   500                     self.kwargs[key] = self.eid2extid(self.kwargs[key])
       
   501                     self.cachekey.append(key)
       
   502                     self._const_var[key] = None
       
   503         return node.as_string()
       
   504 
       
   505     def visit_variableref(self, node):
       
   506         """get the sql name for a variable reference"""
       
   507         return node.name
       
   508 
       
   509     def visit_sortterm(self, node):
       
   510         if node.asc:
       
   511             return node.term.accept(self)
       
   512         return '%s DESC' % node.term.accept(self)
       
   513 
       
   514     def process_eid_const(self, const):
       
   515         value = const.eval(self.kwargs)
       
   516         try:
       
   517             return None, self._const_var[value]
       
   518         except:
       
   519             var = self._varmaker.next()
       
   520             self.need_translation = True
       
   521             restr = '%s eid %s' % (var, self.visit_constant(const))
       
   522             self.need_translation = False
       
   523             self._const_var[value] = var
       
   524             return restr, var
       
   525 
       
   526     def eid2extid(self, eid):
       
   527         try:
       
   528             return self.source.eid2extid(eid, self._session)        
       
   529         except UnknownEid:
       
   530             operator = self.current_operator
       
   531             if operator is not None and operator != '=':
       
   532                 # deal with query like X eid > 12
       
   533                 #
       
   534                 # The problem is
       
   535                 # that eid order in the external source may differ from the
       
   536                 # local source
       
   537                 #
       
   538                 # So search for all eids from this
       
   539                 # source matching the condition locally and then to replace the
       
   540                 # > 12 branch by IN (eids) (XXX we may have to insert a huge
       
   541                 # number of eids...)
       
   542                 # planner so that
       
   543                 sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"
       
   544                 etypes = ','.join("'%s'" % etype for etype in self.current_etypes)
       
   545                 cu = self._session.system_sql(sql % (self.source.uri, etypes,
       
   546                                                       operator, eid))
       
   547                 # XXX buggy cu.rowcount which may be zero while there are some
       
   548                 # results
       
   549                 rows = cu.fetchall()
       
   550                 if rows:
       
   551                     raise ReplaceByInOperator((r[0] for r in rows))
       
   552             raise
       
   553