server/sources/pyrorql.py
changeset 6655 75112ff0511d
parent 6652 592c88c8f018
child 6672 2008fd2f628c
equal deleted inserted replaced
6654:18d159a2d1ba 6655:75112ff0511d
   217         interval = int(self.config.get('synchronization-interval', 5*60))
   217         interval = int(self.config.get('synchronization-interval', 5*60))
   218         self.repo.looping_task(interval, self.synchronize)
   218         self.repo.looping_task(interval, self.synchronize)
   219         self.repo.looping_task(self._query_cache.ttl.seconds/10,
   219         self.repo.looping_task(self._query_cache.ttl.seconds/10,
   220                                self._query_cache.clear_expired)
   220                                self._query_cache.clear_expired)
   221 
   221 
   222     def map_entity_source(self, exturi):
   222     def local_eid(self, cnx, extid, session):
   223         return (exturi == 'system' or
   223         etype, dexturi, dextid = cnx.describe(extid)
   224                 not (exturi in self.repo.sources_by_uri or self._skip_externals))
   224         if dexturi == 'system' or not (
       
   225             dexturi in self.repo.sources_by_uri or self._skip_externals):
       
   226             return self.repo.extid2eid(self, str(extid), etype, session), True
       
   227         if dexturi in self.repo.sources_by_uri:
       
   228             source = self.repo.sources_by_uri[dexturi]
       
   229             cnx = session.pool.connection(source.uri)
       
   230             eid = source.local_eid(cnx, dextid, session)[0]
       
   231             return eid, False
       
   232         return None, None
   225 
   233 
   226     def synchronize(self, mtime=None):
   234     def synchronize(self, mtime=None):
   227         """synchronize content known by this repository with content in the
   235         """synchronize content known by this repository with content in the
   228         external repository
   236         external repository
   229         """
   237         """
   245         session = repo.internal_session()
   253         session = repo.internal_session()
   246         source = repo.system_source
   254         source = repo.system_source
   247         try:
   255         try:
   248             for etype, extid in modified:
   256             for etype, extid in modified:
   249                 try:
   257                 try:
   250                     exturi = cnx.describe(extid)[1]
   258                     eid = self.local_eid(cnx, extid, session)
   251                     if self.map_entity_source(exturi):
   259                     if eid is not None:
   252                         eid = self.extid2eid(str(extid), etype, session)
       
   253                         rset = session.eid_rset(eid, etype)
   260                         rset = session.eid_rset(eid, etype)
   254                         entity = rset.get_entity(0, 0)
   261                         entity = rset.get_entity(0, 0)
   255                         entity.complete(entity.e_schema.indexable_attributes())
   262                         entity.complete(entity.e_schema.indexable_attributes())
   256                         source.index_entity(session, entity)
   263                         source.index_entity(session, entity)
   257                 except:
   264                 except:
   345         if cu is None:
   352         if cu is None:
   346             # this is a ConnectionWrapper instance
   353             # this is a ConnectionWrapper instance
   347             msg = session._("can't connect to source %s, some data may be missing")
   354             msg = session._("can't connect to source %s, some data may be missing")
   348             session.set_shared_data('sources_error', msg % self.uri)
   355             session.set_shared_data('sources_error', msg % self.uri)
   349             return []
   356             return []
   350         try:
   357         translator = RQL2RQL(self)
   351             rql = RQL2RQL(self).generate(session, union, args)
   358         try:
       
   359             rql = translator.generate(session, union, args)
   352         except UnknownEid, ex:
   360         except UnknownEid, ex:
   353             if server.DEBUG:
   361             if server.DEBUG:
   354                 print '  unknown eid', ex, 'no results'
   362                 print '  unknown eid', ex, 'no results'
   355             return []
   363             return []
   356         if server.DEBUG & server.DBG_RQL:
   364         if server.DEBUG & server.DBG_RQL:
   372                     needtranslation.append(i)
   380                     needtranslation.append(i)
   373             if needtranslation:
   381             if needtranslation:
   374                 cnx = session.pool.connection(self.uri)
   382                 cnx = session.pool.connection(self.uri)
   375                 for rowindex in xrange(rset.rowcount - 1, -1, -1):
   383                 for rowindex in xrange(rset.rowcount - 1, -1, -1):
   376                     row = rows[rowindex]
   384                     row = rows[rowindex]
       
   385                     localrow = False
   377                     for colindex in needtranslation:
   386                     for colindex in needtranslation:
   378                         if row[colindex] is not None: # optional variable
   387                         if row[colindex] is not None: # optional variable
   379                             etype = descr[rowindex][colindex]
   388                             eid, local = self.local_eid(cnx, row[colindex], session)
   380                             exttype, exturi, extid = cnx.describe(row[colindex])
   389                             if local:
   381                             if self.map_entity_source(exturi):
   390                                 localrow = True
   382                                 eid = self.extid2eid(str(row[colindex]), etype,
   391                             if eid is not None:
   383                                                      session)
       
   384                                 row[colindex] = eid
   392                                 row[colindex] = eid
   385                             else:
   393                             else:
   386                                 # skip this row
   394                                 # skip this row
   387                                 del rows[rowindex]
   395                                 del rows[rowindex]
   388                                 del descr[rowindex]
   396                                 del descr[rowindex]
   389                                 break
   397                                 break
       
   398                     else:
       
   399                         # skip row if it only contains eids of entities which
       
   400                         # are actually from a source we also know locally,
       
   401                         # except if some args specified (XXX should actually
       
   402                         # check if there are some args local to the source)
       
   403                         if not (translator.has_local_eid or localrow):
       
   404                             del rows[rowindex]
       
   405                             del descr[rowindex]
   390             results = rows
   406             results = rows
   391         else:
   407         else:
   392             results = []
   408             results = []
   393         return results
   409         return results
   394 
   410 
   456 
   472 
   457     def generate(self, session, rqlst, args):
   473     def generate(self, session, rqlst, args):
   458         self._session = session
   474         self._session = session
   459         self.kwargs = args
   475         self.kwargs = args
   460         self.need_translation = False
   476         self.need_translation = False
       
   477         self.has_local_eid = False
   461         return self.visit_union(rqlst)
   478         return self.visit_union(rqlst)
   462 
   479 
   463     def visit_union(self, node):
   480     def visit_union(self, node):
   464         s = self._accept_children(node)
   481         s = self._accept_children(node)
   465         if len(s) > 1:
   482         if len(s) > 1:
   602         return '%s(%s)' % (node.name, ', '.join(res))
   619         return '%s(%s)' % (node.name, ', '.join(res))
   603 
   620 
   604     def visit_constant(self, node):
   621     def visit_constant(self, node):
   605         if self.need_translation or node.uidtype:
   622         if self.need_translation or node.uidtype:
   606             if node.type == 'Int':
   623             if node.type == 'Int':
       
   624                 self.has_local_eid = True
   607                 return str(self.eid2extid(node.value))
   625                 return str(self.eid2extid(node.value))
   608             if node.type == 'Substitute':
   626             if node.type == 'Substitute':
   609                 key = node.value
   627                 key = node.value
   610                 # ensure we have not yet translated the value...
   628                 # ensure we have not yet translated the value...
   611                 if not key in self._const_var:
   629                 if not key in self._const_var:
   612                     self.kwargs[key] = self.eid2extid(self.kwargs[key])
   630                     self.kwargs[key] = self.eid2extid(self.kwargs[key])
   613                     self._const_var[key] = None
   631                     self._const_var[key] = None
       
   632                     self.has_local_eid = True
   614         return node.as_string()
   633         return node.as_string()
   615 
   634 
   616     def visit_variableref(self, node):
   635     def visit_variableref(self, node):
   617         """get the sql name for a variable reference"""
   636         """get the sql name for a variable reference"""
   618         return node.name
   637         return node.name