724 set_log_methods(Operation, getLogger('cubicweb.session')) |
723 set_log_methods(Operation, getLogger('cubicweb.session')) |
725 |
724 |
726 def _container_add(container, value): |
725 def _container_add(container, value): |
727 {set: set.add, list: list.append}[container.__class__](container, value) |
726 {set: set.add, list: list.append}[container.__class__](container, value) |
728 |
727 |
|
728 |
|
729 class DataOperationMixIn(object): |
|
730 """Mix-in class to ease applying a single operation on a set of data, |
|
731 avoiding to create as many as operation as they are individual modification. |
|
732 The body of the operation must then iterate over the values that have been |
|
733 stored in a single operation instance. |
|
734 |
|
735 You should try to use this instead of creating on operation for each |
|
736 `value`, since handling operations becomes costly on massive data import. |
|
737 |
|
738 Usage looks like: |
|
739 .. sourcecode:: python |
|
740 |
|
741 class MyEntityHook(Hook): |
|
742 __regid__ = 'my.entity.hook' |
|
743 __select__ = Hook.__select__ & is_instance('MyEntity') |
|
744 events = ('after_add_entity',) |
|
745 |
|
746 def __call__(self): |
|
747 MyOperation.get_instance(self._cw).add_data(self.entity) |
|
748 |
|
749 |
|
750 class MyOperation(DataOperation, DataOperationMixIn): |
|
751 def precommit_event(self): |
|
752 for bucket in self.get_data(): |
|
753 process(bucket) |
|
754 |
|
755 You can modify the `containercls` class attribute, which defines the |
|
756 container class that should be instantiated to hold payloads. An instance is |
|
757 created on instantiation, and then the :meth:`add_data` method will add the |
|
758 given data to the existing container. Default to a `set`. Give `list` if you |
|
759 want to keep arrival ordering. You can also use another kind of container |
|
760 by redefining :meth:`_build_container` and :meth:`add_data` |
|
761 |
|
762 More optional parameters can be given to the `get_instance` operation, that |
|
763 will be given to the operation constructer (though those parameters should |
|
764 not vary accross different calls to this method for a same operation for |
|
765 obvious reason). |
|
766 |
|
767 .. Note:: |
|
768 For sanity reason `get_data` will reset the operation, so that once |
|
769 the operation has started its treatment, if some hook want to push |
|
770 additional data to this same operation, a new instance will be created |
|
771 (else that data has a great chance to be never treated). This implies: |
|
772 |
|
773 * you should **always** call `get_data` when starting treatment |
|
774 |
|
775 * you should **never** call `get_data` for another reason. |
|
776 """ |
|
777 containercls = set |
|
778 |
|
779 @classproperty |
|
780 def data_key(cls): |
|
781 return ('cw.dataops', cls.__name__) |
|
782 |
|
783 @classmethod |
|
784 def get_instance(cls, session, **kwargs): |
|
785 # no need to lock: transaction_data already comes from thread's local storage |
|
786 try: |
|
787 return session.transaction_data[cls.data_key] |
|
788 except KeyError: |
|
789 op = session.transaction_data[cls.data_key] = cls(session, **kwargs) |
|
790 return op |
|
791 |
|
792 def __init__(self, *args, **kwargs): |
|
793 super(DataOperationMixIn, self).__init__(*args, **kwargs) |
|
794 self._container = self._build_container() |
|
795 self._processed = False |
|
796 |
|
797 def __contains__(self, value): |
|
798 return value in self._container |
|
799 |
|
800 def _build_container(self): |
|
801 return self.containercls() |
|
802 |
|
803 def add_data(self, data): |
|
804 assert not self._processed, """Trying to add data to a closed operation. |
|
805 Iterating over operation data closed it and should be reserved to precommit / |
|
806 postcommit method of the operation.""" |
|
807 _container_add(self._container, data) |
|
808 |
|
809 def get_data(self): |
|
810 assert not self._processed, """Trying to get data from a closed operation. |
|
811 Iterating over operation data closed it and should be reserved to precommit / |
|
812 postcommit method of the operation.""" |
|
813 self._processed = True |
|
814 op = self.session.transaction_data.pop(self.data_key) |
|
815 assert op is self, "Bad handling of operation data, found %s instead of %s for key %s" % ( |
|
816 op, self, self.data_key) |
|
817 return self._container |
|
818 |
|
819 |
|
820 @deprecated('[3.10] use opcls.get_instance(session, **opkwargs).add_data(value)') |
729 def set_operation(session, datakey, value, opcls, containercls=set, **opkwargs): |
821 def set_operation(session, datakey, value, opcls, containercls=set, **opkwargs): |
730 """Function to ease applying a single operation on a set of data, avoiding |
822 """Function to ease applying a single operation on a set of data, avoiding |
731 to create as many as operation as they are individual modification. You |
823 to create as many as operation as they are individual modification. You |
732 should try to use this instead of creating on operation for each `value`, |
824 should try to use this instead of creating on operation for each `value`, |
733 since handling operations becomes coslty on massive data import. |
825 since handling operations becomes coslty on massive data import. |
859 execute = self.session.execute |
948 execute = self.session.execute |
860 for rql in self.rqls: |
949 for rql in self.rqls: |
861 execute(*rql) |
950 execute(*rql) |
862 |
951 |
863 |
952 |
864 class CleanupNewEidsCacheOp(SingleLastOperation): |
953 class CleanupNewEidsCacheOp(DataOperationMixIn, SingleLastOperation): |
865 """on rollback of a insert query we have to remove from repository's |
954 """on rollback of a insert query we have to remove from repository's |
866 type/source cache eids of entities added in that transaction. |
955 type/source cache eids of entities added in that transaction. |
867 |
956 |
868 NOTE: querier's rqlst/solutions cache may have been polluted too with |
957 NOTE: querier's rqlst/solutions cache may have been polluted too with |
869 queries such as Any X WHERE X eid 32 if 32 has been rollbacked however |
958 queries such as Any X WHERE X eid 32 if 32 has been rollbacked however |
870 generated queries are unpredictable and analysing all the cache probably |
959 generated queries are unpredictable and analysing all the cache probably |
871 too expensive. Notice that there is no pb when using args to specify eids |
960 too expensive. Notice that there is no pb when using args to specify eids |
872 instead of giving them into the rql string. |
961 instead of giving them into the rql string. |
873 """ |
962 """ |
|
963 data_key = 'neweids' |
874 |
964 |
875 def rollback_event(self): |
965 def rollback_event(self): |
876 """the observed connections pool has been rollbacked, |
966 """the observed connections pool has been rollbacked, |
877 remove inserted eid from repository type/source cache |
967 remove inserted eid from repository type/source cache |
878 """ |
968 """ |
879 try: |
969 try: |
880 self.session.repo.clear_caches( |
970 self.session.repo.clear_caches(self.get_data()) |
881 self.session.transaction_data['neweids']) |
|
882 except KeyError: |
971 except KeyError: |
883 pass |
972 pass |
884 |
973 |
885 class CleanupDeletedEidsCacheOp(SingleLastOperation): |
974 class CleanupDeletedEidsCacheOp(DataOperationMixIn, SingleLastOperation): |
886 """on commit of delete query, we have to remove from repository's |
975 """on commit of delete query, we have to remove from repository's |
887 type/source cache eids of entities deleted in that transaction. |
976 type/source cache eids of entities deleted in that transaction. |
888 """ |
977 """ |
889 |
978 data_key = 'pendingeids' |
890 def postcommit_event(self): |
979 def postcommit_event(self): |
891 """the observed connections pool has been rollbacked, |
980 """the observed connections pool has been rollbacked, |
892 remove inserted eid from repository type/source cache |
981 remove inserted eid from repository type/source cache |
893 """ |
982 """ |
894 try: |
983 try: |
895 self.session.repo.clear_caches( |
984 self.session.repo.clear_caches(self.get_data()) |
896 self.session.transaction_data['pendingeids']) |
|
897 except KeyError: |
985 except KeyError: |
898 pass |
986 pass |