--- a/server/sources/datafeed.py Fri Jun 06 15:56:24 2014 +0200
+++ b/server/sources/datafeed.py Wed Jun 11 16:54:31 2014 +0200
@@ -113,18 +113,18 @@
self.parser_id = source_entity.parser
self.load_mapping(source_entity._cw)
- def _get_parser(self, session, **kwargs):
+ def _get_parser(self, cnx, **kwargs):
return self.repo.vreg['parsers'].select(
- self.parser_id, session, source=self, **kwargs)
+ self.parser_id, cnx, source=self, **kwargs)
- def load_mapping(self, session):
+ def load_mapping(self, cnx):
self.mapping = {}
self.mapping_idx = {}
try:
- parser = self._get_parser(session)
+ parser = self._get_parser(cnx)
except (RegistryNotFound, ObjectNotFound):
return # no parser yet, don't go further
- self._load_mapping(session, parser=parser)
+ self._load_mapping(cnx, parser=parser)
def add_schema_config(self, schemacfg, checkonly=False, parser=None):
"""added CWSourceSchemaConfig, modify mapping accordingly"""
@@ -146,7 +146,7 @@
def update_latest_retrieval(self, cnx):
self.latest_retrieval = datetime.utcnow()
cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
- {'x': self.eid, 'date': self.latest_retrieval})
+ {'x': self.eid, 'date': self.latest_retrieval})
cnx.commit()
def acquire_synchronization_lock(self, cnx):
@@ -165,7 +165,7 @@
def release_synchronization_lock(self, cnx):
cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
- {'x': self.eid})
+ {'x': self.eid})
cnx.commit()
def pull_data(self, cnx, force=False, raise_on_error=False):
@@ -225,7 +225,7 @@
error = True
return error
- def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
+ def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
"""called by the repository when an eid has been attributed for an
entity stored here but the entity has not been inserted in the system
table yet.
@@ -234,29 +234,29 @@
entity.
"""
entity = super(DataFeedSource, self).before_entity_insertion(
- session, lid, etype, eid, sourceparams)
+ cnx, lid, etype, eid, sourceparams)
entity.cw_edited['cwuri'] = lid.decode('utf-8')
entity.cw_edited.set_defaults()
sourceparams['parser'].before_entity_copy(entity, sourceparams)
return entity
- def after_entity_insertion(self, session, lid, entity, sourceparams):
+ def after_entity_insertion(self, cnx, lid, entity, sourceparams):
"""called by the repository after an entity stored here has been
inserted in the system table.
"""
- relations = preprocess_inlined_relations(session, entity)
- if session.is_hook_category_activated('integrity'):
+ relations = preprocess_inlined_relations(cnx, entity)
+ if cnx.is_hook_category_activated('integrity'):
entity.cw_edited.check(creation=True)
- self.repo.system_source.add_entity(session, entity)
+ self.repo.system_source.add_entity(cnx, entity)
entity.cw_edited.saved = entity._cw_is_saved = True
sourceparams['parser'].after_entity_copy(entity, sourceparams)
# call hooks for inlined relations
call_hooks = self.repo.hm.call_hooks
if self.should_call_hooks:
for attr, value in relations:
- call_hooks('before_add_relation', session,
+ call_hooks('before_add_relation', cnx,
eidfrom=entity.eid, rtype=attr, eidto=value)
- call_hooks('after_add_relation', session,
+ call_hooks('after_add_relation', cnx,
eidfrom=entity.eid, rtype=attr, eidto=value)
def source_cwuris(self, cnx):
@@ -266,8 +266,8 @@
return dict((b64decode(uri), (eid, type))
for uri, eid, type in cnx.system_sql(sql).fetchall())
- def init_import_log(self, session, **kwargs):
- dataimport = session.create_entity('CWDataImport', cw_import_of=self,
+ def init_import_log(self, cnx, **kwargs):
+ dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
start_timestamp=datetime.utcnow(),
**kwargs)
dataimport.init()
@@ -277,8 +277,8 @@
class DataFeedParser(AppObject):
__registry__ = 'parsers'
- def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
- super(DataFeedParser, self).__init__(session, **kwargs)
+ def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
+ super(DataFeedParser, self).__init__(cnx, **kwargs)
self.source = source
self.sourceuris = sourceuris
self.import_log = import_log
@@ -305,20 +305,20 @@
"""return an entity for the given uri. May return None if it should be
skipped
"""
- session = self._cw
+ cnx = self._cw
# if cwsource is specified and repository has a source with the same
# name, call extid2eid on that source so entity will be properly seen as
# coming from this source
source_uri = sourceparams.pop('cwsource', None)
if source_uri is not None and source_uri != 'system':
- source = session.repo.sources_by_uri.get(source_uri, self.source)
+ source = cnx.repo.sources_by_uri.get(source_uri, self.source)
else:
source = self.source
sourceparams['parser'] = self
if isinstance(uri, unicode):
uri = uri.encode('utf-8')
try:
- eid = session.repo.extid2eid(source, str(uri), etype, session,
+ eid = cnx.repo.extid2eid(source, str(uri), etype, cnx,
sourceparams=sourceparams)
except ValidationError as ex:
# XXX use critical so they are seen during tests. Should consider
@@ -333,14 +333,14 @@
# Don't give etype to entity_from_eid so we get UnknownEid if the
# entity has been removed
try:
- entity = session.entity_from_eid(-eid)
+ entity = cnx.entity_from_eid(-eid)
except UnknownEid:
return None
self.notify_updated(entity) # avoid later update from the source's data
return entity
if self.sourceuris is not None:
self.sourceuris.pop(str(uri), None)
- return session.entity_from_eid(eid, etype)
+ return cnx.entity_from_eid(eid, etype)
def process(self, url, raise_on_error=False):
"""main callback: process the url"""
@@ -371,7 +371,7 @@
"""
return True
- def handle_deletion(self, config, session, myuris):
+ def handle_deletion(self, config, cnx, myuris):
if config['delete-entities'] and myuris:
byetype = {}
for extid, (eid, etype) in myuris.iteritems():
@@ -379,10 +379,9 @@
byetype.setdefault(etype, []).append(str(eid))
for etype, eids in byetype.iteritems():
self.warning('delete %s %s entities', len(eids), etype)
- session.set_cnxset()
- session.execute('DELETE %s X WHERE X eid IN (%s)'
- % (etype, ','.join(eids)))
- session.commit()
+ cnx.execute('DELETE %s X WHERE X eid IN (%s)'
+ % (etype, ','.join(eids)))
+ cnx.commit()
def update_if_necessary(self, entity, attrs):
entity.complete(tuple(attrs))
@@ -410,13 +409,8 @@
self.import_log.record_error(str(ex))
return True
error = False
- # Check whether self._cw is a session or a connection
- if getattr(self._cw, 'commit', None) is not None:
- commit = self._cw.commit
- rollback = self._cw.rollback
- else:
- commit = self._cw.cnx.commit
- rollback = self._cw.cnx.rollback
+ commit = self._cw.commit
+ rollback = self._cw.rollback
for args in parsed:
try:
self.process_item(*args)
--- a/sobjects/test/unittest_cwxmlparser.py Fri Jun 06 15:56:24 2014 +0200
+++ b/sobjects/test/unittest_cwxmlparser.py Wed Jun 11 16:54:31 2014 +0200
@@ -132,13 +132,14 @@
REMOVE THE DATABASE TEMPLATE else it won't be considered
"""
test_db_id = 'xmlparser'
+
@classmethod
- def pre_setup_database(cls, session, config):
- myfeed = session.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
+ def pre_setup_database(cls, cnx, config):
+ myfeed = cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
parser=u'cw.entityxml', url=BASEXML)
- myotherfeed = session.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed',
- parser=u'cw.entityxml', url=OTHERXML)
- session.commit()
+ myotherfeed = cnx.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed',
+ parser=u'cw.entityxml', url=OTHERXML)
+ cnx.commit()
myfeed.init_mapping([(('CWUser', 'use_email', '*'),
u'role=subject\naction=copy'),
(('CWUser', 'in_group', '*'),
@@ -153,7 +154,8 @@
(('CWUser', 'in_state', '*'),
u'role=subject\naction=link\nlinkattr=name'),
])
- session.create_entity('Tag', name=u'hop')
+ cnx.create_entity('Tag', name=u'hop')
+ cnx.commit()
def test_complete_url(self):
dfsource = self.repo.sources_by_uri['myfeed']