58 def commit_event(self): |
58 def commit_event(self): |
59 """the observed connections pool has been rollbacked, |
59 """the observed connections pool has been rollbacked, |
60 remove inserted eid from repository type/source cache |
60 remove inserted eid from repository type/source cache |
61 """ |
61 """ |
62 self.repo.clear_caches(self.session.query_data('pendingeids', ())) |
62 self.repo.clear_caches(self.session.query_data('pendingeids', ())) |
63 |
63 |
64 def rollback_event(self): |
64 def rollback_event(self): |
65 """the observed connections pool has been rollbacked, |
65 """the observed connections pool has been rollbacked, |
66 remove inserted eid from repository type/source cache |
66 remove inserted eid from repository type/source cache |
67 """ |
67 """ |
68 self.repo.clear_caches(self.session.query_data('neweids', ())) |
68 self.repo.clear_caches(self.session.query_data('neweids', ())) |
82 if entity.eid in session.query_data('pendingeids', ()): |
82 if entity.eid in session.query_data('pendingeids', ()): |
83 return # entity added and deleted in the same transaction |
83 return # entity added and deleted in the same transaction |
84 session.repo.system_source.fti_unindex_entity(session, entity.eid) |
84 session.repo.system_source.fti_unindex_entity(session, entity.eid) |
85 for container in entity.fti_containers(): |
85 for container in entity.fti_containers(): |
86 session.repo.index_entity(session, container) |
86 session.repo.index_entity(session, container) |
87 |
87 |
88 def commit_event(self): |
88 def commit_event(self): |
89 pass |
89 pass |
90 |
90 |
91 def del_existing_rel_if_needed(session, eidfrom, rtype, eidto): |
91 def del_existing_rel_if_needed(session, eidfrom, rtype, eidto): |
92 """delete existing relation when adding a new one if card is 1 or ? |
92 """delete existing relation when adding a new one if card is 1 or ? |
118 if card[1] in '1?': |
118 if card[1] in '1?': |
119 session.unsafe_execute( |
119 session.unsafe_execute( |
120 'DELETE X %s Y WHERE NOT X eid %%(x)s, Y eid %%(y)s' % rtype, |
120 'DELETE X %s Y WHERE NOT X eid %%(x)s, Y eid %%(y)s' % rtype, |
121 {'x': eidfrom, 'y': eidto}, 'y') |
121 {'x': eidfrom, 'y': eidto}, 'y') |
122 |
122 |
123 |
123 |
124 class Repository(object): |
124 class Repository(object): |
125 """a repository provides access to a set of persistent storages for |
125 """a repository provides access to a set of persistent storages for |
126 entities and relations |
126 entities and relations |
127 |
127 |
128 XXX protect pyro access |
128 XXX protect pyro access |
129 """ |
129 """ |
130 |
130 |
131 def __init__(self, config, vreg=None, debug=False): |
131 def __init__(self, config, vreg=None, debug=False): |
132 self.config = config |
132 self.config = config |
133 if vreg is None: |
133 if vreg is None: |
134 vreg = CubicWebRegistry(config, debug) |
134 vreg = CubicWebRegistry(config, debug) |
135 self.vreg = vreg |
135 self.vreg = vreg |
153 # FIXME: store additional sources info in the system database ? |
153 # FIXME: store additional sources info in the system database ? |
154 # FIXME: sources should be ordered (add_entity priority) |
154 # FIXME: sources should be ordered (add_entity priority) |
155 for uri, source_config in config.sources().items(): |
155 for uri, source_config in config.sources().items(): |
156 if uri == 'admin': |
156 if uri == 'admin': |
157 # not an actual source |
157 # not an actual source |
158 continue |
158 continue |
159 source = self.get_source(uri, source_config) |
159 source = self.get_source(uri, source_config) |
160 self.sources_by_uri[uri] = source |
160 self.sources_by_uri[uri] = source |
161 self.sources.append(source) |
161 self.sources.append(source) |
162 self.system_source = self.sources_by_uri['system'] |
162 self.system_source = self.sources_by_uri['system'] |
163 # ensure system source is the first one |
163 # ensure system source is the first one |
212 # tsearch according to postgres version |
212 # tsearch according to postgres version |
213 for source in self.sources: |
213 for source in self.sources: |
214 source.init_creating() |
214 source.init_creating() |
215 # close initialization pool and reopen fresh ones for proper |
215 # close initialization pool and reopen fresh ones for proper |
216 # initialization now that we know cubes |
216 # initialization now that we know cubes |
217 self._get_pool().close(True) |
217 self._get_pool().close(True) |
218 for i in xrange(config['connections-pool-size']): |
218 for i in xrange(config['connections-pool-size']): |
219 self._available_pools.put_nowait(ConnectionsPool(self.sources)) |
219 self._available_pools.put_nowait(ConnectionsPool(self.sources)) |
220 |
220 |
221 # internals ############################################################### |
221 # internals ############################################################### |
222 |
222 |
223 def get_source(self, uri, source_config): |
223 def get_source(self, uri, source_config): |
224 source_config['uri'] = uri |
224 source_config['uri'] = uri |
225 return get_source(source_config, self.schema, self) |
225 return get_source(source_config, self.schema, self) |
226 |
226 |
227 def set_schema(self, schema, resetvreg=True): |
227 def set_schema(self, schema, resetvreg=True): |
228 schema.rebuild_infered_relations() |
228 schema.rebuild_infered_relations() |
229 self.info('set schema %s %#x', schema.name, id(schema)) |
229 self.info('set schema %s %#x', schema.name, id(schema)) |
230 self.debug(', '.join(sorted(str(e) for e in schema.entities()))) |
230 self.debug(', '.join(sorted(str(e) for e in schema.entities()))) |
231 self.querier.set_schema(schema) |
231 self.querier.set_schema(schema) |
257 except BadSchemaDefinition: |
257 except BadSchemaDefinition: |
258 raise |
258 raise |
259 except Exception, ex: |
259 except Exception, ex: |
260 import traceback |
260 import traceback |
261 traceback.print_exc() |
261 traceback.print_exc() |
262 raise Exception('Is the database initialised ? (cause: %s)' % |
262 raise Exception('Is the database initialised ? (cause: %s)' % |
263 (ex.args and ex.args[0].strip() or 'unknown')), \ |
263 (ex.args and ex.args[0].strip() or 'unknown')), \ |
264 None, sys.exc_info()[-1] |
264 None, sys.exc_info()[-1] |
265 self.info('set the actual schema') |
265 self.info('set the actual schema') |
266 # XXX have to do this since CWProperty isn't in the bootstrap schema |
266 # XXX have to do this since CWProperty isn't in the bootstrap schema |
267 # it'll be redone in set_schema |
267 # it'll be redone in set_schema |
293 config.core_hooks = True |
293 config.core_hooks = True |
294 config.usergroup_hooks = True |
294 config.usergroup_hooks = True |
295 config.schema_hooks = True |
295 config.schema_hooks = True |
296 config.notification_hooks = True |
296 config.notification_hooks = True |
297 config.application_hooks = True |
297 config.application_hooks = True |
298 |
298 |
299 def start_looping_tasks(self): |
299 def start_looping_tasks(self): |
300 assert isinstance(self._looping_tasks, list), 'already started' |
300 assert isinstance(self._looping_tasks, list), 'already started' |
301 for i, (interval, func) in enumerate(self._looping_tasks): |
301 for i, (interval, func) in enumerate(self._looping_tasks): |
302 self._looping_tasks[i] = task = LoopTask(interval, func) |
302 self._looping_tasks[i] = task = LoopTask(interval, func) |
303 self.info('starting task %s with interval %.2fs', task.name, |
303 self.info('starting task %s with interval %.2fs', task.name, |
306 # ensure no tasks will be further added |
306 # ensure no tasks will be further added |
307 self._looping_tasks = tuple(self._looping_tasks) |
307 self._looping_tasks = tuple(self._looping_tasks) |
308 |
308 |
309 def looping_task(self, interval, func): |
309 def looping_task(self, interval, func): |
310 """register a function to be called every `interval` seconds. |
310 """register a function to be called every `interval` seconds. |
311 |
311 |
312 looping tasks can only be registered during repository initialization, |
312 looping tasks can only be registered during repository initialization, |
313 once done this method will fail. |
313 once done this method will fail. |
314 """ |
314 """ |
315 try: |
315 try: |
316 self._looping_tasks.append( (interval, func) ) |
316 self._looping_tasks.append( (interval, func) ) |
319 |
319 |
320 def threaded_task(self, func): |
320 def threaded_task(self, func): |
321 """start function in a separated thread""" |
321 """start function in a separated thread""" |
322 t = RepoThread(func, self._running_threads) |
322 t = RepoThread(func, self._running_threads) |
323 t.start() |
323 t.start() |
324 |
324 |
325 #@locked |
325 #@locked |
326 def _get_pool(self): |
326 def _get_pool(self): |
327 try: |
327 try: |
328 return self._available_pools.get(True, timeout=5) |
328 return self._available_pools.get(True, timeout=5) |
329 except Queue.Empty: |
329 except Queue.Empty: |
330 raise Exception('no pool available after 5 secs, probably either a ' |
330 raise Exception('no pool available after 5 secs, probably either a ' |
331 'bug in code (to many uncommited/rollbacked ' |
331 'bug in code (to many uncommited/rollbacked ' |
332 'connections) or to much load on the server (in ' |
332 'connections) or to much load on the server (in ' |
333 'which case you can try to set a bigger ' |
333 'which case you can try to set a bigger ' |
334 'connections pools size)') |
334 'connections pools size)') |
335 |
335 |
336 def _free_pool(self, pool): |
336 def _free_pool(self, pool): |
337 pool.rollback() |
337 pool.rollback() |
338 self._available_pools.put_nowait(pool) |
338 self._available_pools.put_nowait(pool) |
339 |
339 |
340 def pinfo(self): |
340 def pinfo(self): |
380 nocache = self.system_source.no_cache |
380 nocache = self.system_source.no_cache |
381 self.info('sql cache usage: %s/%s (%s%%)', hits+ misses, nocache, |
381 self.info('sql cache usage: %s/%s (%s%%)', hits+ misses, nocache, |
382 ((hits + misses) * 100) / (hits + misses + nocache)) |
382 ((hits + misses) * 100) / (hits + misses + nocache)) |
383 except ZeroDivisionError: |
383 except ZeroDivisionError: |
384 pass |
384 pass |
385 |
385 |
386 def authenticate_user(self, session, login, password): |
386 def authenticate_user(self, session, login, password): |
387 """validate login / password, raise AuthenticationError on failure |
387 """validate login / password, raise AuthenticationError on failure |
388 return associated CWUser instance on success |
388 return associated CWUser instance on success |
389 """ |
389 """ |
390 for source in self.sources: |
390 for source in self.sources: |
467 'is installed. Run "cubicweb-ctl upgrade".') |
467 'is installed. Run "cubicweb-ctl upgrade".') |
468 raise ExecutionError(msg % (cube, version, fsversion)) |
468 raise ExecutionError(msg % (cube, version, fsversion)) |
469 finally: |
469 finally: |
470 session.close() |
470 session.close() |
471 return vcconf |
471 return vcconf |
472 |
472 |
473 @cached |
473 @cached |
474 def source_defs(self): |
474 def source_defs(self): |
475 sources = self.config.sources().copy() |
475 sources = self.config.sources().copy() |
476 # remove manager information |
476 # remove manager information |
477 sources.pop('admin', None) |
477 sources.pop('admin', None) |
524 'U primary_email X, U use_email X WHERE U login %(login)s', d) |
524 'U primary_email X, U use_email X WHERE U login %(login)s', d) |
525 session.commit() |
525 session.commit() |
526 finally: |
526 finally: |
527 session.close() |
527 session.close() |
528 return True |
528 return True |
529 |
529 |
530 def connect(self, login, password, cnxprops=None): |
530 def connect(self, login, password, cnxprops=None): |
531 """open a connection for a given user |
531 """open a connection for a given user |
532 |
532 |
533 base_url may be needed to send mails |
533 base_url may be needed to send mails |
534 cnxtype indicate if this is a pyro connection or a in-memory connection |
534 cnxtype indicate if this is a pyro connection or a in-memory connection |
535 |
535 |
536 raise `AuthenticationError` if the authentication failed |
536 raise `AuthenticationError` if the authentication failed |
537 raise `ConnectionError` if we can't open a connection |
537 raise `ConnectionError` if we can't open a connection |
538 """ |
538 """ |
539 # use an internal connection |
539 # use an internal connection |
540 session = self.internal_session() |
540 session = self.internal_session() |
582 # FIXME: check error to catch internal errors |
582 # FIXME: check error to catch internal errors |
583 self.exception('unexpected error') |
583 self.exception('unexpected error') |
584 raise |
584 raise |
585 finally: |
585 finally: |
586 session.reset_pool() |
586 session.reset_pool() |
587 |
587 |
588 def describe(self, sessionid, eid): |
588 def describe(self, sessionid, eid): |
589 """return a tuple (type, source, extid) for the entity with id <eid>""" |
589 """return a tuple (type, source, extid) for the entity with id <eid>""" |
590 session = self._get_session(sessionid, setpool=True) |
590 session = self._get_session(sessionid, setpool=True) |
591 try: |
591 try: |
592 return self.type_and_source_from_eid(eid, session) |
592 return self.type_and_source_from_eid(eid, session) |
616 def commit(self, sessionid): |
616 def commit(self, sessionid): |
617 """commit transaction for the session with the given id""" |
617 """commit transaction for the session with the given id""" |
618 self.debug('begin commit for session %s', sessionid) |
618 self.debug('begin commit for session %s', sessionid) |
619 try: |
619 try: |
620 self._get_session(sessionid, setpool=True).commit() |
620 self._get_session(sessionid, setpool=True).commit() |
621 except (ValidationError, Unauthorized): |
621 except (ValidationError, Unauthorized): |
622 raise |
622 raise |
623 except: |
623 except: |
624 self.exception('unexpected error') |
624 self.exception('unexpected error') |
625 raise |
625 raise |
626 |
626 |
627 def rollback(self, sessionid): |
627 def rollback(self, sessionid): |
628 """commit transaction for the session with the given id""" |
628 """commit transaction for the session with the given id""" |
629 self.debug('begin rollback for session %s', sessionid) |
629 self.debug('begin rollback for session %s', sessionid) |
630 try: |
630 try: |
631 self._get_session(sessionid, setpool=True).rollback() |
631 self._get_session(sessionid, setpool=True).rollback() |
643 # during `session_close` hooks |
643 # during `session_close` hooks |
644 session.commit() |
644 session.commit() |
645 session.close() |
645 session.close() |
646 del self._sessions[sessionid] |
646 del self._sessions[sessionid] |
647 self.info('closed session %s for user %s', sessionid, session.user.login) |
647 self.info('closed session %s for user %s', sessionid, session.user.login) |
648 |
648 |
649 def user_info(self, sessionid, props=None): |
649 def user_info(self, sessionid, props=None): |
650 """this method should be used by client to: |
650 """this method should be used by client to: |
651 * check session id validity |
651 * check session id validity |
652 * update user information on each user's request (i.e. groups and |
652 * update user information on each user's request (i.e. groups and |
653 custom properties) |
653 custom properties) |
657 # update session properties |
657 # update session properties |
658 for prop, value in props.items(): |
658 for prop, value in props.items(): |
659 session.change_property(prop, value) |
659 session.change_property(prop, value) |
660 user = session.user |
660 user = session.user |
661 return user.eid, user.login, user.groups, user.properties |
661 return user.eid, user.login, user.groups, user.properties |
662 |
662 |
663 # public (inter-repository) interface ##################################### |
663 # public (inter-repository) interface ##################################### |
664 |
664 |
665 def entities_modified_since(self, etypes, mtime): |
665 def entities_modified_since(self, etypes, mtime): |
666 """function designed to be called from an external repository which |
666 """function designed to be called from an external repository which |
667 is using this one as a rql source for synchronization, and return a |
667 is using this one as a rql source for synchronization, and return a |
668 3-uple containing : |
668 3-uple containing : |
669 * the local date |
669 * the local date |
703 for session in self._sessions.values(): |
703 for session in self._sessions.values(): |
704 if session.timestamp < mintime: |
704 if session.timestamp < mintime: |
705 self.close(session.id) |
705 self.close(session.id) |
706 nbclosed += 1 |
706 nbclosed += 1 |
707 return nbclosed |
707 return nbclosed |
708 |
708 |
709 def internal_session(self, cnxprops=None): |
709 def internal_session(self, cnxprops=None): |
710 """return a dbapi like connection/cursor using internal user which |
710 """return a dbapi like connection/cursor using internal user which |
711 have every rights on the repository. You'll *have to* commit/rollback |
711 have every rights on the repository. You'll *have to* commit/rollback |
712 or close (rollback implicitly) the session once the job's done, else |
712 or close (rollback implicitly) the session once the job's done, else |
713 you'll leak connections pool up to the time where no more pool is |
713 you'll leak connections pool up to the time where no more pool is |
714 available, causing irremediable freeze... |
714 available, causing irremediable freeze... |
715 """ |
715 """ |
716 session = InternalSession(self, cnxprops) |
716 session = InternalSession(self, cnxprops) |
717 session.set_pool() |
717 session.set_pool() |
718 return session |
718 return session |
719 |
719 |
720 def _get_session(self, sessionid, setpool=False): |
720 def _get_session(self, sessionid, setpool=False): |
721 """return the user associated to the given session identifier""" |
721 """return the user associated to the given session identifier""" |
722 try: |
722 try: |
723 session = self._sessions[sessionid] |
723 session = self._sessions[sessionid] |
724 except KeyError: |
724 except KeyError: |
729 |
729 |
730 # data sources handling ################################################### |
730 # data sources handling ################################################### |
731 # * correspondance between eid and (type, source) |
731 # * correspondance between eid and (type, source) |
732 # * correspondance between eid and local id (i.e. specific to a given source) |
732 # * correspondance between eid and local id (i.e. specific to a given source) |
733 # * searchable text indexes |
733 # * searchable text indexes |
734 |
734 |
735 def type_and_source_from_eid(self, eid, session=None): |
735 def type_and_source_from_eid(self, eid, session=None): |
736 """return a tuple (type, source, extid) for the entity with id <eid>""" |
736 """return a tuple (type, source, extid) for the entity with id <eid>""" |
737 try: |
737 try: |
738 eid = typed_eid(eid) |
738 eid = typed_eid(eid) |
739 except ValueError: |
739 except ValueError: |
769 except KeyError: |
769 except KeyError: |
770 etype = None |
770 etype = None |
771 rqlcache.pop('Any X WHERE X eid %s' % eid, None) |
771 rqlcache.pop('Any X WHERE X eid %s' % eid, None) |
772 for source in self.sources: |
772 for source in self.sources: |
773 source.clear_eid_cache(eid, etype) |
773 source.clear_eid_cache(eid, etype) |
774 |
774 |
775 def type_from_eid(self, eid, session=None): |
775 def type_from_eid(self, eid, session=None): |
776 """return the type of the entity with id <eid>""" |
776 """return the type of the entity with id <eid>""" |
777 return self.type_and_source_from_eid(eid, session)[0] |
777 return self.type_and_source_from_eid(eid, session)[0] |
778 |
778 |
779 def source_from_eid(self, eid, session=None): |
779 def source_from_eid(self, eid, session=None): |
780 """return the source for the given entity's eid""" |
780 """return the source for the given entity's eid""" |
781 return self.sources_by_uri[self.type_and_source_from_eid(eid, session)[1]] |
781 return self.sources_by_uri[self.type_and_source_from_eid(eid, session)[1]] |
782 |
782 |
783 def eid2extid(self, source, eid, session=None): |
783 def eid2extid(self, source, eid, session=None): |
784 """get local id from an eid""" |
784 """get local id from an eid""" |
785 etype, uri, extid = self.type_and_source_from_eid(eid, session) |
785 etype, uri, extid = self.type_and_source_from_eid(eid, session) |
786 if source.uri != uri: |
786 if source.uri != uri: |
787 # eid not from the given source |
787 # eid not from the given source |
846 session.commit(reset_pool) |
846 session.commit(reset_pool) |
847 return eid |
847 return eid |
848 except: |
848 except: |
849 session.rollback(reset_pool) |
849 session.rollback(reset_pool) |
850 raise |
850 raise |
851 |
851 |
852 def add_info(self, session, entity, source, extid=None, complete=True): |
852 def add_info(self, session, entity, source, extid=None, complete=True): |
853 """add type and source info for an eid into the system table, |
853 """add type and source info for an eid into the system table, |
854 and index the entity with the full text index |
854 and index the entity with the full text index |
855 """ |
855 """ |
856 # begin by inserting eid/type/source/extid into the entities table |
856 # begin by inserting eid/type/source/extid into the entities table |
860 session.add_query_data('neweids', entity.eid) |
860 session.add_query_data('neweids', entity.eid) |
861 # now we can update the full text index |
861 # now we can update the full text index |
862 if self.do_fti: |
862 if self.do_fti: |
863 FTIndexEntityOp(session, entity=entity) |
863 FTIndexEntityOp(session, entity=entity) |
864 CleanupEidTypeCacheOp(session) |
864 CleanupEidTypeCacheOp(session) |
865 |
865 |
866 def delete_info(self, session, eid): |
866 def delete_info(self, session, eid): |
867 self._prepare_delete_info(session, eid) |
867 self._prepare_delete_info(session, eid) |
868 self._delete_info(session, eid) |
868 self._delete_info(session, eid) |
869 |
869 |
870 def _prepare_delete_info(self, session, eid): |
870 def _prepare_delete_info(self, session, eid): |
871 """prepare the repository for deletion of an entity: |
871 """prepare the repository for deletion of an entity: |
872 * update the fti |
872 * update the fti |
873 * mark eid as being deleted in session info |
873 * mark eid as being deleted in session info |
874 * setup cache update operation |
874 * setup cache update operation |
875 """ |
875 """ |
876 self.system_source.fti_unindex_entity(session, eid) |
876 self.system_source.fti_unindex_entity(session, eid) |
877 pending = session.query_data('pendingeids', set(), setdefault=True) |
877 pending = session.query_data('pendingeids', set(), setdefault=True) |
878 pending.add(eid) |
878 pending.add(eid) |
879 CleanupEidTypeCacheOp(session) |
879 CleanupEidTypeCacheOp(session) |
880 |
880 |
881 def _delete_info(self, session, eid): |
881 def _delete_info(self, session, eid): |
882 """delete system information on deletion of an entity: |
882 """delete system information on deletion of an entity: |
883 * delete all relations on this entity |
883 * delete all relations on this entity |
884 * transfer record from the entities table to the deleted_entities table |
884 * transfer record from the entities table to the deleted_entities table |
885 """ |
885 """ |
886 etype, uri, extid = self.type_and_source_from_eid(eid, session) |
886 etype, uri, extid = self.type_and_source_from_eid(eid, session) |
887 self._clear_eid_relations(session, etype, eid) |
887 self._clear_eid_relations(session, etype, eid) |
888 self.system_source.delete_info(session, eid, etype, uri, extid) |
888 self.system_source.delete_info(session, eid, etype, uri, extid) |
889 |
889 |
890 def _clear_eid_relations(self, session, etype, eid): |
890 def _clear_eid_relations(self, session, etype, eid): |
891 """when a entity is deleted, build and execute rql query to delete all |
891 """when a entity is deleted, build and execute rql query to delete all |
892 its relations |
892 its relations |
893 """ |
893 """ |
894 rql = [] |
894 rql = [] |
915 if entity.eid in alreadydone: |
915 if entity.eid in alreadydone: |
916 self.info('skipping reindexation of %s, already done', entity.eid) |
916 self.info('skipping reindexation of %s, already done', entity.eid) |
917 return |
917 return |
918 alreadydone.add(entity.eid) |
918 alreadydone.add(entity.eid) |
919 self.system_source.fti_index_entity(session, entity) |
919 self.system_source.fti_index_entity(session, entity) |
920 |
920 |
921 def locate_relation_source(self, session, subject, rtype, object): |
921 def locate_relation_source(self, session, subject, rtype, object): |
922 subjsource = self.source_from_eid(subject, session) |
922 subjsource = self.source_from_eid(subject, session) |
923 objsource = self.source_from_eid(object, session) |
923 objsource = self.source_from_eid(object, session) |
924 if not (subjsource is objsource and subjsource.support_relation(rtype, 1)): |
924 if not (subjsource is objsource and subjsource.support_relation(rtype, 1)): |
925 source = self.system_source |
925 source = self.system_source |
926 if not source.support_relation(rtype, 1): |
926 if not source.support_relation(rtype, 1): |
927 raise RTypeNotSupportedBySources(rtype) |
927 raise RTypeNotSupportedBySources(rtype) |
928 else: |
928 else: |
929 source = subjsource |
929 source = subjsource |
930 return source |
930 return source |
931 |
931 |
932 def locate_etype_source(self, etype): |
932 def locate_etype_source(self, etype): |
933 for source in self.sources: |
933 for source in self.sources: |
934 if source.support_entity(etype, 1): |
934 if source.support_entity(etype, 1): |
935 return source |
935 return source |
936 else: |
936 else: |
937 raise ETypeNotSupportedBySources(etype) |
937 raise ETypeNotSupportedBySources(etype) |
938 |
938 |
939 def glob_add_entity(self, session, entity): |
939 def glob_add_entity(self, session, entity): |
940 """add an entity to the repository |
940 """add an entity to the repository |
941 |
941 |
942 the entity eid should originaly be None and a unique eid is assigned to |
942 the entity eid should originaly be None and a unique eid is assigned to |
943 the entity instance |
943 the entity instance |
944 """ |
944 """ |
945 entity = entity.pre_add_hook() |
945 entity = entity.pre_add_hook() |
946 eschema = entity.e_schema |
946 eschema = entity.e_schema |
979 self.hm.call_hooks('before_add_relation', attr, session, |
979 self.hm.call_hooks('before_add_relation', attr, session, |
980 entity.eid, attr, value) |
980 entity.eid, attr, value) |
981 self.hm.call_hooks('after_add_relation', attr, session, |
981 self.hm.call_hooks('after_add_relation', attr, session, |
982 entity.eid, attr, value) |
982 entity.eid, attr, value) |
983 return entity.eid |
983 return entity.eid |
984 |
984 |
985 def glob_update_entity(self, session, entity): |
985 def glob_update_entity(self, session, entity): |
986 """replace an entity in the repository |
986 """replace an entity in the repository |
987 the type and the eid of an entity must not be changed |
987 the type and the eid of an entity must not be changed |
988 """ |
988 """ |
989 #print 'update', entity |
989 #print 'update', entity |
1049 self._delete_info(session, eid) |
1049 self._delete_info(session, eid) |
1050 source.delete_entity(session, etype, eid) |
1050 source.delete_entity(session, etype, eid) |
1051 if source.should_call_hooks: |
1051 if source.should_call_hooks: |
1052 self.hm.call_hooks('after_delete_entity', etype, session, eid) |
1052 self.hm.call_hooks('after_delete_entity', etype, session, eid) |
1053 # don't clear cache here this is done in a hook on commit |
1053 # don't clear cache here this is done in a hook on commit |
1054 |
1054 |
1055 def glob_add_relation(self, session, subject, rtype, object): |
1055 def glob_add_relation(self, session, subject, rtype, object): |
1056 """add a relation to the repository""" |
1056 """add a relation to the repository""" |
1057 assert subject is not None |
1057 assert subject is not None |
1058 assert rtype |
1058 assert rtype |
1059 assert object is not None |
1059 assert object is not None |
1087 self.hm.call_hooks('after_delete_relation', rtype, session, |
1087 self.hm.call_hooks('after_delete_relation', rtype, session, |
1088 subject, rtype, object) |
1088 subject, rtype, object) |
1089 |
1089 |
1090 |
1090 |
1091 # pyro handling ########################################################### |
1091 # pyro handling ########################################################### |
1092 |
1092 |
1093 def pyro_register(self, host=''): |
1093 def pyro_register(self, host=''): |
1094 """register the repository as a pyro object""" |
1094 """register the repository as a pyro object""" |
1095 from Pyro import core |
1095 from Pyro import core |
1096 port = self.config['pyro-port'] |
1096 port = self.config['pyro-port'] |
1097 nshost, nsgroup = self.config['pyro-ns-host'], self.config['pyro-ns-group'] |
1097 nshost, nsgroup = self.config['pyro-ns-host'], self.config['pyro-ns-group'] |
1106 daemon.connect(impl, '%s.%s' % (nsgroup, nsid)) |
1106 daemon.connect(impl, '%s.%s' % (nsgroup, nsid)) |
1107 msg = 'repository registered as a pyro object using group %s and id %s' |
1107 msg = 'repository registered as a pyro object using group %s and id %s' |
1108 self.info(msg, nsgroup, nsid) |
1108 self.info(msg, nsgroup, nsid) |
1109 self.pyro_registered = True |
1109 self.pyro_registered = True |
1110 return daemon |
1110 return daemon |
1111 |
1111 |
1112 def pyro_nameserver(self, host=None, group=None): |
1112 def pyro_nameserver(self, host=None, group=None): |
1113 """locate and bind the the name server to the daemon""" |
1113 """locate and bind the the name server to the daemon""" |
1114 from Pyro import naming, errors |
1114 from Pyro import naming, errors |
1115 # locate the name server |
1115 # locate the name server |
1116 nameserver = naming.NameServerLocator().getNS(host) |
1116 nameserver = naming.NameServerLocator().getNS(host) |
1121 except errors.NamingError: |
1121 except errors.NamingError: |
1122 pass |
1122 pass |
1123 return nameserver |
1123 return nameserver |
1124 |
1124 |
1125 # multi-sources planner helpers ########################################### |
1125 # multi-sources planner helpers ########################################### |
1126 |
1126 |
1127 @cached |
1127 @cached |
1128 def rel_type_sources(self, rtype): |
1128 def rel_type_sources(self, rtype): |
1129 return [source for source in self.sources |
1129 return [source for source in self.sources |
1130 if source.support_relation(rtype) |
1130 if source.support_relation(rtype) |
1131 or rtype in source.dont_cross_relations] |
1131 or rtype in source.dont_cross_relations] |
1132 |
1132 |
1133 @cached |
1133 @cached |
1134 def can_cross_relation(self, rtype): |
1134 def can_cross_relation(self, rtype): |
1135 return [source for source in self.sources |
1135 return [source for source in self.sources |
1136 if source.support_relation(rtype) |
1136 if source.support_relation(rtype) |
1137 and rtype in source.cross_relations] |
1137 and rtype in source.cross_relations] |
1138 |
1138 |
1139 @cached |
1139 @cached |
1140 def is_multi_sources_relation(self, rtype): |
1140 def is_multi_sources_relation(self, rtype): |
1141 return any(source for source in self.sources |
1141 return any(source for source in self.sources |
1142 if not source is self.system_source |
1142 if not source is self.system_source |
1143 and source.support_relation(rtype)) |
1143 and source.support_relation(rtype)) |
1144 |
1144 |
1145 |
1145 |
1146 def pyro_unregister(config): |
1146 def pyro_unregister(config): |
1147 """unregister the repository from the pyro name server""" |
1147 """unregister the repository from the pyro name server""" |
1148 nshost, nsgroup = config['pyro-ns-host'], config['pyro-ns-group'] |
1148 nshost, nsgroup = config['pyro-ns-host'], config['pyro-ns-group'] |
1149 appid = config['pyro-id'] or config.appid |
1149 appid = config['pyro-id'] or config.appid |