server/sources/pyrorql.py
branchtls-sprint
changeset 1791 c77629112437
parent 1398 5fe84a5f7035
child 1805 7bad2785d167
equal deleted inserted replaced
1790:a3e2b079de00 1791:c77629112437
    25 from cubicweb.server.sources import AbstractSource, ConnectionWrapper, TimedCache
    25 from cubicweb.server.sources import AbstractSource, ConnectionWrapper, TimedCache
    26 
    26 
    27 class ReplaceByInOperator(Exception):
    27 class ReplaceByInOperator(Exception):
    28     def __init__(self, eids):
    28     def __init__(self, eids):
    29         self.eids = eids
    29         self.eids = eids
    30         
    30 
    31 class PyroRQLSource(AbstractSource):
    31 class PyroRQLSource(AbstractSource):
    32     """External repository source, using Pyro connection"""
    32     """External repository source, using Pyro connection"""
    33     
    33 
    34     # boolean telling if modification hooks should be called when something is
    34     # boolean telling if modification hooks should be called when something is
    35     # modified in this source
    35     # modified in this source
    36     should_call_hooks = False
    36     should_call_hooks = False
    37     # boolean telling if the repository should connect to this source during
    37     # boolean telling if the repository should connect to this source during
    38     # migration
    38     # migration
    39     connect_for_migration = False
    39     connect_for_migration = False
    40     
    40 
    41     support_entities = None
    41     support_entities = None
    42     
    42 
    43     options = (
    43     options = (
    44         # XXX pyro-ns host/port
    44         # XXX pyro-ns host/port
    45         ('pyro-ns-id',
    45         ('pyro-ns-id',
    46          {'type' : 'string',
    46          {'type' : 'string',
    47           'default': REQUIRED,
    47           'default': REQUIRED,
    99           'default': 5*60,
    99           'default': 5*60,
   100           'help': 'interval between synchronization with the external \
   100           'help': 'interval between synchronization with the external \
   101 repository (default to 5 minutes).',
   101 repository (default to 5 minutes).',
   102           'group': 'pyro-source', 'inputlevel': 2,
   102           'group': 'pyro-source', 'inputlevel': 2,
   103           }),
   103           }),
   104         
   104 
   105     )
   105     )
   106 
   106 
   107     PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
   107     PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
   108     _conn = None
   108     _conn = None
   109 
   109 
   125         self.config = source_config
   125         self.config = source_config
   126         myoptions = (('%s.latest-update-time' % self.uri,
   126         myoptions = (('%s.latest-update-time' % self.uri,
   127                       {'type' : 'int', 'sitewide': True,
   127                       {'type' : 'int', 'sitewide': True,
   128                        'default': 0,
   128                        'default': 0,
   129                        'help': _('timestamp of the latest source synchronization.'),
   129                        'help': _('timestamp of the latest source synchronization.'),
   130                        'group': 'sources', 
   130                        'group': 'sources',
   131                        }),)
   131                        }),)
   132         register_persistent_options(myoptions)
   132         register_persistent_options(myoptions)
   133         self._query_cache = TimedCache(30)
   133         self._query_cache = TimedCache(30)
   134 
   134 
   135     def last_update_time(self):
   135     def last_update_time(self):
   152             session.close()
   152             session.close()
   153 
   153 
   154     def init(self):
   154     def init(self):
   155         """method called by the repository once ready to handle request"""
   155         """method called by the repository once ready to handle request"""
   156         interval = int(self.config.get('synchronization-interval', 5*60))
   156         interval = int(self.config.get('synchronization-interval', 5*60))
   157         self.repo.looping_task(interval, self.synchronize) 
   157         self.repo.looping_task(interval, self.synchronize)
   158         self.repo.looping_task(self._query_cache.ttl.seconds/10, self._query_cache.clear_expired) 
   158         self.repo.looping_task(self._query_cache.ttl.seconds/10, self._query_cache.clear_expired)
   159 
   159 
   160     def synchronize(self, mtime=None):
   160     def synchronize(self, mtime=None):
   161         """synchronize content known by this repository with content in the
   161         """synchronize content known by this repository with content in the
   162         external repository
   162         external repository
   163         """
   163         """
   167         etypes = self.support_entities.keys()
   167         etypes = self.support_entities.keys()
   168         if mtime is None:
   168         if mtime is None:
   169             mtime = self.last_update_time()
   169             mtime = self.last_update_time()
   170         updatetime, modified, deleted = extrepo.entities_modified_since(etypes,
   170         updatetime, modified, deleted = extrepo.entities_modified_since(etypes,
   171                                                                         mtime)
   171                                                                         mtime)
       
   172         self._query_cache.clear()
   172         repo = self.repo
   173         repo = self.repo
   173         session = repo.internal_session()
   174         session = repo.internal_session()
   174         try:
   175         try:
   175             for etype, extid in modified:
   176             for etype, extid in modified:
   176                 try:
   177                 try:
   199                             {'k': u'sources.%s.latest-update-time' % self.uri,
   200                             {'k': u'sources.%s.latest-update-time' % self.uri,
   200                              'v': unicode(int(mktime(updatetime.timetuple())))})
   201                              'v': unicode(int(mktime(updatetime.timetuple())))})
   201             session.commit()
   202             session.commit()
   202         finally:
   203         finally:
   203             session.close()
   204             session.close()
   204                 
   205 
   205     def _get_connection(self):
   206     def _get_connection(self):
   206         """open and return a connection to the source"""
   207         """open and return a connection to the source"""
   207         nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host']
   208         nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host']
   208         nsport = self.config.get('pyro-ns-port') or self.repo.config['pyro-ns-port']
   209         nsport = self.config.get('pyro-ns-port') or self.repo.config['pyro-ns-port']
   209         nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group']
   210         nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group']
   219             return self._get_connection()
   220             return self._get_connection()
   220         except (ConnectionError, PyroError):
   221         except (ConnectionError, PyroError):
   221             self.critical("can't get connection to source %s", self.uri,
   222             self.critical("can't get connection to source %s", self.uri,
   222                           exc_info=1)
   223                           exc_info=1)
   223             return ConnectionWrapper()
   224             return ConnectionWrapper()
   224         
   225 
   225     def check_connection(self, cnx):
   226     def check_connection(self, cnx):
   226         """check connection validity, return None if the connection is still valid
   227         """check connection validity, return None if the connection is still valid
   227         else a new connection
   228         else a new connection
   228         """
   229         """
   229         # we have to transfer manually thread ownership. This can be done safely
   230         # we have to transfer manually thread ownership. This can be done safely
   240                 return # ok
   241                 return # ok
   241             except (BadConnectionId, ConnectionClosedError):
   242             except (BadConnectionId, ConnectionClosedError):
   242                 pass
   243                 pass
   243         # try to reconnect
   244         # try to reconnect
   244         return self.get_connection()
   245         return self.get_connection()
   245         
   246 
   246     def syntax_tree_search(self, session, union, args=None, cachekey=None,
   247     def syntax_tree_search(self, session, union, args=None, cachekey=None,
   247                            varmap=None):
   248                            varmap=None):
   248         #assert not varmap, (varmap, union)
   249         #assert not varmap, (varmap, union)
   249         rqlkey = union.as_string(kwargs=args)
   250         rqlkey = union.as_string(kwargs=args)
   250         try:
   251         try:
   251             results = self._query_cache[rqlkey]
   252             results = self._query_cache[rqlkey]
   252         except KeyError:
   253         except KeyError:
   253             results = self._syntax_tree_search(session, union, args)
   254             results = self._syntax_tree_search(session, union, args)
   254             self._query_cache[rqlkey] = results
   255             self._query_cache[rqlkey] = results
   255         return results
   256         return results
   256     
   257 
   257     def _syntax_tree_search(self, session, union, args):
   258     def _syntax_tree_search(self, session, union, args):
   258         """return result from this source for a rql query (actually from a rql 
   259         """return result from this source for a rql query (actually from a rql
   259         syntax tree and a solution dictionary mapping each used variable to a 
   260         syntax tree and a solution dictionary mapping each used variable to a
   260         possible type). If cachekey is given, the query necessary to fetch the
   261         possible type). If cachekey is given, the query necessary to fetch the
   261         results (but not the results themselves) may be cached using this key.
   262         results (but not the results themselves) may be cached using this key.
   262         """
   263         """
   263         if not args is None:
   264         if not args is None:
   264             args = args.copy()
   265             args = args.copy()
   328         kwargs = {'x': self.eid2extid(entity.eid, session)}
   329         kwargs = {'x': self.eid2extid(entity.eid, session)}
   329         for key, val in entity.iteritems():
   330         for key, val in entity.iteritems():
   330             relations.append('X %s %%(%s)s' % (key, key))
   331             relations.append('X %s %%(%s)s' % (key, key))
   331             kwargs[key] = val
   332             kwargs[key] = val
   332         return relations, kwargs
   333         return relations, kwargs
   333     
   334 
   334     def add_entity(self, session, entity):
   335     def add_entity(self, session, entity):
   335         """add a new entity to the source"""
   336         """add a new entity to the source"""
   336         raise NotImplementedError()
   337         raise NotImplementedError()
   337         
   338 
   338     def update_entity(self, session, entity):
   339     def update_entity(self, session, entity):
   339         """update an entity in the source"""
   340         """update an entity in the source"""
   340         relations, kwargs = self._entity_relations_and_kwargs(session, entity)
   341         relations, kwargs = self._entity_relations_and_kwargs(session, entity)
   341         cu = session.pool[self.uri]
   342         cu = session.pool[self.uri]
   342         cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations),
   343         cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations),
   352         """add a relation to the source"""
   353         """add a relation to the source"""
   353         cu = session.pool[self.uri]
   354         cu = session.pool[self.uri]
   354         cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   355         cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   355                    {'x': self.eid2extid(subject, session),
   356                    {'x': self.eid2extid(subject, session),
   356                     'y': self.eid2extid(object, session)}, ('x', 'y'))
   357                     'y': self.eid2extid(object, session)}, ('x', 'y'))
   357     
   358 
   358     def delete_relation(self, session, subject, rtype, object):
   359     def delete_relation(self, session, subject, rtype, object):
   359         """delete a relation from the source"""
   360         """delete a relation from the source"""
   360         cu = session.pool[self.uri]
   361         cu = session.pool[self.uri]
   361         cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   362         cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   362                    {'x': self.eid2extid(subject, session),
   363                    {'x': self.eid2extid(subject, session),
   366 class RQL2RQL(object):
   367 class RQL2RQL(object):
   367     """translate a local rql query to be executed on a distant repository"""
   368     """translate a local rql query to be executed on a distant repository"""
   368     def __init__(self, source):
   369     def __init__(self, source):
   369         self.source = source
   370         self.source = source
   370         self.current_operator = None
   371         self.current_operator = None
   371         
   372 
   372     def _accept_children(self, node):
   373     def _accept_children(self, node):
   373         res = []
   374         res = []
   374         for child in node.children:
   375         for child in node.children:
   375             rql = child.accept(self)
   376             rql = child.accept(self)
   376             if rql is not None:
   377             if rql is not None:
   377                 res.append(rql)
   378                 res.append(rql)
   378         return res
   379         return res
   379         
   380 
   380     def generate(self, session, rqlst, args):
   381     def generate(self, session, rqlst, args):
   381         self._session = session 
   382         self._session = session
   382         self.kwargs = args
   383         self.kwargs = args
   383         self.cachekey = []
   384         self.cachekey = []
   384         self.need_translation = False
   385         self.need_translation = False
   385         return self.visit_union(rqlst), self.cachekey
   386         return self.visit_union(rqlst), self.cachekey
   386     
   387 
   387     def visit_union(self, node):
   388     def visit_union(self, node):
   388         s = self._accept_children(node)
   389         s = self._accept_children(node)
   389         if len(s) > 1:
   390         if len(s) > 1:
   390             return ' UNION '.join('(%s)' % q for q in s)
   391             return ' UNION '.join('(%s)' % q for q in s)
   391         return s[0]
   392         return s[0]
   392     
   393 
   393     def visit_select(self, node):
   394     def visit_select(self, node):
   394         """return the tree as an encoded rql string"""
   395         """return the tree as an encoded rql string"""
   395         self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
   396         self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
   396         self._const_var = {}
   397         self._const_var = {}
   397         if node.distinct:
   398         if node.distinct:
   414             nr = node.where.accept(self)
   415             nr = node.where.accept(self)
   415             if nr is not None:
   416             if nr is not None:
   416                 restrictions.append(nr)
   417                 restrictions.append(nr)
   417         if restrictions:
   418         if restrictions:
   418             s.append('WHERE %s' % ','.join(restrictions))
   419             s.append('WHERE %s' % ','.join(restrictions))
   419         
   420 
   420         if node.having:
   421         if node.having:
   421             s.append('HAVING %s' % ', '.join(term.accept(self)
   422             s.append('HAVING %s' % ', '.join(term.accept(self)
   422                                              for term in node.having))
   423                                              for term in node.having))
   423         subqueries = []
   424         subqueries = []
   424         for subquery in node.with_:
   425         for subquery in node.with_:
   425             subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
   426             subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
   426                                                  self.visit_union(subquery.query)))
   427                                                  self.visit_union(subquery.query)))
   427         if subqueries:
   428         if subqueries:
   428             s.append('WITH %s' % (','.join(subqueries)))
   429             s.append('WITH %s' % (','.join(subqueries)))
   429         return ' '.join(s)
   430         return ' '.join(s)
   430     
   431 
   431     def visit_and(self, node):
   432     def visit_and(self, node):
   432         res = self._accept_children(node)
   433         res = self._accept_children(node)
   433         if res:
   434         if res:
   434             return ', '.join(res)
   435             return ', '.join(res)
   435         return
   436         return
   436     
   437 
   437     def visit_or(self, node):
   438     def visit_or(self, node):
   438         res = self._accept_children(node)
   439         res = self._accept_children(node)
   439         if len(res) > 1:
   440         if len(res) > 1:
   440             return ' OR '.join('(%s)' % rql for rql in res)
   441             return ' OR '.join('(%s)' % rql for rql in res)
   441         elif res:
   442         elif res:
   442             return res[0]
   443             return res[0]
   443         return
   444         return
   444     
   445 
   445     def visit_not(self, node):
   446     def visit_not(self, node):
   446         rql = node.children[0].accept(self)
   447         rql = node.children[0].accept(self)
   447         if rql:
   448         if rql:
   448             return 'NOT (%s)' % rql
   449             return 'NOT (%s)' % rql
   449         return
   450         return
   450     
   451 
   451     def visit_exists(self, node):
   452     def visit_exists(self, node):
   452         return 'EXISTS(%s)' % node.children[0].accept(self)
   453         return 'EXISTS(%s)' % node.children[0].accept(self)
   453         
   454 
   454     def visit_relation(self, node):
   455     def visit_relation(self, node):
   455         try:
   456         try:
   456             if isinstance(node.children[0], Constant):
   457             if isinstance(node.children[0], Constant):
   457                 # simplified rqlst, reintroduce eid relation
   458                 # simplified rqlst, reintroduce eid relation
   458                 try:
   459                 try:
   495         if node.optional in ('right', 'both'):
   496         if node.optional in ('right', 'both'):
   496             rhs += '?'
   497             rhs += '?'
   497         if restr is not None:
   498         if restr is not None:
   498             return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
   499             return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
   499         return '%s %s %s' % (lhs, node.r_type, rhs)
   500         return '%s %s %s' % (lhs, node.r_type, rhs)
   500         
   501 
   501     def visit_comparison(self, node):
   502     def visit_comparison(self, node):
   502         if node.operator in ('=', 'IS'):
   503         if node.operator in ('=', 'IS'):
   503             return node.children[0].accept(self)
   504             return node.children[0].accept(self)
   504         return '%s %s' % (node.operator.encode(),
   505         return '%s %s' % (node.operator.encode(),
   505                           node.children[0].accept(self))
   506                           node.children[0].accept(self))
   506             
   507 
   507     def visit_mathexpression(self, node):
   508     def visit_mathexpression(self, node):
   508         return '(%s %s %s)' % (node.children[0].accept(self),
   509         return '(%s %s %s)' % (node.children[0].accept(self),
   509                                node.operator.encode(),
   510                                node.operator.encode(),
   510                                node.children[1].accept(self))
   511                                node.children[1].accept(self))
   511         
   512 
   512     def visit_function(self, node):
   513     def visit_function(self, node):
   513         #if node.name == 'IN':
   514         #if node.name == 'IN':
   514         res = []
   515         res = []
   515         for child in node.children:
   516         for child in node.children:
   516             try:
   517             try:
   519                 continue
   520                 continue
   520             res.append(rql)
   521             res.append(rql)
   521         if not res:
   522         if not res:
   522             raise ex
   523             raise ex
   523         return '%s(%s)' % (node.name, ', '.join(res))
   524         return '%s(%s)' % (node.name, ', '.join(res))
   524         
   525 
   525     def visit_constant(self, node):
   526     def visit_constant(self, node):
   526         if self.need_translation or node.uidtype:
   527         if self.need_translation or node.uidtype:
   527             if node.type == 'Int':
   528             if node.type == 'Int':
   528                 return str(self.eid2extid(node.value))
   529                 return str(self.eid2extid(node.value))
   529             if node.type == 'Substitute':
   530             if node.type == 'Substitute':
   556             self._const_var[value] = var
   557             self._const_var[value] = var
   557             return restr, var
   558             return restr, var
   558 
   559 
   559     def eid2extid(self, eid):
   560     def eid2extid(self, eid):
   560         try:
   561         try:
   561             return self.source.eid2extid(eid, self._session)        
   562             return self.source.eid2extid(eid, self._session)
   562         except UnknownEid:
   563         except UnknownEid:
   563             operator = self.current_operator
   564             operator = self.current_operator
   564             if operator is not None and operator != '=':
   565             if operator is not None and operator != '=':
   565                 # deal with query like X eid > 12
   566                 # deal with query like X eid > 12
   566                 #
   567                 #
   581                 # results
   582                 # results
   582                 rows = cu.fetchall()
   583                 rows = cu.fetchall()
   583                 if rows:
   584                 if rows:
   584                     raise ReplaceByInOperator((r[0] for r in rows))
   585                     raise ReplaceByInOperator((r[0] for r in rows))
   585             raise
   586             raise
   586                 
   587