|
1 # copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """datafeed sources: copy data from an external data stream into the system |
|
19 database |
|
20 """ |
|
21 from datetime import datetime, timedelta |
|
22 from base64 import b64decode |
|
23 |
|
24 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError |
|
25 from cubicweb.server.sources import AbstractSource |
|
26 from cubicweb.appobject import AppObject |
|
27 |
|
28 class DataFeedSource(AbstractSource): |
|
29 copy_based_source = True |
|
30 |
|
31 options = ( |
|
32 ('synchronize', |
|
33 {'type' : 'yn', |
|
34 'default': True, |
|
35 'help': ('Is the repository responsible to automatically import ' |
|
36 'content from this source? ' |
|
37 'You should say yes unless you don\'t want this behaviour ' |
|
38 'or if you use a multiple repositories setup, in which ' |
|
39 'case you should say yes on one repository, no on others.'), |
|
40 'group': 'datafeed-source', 'level': 2, |
|
41 }), |
|
42 ('synchronization-interval', |
|
43 {'type' : 'time', |
|
44 'default': '5min', |
|
45 'help': ('Interval in seconds between synchronization with the ' |
|
46 'external source (default to 5 minutes, must be >= 1 min).'), |
|
47 'group': 'datafeed-source', 'level': 2, |
|
48 }), |
|
49 ('delete-entities', |
|
50 {'type' : 'yn', |
|
51 'default': True, |
|
52 'help': ('Should already imported entities not found anymore on the ' |
|
53 'external source be deleted?'), |
|
54 'group': 'datafeed-source', 'level': 2, |
|
55 }), |
|
56 |
|
57 ) |
|
58 def __init__(self, repo, source_config, eid=None): |
|
59 AbstractSource.__init__(self, repo, source_config, eid) |
|
60 self.update_config(None, self.check_conf_dict(eid, source_config)) |
|
61 |
|
62 def check_config(self, source_entity): |
|
63 """check configuration of source entity""" |
|
64 typedconfig = super(DataFeedSource, self).check_config(source_entity) |
|
65 if typedconfig['synchronization-interval'] < 60: |
|
66 _ = source_entity._cw._ |
|
67 msg = _('synchronization-interval must be greater than 1 minute') |
|
68 raise ValidationError(source_entity.eid, {'config': msg}) |
|
69 return typedconfig |
|
70 |
|
71 def _entity_update(self, source_entity): |
|
72 source_entity.complete() |
|
73 self.parser = source_entity.parser |
|
74 self.latest_retrieval = source_entity.latest_retrieval |
|
75 self.urls = [url.strip() for url in source_entity.url.splitlines() |
|
76 if url.strip()] |
|
77 |
|
78 def update_config(self, source_entity, typedconfig): |
|
79 """update configuration from source entity. `typedconfig` is config |
|
80 properly typed with defaults set |
|
81 """ |
|
82 self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval']) |
|
83 if source_entity is not None: |
|
84 self._entity_update(source_entity) |
|
85 self.config = typedconfig |
|
86 |
|
87 def init(self, activated, source_entity): |
|
88 if activated: |
|
89 self._entity_update(source_entity) |
|
90 self.parser = source_entity.parser |
|
91 self.load_mapping(source_entity._cw) |
|
92 |
|
93 def _get_parser(self, session, **kwargs): |
|
94 return self.repo.vreg['parsers'].select( |
|
95 self.parser, session, source=self, **kwargs) |
|
96 |
|
97 def load_mapping(self, session): |
|
98 self.mapping = {} |
|
99 self.mapping_idx = {} |
|
100 try: |
|
101 parser = self._get_parser(session) |
|
102 except (RegistryNotFound, ObjectNotFound): |
|
103 return # no parser yet, don't go further |
|
104 self._load_mapping(session, parser=parser) |
|
105 |
|
106 def add_schema_config(self, schemacfg, checkonly=False, parser=None): |
|
107 """added CWSourceSchemaConfig, modify mapping accordingly""" |
|
108 if parser is None: |
|
109 parser = self._get_parser(schemacfg._cw) |
|
110 parser.add_schema_config(schemacfg, checkonly) |
|
111 |
|
112 def del_schema_config(self, schemacfg, checkonly=False, parser=None): |
|
113 """deleted CWSourceSchemaConfig, modify mapping accordingly""" |
|
114 if parser is None: |
|
115 parser = self._get_parser(schemacfg._cw) |
|
116 parser.del_schema_config(schemacfg, checkonly) |
|
117 |
|
118 def fresh(self): |
|
119 if self.latest_retrieval is None: |
|
120 return False |
|
121 return datetime.now() < (self.latest_retrieval + self.synchro_interval) |
|
122 |
|
123 def pull_data(self, session, force=False): |
|
124 if not force and self.fresh(): |
|
125 return |
|
126 if self.config['delete-entities']: |
|
127 myuris = self.source_cwuris(session) |
|
128 else: |
|
129 myuris = None |
|
130 parser = self._get_parser(session, sourceuris=myuris) |
|
131 error = False |
|
132 self.info('pulling data for source %s', self.uri) |
|
133 for url in self.urls: |
|
134 try: |
|
135 parser.process(url) |
|
136 except IOError, exc: |
|
137 self.error('could not pull data while processing %s: %s', |
|
138 url, exc) |
|
139 error = True |
|
140 if error: |
|
141 self.warning("some error occured, don't attempt to delete entities") |
|
142 elif self.config['delete-entities'] and myuris: |
|
143 byetype = {} |
|
144 for eid, etype in myuris.values(): |
|
145 byetype.setdefault(etype, []).append(str(eid)) |
|
146 self.error('delete %s entities %s', self.uri, byetype) |
|
147 for etype, eids in byetype.iteritems(): |
|
148 session.execute('DELETE %s X WHERE X eid IN (%s)' |
|
149 % (etype, ','.join(eids))) |
|
150 self.latest_retrieval = datetime.now() |
|
151 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
|
152 {'x': self.eid, 'date': self.latest_retrieval}) |
|
153 return parser.stats |
|
154 |
|
155 def before_entity_insertion(self, session, lid, etype, eid, sourceparams): |
|
156 """called by the repository when an eid has been attributed for an |
|
157 entity stored here but the entity has not been inserted in the system |
|
158 table yet. |
|
159 |
|
160 This method must return the an Entity instance representation of this |
|
161 entity. |
|
162 """ |
|
163 entity = super(DataFeedSource, self).before_entity_insertion( |
|
164 session, lid, etype, eid, sourceparams) |
|
165 entity.cw_edited['cwuri'] = unicode(lid) |
|
166 entity.cw_edited.set_defaults() |
|
167 sourceparams['parser'].before_entity_copy(entity, sourceparams) |
|
168 # avoid query to search full-text indexed attributes |
|
169 for attr in entity.e_schema.indexable_attributes(): |
|
170 entity.cw_edited.setdefault(attr, u'') |
|
171 return entity |
|
172 |
|
173 def after_entity_insertion(self, session, lid, entity, sourceparams): |
|
174 """called by the repository after an entity stored here has been |
|
175 inserted in the system table. |
|
176 """ |
|
177 if session.is_hook_category_activated('integrity'): |
|
178 entity.cw_edited.check(creation=True) |
|
179 self.repo.system_source.add_entity(session, entity) |
|
180 entity.cw_edited.saved = entity._cw_is_saved = True |
|
181 sourceparams['parser'].after_entity_copy(entity, sourceparams) |
|
182 |
|
183 def source_cwuris(self, session): |
|
184 sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' |
|
185 'WHERE entities.eid=cw_source_relation.eid_from ' |
|
186 'AND cw_source_relation.eid_to=%s' % self.eid) |
|
187 return dict((b64decode(uri), (eid, type)) |
|
188 for uri, eid, type in session.system_sql(sql)) |
|
189 |
|
190 |
|
191 class DataFeedParser(AppObject): |
|
192 __registry__ = 'parsers' |
|
193 |
|
194 def __init__(self, session, source, sourceuris=None): |
|
195 self._cw = session |
|
196 self.source = source |
|
197 self.sourceuris = sourceuris |
|
198 self.stats = {'created': set(), |
|
199 'updated': set()} |
|
200 |
|
201 def add_schema_config(self, schemacfg, checkonly=False): |
|
202 """added CWSourceSchemaConfig, modify mapping accordingly""" |
|
203 msg = schemacfg._cw._("this parser doesn't use a mapping") |
|
204 raise ValidationError(schemacfg.eid, {None: msg}) |
|
205 |
|
206 def del_schema_config(self, schemacfg, checkonly=False): |
|
207 """deleted CWSourceSchemaConfig, modify mapping accordingly""" |
|
208 msg = schemacfg._cw._("this parser doesn't use a mapping") |
|
209 raise ValidationError(schemacfg.eid, {None: msg}) |
|
210 |
|
211 def extid2entity(self, uri, etype, **sourceparams): |
|
212 sourceparams['parser'] = self |
|
213 eid = self.source.extid2eid(str(uri), etype, self._cw, |
|
214 sourceparams=sourceparams) |
|
215 if self.sourceuris is not None: |
|
216 self.sourceuris.pop(str(uri), None) |
|
217 return self._cw.entity_from_eid(eid, etype) |
|
218 |
|
219 def process(self, url): |
|
220 """main callback: process the url""" |
|
221 raise NotImplementedError |
|
222 |
|
223 def before_entity_copy(self, entity, sourceparams): |
|
224 raise NotImplementedError |
|
225 |
|
226 def after_entity_copy(self, entity, sourceparams): |
|
227 self.stats['created'].add(entity.eid) |
|
228 |
|
229 def created_during_pull(self, entity): |
|
230 return entity.eid in self.stats['created'] |
|
231 |
|
232 def updated_during_pull(self, entity): |
|
233 return entity.eid in self.stats['updated'] |
|
234 |
|
235 def notify_updated(self, entity): |
|
236 return self.stats['updated'].add(entity.eid) |