server/sources/pyrorql.py
changeset 6958 861251f125cf
parent 6957 ffda12be2e9f
child 7293 97505b798975
child 7499 96412cfc28e2
equal deleted inserted replaced
6957:ffda12be2e9f 6958:861251f125cf
   125 
   125 
   126     def __init__(self, repo, source_config, eid=None):
   126     def __init__(self, repo, source_config, eid=None):
   127         AbstractSource.__init__(self, repo, source_config, eid)
   127         AbstractSource.__init__(self, repo, source_config, eid)
   128         self.update_config(None, self.check_conf_dict(eid, source_config,
   128         self.update_config(None, self.check_conf_dict(eid, source_config,
   129                                                       fail_if_unknown=False))
   129                                                       fail_if_unknown=False))
   130         myoptions = (('%s.latest-update-time' % self.uri,
       
   131                       {'type' : 'int', 'sitewide': True,
       
   132                        'default': 0,
       
   133                        'help': _('timestamp of the latest source synchronization.'),
       
   134                        'group': 'sources',
       
   135                        }),)
       
   136         register_persistent_options(myoptions)
       
   137         self._query_cache = TimedCache(1800)
   130         self._query_cache = TimedCache(1800)
   138 
   131 
   139     def update_config(self, source_entity, processed_config):
   132     def update_config(self, source_entity, processed_config):
   140         """update configuration from source entity"""
   133         """update configuration from source entity"""
   141         # XXX get it through pyro if unset
   134         # XXX get it through pyro if unset
   142         baseurl = processed_config.get('base-url')
   135         baseurl = processed_config.get('base-url')
   143         if baseurl and not baseurl.endswith('/'):
   136         if baseurl and not baseurl.endswith('/'):
   144             processed_config['base-url'] += '/'
   137             processed_config['base-url'] += '/'
   145         self.config = processed_config
   138         self.config = processed_config
   146         self._skip_externals = processed_config['skip-external-entities']
   139         self._skip_externals = processed_config['skip-external-entities']
       
   140         if source_entity is not None:
       
   141             self.latest_retrieval = source_entity.latest_retrieval
   147 
   142 
   148     def reset_caches(self):
   143     def reset_caches(self):
   149         """method called during test to reset potential source caches"""
   144         """method called during test to reset potential source caches"""
   150         self._query_cache = TimedCache(1800)
   145         self._query_cache = TimedCache(1800)
   151 
       
   152     def last_update_time(self):
       
   153         pkey = u'sources.%s.latest-update-time' % self.uri
       
   154         session = self.repo.internal_session()
       
   155         try:
       
   156             rset = session.execute('Any V WHERE X is CWProperty, X value V, X pkey %(k)s',
       
   157                                    {'k': pkey})
       
   158             if not rset:
       
   159                 # insert it
       
   160                 session.execute('INSERT CWProperty X: X pkey %(k)s, X value %(v)s',
       
   161                                 {'k': pkey, 'v': u'0'})
       
   162                 session.commit()
       
   163                 timestamp = 0
       
   164             else:
       
   165                 assert len(rset) == 1
       
   166                 timestamp = int(rset[0][0])
       
   167             return datetime.fromtimestamp(timestamp)
       
   168         finally:
       
   169             session.close()
       
   170 
   146 
   171     def init(self, activated, source_entity):
   147     def init(self, activated, source_entity):
   172         """method called by the repository once ready to handle request"""
   148         """method called by the repository once ready to handle request"""
   173         self.load_mapping(source_entity._cw)
   149         self.load_mapping(source_entity._cw)
   174         if activated:
   150         if activated:
   175             interval = self.config['synchronization-interval']
   151             interval = self.config['synchronization-interval']
   176             self.repo.looping_task(interval, self.synchronize)
   152             self.repo.looping_task(interval, self.synchronize)
   177             self.repo.looping_task(self._query_cache.ttl.seconds/10,
   153             self.repo.looping_task(self._query_cache.ttl.seconds/10,
   178                                    self._query_cache.clear_expired)
   154                                    self._query_cache.clear_expired)
       
   155             self.latest_retrieval = source_entity.latest_retrieval
   179 
   156 
   180     def load_mapping(self, session=None):
   157     def load_mapping(self, session=None):
   181         self.support_entities = {}
   158         self.support_entities = {}
   182         self.support_relations = {}
   159         self.support_relations = {}
   183         self.dont_cross_relations = set(('owned_by', 'created_by'))
   160         self.dont_cross_relations = set(('owned_by', 'created_by'))
   277             # fake connection wrapper returned when we can't connect to the
   254             # fake connection wrapper returned when we can't connect to the
   278             # external source (hence we've no chance to synchronize...)
   255             # external source (hence we've no chance to synchronize...)
   279             return
   256             return
   280         etypes = self.support_entities.keys()
   257         etypes = self.support_entities.keys()
   281         if mtime is None:
   258         if mtime is None:
   282             mtime = self.last_update_time()
   259             mtime = self.latest_retrieval
   283         updatetime, modified, deleted = extrepo.entities_modified_since(
   260         updatetime, modified, deleted = extrepo.entities_modified_since(
   284             etypes, mtime)
   261             etypes, mtime)
   285         self._query_cache.clear()
   262         self._query_cache.clear()
   286         repo = self.repo
   263         repo = self.repo
   287         session = repo.internal_session()
   264         session = repo.internal_session()
   310                                          scleanup=self.eid)
   287                                          scleanup=self.eid)
   311                 except:
   288                 except:
   312                     self.exception('while updating %s with external id %s of source %s',
   289                     self.exception('while updating %s with external id %s of source %s',
   313                                    etype, extid, self.uri)
   290                                    etype, extid, self.uri)
   314                     continue
   291                     continue
   315             session.execute('SET X value %(v)s WHERE X pkey %(k)s',
   292             self.latest_retrieval = updatetime
   316                             {'k': u'sources.%s.latest-update-time' % self.uri,
   293             session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   317                              'v': unicode(int(mktime(updatetime.timetuple())))})
   294                             {'x': self.eid, 'date': self.latest_retrieval})
   318             session.commit()
   295             session.commit()
   319         finally:
   296         finally:
   320             session.close()
   297             session.close()
   321 
   298 
   322     def _get_connection(self):
   299     def _get_connection(self):