# HG changeset patch # User Aurelien Campeas # Date 1402498471 -7200 # Node ID 21278eb03bbf1396c626a688eb8b9e8aede33383 # Parent f3936f64bd98d42993287ecc17a4fca6e50c5725 [datafeed sources] finish the session -> cnx switch Related to #3933480. diff -r f3936f64bd98 -r 21278eb03bbf server/sources/datafeed.py --- a/server/sources/datafeed.py Fri Jun 06 15:56:24 2014 +0200 +++ b/server/sources/datafeed.py Wed Jun 11 16:54:31 2014 +0200 @@ -113,18 +113,18 @@ self.parser_id = source_entity.parser self.load_mapping(source_entity._cw) - def _get_parser(self, session, **kwargs): + def _get_parser(self, cnx, **kwargs): return self.repo.vreg['parsers'].select( - self.parser_id, session, source=self, **kwargs) + self.parser_id, cnx, source=self, **kwargs) - def load_mapping(self, session): + def load_mapping(self, cnx): self.mapping = {} self.mapping_idx = {} try: - parser = self._get_parser(session) + parser = self._get_parser(cnx) except (RegistryNotFound, ObjectNotFound): return # no parser yet, don't go further - self._load_mapping(session, parser=parser) + self._load_mapping(cnx, parser=parser) def add_schema_config(self, schemacfg, checkonly=False, parser=None): """added CWSourceSchemaConfig, modify mapping accordingly""" @@ -146,7 +146,7 @@ def update_latest_retrieval(self, cnx): self.latest_retrieval = datetime.utcnow() cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', - {'x': self.eid, 'date': self.latest_retrieval}) + {'x': self.eid, 'date': self.latest_retrieval}) cnx.commit() def acquire_synchronization_lock(self, cnx): @@ -165,7 +165,7 @@ def release_synchronization_lock(self, cnx): cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', - {'x': self.eid}) + {'x': self.eid}) cnx.commit() def pull_data(self, cnx, force=False, raise_on_error=False): @@ -225,7 +225,7 @@ error = True return error - def before_entity_insertion(self, session, lid, etype, eid, sourceparams): + def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams): """called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. @@ -234,29 +234,29 @@ entity. """ entity = super(DataFeedSource, self).before_entity_insertion( - session, lid, etype, eid, sourceparams) + cnx, lid, etype, eid, sourceparams) entity.cw_edited['cwuri'] = lid.decode('utf-8') entity.cw_edited.set_defaults() sourceparams['parser'].before_entity_copy(entity, sourceparams) return entity - def after_entity_insertion(self, session, lid, entity, sourceparams): + def after_entity_insertion(self, cnx, lid, entity, sourceparams): """called by the repository after an entity stored here has been inserted in the system table. """ - relations = preprocess_inlined_relations(session, entity) - if session.is_hook_category_activated('integrity'): + relations = preprocess_inlined_relations(cnx, entity) + if cnx.is_hook_category_activated('integrity'): entity.cw_edited.check(creation=True) - self.repo.system_source.add_entity(session, entity) + self.repo.system_source.add_entity(cnx, entity) entity.cw_edited.saved = entity._cw_is_saved = True sourceparams['parser'].after_entity_copy(entity, sourceparams) # call hooks for inlined relations call_hooks = self.repo.hm.call_hooks if self.should_call_hooks: for attr, value in relations: - call_hooks('before_add_relation', session, + call_hooks('before_add_relation', cnx, eidfrom=entity.eid, rtype=attr, eidto=value) - call_hooks('after_add_relation', session, + call_hooks('after_add_relation', cnx, eidfrom=entity.eid, rtype=attr, eidto=value) def source_cwuris(self, cnx): @@ -266,8 +266,8 @@ return dict((b64decode(uri), (eid, type)) for uri, eid, type in cnx.system_sql(sql).fetchall()) - def init_import_log(self, session, **kwargs): - dataimport = session.create_entity('CWDataImport', cw_import_of=self, + def init_import_log(self, cnx, **kwargs): + dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, start_timestamp=datetime.utcnow(), **kwargs) dataimport.init() @@ -277,8 +277,8 @@ class DataFeedParser(AppObject): __registry__ = 'parsers' - def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs): - super(DataFeedParser, self).__init__(session, **kwargs) + def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs): + super(DataFeedParser, self).__init__(cnx, **kwargs) self.source = source self.sourceuris = sourceuris self.import_log = import_log @@ -305,20 +305,20 @@ """return an entity for the given uri. May return None if it should be skipped """ - session = self._cw + cnx = self._cw # if cwsource is specified and repository has a source with the same # name, call extid2eid on that source so entity will be properly seen as # coming from this source source_uri = sourceparams.pop('cwsource', None) if source_uri is not None and source_uri != 'system': - source = session.repo.sources_by_uri.get(source_uri, self.source) + source = cnx.repo.sources_by_uri.get(source_uri, self.source) else: source = self.source sourceparams['parser'] = self if isinstance(uri, unicode): uri = uri.encode('utf-8') try: - eid = session.repo.extid2eid(source, str(uri), etype, session, + eid = cnx.repo.extid2eid(source, str(uri), etype, cnx, sourceparams=sourceparams) except ValidationError as ex: # XXX use critical so they are seen during tests. Should consider @@ -333,14 +333,14 @@ # Don't give etype to entity_from_eid so we get UnknownEid if the # entity has been removed try: - entity = session.entity_from_eid(-eid) + entity = cnx.entity_from_eid(-eid) except UnknownEid: return None self.notify_updated(entity) # avoid later update from the source's data return entity if self.sourceuris is not None: self.sourceuris.pop(str(uri), None) - return session.entity_from_eid(eid, etype) + return cnx.entity_from_eid(eid, etype) def process(self, url, raise_on_error=False): """main callback: process the url""" @@ -371,7 +371,7 @@ """ return True - def handle_deletion(self, config, session, myuris): + def handle_deletion(self, config, cnx, myuris): if config['delete-entities'] and myuris: byetype = {} for extid, (eid, etype) in myuris.iteritems(): @@ -379,10 +379,9 @@ byetype.setdefault(etype, []).append(str(eid)) for etype, eids in byetype.iteritems(): self.warning('delete %s %s entities', len(eids), etype) - session.set_cnxset() - session.execute('DELETE %s X WHERE X eid IN (%s)' - % (etype, ','.join(eids))) - session.commit() + cnx.execute('DELETE %s X WHERE X eid IN (%s)' + % (etype, ','.join(eids))) + cnx.commit() def update_if_necessary(self, entity, attrs): entity.complete(tuple(attrs)) @@ -410,13 +409,8 @@ self.import_log.record_error(str(ex)) return True error = False - # Check whether self._cw is a session or a connection - if getattr(self._cw, 'commit', None) is not None: - commit = self._cw.commit - rollback = self._cw.rollback - else: - commit = self._cw.cnx.commit - rollback = self._cw.cnx.rollback + commit = self._cw.commit + rollback = self._cw.rollback for args in parsed: try: self.process_item(*args) diff -r f3936f64bd98 -r 21278eb03bbf sobjects/ldapparser.py --- a/sobjects/ldapparser.py Fri Jun 06 15:56:24 2014 +0200 +++ b/sobjects/ldapparser.py Wed Jun 11 16:54:31 2014 +0200 @@ -92,9 +92,9 @@ for groupdict in self.group_source_entities_by_extid.itervalues(): self._process('CWGroup', groupdict) - def handle_deletion(self, config, session, myuris): + def handle_deletion(self, config, cnx, myuris): if config['delete-entities']: - super(DataFeedLDAPAdapter, self).handle_deletion(config, session, myuris) + super(DataFeedLDAPAdapter, self).handle_deletion(config, cnx, myuris) return if myuris: byetype = {} @@ -107,9 +107,9 @@ continue self.info('deactivate %s %s entities', len(eids), etype) for eid in eids: - wf = session.entity_from_eid(eid).cw_adapt_to('IWorkflowable') + wf = cnx.entity_from_eid(eid).cw_adapt_to('IWorkflowable') wf.fire_transition_if_possible('deactivate') - session.commit(free_cnxset=False) + cnx.commit() def update_if_necessary(self, entity, attrs): # disable read security to allow password selection diff -r f3936f64bd98 -r 21278eb03bbf sobjects/test/unittest_cwxmlparser.py --- a/sobjects/test/unittest_cwxmlparser.py Fri Jun 06 15:56:24 2014 +0200 +++ b/sobjects/test/unittest_cwxmlparser.py Wed Jun 11 16:54:31 2014 +0200 @@ -132,13 +132,14 @@ REMOVE THE DATABASE TEMPLATE else it won't be considered """ test_db_id = 'xmlparser' + @classmethod - def pre_setup_database(cls, session, config): - myfeed = session.create_entity('CWSource', name=u'myfeed', type=u'datafeed', + def pre_setup_database(cls, cnx, config): + myfeed = cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed', parser=u'cw.entityxml', url=BASEXML) - myotherfeed = session.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed', - parser=u'cw.entityxml', url=OTHERXML) - session.commit() + myotherfeed = cnx.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed', + parser=u'cw.entityxml', url=OTHERXML) + cnx.commit() myfeed.init_mapping([(('CWUser', 'use_email', '*'), u'role=subject\naction=copy'), (('CWUser', 'in_group', '*'), @@ -153,7 +154,8 @@ (('CWUser', 'in_state', '*'), u'role=subject\naction=link\nlinkattr=name'), ]) - session.create_entity('Tag', name=u'hop') + cnx.create_entity('Tag', name=u'hop') + cnx.commit() def test_complete_url(self): dfsource = self.repo.sources_by_uri['myfeed']