125 |
125 |
126 def __init__(self, repo, source_config, eid=None): |
126 def __init__(self, repo, source_config, eid=None): |
127 AbstractSource.__init__(self, repo, source_config, eid) |
127 AbstractSource.__init__(self, repo, source_config, eid) |
128 self.update_config(None, self.check_conf_dict(eid, source_config, |
128 self.update_config(None, self.check_conf_dict(eid, source_config, |
129 fail_if_unknown=False)) |
129 fail_if_unknown=False)) |
130 myoptions = (('%s.latest-update-time' % self.uri, |
|
131 {'type' : 'int', 'sitewide': True, |
|
132 'default': 0, |
|
133 'help': _('timestamp of the latest source synchronization.'), |
|
134 'group': 'sources', |
|
135 }),) |
|
136 register_persistent_options(myoptions) |
|
137 self._query_cache = TimedCache(1800) |
130 self._query_cache = TimedCache(1800) |
138 |
131 |
139 def update_config(self, source_entity, processed_config): |
132 def update_config(self, source_entity, processed_config): |
140 """update configuration from source entity""" |
133 """update configuration from source entity""" |
141 # XXX get it through pyro if unset |
134 # XXX get it through pyro if unset |
142 baseurl = processed_config.get('base-url') |
135 baseurl = processed_config.get('base-url') |
143 if baseurl and not baseurl.endswith('/'): |
136 if baseurl and not baseurl.endswith('/'): |
144 processed_config['base-url'] += '/' |
137 processed_config['base-url'] += '/' |
145 self.config = processed_config |
138 self.config = processed_config |
146 self._skip_externals = processed_config['skip-external-entities'] |
139 self._skip_externals = processed_config['skip-external-entities'] |
|
140 if source_entity is not None: |
|
141 self.latest_retrieval = source_entity.latest_retrieval |
147 |
142 |
148 def reset_caches(self): |
143 def reset_caches(self): |
149 """method called during test to reset potential source caches""" |
144 """method called during test to reset potential source caches""" |
150 self._query_cache = TimedCache(1800) |
145 self._query_cache = TimedCache(1800) |
151 |
|
152 def last_update_time(self): |
|
153 pkey = u'sources.%s.latest-update-time' % self.uri |
|
154 session = self.repo.internal_session() |
|
155 try: |
|
156 rset = session.execute('Any V WHERE X is CWProperty, X value V, X pkey %(k)s', |
|
157 {'k': pkey}) |
|
158 if not rset: |
|
159 # insert it |
|
160 session.execute('INSERT CWProperty X: X pkey %(k)s, X value %(v)s', |
|
161 {'k': pkey, 'v': u'0'}) |
|
162 session.commit() |
|
163 timestamp = 0 |
|
164 else: |
|
165 assert len(rset) == 1 |
|
166 timestamp = int(rset[0][0]) |
|
167 return datetime.fromtimestamp(timestamp) |
|
168 finally: |
|
169 session.close() |
|
170 |
146 |
171 def init(self, activated, source_entity): |
147 def init(self, activated, source_entity): |
172 """method called by the repository once ready to handle request""" |
148 """method called by the repository once ready to handle request""" |
173 self.load_mapping(source_entity._cw) |
149 self.load_mapping(source_entity._cw) |
174 if activated: |
150 if activated: |
175 interval = self.config['synchronization-interval'] |
151 interval = self.config['synchronization-interval'] |
176 self.repo.looping_task(interval, self.synchronize) |
152 self.repo.looping_task(interval, self.synchronize) |
177 self.repo.looping_task(self._query_cache.ttl.seconds/10, |
153 self.repo.looping_task(self._query_cache.ttl.seconds/10, |
178 self._query_cache.clear_expired) |
154 self._query_cache.clear_expired) |
|
155 self.latest_retrieval = source_entity.latest_retrieval |
179 |
156 |
180 def load_mapping(self, session=None): |
157 def load_mapping(self, session=None): |
181 self.support_entities = {} |
158 self.support_entities = {} |
182 self.support_relations = {} |
159 self.support_relations = {} |
183 self.dont_cross_relations = set(('owned_by', 'created_by')) |
160 self.dont_cross_relations = set(('owned_by', 'created_by')) |