124 def fresh(self): |
124 def fresh(self): |
125 if self.latest_retrieval is None: |
125 if self.latest_retrieval is None: |
126 return False |
126 return False |
127 return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) |
127 return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) |
128 |
128 |
|
129 def update_latest_retrieval(self, session): |
|
130 self.latest_retrieval = datetime.utcnow() |
|
131 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
|
132 {'x': self.eid, 'date': self.latest_retrieval}) |
|
133 |
129 def pull_data(self, session, force=False, raise_on_error=False): |
134 def pull_data(self, session, force=False, raise_on_error=False): |
130 if not force and self.fresh(): |
135 if not force and self.fresh(): |
131 return {} |
136 return {} |
132 if self.config['delete-entities']: |
137 if self.config['delete-entities']: |
133 myuris = self.source_cwuris(session) |
138 myuris = self.source_cwuris(session) |
134 else: |
139 else: |
135 myuris = None |
140 myuris = None |
136 parser = self._get_parser(session, sourceuris=myuris) |
141 parser = self._get_parser(session, sourceuris=myuris) |
|
142 if self.process_urls(parser, self.urls, raise_on_error): |
|
143 self.warning("some error occured, don't attempt to delete entities") |
|
144 elif self.config['delete-entities'] and myuris: |
|
145 byetype = {} |
|
146 for eid, etype in myuris.values(): |
|
147 byetype.setdefault(etype, []).append(str(eid)) |
|
148 self.error('delete %s entities %s', self.uri, byetype) |
|
149 for etype, eids in byetype.iteritems(): |
|
150 session.execute('DELETE %s X WHERE X eid IN (%s)' |
|
151 % (etype, ','.join(eids))) |
|
152 self.update_latest_retrieval(session) |
|
153 return parser.stats |
|
154 |
|
155 def process_urls(self, parser, urls, raise_on_error=False): |
137 error = False |
156 error = False |
138 self.info('pulling data for source %s', self.uri) |
157 for url in urls: |
139 for url in self.urls: |
158 self.info('pulling data from %s', url) |
140 try: |
159 try: |
141 if parser.process(url, raise_on_error): |
160 if parser.process(url, raise_on_error): |
142 error = True |
161 error = True |
143 except IOError, exc: |
162 except IOError, exc: |
144 if raise_on_error: |
163 if raise_on_error: |
145 raise |
164 raise |
146 self.error('could not pull data while processing %s: %s', |
165 self.error('could not pull data while processing %s: %s', |
147 url, exc) |
166 url, exc) |
148 error = True |
167 error = True |
149 if error: |
168 return error |
150 self.warning("some error occured, don't attempt to delete entities") |
|
151 elif self.config['delete-entities'] and myuris: |
|
152 byetype = {} |
|
153 for eid, etype in myuris.values(): |
|
154 byetype.setdefault(etype, []).append(str(eid)) |
|
155 self.error('delete %s entities %s', self.uri, byetype) |
|
156 for etype, eids in byetype.iteritems(): |
|
157 session.execute('DELETE %s X WHERE X eid IN (%s)' |
|
158 % (etype, ','.join(eids))) |
|
159 self.latest_retrieval = datetime.utcnow() |
|
160 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
|
161 {'x': self.eid, 'date': self.latest_retrieval}) |
|
162 return parser.stats |
|
163 |
169 |
164 def before_entity_insertion(self, session, lid, etype, eid, sourceparams): |
170 def before_entity_insertion(self, session, lid, etype, eid, sourceparams): |
165 """called by the repository when an eid has been attributed for an |
171 """called by the repository when an eid has been attributed for an |
166 entity stored here but the entity has not been inserted in the system |
172 entity stored here but the entity has not been inserted in the system |
167 table yet. |
173 table yet. |