38 # would need a two phases commit or like, but I don't know how to do |
38 # would need a two phases commit or like, but I don't know how to do |
39 # this using the db-api... |
39 # this using the db-api... |
40 for source, cnx in self.source_cnxs.values(): |
40 for source, cnx in self.source_cnxs.values(): |
41 # let exception propagates |
41 # let exception propagates |
42 cnx.commit() |
42 cnx.commit() |
43 |
43 |
44 def rollback(self): |
44 def rollback(self): |
45 """rollback the current transaction for this user""" |
45 """rollback the current transaction for this user""" |
46 for source, cnx in self.source_cnxs.values(): |
46 for source, cnx in self.source_cnxs.values(): |
47 # catch exceptions, rollback other sources anyway |
47 # catch exceptions, rollback other sources anyway |
48 try: |
48 try: |
62 for _, cnx in self.source_cnxs.values(): |
62 for _, cnx in self.source_cnxs.values(): |
63 try: |
63 try: |
64 cnx.close() |
64 cnx.close() |
65 except: |
65 except: |
66 continue |
66 continue |
67 |
67 |
68 # internals ############################################################### |
68 # internals ############################################################### |
69 |
69 |
70 def pool_set(self, session): |
70 def pool_set(self, session): |
71 """pool is being set""" |
71 """pool is being set""" |
72 self.check_connections() |
72 self.check_connections() |
73 |
73 |
74 def pool_reset(self, session): |
74 def pool_reset(self, session): |
75 """pool is being reseted""" |
75 """pool is being reseted""" |
76 for source, cnx in self.source_cnxs.values(): |
76 for source, cnx in self.source_cnxs.values(): |
77 source.pool_reset(cnx) |
77 source.pool_reset(cnx) |
78 |
78 |
79 def __getitem__(self, uri): |
79 def __getitem__(self, uri): |
80 """subscription notation provide access to sources'cursors""" |
80 """subscription notation provide access to sources'cursors""" |
81 try: |
81 try: |
82 cursor = self._cursors[uri] |
82 cursor = self._cursors[uri] |
83 except KeyError: |
83 except KeyError: |
84 cursor = self.source_cnxs[uri][1].cursor() |
84 cursor = self.source_cnxs[uri][1].cursor() |
85 if cursor is not None: |
85 if cursor is not None: |
86 # None possible on sources without cursor support such as ldap |
86 # None possible on sources without cursor support such as ldap |
87 self._cursors[uri] = cursor |
87 self._cursors[uri] = cursor |
88 return cursor |
88 return cursor |
89 |
89 |
90 def sources(self): |
90 def sources(self): |
91 """return the source objects handled by this pool""" |
91 """return the source objects handled by this pool""" |
92 # implementation details of flying insert requires the system source |
92 # implementation details of flying insert requires the system source |
93 # first |
93 # first |
94 yield self.source_cnxs['system'] |
94 yield self.source_cnxs['system'] |
95 for uri, (source, cursor) in self.source_cnxs.items(): |
95 for uri, (source, cursor) in self.source_cnxs.items(): |
96 if uri == 'system': |
96 if uri == 'system': |
97 continue |
97 continue |
98 yield source |
98 yield source |
99 #return [source_cnx[0] for source_cnx in self.source_cnxs.values()] |
99 #return [source_cnx[0] for source_cnx in self.source_cnxs.values()] |
100 |
100 |
101 def source(self, uid): |
101 def source(self, uid): |
102 """return the source object with the given uri""" |
102 """return the source object with the given uri""" |
103 return self.source_cnxs[uid][0] |
103 return self.source_cnxs[uid][0] |
104 |
104 |
105 def connection(self, uid): |
105 def connection(self, uid): |
106 """return the connection on the source object with the given uri""" |
106 """return the connection on the source object with the given uri""" |
107 return self.source_cnxs[uid][1] |
107 return self.source_cnxs[uid][1] |
108 |
108 |
109 def reconnect(self, source): |
109 def reconnect(self, source): |
110 """reopen a connection for this source""" |
110 """reopen a connection for this source""" |
111 source.info('trying to reconnect') |
111 source.info('trying to reconnect') |
112 self.source_cnxs[source.uri] = (source, source.get_connection()) |
112 self.source_cnxs[source.uri] = (source, source.get_connection()) |
113 del self._cursors[source.uri] |
113 del self._cursors[source.uri] |
114 |
114 |
115 def check_connections(self): |
115 def check_connections(self): |
116 for source, cnx in self.source_cnxs.itervalues(): |
116 for source, cnx in self.source_cnxs.itervalues(): |
117 newcnx = source.check_connection(cnx) |
117 newcnx = source.check_connection(cnx) |
131 the pool is preparing to commit. You shouldn't do anything things which |
131 the pool is preparing to commit. You shouldn't do anything things which |
132 has to be reverted if the commit fail at this point, but you can freely |
132 has to be reverted if the commit fail at this point, but you can freely |
133 do any heavy computation or raise an exception if the commit can't go. |
133 do any heavy computation or raise an exception if the commit can't go. |
134 You can add some new operation during this phase but their precommit |
134 You can add some new operation during this phase but their precommit |
135 event won't be triggered |
135 event won't be triggered |
136 |
136 |
137 commit: |
137 commit: |
138 the pool is preparing to commit. You should avoid to do to expensive |
138 the pool is preparing to commit. You should avoid to do to expensive |
139 stuff or something that may cause an exception in this event |
139 stuff or something that may cause an exception in this event |
140 |
140 |
141 revertcommit: |
141 revertcommit: |
142 if an operation failed while commited, this event is triggered for |
142 if an operation failed while commited, this event is triggered for |
143 all operations which had their commit event already to let them |
143 all operations which had their commit event already to let them |
144 revert things (including the operation which made fail the commit) |
144 revert things (including the operation which made fail the commit) |
145 |
145 |
163 self.__dict__.update(kwargs) |
163 self.__dict__.update(kwargs) |
164 self.register(session) |
164 self.register(session) |
165 # execution information |
165 # execution information |
166 self.processed = None # 'precommit', 'commit' |
166 self.processed = None # 'precommit', 'commit' |
167 self.failed = False |
167 self.failed = False |
168 |
168 |
169 def register(self, session): |
169 def register(self, session): |
170 session.add_operation(self, self.insert_index()) |
170 session.add_operation(self, self.insert_index()) |
171 |
171 |
172 def insert_index(self): |
172 def insert_index(self): |
173 """return the index of the lastest instance which is not a |
173 """return the index of the lastest instance which is not a |
174 LateOperation instance |
174 LateOperation instance |
175 """ |
175 """ |
176 for i, op in enumerate(self.session.pending_operations): |
176 for i, op in enumerate(self.session.pending_operations): |
177 if isinstance(op, (LateOperation, SingleLastOperation)): |
177 if isinstance(op, (LateOperation, SingleLastOperation)): |
178 return i |
178 return i |
179 return None |
179 return None |
180 |
180 |
181 def handle_event(self, event): |
181 def handle_event(self, event): |
182 """delegate event handling to the opertaion""" |
182 """delegate event handling to the opertaion""" |
183 getattr(self, event)() |
183 getattr(self, event)() |
184 |
184 |
185 def precommit_event(self): |
185 def precommit_event(self): |
186 """the observed connections pool is preparing a commit""" |
186 """the observed connections pool is preparing a commit""" |
187 |
187 |
188 def revertprecommit_event(self): |
188 def revertprecommit_event(self): |
189 """an error went when pre-commiting this operation or a later one |
189 """an error went when pre-commiting this operation or a later one |
190 |
190 |
191 should revert pre-commit's changes but take care, they may have not |
191 should revert pre-commit's changes but take care, they may have not |
192 been all considered if it's this operation which failed |
192 been all considered if it's this operation which failed |
193 """ |
193 """ |
194 |
194 |
195 def commit_event(self): |
195 def commit_event(self): |
196 """the observed connections pool is commiting""" |
196 """the observed connections pool is commiting""" |
197 raise NotImplementedError() |
197 raise NotImplementedError() |
198 |
198 |
199 def revertcommit_event(self): |
199 def revertcommit_event(self): |
200 """an error went when commiting this operation or a later one |
200 """an error went when commiting this operation or a later one |
201 |
201 |
202 should revert commit's changes but take care, they may have not |
202 should revert commit's changes but take care, they may have not |
203 been all considered if it's this operation which failed |
203 been all considered if it's this operation which failed |
204 """ |
204 """ |
205 |
205 |
206 def rollback_event(self): |
206 def rollback_event(self): |
207 """the observed connections pool has been rollbacked |
207 """the observed connections pool has been rollbacked |
208 |
208 |
209 do nothing by default, the operation will just be removed from the pool |
209 do nothing by default, the operation will just be removed from the pool |
210 operation list |
210 operation list |
211 """ |
211 """ |
212 |
212 |
213 |
213 |