43 |
43 |
44 def uidtype(union, col, etype, args): |
44 def uidtype(union, col, etype, args): |
45 select, col = union.locate_subquery(col, etype, args) |
45 select, col = union.locate_subquery(col, etype, args) |
46 return getattr(select.selection[col], 'uidtype', None) |
46 return getattr(select.selection[col], 'uidtype', None) |
47 |
47 |
48 def load_mapping_file(mappingfile): |
|
49 mapping = {} |
|
50 execfile(mappingfile, mapping) |
|
51 for junk in ('__builtins__', '__doc__'): |
|
52 mapping.pop(junk, None) |
|
53 mapping.setdefault('support_relations', {}) |
|
54 mapping.setdefault('dont_cross_relations', set()) |
|
55 mapping.setdefault('cross_relations', set()) |
|
56 |
|
57 # do some basic checks of the mapping content |
|
58 assert 'support_entities' in mapping, \ |
|
59 'mapping file should at least define support_entities' |
|
60 assert isinstance(mapping['support_entities'], dict) |
|
61 assert isinstance(mapping['support_relations'], dict) |
|
62 assert isinstance(mapping['dont_cross_relations'], set) |
|
63 assert isinstance(mapping['cross_relations'], set) |
|
64 unknown = set(mapping) - set( ('support_entities', 'support_relations', |
|
65 'dont_cross_relations', 'cross_relations') ) |
|
66 assert not unknown, 'unknown mapping attribute(s): %s' % unknown |
|
67 # relations that are necessarily not crossed |
|
68 mapping['dont_cross_relations'] |= set(('owned_by', 'created_by')) |
|
69 for rtype in ('is', 'is_instance_of', 'cw_source'): |
|
70 assert rtype not in mapping['dont_cross_relations'], \ |
|
71 '%s relation should not be in dont_cross_relations' % rtype |
|
72 assert rtype not in mapping['support_relations'], \ |
|
73 '%s relation should not be in support_relations' % rtype |
|
74 return mapping |
|
75 |
|
76 |
48 |
77 class ReplaceByInOperator(Exception): |
49 class ReplaceByInOperator(Exception): |
78 def __init__(self, eids): |
50 def __init__(self, eids): |
79 self.eids = eids |
51 self.eids = eids |
80 |
52 |
94 {'type' : 'string', |
66 {'type' : 'string', |
95 'default': REQUIRED, |
67 'default': REQUIRED, |
96 'help': 'identifier of the repository in the pyro name server', |
68 'help': 'identifier of the repository in the pyro name server', |
97 'group': 'pyro-source', 'level': 0, |
69 'group': 'pyro-source', 'level': 0, |
98 }), |
70 }), |
99 ('mapping-file', |
|
100 {'type' : 'string', |
|
101 'default': REQUIRED, |
|
102 'help': 'path to a python file with the schema mapping definition', |
|
103 'group': 'pyro-source', 'level': 1, |
|
104 }), |
|
105 ('cubicweb-user', |
71 ('cubicweb-user', |
106 {'type' : 'string', |
72 {'type' : 'string', |
107 'default': REQUIRED, |
73 'default': REQUIRED, |
108 'help': 'user to use for connection on the distant repository', |
74 'help': 'user to use for connection on the distant repository', |
109 'group': 'pyro-source', 'level': 0, |
75 'group': 'pyro-source', 'level': 0, |
154 PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',) |
120 PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',) |
155 _conn = None |
121 _conn = None |
156 |
122 |
157 def __init__(self, repo, source_config, *args, **kwargs): |
123 def __init__(self, repo, source_config, *args, **kwargs): |
158 AbstractSource.__init__(self, repo, source_config, *args, **kwargs) |
124 AbstractSource.__init__(self, repo, source_config, *args, **kwargs) |
159 mappingfile = source_config['mapping-file'] |
125 # XXX get it through pyro if unset |
160 if not mappingfile[0] == '/': |
|
161 mappingfile = join(repo.config.apphome, mappingfile) |
|
162 try: |
|
163 mapping = load_mapping_file(mappingfile) |
|
164 except IOError: |
|
165 self.disabled = True |
|
166 self.error('cant read mapping file %s, source disabled', |
|
167 mappingfile) |
|
168 self.support_entities = {} |
|
169 self.support_relations = {} |
|
170 self.dont_cross_relations = set() |
|
171 self.cross_relations = set() |
|
172 else: |
|
173 self.support_entities = mapping['support_entities'] |
|
174 self.support_relations = mapping['support_relations'] |
|
175 self.dont_cross_relations = mapping['dont_cross_relations'] |
|
176 self.cross_relations = mapping['cross_relations'] |
|
177 baseurl = source_config.get('base-url') |
126 baseurl = source_config.get('base-url') |
178 if baseurl and not baseurl.endswith('/'): |
127 if baseurl and not baseurl.endswith('/'): |
179 source_config['base-url'] += '/' |
128 source_config['base-url'] += '/' |
180 self.config = source_config |
129 self.config = source_config |
181 myoptions = (('%s.latest-update-time' % self.uri, |
130 myoptions = (('%s.latest-update-time' % self.uri, |
210 timestamp = int(rset[0][0]) |
159 timestamp = int(rset[0][0]) |
211 return datetime.fromtimestamp(timestamp) |
160 return datetime.fromtimestamp(timestamp) |
212 finally: |
161 finally: |
213 session.close() |
162 session.close() |
214 |
163 |
215 def init(self): |
164 def init(self, activated, session=None): |
216 """method called by the repository once ready to handle request""" |
165 """method called by the repository once ready to handle request""" |
217 interval = int(self.config.get('synchronization-interval', 5*60)) |
166 self.load_mapping(session) |
218 self.repo.looping_task(interval, self.synchronize) |
167 if activated: |
219 self.repo.looping_task(self._query_cache.ttl.seconds/10, |
168 interval = int(self.config.get('synchronization-interval', 5*60)) |
220 self._query_cache.clear_expired) |
169 self.repo.looping_task(interval, self.synchronize) |
|
170 self.repo.looping_task(self._query_cache.ttl.seconds/10, |
|
171 self._query_cache.clear_expired) |
|
172 |
|
173 def load_mapping(self, session=None): |
|
174 self.support_entities = {} |
|
175 self.support_relations = {} |
|
176 self.dont_cross_relations = set(('owned_by', 'created_by')) |
|
177 self.cross_relations = set() |
|
178 assert self.eid is not None |
|
179 if session is None: |
|
180 _session = self.repo.internal_session() |
|
181 else: |
|
182 _session = session |
|
183 try: |
|
184 for rql, struct in [('Any ETN WHERE S cw_support ET, ET name ETN, ET is CWEType, S eid %(s)s', |
|
185 self.support_entities), |
|
186 ('Any RTN WHERE S cw_support RT, RT name RTN, RT is CWRType, S eid %(s)s', |
|
187 self.support_relations)]: |
|
188 for ertype, in _session.execute(rql, {'s': self.eid}): |
|
189 struct[ertype] = True # XXX write support |
|
190 for rql, struct in [('Any RTN WHERE S cw_may_cross RT, RT name RTN, S eid %(s)s', |
|
191 self.cross_relations), |
|
192 ('Any RTN WHERE S cw_dont_cross RT, RT name RTN, S eid %(s)s', |
|
193 self.dont_cross_relations)]: |
|
194 for rtype, in _session.execute(rql, {'s': self.eid}): |
|
195 struct.add(rtype) |
|
196 finally: |
|
197 if session is None: |
|
198 _session.close() |
|
199 # XXX move in hooks or schema constraints |
|
200 for rtype in ('is', 'is_instance_of', 'cw_source'): |
|
201 assert rtype not in self.dont_cross_relations, \ |
|
202 '%s relation should not be in dont_cross_relations' % rtype |
|
203 assert rtype not in self.support_relations, \ |
|
204 '%s relation should not be in support_relations' % rtype |
221 |
205 |
222 def local_eid(self, cnx, extid, session): |
206 def local_eid(self, cnx, extid, session): |
223 etype, dexturi, dextid = cnx.describe(extid) |
207 etype, dexturi, dextid = cnx.describe(extid) |
224 if dexturi == 'system' or not ( |
208 if dexturi == 'system' or not ( |
225 dexturi in self.repo.sources_by_uri or self._skip_externals): |
209 dexturi in self.repo.sources_by_uri or self._skip_externals): |
244 # external source (hence we've no chance to synchronize...) |
228 # external source (hence we've no chance to synchronize...) |
245 return |
229 return |
246 etypes = self.support_entities.keys() |
230 etypes = self.support_entities.keys() |
247 if mtime is None: |
231 if mtime is None: |
248 mtime = self.last_update_time() |
232 mtime = self.last_update_time() |
249 updatetime, modified, deleted = extrepo.entities_modified_since(etypes, |
233 updatetime, modified, deleted = extrepo.entities_modified_since( |
250 mtime) |
234 etypes, mtime) |
251 self._query_cache.clear() |
235 self._query_cache.clear() |
252 repo = self.repo |
236 repo = self.repo |
253 session = repo.internal_session() |
237 session = repo.internal_session() |
254 source = repo.system_source |
238 source = repo.system_source |
255 try: |
239 try: |