# HG changeset patch # User Sylvain Thénault # Date 1297271177 -3600 # Node ID ffda12be2e9fe5c6f488725a37d66f7b75989049 # Parent b172c383dbced80a83ed135984ca0548829c678f [repository] #1460066: backport datafeed cube as cubicweb source * add some attributes to CWSource to handle this kind of source (not natural to put everything in 'config' string). Adding a CWSource subclass has been attempted then rollbacked because it adds pain to handle multi-sources planning and it introduce an ambiguity on a generic relation (cw_source), which may be a penalty in multiple case * data feed sources are a new kind of source, namely 'copy based', which have no effect on the query planner * a data feed source is associated to a list of url and a parser (appobjects in the 'parsers' registry * entities imported by a data feed have cwuri set to their url on the distant site, their cw_source relation point to the data feed source, though their source stored in the entities table (returned by cw_metainformation) is their physical source, hence 'system' diff -r b172c383dbce -r ffda12be2e9f devtools/repotest.py --- a/devtools/repotest.py Wed Feb 09 18:06:13 2011 +0100 +++ b/devtools/repotest.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -277,7 +277,8 @@ class BasePlannerTC(BaseQuerierTC): - newsources = 0 + newsources = () + def setup(self): clear_cache(self.repo, 'rel_type_sources') clear_cache(self.repo, 'rel_type_sources') @@ -293,18 +294,21 @@ do_monkey_patch() self._dumb_sessions = [] # by hi-jacked parent setup self.repo.vreg.rqlhelper.backend = 'postgres' # so FTIRANK is considered + self.newsources = [] def add_source(self, sourcecls, uri): - self.sources.append(sourcecls(self.repo, {'uri': uri})) - self.repo.sources_by_uri[uri] = self.sources[-1] - setattr(self, uri, self.sources[-1]) - self.newsources += 1 + source = sourcecls(self.repo, {'uri': uri, 'type': 'whatever'}) + if not source.copy_based_source: + self.sources.append(source) + self.newsources.append(source) + self.repo.sources_by_uri[uri] = source + setattr(self, uri, source) def tearDown(self): - while self.newsources: - source = self.sources.pop(-1) + for source in self.newsources: + if not source.copy_based_source: + self.sources.remove(source) del self.repo.sources_by_uri[source.uri] - self.newsources -= 1 undo_monkey_patch() for session in self._dumb_sessions: session._threaddata.pool = None diff -r b172c383dbce -r ffda12be2e9f entities/sources.py --- a/entities/sources.py Wed Feb 09 18:06:13 2011 +0100 +++ b/entities/sources.py Wed Feb 09 18:06:17 2011 +0100 @@ -129,8 +129,5 @@ return self.cw_schema[0] @property - def source(self): - """repository only property, not available from the web side (eg - self._cw is expected to be a server session) - """ - return self._cw.repo.sources_by_eid[self.cw_for_source[0].eid] + def cwsource(self): + return self.cw_for_source[0] diff -r b172c383dbce -r ffda12be2e9f hooks/__init__.py --- a/hooks/__init__.py Wed Feb 09 18:06:13 2011 +0100 +++ b/hooks/__init__.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -15,17 +15,17 @@ # # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see . -"""core hooks +"""core hooks registering some maintainance tasks as server startup time""" -""" __docformat__ = "restructuredtext en" from datetime import timedelta, datetime + from cubicweb.server import hook class ServerStartupHook(hook.Hook): """task to cleanup expirated auth cookie entities""" - __regid__ = 'cw_cleanup_transactions' + __regid__ = 'cw.start-looping-tasks' events = ('server_startup',) def __call__(self): @@ -47,3 +47,27 @@ finally: session.close() self.repo.looping_task(60*60*24, cleanup_old_transactions, self.repo) + def update_feeds(repo): + session = repo.internal_session() + try: + # don't iter on repo.sources which doesn't include copy based + # sources (the one we're looking for) + for source in repo.sources_by_eid.itervalues(): + if (not source.copy_based_source + or not repo.config.source_enabled(source) + or not source.config['synchronize']): + continue + try: + stats = source.pull_data(session) + if stats['created']: + source.info('added %s entities', len(stats['created'])) + if stats['updated']: + source.info('updated %s entities', len(stats['updated'])) + session.commit() + except Exception, exc: + session.exception('while trying to update feed %s', source) + session.rollback() + session.set_pool() + finally: + session.close() + self.repo.looping_task(60, update_feeds, self.repo) diff -r b172c383dbce -r ffda12be2e9f hooks/integrity.py --- a/hooks/integrity.py Wed Feb 09 18:06:13 2011 +0100 +++ b/hooks/integrity.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. diff -r b172c383dbce -r ffda12be2e9f hooks/syncsources.py --- a/hooks/syncsources.py Wed Feb 09 18:06:13 2011 +0100 +++ b/hooks/syncsources.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,4 +1,24 @@ +# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""hooks for repository sources synchronization""" + from yams.schema import role_name + from cubicweb import ValidationError from cubicweb.selectors import is_instance from cubicweb.server import SOURCE_TYPES, hook @@ -101,7 +121,7 @@ data = self.__data = self.get_data() for schemacfg, source in data: if source is None: - source = schemacfg.source + source = schemacfg.cwsource.repo_source if session.added_in_transaction(schemacfg.eid): if not session.deleted_in_transaction(schemacfg.eid): source.add_schema_config(schemacfg, checkonly=checkonly) diff -r b172c383dbce -r ffda12be2e9f hooks/test/unittest_hooks.py --- a/hooks/test/unittest_hooks.py Wed Feb 09 18:06:13 2011 +0100 +++ b/hooks/test/unittest_hooks.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -18,12 +18,11 @@ # with CubicWeb. If not, see . """functional tests for core hooks -note: most schemahooks.py hooks are actually tested in unittest_migrations.py +Note: + syncschema.py hooks are mostly tested in server/test/unittest_migrations.py """ from __future__ import with_statement -from logilab.common.testlib import TestCase, unittest_main - from datetime import datetime from cubicweb import ValidationError, AuthenticationError, BadConnectionId @@ -31,38 +30,6 @@ class CoreHooksTC(CubicWebTC): - def test_delete_internal_entities(self): - self.assertRaises(ValidationError, self.execute, - 'DELETE CWEType X WHERE X name "CWEType"') - self.assertRaises(ValidationError, self.execute, - 'DELETE CWRType X WHERE X name "relation_type"') - self.assertRaises(ValidationError, self.execute, - 'DELETE CWGroup X WHERE X name "owners"') - - def test_delete_required_relations_subject(self): - self.execute('INSERT CWUser X: X login "toto", X upassword "hop", X in_group Y ' - 'WHERE Y name "users"') - self.commit() - self.execute('DELETE X in_group Y WHERE X login "toto", Y name "users"') - self.assertRaises(ValidationError, self.commit) - self.execute('DELETE X in_group Y WHERE X login "toto"') - self.execute('SET X in_group Y WHERE X login "toto", Y name "guests"') - self.commit() - - def test_delete_required_relations_object(self): - self.skipTest('no sample in the schema ! YAGNI ? Kermaat ?') - - def test_static_vocabulary_check(self): - self.assertRaises(ValidationError, - self.execute, - 'SET X composite "whatever" WHERE X from_entity FE, FE name "CWUser", X relation_type RT, RT name "in_group"') - - def test_missing_required_relations_subject_inline(self): - # missing in_group relation - self.execute('INSERT CWUser X: X login "toto", X upassword "hop"') - self.assertRaises(ValidationError, - self.commit) - def test_inlined(self): self.assertEqual(self.repo.schema['sender'].inlined, True) self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"') @@ -73,54 +40,6 @@ rset = self.execute('Any S WHERE X sender S, X eid %s' % eeid) self.assertEqual(len(rset), 1) - def test_composite_1(self): - self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"') - self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"') - self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P ' - 'WHERE Y is EmailAddress, P is EmailPart') - self.failUnless(self.execute('Email X WHERE X sender Y')) - self.commit() - self.execute('DELETE Email X') - rset = self.execute('Any X WHERE X is EmailPart') - self.assertEqual(len(rset), 1) - self.commit() - rset = self.execute('Any X WHERE X is EmailPart') - self.assertEqual(len(rset), 0) - - def test_composite_2(self): - self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"') - self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"') - self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P ' - 'WHERE Y is EmailAddress, P is EmailPart') - self.commit() - self.execute('DELETE Email X') - self.execute('DELETE EmailPart X') - self.commit() - rset = self.execute('Any X WHERE X is EmailPart') - self.assertEqual(len(rset), 0) - - def test_composite_redirection(self): - self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"') - self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"') - self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P ' - 'WHERE Y is EmailAddress, P is EmailPart') - self.execute('INSERT Email X: X messageid "<2345>", X subject "test2", X sender Y, X recipients Y ' - 'WHERE Y is EmailAddress') - self.commit() - self.execute('DELETE X parts Y WHERE X messageid "<1234>"') - self.execute('SET X parts Y WHERE X messageid "<2345>"') - self.commit() - rset = self.execute('Any X WHERE X is EmailPart') - self.assertEqual(len(rset), 1) - self.assertEqual(rset.get_entity(0, 0).reverse_parts[0].messageid, '<2345>') - - def test_unsatisfied_constraints(self): - releid = self.execute('SET U in_group G WHERE G name "owners", U login "admin"')[0][0] - with self.assertRaises(ValidationError) as cm: - self.commit() - self.assertEqual(cm.exception.errors, - {'in_group-object': u'RQLConstraint NOT O name "owners" failed'}) - def test_html_tidy_hook(self): req = self.request() entity = req.create_entity('Workflow', name=u'wf1', description_format=u'text/html', @@ -271,4 +190,5 @@ if __name__ == '__main__': + from logilab.common.testlib import unittest_main unittest_main() diff -r b172c383dbce -r ffda12be2e9f misc/migration/3.11.0_Any.py --- a/misc/migration/3.11.0_Any.py Wed Feb 09 18:06:13 2011 +0100 +++ b/misc/migration/3.11.0_Any.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,7 +1,13 @@ for rtype in ('cw_support', 'cw_dont_cross', 'cw_may_cross'): drop_relation_type(rtype) + add_entity_type('CWSourceSchemaConfig') +if not 'url' in schema['CWSource'].subjrels: + add_attribute('CWSource', 'url') + add_attribute('CWSource', 'parser') + add_attribute('CWSource', 'latest_retrieval') + try: from cubicweb.server.sources.pyrorql import PyroRQLSource except ImportError: diff -r b172c383dbce -r ffda12be2e9f schemas/base.py --- a/schemas/base.py Wed Feb 09 18:06:13 2011 +0100 +++ b/schemas/base.py Wed Feb 09 18:06:17 2011 +0100 @@ -21,7 +21,7 @@ _ = unicode from yams.buildobjs import (EntityType, RelationType, RelationDefinition, - SubjectRelation, String, Datetime, Password) + SubjectRelation, String, Datetime, Password, Interval) from cubicweb.schema import ( RQLConstraint, WorkflowableEntityType, ERQLExpression, RRQLExpression, PUB_SYSTEM_ENTITY_PERMS, PUB_SYSTEM_REL_PERMS, PUB_SYSTEM_ATTR_PERMS) @@ -258,6 +258,13 @@ 'read': ('managers',), 'update': ('managers',), }) + # put this here and not in a subclass even if it's only for some sources + # since having subclasses on generic relation (cw_source) double the number + # of rdef in the schema, and make ms planning harder since queries solutions + # may changes when sources are specified + url = String(description=_('URLs from which content will be imported. You can put one url per line')) + parser = String(description=_('parser to use to extract entities from content retrieved at given URLs.')) + latest_retrieval = Datetime(description=_('latest synchronization time')) ENTITY_MANAGERS_PERMISSIONS = { @@ -272,6 +279,7 @@ 'delete': ('managers',), } + class CWSourceHostConfig(EntityType): __permissions__ = ENTITY_MANAGERS_PERMISSIONS __unique_together__ = [('match_host', 'cw_host_config_of')] diff -r b172c383dbce -r ffda12be2e9f server/__init__.py --- a/server/__init__.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/__init__.py Wed Feb 09 18:06:17 2011 +0100 @@ -254,7 +254,7 @@ # available sources registry SOURCE_TYPES = {'native': LazyObject('cubicweb.server.sources.native', 'NativeSQLSource'), - # XXX private sources installed by an external cube 'pyrorql': LazyObject('cubicweb.server.sources.pyrorql', 'PyroRQLSource'), 'ldapuser': LazyObject('cubicweb.server.sources.ldapuser', 'LDAPUserSource'), + 'datafeed': LazyObject('cubicweb.server.sources.datafeed', 'DataFeedSource'), } diff -r b172c383dbce -r ffda12be2e9f server/msplanner.py --- a/server/msplanner.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/msplanner.py Wed Feb 09 18:06:17 2011 +0100 @@ -519,6 +519,16 @@ invariant = getattr(lhs, '_q_invariant', False) # XXX NOT NOT neged = srel.neged(traverse_scope=True) or (rel and rel.neged(strict=True)) + has_copy_based_source = False + sources_ = [] + for source in sources: + if source.copy_based_source: + has_copy_based_source = True + if not self.system_source in sources_: + sources_.append(self.system_source) + else: + sources_.append(source) + sources = sources_ if neged: for source in sources: if invariant and source is self.system_source: @@ -535,7 +545,8 @@ if rel is None or (len(var.stinfo['relations']) == 2 and not var.stinfo['selected']): self._remove_source_term(self.system_source, var) - if not (len(sources) > 1 or usesys or invariant): + if not (has_copy_based_source or len(sources) > 1 + or usesys or invariant): if rel is None: srel.parent.remove(srel) else: diff -r b172c383dbce -r ffda12be2e9f server/repository.py --- a/server/repository.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/repository.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -207,12 +207,6 @@ self.init_sources_from_database() if 'CWProperty' in self.schema: self.vreg.init_properties(self.properties()) - # call source's init method to complete their initialisation if - # needed (for instance looking for persistent configuration using an - # internal session, which is not possible until pools have been - # initialized) - for source in self.sources_by_uri.itervalues(): - source.init(source in self.sources) else: # call init_creating so that for instance native source can # configurate tsearch according to postgres version @@ -241,11 +235,12 @@ try: # FIXME: sources should be ordered (add_entity priority) for sourceent in session.execute( - 'Any S, SN, SA, SC WHERE S is CWSource, ' + 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, ' 'S name SN, S type SA, S config SC').entities(): if sourceent.name == 'system': self.system_source.eid = sourceent.eid self.sources_by_eid[sourceent.eid] = self.system_source + self.system_source.init(True, sourceent) continue self.add_source(sourceent, add_to_pools=False) finally: @@ -262,20 +257,25 @@ self.sources_by_eid[sourceent.eid] = source self.sources_by_uri[sourceent.name] = source if self.config.source_enabled(source): - source.init(True, session=sourceent._cw) - self.sources.append(source) - self.querier.set_planner() - if add_to_pools: - for pool in self.pools: - pool.add_source(source) + # call source's init method to complete their initialisation if + # needed (for instance looking for persistent configuration using an + # internal session, which is not possible until pools have been + # initialized) + source.init(True, sourceent) + if not source.copy_based_source: + self.sources.append(source) + self.querier.set_planner() + if add_to_pools: + for pool in self.pools: + pool.add_source(source) else: - source.init(False, session=sourceent._cw) + source.init(False, sourceent) self._clear_planning_caches() def remove_source(self, uri): source = self.sources_by_uri.pop(uri) del self.sources_by_eid[source.eid] - if self.config.source_enabled(source): + if self.config.source_enabled(source) and not source.copy_based_source: self.sources.remove(source) self.querier.set_planner() for pool in self.pools: @@ -1015,9 +1015,11 @@ raise UnknownEid(eid) return extid - def extid2eid(self, source, extid, etype, session=None, insert=True): + def extid2eid(self, source, extid, etype, session=None, insert=True, + sourceparams=None): """get eid from a local id. An eid is attributed if no record is found""" - cachekey = (extid, source.uri) + uri = 'system' if source.copy_based_source else source.uri + cachekey = (extid, uri) try: return self._extid_cache[cachekey] except KeyError: @@ -1026,10 +1028,10 @@ if session is None: session = self.internal_session() reset_pool = True - eid = self.system_source.extid2eid(session, source, extid) + eid = self.system_source.extid2eid(session, uri, extid) if eid is not None: self._extid_cache[cachekey] = eid - self._type_source_cache[eid] = (etype, source.uri, extid) + self._type_source_cache[eid] = (etype, uri, extid) if reset_pool: session.reset_pool() return eid @@ -1047,13 +1049,14 @@ try: eid = self.system_source.create_eid(session) self._extid_cache[cachekey] = eid - self._type_source_cache[eid] = (etype, source.uri, extid) - entity = source.before_entity_insertion(session, extid, etype, eid) + self._type_source_cache[eid] = (etype, uri, extid) + entity = source.before_entity_insertion( + session, extid, etype, eid, sourceparams) if source.should_call_hooks: self.hm.call_hooks('before_add_entity', session, entity=entity) # XXX call add_info with complete=False ? self.add_info(session, entity, source, extid) - source.after_entity_insertion(session, extid, entity) + source.after_entity_insertion(session, extid, entity, sourceparams) if source.should_call_hooks: self.hm.call_hooks('after_add_entity', session, entity=entity) session.commit(reset_pool) @@ -1190,6 +1193,8 @@ if suri == 'system': extid = None else: + if source.copy_based_source: + suri = 'system' extid = source.get_extid(entity) self._extid_cache[(str(extid), suri)] = entity.eid self._type_source_cache[entity.eid] = (entity.__regid__, suri, extid) diff -r b172c383dbce -r ffda12be2e9f server/sources/__init__.py --- a/server/sources/__init__.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/sources/__init__.py Wed Feb 09 18:06:17 2011 +0100 @@ -79,6 +79,9 @@ class AbstractSource(object): """an abstract class for sources""" + # does the source copy data into the system source, or is it a *true* source + # (i.e. entities are not stored physically here) + copy_based_source = False # boolean telling if modification hooks should be called when something is # modified in this source @@ -204,7 +207,7 @@ """method called by the repository once ready to create a new instance""" pass - def init(self, activated, session=None): + def init(self, activated, source_entity): """method called by the repository once ready to handle request. `activated` is a boolean flag telling if the source is activated or not. """ @@ -321,7 +324,7 @@ return rtype in self.cross_relations return rtype not in self.dont_cross_relations - def before_entity_insertion(self, session, lid, etype, eid): + def before_entity_insertion(self, session, lid, etype, eid, sourceparams): """called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. @@ -334,12 +337,30 @@ entity.cw_edited = EditedEntity(entity) return entity - def after_entity_insertion(self, session, lid, entity): + def after_entity_insertion(self, session, lid, entity, sourceparams): """called by the repository after an entity stored here has been inserted in the system table. """ pass + def _load_mapping(self, session=None, **kwargs): + if not 'CWSourceSchemaConfig' in self.schema: + self.warning('instance is not mapping ready') + return + if session is None: + _session = self.repo.internal_session() + else: + _session = session + try: + for schemacfg in _session.execute( + 'Any CFG,CFGO,S WHERE ' + 'CFG options CFGO, CFG cw_schema S, ' + 'CFG cw_for_source X, X eid %(x)s', {'x': self.eid}).entities(): + self.add_schema_config(schemacfg, **kwargs) + finally: + if session is None: + _session.close() + def add_schema_config(self, schemacfg, checkonly=False): """added CWSourceSchemaConfig, modify mapping accordingly""" msg = schemacfg._cw._("this source doesn't use a mapping") diff -r b172c383dbce -r ffda12be2e9f server/sources/datafeed.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/sources/datafeed.py Wed Feb 09 18:06:17 2011 +0100 @@ -0,0 +1,236 @@ +# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""datafeed sources: copy data from an external data stream into the system +database +""" +from datetime import datetime, timedelta +from base64 import b64decode + +from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError +from cubicweb.server.sources import AbstractSource +from cubicweb.appobject import AppObject + +class DataFeedSource(AbstractSource): + copy_based_source = True + + options = ( + ('synchronize', + {'type' : 'yn', + 'default': True, + 'help': ('Is the repository responsible to automatically import ' + 'content from this source? ' + 'You should say yes unless you don\'t want this behaviour ' + 'or if you use a multiple repositories setup, in which ' + 'case you should say yes on one repository, no on others.'), + 'group': 'datafeed-source', 'level': 2, + }), + ('synchronization-interval', + {'type' : 'time', + 'default': '5min', + 'help': ('Interval in seconds between synchronization with the ' + 'external source (default to 5 minutes, must be >= 1 min).'), + 'group': 'datafeed-source', 'level': 2, + }), + ('delete-entities', + {'type' : 'yn', + 'default': True, + 'help': ('Should already imported entities not found anymore on the ' + 'external source be deleted?'), + 'group': 'datafeed-source', 'level': 2, + }), + + ) + def __init__(self, repo, source_config, eid=None): + AbstractSource.__init__(self, repo, source_config, eid) + self.update_config(None, self.check_conf_dict(eid, source_config)) + + def check_config(self, source_entity): + """check configuration of source entity""" + typedconfig = super(DataFeedSource, self).check_config(source_entity) + if typedconfig['synchronization-interval'] < 60: + _ = source_entity._cw._ + msg = _('synchronization-interval must be greater than 1 minute') + raise ValidationError(source_entity.eid, {'config': msg}) + return typedconfig + + def _entity_update(self, source_entity): + source_entity.complete() + self.parser = source_entity.parser + self.latest_retrieval = source_entity.latest_retrieval + self.urls = [url.strip() for url in source_entity.url.splitlines() + if url.strip()] + + def update_config(self, source_entity, typedconfig): + """update configuration from source entity. `typedconfig` is config + properly typed with defaults set + """ + self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval']) + if source_entity is not None: + self._entity_update(source_entity) + self.config = typedconfig + + def init(self, activated, source_entity): + if activated: + self._entity_update(source_entity) + self.parser = source_entity.parser + self.load_mapping(source_entity._cw) + + def _get_parser(self, session, **kwargs): + return self.repo.vreg['parsers'].select( + self.parser, session, source=self, **kwargs) + + def load_mapping(self, session): + self.mapping = {} + self.mapping_idx = {} + try: + parser = self._get_parser(session) + except (RegistryNotFound, ObjectNotFound): + return # no parser yet, don't go further + self._load_mapping(session, parser=parser) + + def add_schema_config(self, schemacfg, checkonly=False, parser=None): + """added CWSourceSchemaConfig, modify mapping accordingly""" + if parser is None: + parser = self._get_parser(schemacfg._cw) + parser.add_schema_config(schemacfg, checkonly) + + def del_schema_config(self, schemacfg, checkonly=False, parser=None): + """deleted CWSourceSchemaConfig, modify mapping accordingly""" + if parser is None: + parser = self._get_parser(schemacfg._cw) + parser.del_schema_config(schemacfg, checkonly) + + def fresh(self): + if self.latest_retrieval is None: + return False + return datetime.now() < (self.latest_retrieval + self.synchro_interval) + + def pull_data(self, session, force=False): + if not force and self.fresh(): + return + if self.config['delete-entities']: + myuris = self.source_cwuris(session) + else: + myuris = None + parser = self._get_parser(session, sourceuris=myuris) + error = False + self.info('pulling data for source %s', self.uri) + for url in self.urls: + try: + parser.process(url) + except IOError, exc: + self.error('could not pull data while processing %s: %s', + url, exc) + error = True + if error: + self.warning("some error occured, don't attempt to delete entities") + elif self.config['delete-entities'] and myuris: + byetype = {} + for eid, etype in myuris.values(): + byetype.setdefault(etype, []).append(str(eid)) + self.error('delete %s entities %s', self.uri, byetype) + for etype, eids in byetype.iteritems(): + session.execute('DELETE %s X WHERE X eid IN (%s)' + % (etype, ','.join(eids))) + self.latest_retrieval = datetime.now() + session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', + {'x': self.eid, 'date': self.latest_retrieval}) + return parser.stats + + def before_entity_insertion(self, session, lid, etype, eid, sourceparams): + """called by the repository when an eid has been attributed for an + entity stored here but the entity has not been inserted in the system + table yet. + + This method must return the an Entity instance representation of this + entity. + """ + entity = super(DataFeedSource, self).before_entity_insertion( + session, lid, etype, eid, sourceparams) + entity.cw_edited['cwuri'] = unicode(lid) + entity.cw_edited.set_defaults() + sourceparams['parser'].before_entity_copy(entity, sourceparams) + # avoid query to search full-text indexed attributes + for attr in entity.e_schema.indexable_attributes(): + entity.cw_edited.setdefault(attr, u'') + return entity + + def after_entity_insertion(self, session, lid, entity, sourceparams): + """called by the repository after an entity stored here has been + inserted in the system table. + """ + if session.is_hook_category_activated('integrity'): + entity.cw_edited.check(creation=True) + self.repo.system_source.add_entity(session, entity) + entity.cw_edited.saved = entity._cw_is_saved = True + sourceparams['parser'].after_entity_copy(entity, sourceparams) + + def source_cwuris(self, session): + sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' + 'WHERE entities.eid=cw_source_relation.eid_from ' + 'AND cw_source_relation.eid_to=%s' % self.eid) + return dict((b64decode(uri), (eid, type)) + for uri, eid, type in session.system_sql(sql)) + + +class DataFeedParser(AppObject): + __registry__ = 'parsers' + + def __init__(self, session, source, sourceuris=None): + self._cw = session + self.source = source + self.sourceuris = sourceuris + self.stats = {'created': set(), + 'updated': set()} + + def add_schema_config(self, schemacfg, checkonly=False): + """added CWSourceSchemaConfig, modify mapping accordingly""" + msg = schemacfg._cw._("this parser doesn't use a mapping") + raise ValidationError(schemacfg.eid, {None: msg}) + + def del_schema_config(self, schemacfg, checkonly=False): + """deleted CWSourceSchemaConfig, modify mapping accordingly""" + msg = schemacfg._cw._("this parser doesn't use a mapping") + raise ValidationError(schemacfg.eid, {None: msg}) + + def extid2entity(self, uri, etype, **sourceparams): + sourceparams['parser'] = self + eid = self.source.extid2eid(str(uri), etype, self._cw, + sourceparams=sourceparams) + if self.sourceuris is not None: + self.sourceuris.pop(str(uri), None) + return self._cw.entity_from_eid(eid, etype) + + def process(self, url): + """main callback: process the url""" + raise NotImplementedError + + def before_entity_copy(self, entity, sourceparams): + raise NotImplementedError + + def after_entity_copy(self, entity, sourceparams): + self.stats['created'].add(entity.eid) + + def created_during_pull(self, entity): + return entity.eid in self.stats['created'] + + def updated_during_pull(self, entity): + return entity.eid in self.stats['updated'] + + def notify_updated(self, entity): + return self.stats['updated'].add(entity.eid) diff -r b172c383dbce -r ffda12be2e9f server/sources/ldapuser.py --- a/server/sources/ldapuser.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/sources/ldapuser.py Wed Feb 09 18:06:17 2011 +0100 @@ -202,7 +202,7 @@ self._cache = {} self._query_cache = TimedCache(self._cache_ttl) - def init(self, activated, session=None): + def init(self, activated, source_entity): """method called by the repository once ready to handle request""" if activated: self.info('ldap init') @@ -575,7 +575,7 @@ self.debug('ldap built results %s', len(result)) return result - def before_entity_insertion(self, session, lid, etype, eid): + def before_entity_insertion(self, session, lid, etype, eid, sourceparams): """called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. @@ -584,18 +584,20 @@ entity. """ self.debug('ldap before entity insertion') - entity = super(LDAPUserSource, self).before_entity_insertion(session, lid, etype, eid) + entity = super(LDAPUserSource, self).before_entity_insertion( + session, lid, etype, eid, sourceparams) res = self._search(session, lid, BASE)[0] for attr in entity.e_schema.indexable_attributes(): entity.cw_edited[attr] = res[self.user_rev_attrs[attr]] return entity - def after_entity_insertion(self, session, lid, entity): + def after_entity_insertion(self, session, lid, entity, sourceparams): """called by the repository after an entity stored here has been inserted in the system table. """ self.debug('ldap after entity insertion') - super(LDAPUserSource, self).after_entity_insertion(session, lid, entity) + super(LDAPUserSource, self).after_entity_insertion( + session, lid, entity, sourceparams) dn = lid for group in self.user_default_groups: session.execute('SET X in_group G WHERE X eid %(x)s, G name %(group)s', diff -r b172c383dbce -r ffda12be2e9f server/sources/native.py --- a/server/sources/native.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/sources/native.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -372,8 +372,8 @@ if self.repo.config.open_connections_pools: self.open_pool_connections() - def init(self, activated, session=None): - self.init_creating(session and session.pool) + def init(self, activated, source_entity): + self.init_creating(source_entity._cw.pool) def shutdown(self): if self._eid_creation_cnx: @@ -803,13 +803,13 @@ res[-1] = b64decode(res[-1]) return res - def extid2eid(self, session, source, extid): + def extid2eid(self, session, source_uri, extid): """get eid from an external id. Return None if no record found.""" assert isinstance(extid, str) cursor = self.doexec(session, 'SELECT eid FROM entities ' 'WHERE extid=%(x)s AND source=%(s)s', - {'x': b64encode(extid), 's': source.uri}) + {'x': b64encode(extid), 's': source_uri}) # XXX testing rowcount cause strange bug with sqlite, results are there # but rowcount is 0 #if cursor.rowcount > 0: @@ -898,8 +898,9 @@ if extid is not None: assert isinstance(extid, str) extid = b64encode(extid) + uri = 'system' if source.copy_based_source else source.uri attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid, - 'source': source.uri, 'mtime': datetime.now()} + 'source': uri, 'mtime': datetime.now()} self.doexec(session, self.sqlgen.insert('entities', attrs), attrs) # insert core relations: is, is_instance_of and cw_source try: diff -r b172c383dbce -r ffda12be2e9f server/sources/pyrorql.py --- a/server/sources/pyrorql.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/sources/pyrorql.py Wed Feb 09 18:06:17 2011 +0100 @@ -168,9 +168,9 @@ finally: session.close() - def init(self, activated, session=None): + def init(self, activated, source_entity): """method called by the repository once ready to handle request""" - self.load_mapping(session) + self.load_mapping(source_entity._cw) if activated: interval = self.config['synchronization-interval'] self.repo.looping_task(interval, self.synchronize) @@ -184,19 +184,7 @@ self.cross_relations = set() assert self.eid is not None self._schemacfg_idx = {} - if session is None: - _session = self.repo.internal_session() - else: - _session = session - try: - for schemacfg in _session.execute( - 'Any CFG,CFGO,SN,S WHERE ' - 'CFG options CFGO, CFG cw_schema S, S name SN, ' - 'CFG cw_for_source X, X eid %(x)s', {'x': self.eid}).entities(): - self.add_schema_config(schemacfg) - finally: - if session is None: - _session.close() + self._load_mapping(session) etype_options = set(('write',)) rtype_options = set(('maycross', 'dontcross', 'write',)) diff -r b172c383dbce -r ffda12be2e9f server/test/unittest_datafeed.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/test/unittest_datafeed.py Wed Feb 09 18:06:17 2011 +0100 @@ -0,0 +1,98 @@ +# copyright 2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +from __future__ import with_statement + +from datetime import timedelta + +from cubicweb.devtools.testlib import CubicWebTC +from cubicweb.server.sources import datafeed + + +class DataFeedTC(CubicWebTC): + def setup_database(self): + self.request().create_entity('CWSource', name=u'myfeed', type=u'datafeed', + parser=u'testparser', url=u'ignored', + config=u'synchronization-interval=1min') + + def test(self): + self.assertIn('myfeed', self.repo.sources_by_uri) + dfsource = self.repo.sources_by_uri['myfeed'] + self.assertNotIn(dfsource, self.repo.sources) + self.assertEqual(dfsource.latest_retrieval, None) + self.assertEqual(dfsource.synchro_interval, timedelta(seconds=60)) + self.assertFalse(dfsource.fresh()) + + class AParser(datafeed.DataFeedParser): + __regid__ = 'testparser' + def process(self, url): + entity = self.extid2entity('http://www.cubicweb.org/', 'Card', + item={'title': u'cubicweb.org', + 'content': u'the cw web site'}) + if not self.created_during_pull(entity): + self.notify_updated(entity) + def before_entity_copy(self, entity, sourceparams): + entity.cw_edited.update(sourceparams['item']) + + with self.temporary_appobjects(AParser): + stats = dfsource.pull_data(self.session, force=True) + self.commit() + # test import stats + self.assertEqual(sorted(stats.keys()), ['created', 'updated']) + self.assertEqual(len(stats['created']), 1) + entity = self.execute('Card X').get_entity(0, 0) + self.assertIn(entity.eid, stats['created']) + self.assertEqual(stats['updated'], set()) + # test imported entities + self.assertEqual(entity.title, 'cubicweb.org') + self.assertEqual(entity.content, 'the cw web site') + self.assertEqual(entity.cwuri, 'http://www.cubicweb.org/') + self.assertEqual(entity.cw_source[0].name, 'myfeed') + self.assertEqual(entity.cw_metainformation(), + {'type': 'Card', + 'source': {'uri': 'system', 'type': 'native'}, + 'extid': 'http://www.cubicweb.org/'} + ) + # test repo cache keys + self.assertEqual(self.repo._type_source_cache[entity.eid], + ('Card', 'system', 'http://www.cubicweb.org/')) + self.assertEqual(self.repo._extid_cache[('http://www.cubicweb.org/', 'system')], + entity.eid) + # test repull + stats = dfsource.pull_data(self.session, force=True) + self.assertEqual(stats['created'], set()) + self.assertEqual(stats['updated'], set((entity.eid,))) + # test repull with caches reseted + self.repo._type_source_cache.clear() + self.repo._extid_cache.clear() + stats = dfsource.pull_data(self.session, force=True) + self.assertEqual(stats['created'], set()) + self.assertEqual(stats['updated'], set((entity.eid,))) + self.assertEqual(self.repo._type_source_cache[entity.eid], + ('Card', 'system', 'http://www.cubicweb.org/')) + self.assertEqual(self.repo._extid_cache[('http://www.cubicweb.org/', 'system')], + entity.eid) + + self.assertEqual(dfsource.source_cwuris(self.session), + {'http://www.cubicweb.org/': (entity.eid, 'Card')} + ) + self.assertTrue(dfsource.latest_retrieval) + self.assertTrue(dfsource.fresh()) + +if __name__ == '__main__': + from logilab.common.testlib import unittest_main + unittest_main() diff -r b172c383dbce -r ffda12be2e9f server/test/unittest_msplanner.py --- a/server/test/unittest_msplanner.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/test/unittest_msplanner.py Wed Feb 09 18:06:17 2011 +0100 @@ -58,6 +58,10 @@ def syntax_tree_search(self, *args, **kwargs): return [] + +class FakeDataFeedSource(FakeCardSource): + copy_based_source = True + X_ALL_SOLS = sorted([{'X': 'Affaire'}, {'X': 'BaseTransition'}, {'X': 'Basket'}, {'X': 'Bookmark'}, {'X': 'CWAttribute'}, {'X': 'CWCache'}, {'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWEType'}, @@ -110,6 +114,7 @@ self.schema['CWUser'].set_action_permissions('read', userreadperms) self.add_source(FakeUserROSource, 'ldap') self.add_source(FakeCardSource, 'cards') + self.add_source(FakeDataFeedSource, 'datafeed') def tearDown(self): # restore hijacked security @@ -1955,6 +1960,22 @@ ]) def test_source_specified_1_2(self): + self._test('Card X WHERE X cw_source S, S name "datafeed"', + [('OneFetchStep', [('Any X WHERE X cw_source S, S name "datafeed", X is Card', + [{'X': 'Card', 'S': 'CWSource'}])], + None, None, + [self.system],{}, []) + ]) + + def test_source_specified_1_3(self): + self._test('Any X, SN WHERE X is Card, X cw_source S, S name "datafeed", S name SN', + [('OneFetchStep', [('Any X,SN WHERE X is Card, X cw_source S, S name "datafeed", ' + 'S name SN', + [{'S': 'CWSource', 'SN': 'String', 'X': 'Card'}])], + None, None, [self.system], {}, []) + ]) + + def test_source_specified_1_4(self): sols = [] for sol in X_ALL_SOLS: sol = sol.copy() @@ -2004,6 +2025,14 @@ ]) def test_source_specified_3_2(self): + self._test('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"', + [('OneFetchStep', + [('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"', + [{'X': 'Card', 'XT': 'String', 'S': 'CWSource'}])], + None, None, [self.system], {}, []) + ]) + + def test_source_specified_3_3(self): self.skipTest('oops') self._test('Any STN WHERE X is Note, X type XT, X in_state ST, ST name STN, X cw_source S, S name "cards"', [('OneFetchStep', diff -r b172c383dbce -r ffda12be2e9f server/test/unittest_ssplanner.py --- a/server/test/unittest_ssplanner.py Wed Feb 09 18:06:13 2011 +0100 +++ b/server/test/unittest_ssplanner.py Wed Feb 09 18:06:17 2011 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. diff -r b172c383dbce -r ffda12be2e9f test/unittest_schema.py --- a/test/unittest_schema.py Wed Feb 09 18:06:13 2011 +0100 +++ b/test/unittest_schema.py Wed Feb 09 18:06:17 2011 +0100 @@ -195,7 +195,7 @@ 'identity', 'in_group', 'in_state', 'indexed', 'initial_state', 'inlined', 'internationalizable', 'is', 'is_instance_of', - 'label', 'last_login_time', 'login', + 'label', 'last_login_time', 'latest_retrieval', 'login', 'mainvars', 'match_host', 'modification_date', @@ -203,7 +203,7 @@ 'options', 'ordernum', 'owned_by', - 'path', 'pkey', 'prefered_form', 'prenom', 'primary_email', + 'parser', 'path', 'pkey', 'prefered_form', 'prenom', 'primary_email', 'read_permission', 'relation_type', 'relations', 'require_group', @@ -211,7 +211,7 @@ 'tags', 'timestamp', 'title', 'to_entity', 'to_state', 'transition_of', 'travaille', 'type', - 'upassword', 'update_permission', 'uri', 'use_email', + 'upassword', 'update_permission', 'url', 'uri', 'use_email', 'value',