[repository] #1460066: backport datafeed cube as cubicweb source
* add some attributes to CWSource to handle this kind of source
(not natural to put everything in 'config' string). Adding a CWSource
subclass has been attempted then rollbacked because it adds pain
to handle multi-sources planning and it introduce an ambiguity on
a generic relation (cw_source), which may be a penalty in multiple
case
* data feed sources are a new kind of source, namely 'copy based',
which have no effect on the query planner
* a data feed source is associated to a list of url and a parser (appobjects
in the 'parsers' registry
* entities imported by a data feed have cwuri set to their url on the distant
site, their cw_source relation point to the data feed source, though their
source stored in the entities table (returned by cw_metainformation) is their
physical source, hence 'system'
--- a/devtools/repotest.py Wed Feb 09 18:06:13 2011 +0100
+++ b/devtools/repotest.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -277,7 +277,8 @@
class BasePlannerTC(BaseQuerierTC):
- newsources = 0
+ newsources = ()
+
def setup(self):
clear_cache(self.repo, 'rel_type_sources')
clear_cache(self.repo, 'rel_type_sources')
@@ -293,18 +294,21 @@
do_monkey_patch()
self._dumb_sessions = [] # by hi-jacked parent setup
self.repo.vreg.rqlhelper.backend = 'postgres' # so FTIRANK is considered
+ self.newsources = []
def add_source(self, sourcecls, uri):
- self.sources.append(sourcecls(self.repo, {'uri': uri}))
- self.repo.sources_by_uri[uri] = self.sources[-1]
- setattr(self, uri, self.sources[-1])
- self.newsources += 1
+ source = sourcecls(self.repo, {'uri': uri, 'type': 'whatever'})
+ if not source.copy_based_source:
+ self.sources.append(source)
+ self.newsources.append(source)
+ self.repo.sources_by_uri[uri] = source
+ setattr(self, uri, source)
def tearDown(self):
- while self.newsources:
- source = self.sources.pop(-1)
+ for source in self.newsources:
+ if not source.copy_based_source:
+ self.sources.remove(source)
del self.repo.sources_by_uri[source.uri]
- self.newsources -= 1
undo_monkey_patch()
for session in self._dumb_sessions:
session._threaddata.pool = None
--- a/entities/sources.py Wed Feb 09 18:06:13 2011 +0100
+++ b/entities/sources.py Wed Feb 09 18:06:17 2011 +0100
@@ -129,8 +129,5 @@
return self.cw_schema[0]
@property
- def source(self):
- """repository only property, not available from the web side (eg
- self._cw is expected to be a server session)
- """
- return self._cw.repo.sources_by_eid[self.cw_for_source[0].eid]
+ def cwsource(self):
+ return self.cw_for_source[0]
--- a/hooks/__init__.py Wed Feb 09 18:06:13 2011 +0100
+++ b/hooks/__init__.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -15,17 +15,17 @@
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""core hooks
+"""core hooks registering some maintainance tasks as server startup time"""
-"""
__docformat__ = "restructuredtext en"
from datetime import timedelta, datetime
+
from cubicweb.server import hook
class ServerStartupHook(hook.Hook):
"""task to cleanup expirated auth cookie entities"""
- __regid__ = 'cw_cleanup_transactions'
+ __regid__ = 'cw.start-looping-tasks'
events = ('server_startup',)
def __call__(self):
@@ -47,3 +47,27 @@
finally:
session.close()
self.repo.looping_task(60*60*24, cleanup_old_transactions, self.repo)
+ def update_feeds(repo):
+ session = repo.internal_session()
+ try:
+ # don't iter on repo.sources which doesn't include copy based
+ # sources (the one we're looking for)
+ for source in repo.sources_by_eid.itervalues():
+ if (not source.copy_based_source
+ or not repo.config.source_enabled(source)
+ or not source.config['synchronize']):
+ continue
+ try:
+ stats = source.pull_data(session)
+ if stats['created']:
+ source.info('added %s entities', len(stats['created']))
+ if stats['updated']:
+ source.info('updated %s entities', len(stats['updated']))
+ session.commit()
+ except Exception, exc:
+ session.exception('while trying to update feed %s', source)
+ session.rollback()
+ session.set_pool()
+ finally:
+ session.close()
+ self.repo.looping_task(60, update_feeds, self.repo)
--- a/hooks/integrity.py Wed Feb 09 18:06:13 2011 +0100
+++ b/hooks/integrity.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
--- a/hooks/syncsources.py Wed Feb 09 18:06:13 2011 +0100
+++ b/hooks/syncsources.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,4 +1,24 @@
+# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""hooks for repository sources synchronization"""
+
from yams.schema import role_name
+
from cubicweb import ValidationError
from cubicweb.selectors import is_instance
from cubicweb.server import SOURCE_TYPES, hook
@@ -101,7 +121,7 @@
data = self.__data = self.get_data()
for schemacfg, source in data:
if source is None:
- source = schemacfg.source
+ source = schemacfg.cwsource.repo_source
if session.added_in_transaction(schemacfg.eid):
if not session.deleted_in_transaction(schemacfg.eid):
source.add_schema_config(schemacfg, checkonly=checkonly)
--- a/hooks/test/unittest_hooks.py Wed Feb 09 18:06:13 2011 +0100
+++ b/hooks/test/unittest_hooks.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -18,12 +18,11 @@
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
"""functional tests for core hooks
-note: most schemahooks.py hooks are actually tested in unittest_migrations.py
+Note:
+ syncschema.py hooks are mostly tested in server/test/unittest_migrations.py
"""
from __future__ import with_statement
-from logilab.common.testlib import TestCase, unittest_main
-
from datetime import datetime
from cubicweb import ValidationError, AuthenticationError, BadConnectionId
@@ -31,38 +30,6 @@
class CoreHooksTC(CubicWebTC):
- def test_delete_internal_entities(self):
- self.assertRaises(ValidationError, self.execute,
- 'DELETE CWEType X WHERE X name "CWEType"')
- self.assertRaises(ValidationError, self.execute,
- 'DELETE CWRType X WHERE X name "relation_type"')
- self.assertRaises(ValidationError, self.execute,
- 'DELETE CWGroup X WHERE X name "owners"')
-
- def test_delete_required_relations_subject(self):
- self.execute('INSERT CWUser X: X login "toto", X upassword "hop", X in_group Y '
- 'WHERE Y name "users"')
- self.commit()
- self.execute('DELETE X in_group Y WHERE X login "toto", Y name "users"')
- self.assertRaises(ValidationError, self.commit)
- self.execute('DELETE X in_group Y WHERE X login "toto"')
- self.execute('SET X in_group Y WHERE X login "toto", Y name "guests"')
- self.commit()
-
- def test_delete_required_relations_object(self):
- self.skipTest('no sample in the schema ! YAGNI ? Kermaat ?')
-
- def test_static_vocabulary_check(self):
- self.assertRaises(ValidationError,
- self.execute,
- 'SET X composite "whatever" WHERE X from_entity FE, FE name "CWUser", X relation_type RT, RT name "in_group"')
-
- def test_missing_required_relations_subject_inline(self):
- # missing in_group relation
- self.execute('INSERT CWUser X: X login "toto", X upassword "hop"')
- self.assertRaises(ValidationError,
- self.commit)
-
def test_inlined(self):
self.assertEqual(self.repo.schema['sender'].inlined, True)
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
@@ -73,54 +40,6 @@
rset = self.execute('Any S WHERE X sender S, X eid %s' % eeid)
self.assertEqual(len(rset), 1)
- def test_composite_1(self):
- self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
- self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
- self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P '
- 'WHERE Y is EmailAddress, P is EmailPart')
- self.failUnless(self.execute('Email X WHERE X sender Y'))
- self.commit()
- self.execute('DELETE Email X')
- rset = self.execute('Any X WHERE X is EmailPart')
- self.assertEqual(len(rset), 1)
- self.commit()
- rset = self.execute('Any X WHERE X is EmailPart')
- self.assertEqual(len(rset), 0)
-
- def test_composite_2(self):
- self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
- self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
- self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P '
- 'WHERE Y is EmailAddress, P is EmailPart')
- self.commit()
- self.execute('DELETE Email X')
- self.execute('DELETE EmailPart X')
- self.commit()
- rset = self.execute('Any X WHERE X is EmailPart')
- self.assertEqual(len(rset), 0)
-
- def test_composite_redirection(self):
- self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
- self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
- self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P '
- 'WHERE Y is EmailAddress, P is EmailPart')
- self.execute('INSERT Email X: X messageid "<2345>", X subject "test2", X sender Y, X recipients Y '
- 'WHERE Y is EmailAddress')
- self.commit()
- self.execute('DELETE X parts Y WHERE X messageid "<1234>"')
- self.execute('SET X parts Y WHERE X messageid "<2345>"')
- self.commit()
- rset = self.execute('Any X WHERE X is EmailPart')
- self.assertEqual(len(rset), 1)
- self.assertEqual(rset.get_entity(0, 0).reverse_parts[0].messageid, '<2345>')
-
- def test_unsatisfied_constraints(self):
- releid = self.execute('SET U in_group G WHERE G name "owners", U login "admin"')[0][0]
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual(cm.exception.errors,
- {'in_group-object': u'RQLConstraint NOT O name "owners" failed'})
-
def test_html_tidy_hook(self):
req = self.request()
entity = req.create_entity('Workflow', name=u'wf1', description_format=u'text/html',
@@ -271,4 +190,5 @@
if __name__ == '__main__':
+ from logilab.common.testlib import unittest_main
unittest_main()
--- a/misc/migration/3.11.0_Any.py Wed Feb 09 18:06:13 2011 +0100
+++ b/misc/migration/3.11.0_Any.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,7 +1,13 @@
for rtype in ('cw_support', 'cw_dont_cross', 'cw_may_cross'):
drop_relation_type(rtype)
+
add_entity_type('CWSourceSchemaConfig')
+if not 'url' in schema['CWSource'].subjrels:
+ add_attribute('CWSource', 'url')
+ add_attribute('CWSource', 'parser')
+ add_attribute('CWSource', 'latest_retrieval')
+
try:
from cubicweb.server.sources.pyrorql import PyroRQLSource
except ImportError:
--- a/schemas/base.py Wed Feb 09 18:06:13 2011 +0100
+++ b/schemas/base.py Wed Feb 09 18:06:17 2011 +0100
@@ -21,7 +21,7 @@
_ = unicode
from yams.buildobjs import (EntityType, RelationType, RelationDefinition,
- SubjectRelation, String, Datetime, Password)
+ SubjectRelation, String, Datetime, Password, Interval)
from cubicweb.schema import (
RQLConstraint, WorkflowableEntityType, ERQLExpression, RRQLExpression,
PUB_SYSTEM_ENTITY_PERMS, PUB_SYSTEM_REL_PERMS, PUB_SYSTEM_ATTR_PERMS)
@@ -258,6 +258,13 @@
'read': ('managers',),
'update': ('managers',),
})
+ # put this here and not in a subclass even if it's only for some sources
+ # since having subclasses on generic relation (cw_source) double the number
+ # of rdef in the schema, and make ms planning harder since queries solutions
+ # may changes when sources are specified
+ url = String(description=_('URLs from which content will be imported. You can put one url per line'))
+ parser = String(description=_('parser to use to extract entities from content retrieved at given URLs.'))
+ latest_retrieval = Datetime(description=_('latest synchronization time'))
ENTITY_MANAGERS_PERMISSIONS = {
@@ -272,6 +279,7 @@
'delete': ('managers',),
}
+
class CWSourceHostConfig(EntityType):
__permissions__ = ENTITY_MANAGERS_PERMISSIONS
__unique_together__ = [('match_host', 'cw_host_config_of')]
--- a/server/__init__.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/__init__.py Wed Feb 09 18:06:17 2011 +0100
@@ -254,7 +254,7 @@
# available sources registry
SOURCE_TYPES = {'native': LazyObject('cubicweb.server.sources.native', 'NativeSQLSource'),
- # XXX private sources installed by an external cube
'pyrorql': LazyObject('cubicweb.server.sources.pyrorql', 'PyroRQLSource'),
'ldapuser': LazyObject('cubicweb.server.sources.ldapuser', 'LDAPUserSource'),
+ 'datafeed': LazyObject('cubicweb.server.sources.datafeed', 'DataFeedSource'),
}
--- a/server/msplanner.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/msplanner.py Wed Feb 09 18:06:17 2011 +0100
@@ -519,6 +519,16 @@
invariant = getattr(lhs, '_q_invariant', False)
# XXX NOT NOT
neged = srel.neged(traverse_scope=True) or (rel and rel.neged(strict=True))
+ has_copy_based_source = False
+ sources_ = []
+ for source in sources:
+ if source.copy_based_source:
+ has_copy_based_source = True
+ if not self.system_source in sources_:
+ sources_.append(self.system_source)
+ else:
+ sources_.append(source)
+ sources = sources_
if neged:
for source in sources:
if invariant and source is self.system_source:
@@ -535,7 +545,8 @@
if rel is None or (len(var.stinfo['relations']) == 2 and
not var.stinfo['selected']):
self._remove_source_term(self.system_source, var)
- if not (len(sources) > 1 or usesys or invariant):
+ if not (has_copy_based_source or len(sources) > 1
+ or usesys or invariant):
if rel is None:
srel.parent.remove(srel)
else:
--- a/server/repository.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/repository.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -207,12 +207,6 @@
self.init_sources_from_database()
if 'CWProperty' in self.schema:
self.vreg.init_properties(self.properties())
- # call source's init method to complete their initialisation if
- # needed (for instance looking for persistent configuration using an
- # internal session, which is not possible until pools have been
- # initialized)
- for source in self.sources_by_uri.itervalues():
- source.init(source in self.sources)
else:
# call init_creating so that for instance native source can
# configurate tsearch according to postgres version
@@ -241,11 +235,12 @@
try:
# FIXME: sources should be ordered (add_entity priority)
for sourceent in session.execute(
- 'Any S, SN, SA, SC WHERE S is CWSource, '
+ 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
'S name SN, S type SA, S config SC').entities():
if sourceent.name == 'system':
self.system_source.eid = sourceent.eid
self.sources_by_eid[sourceent.eid] = self.system_source
+ self.system_source.init(True, sourceent)
continue
self.add_source(sourceent, add_to_pools=False)
finally:
@@ -262,20 +257,25 @@
self.sources_by_eid[sourceent.eid] = source
self.sources_by_uri[sourceent.name] = source
if self.config.source_enabled(source):
- source.init(True, session=sourceent._cw)
- self.sources.append(source)
- self.querier.set_planner()
- if add_to_pools:
- for pool in self.pools:
- pool.add_source(source)
+ # call source's init method to complete their initialisation if
+ # needed (for instance looking for persistent configuration using an
+ # internal session, which is not possible until pools have been
+ # initialized)
+ source.init(True, sourceent)
+ if not source.copy_based_source:
+ self.sources.append(source)
+ self.querier.set_planner()
+ if add_to_pools:
+ for pool in self.pools:
+ pool.add_source(source)
else:
- source.init(False, session=sourceent._cw)
+ source.init(False, sourceent)
self._clear_planning_caches()
def remove_source(self, uri):
source = self.sources_by_uri.pop(uri)
del self.sources_by_eid[source.eid]
- if self.config.source_enabled(source):
+ if self.config.source_enabled(source) and not source.copy_based_source:
self.sources.remove(source)
self.querier.set_planner()
for pool in self.pools:
@@ -1015,9 +1015,11 @@
raise UnknownEid(eid)
return extid
- def extid2eid(self, source, extid, etype, session=None, insert=True):
+ def extid2eid(self, source, extid, etype, session=None, insert=True,
+ sourceparams=None):
"""get eid from a local id. An eid is attributed if no record is found"""
- cachekey = (extid, source.uri)
+ uri = 'system' if source.copy_based_source else source.uri
+ cachekey = (extid, uri)
try:
return self._extid_cache[cachekey]
except KeyError:
@@ -1026,10 +1028,10 @@
if session is None:
session = self.internal_session()
reset_pool = True
- eid = self.system_source.extid2eid(session, source, extid)
+ eid = self.system_source.extid2eid(session, uri, extid)
if eid is not None:
self._extid_cache[cachekey] = eid
- self._type_source_cache[eid] = (etype, source.uri, extid)
+ self._type_source_cache[eid] = (etype, uri, extid)
if reset_pool:
session.reset_pool()
return eid
@@ -1047,13 +1049,14 @@
try:
eid = self.system_source.create_eid(session)
self._extid_cache[cachekey] = eid
- self._type_source_cache[eid] = (etype, source.uri, extid)
- entity = source.before_entity_insertion(session, extid, etype, eid)
+ self._type_source_cache[eid] = (etype, uri, extid)
+ entity = source.before_entity_insertion(
+ session, extid, etype, eid, sourceparams)
if source.should_call_hooks:
self.hm.call_hooks('before_add_entity', session, entity=entity)
# XXX call add_info with complete=False ?
self.add_info(session, entity, source, extid)
- source.after_entity_insertion(session, extid, entity)
+ source.after_entity_insertion(session, extid, entity, sourceparams)
if source.should_call_hooks:
self.hm.call_hooks('after_add_entity', session, entity=entity)
session.commit(reset_pool)
@@ -1190,6 +1193,8 @@
if suri == 'system':
extid = None
else:
+ if source.copy_based_source:
+ suri = 'system'
extid = source.get_extid(entity)
self._extid_cache[(str(extid), suri)] = entity.eid
self._type_source_cache[entity.eid] = (entity.__regid__, suri, extid)
--- a/server/sources/__init__.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/sources/__init__.py Wed Feb 09 18:06:17 2011 +0100
@@ -79,6 +79,9 @@
class AbstractSource(object):
"""an abstract class for sources"""
+ # does the source copy data into the system source, or is it a *true* source
+ # (i.e. entities are not stored physically here)
+ copy_based_source = False
# boolean telling if modification hooks should be called when something is
# modified in this source
@@ -204,7 +207,7 @@
"""method called by the repository once ready to create a new instance"""
pass
- def init(self, activated, session=None):
+ def init(self, activated, source_entity):
"""method called by the repository once ready to handle request.
`activated` is a boolean flag telling if the source is activated or not.
"""
@@ -321,7 +324,7 @@
return rtype in self.cross_relations
return rtype not in self.dont_cross_relations
- def before_entity_insertion(self, session, lid, etype, eid):
+ def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
"""called by the repository when an eid has been attributed for an
entity stored here but the entity has not been inserted in the system
table yet.
@@ -334,12 +337,30 @@
entity.cw_edited = EditedEntity(entity)
return entity
- def after_entity_insertion(self, session, lid, entity):
+ def after_entity_insertion(self, session, lid, entity, sourceparams):
"""called by the repository after an entity stored here has been
inserted in the system table.
"""
pass
+ def _load_mapping(self, session=None, **kwargs):
+ if not 'CWSourceSchemaConfig' in self.schema:
+ self.warning('instance is not mapping ready')
+ return
+ if session is None:
+ _session = self.repo.internal_session()
+ else:
+ _session = session
+ try:
+ for schemacfg in _session.execute(
+ 'Any CFG,CFGO,S WHERE '
+ 'CFG options CFGO, CFG cw_schema S, '
+ 'CFG cw_for_source X, X eid %(x)s', {'x': self.eid}).entities():
+ self.add_schema_config(schemacfg, **kwargs)
+ finally:
+ if session is None:
+ _session.close()
+
def add_schema_config(self, schemacfg, checkonly=False):
"""added CWSourceSchemaConfig, modify mapping accordingly"""
msg = schemacfg._cw._("this source doesn't use a mapping")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server/sources/datafeed.py Wed Feb 09 18:06:17 2011 +0100
@@ -0,0 +1,236 @@
+# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""datafeed sources: copy data from an external data stream into the system
+database
+"""
+from datetime import datetime, timedelta
+from base64 import b64decode
+
+from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
+from cubicweb.server.sources import AbstractSource
+from cubicweb.appobject import AppObject
+
+class DataFeedSource(AbstractSource):
+ copy_based_source = True
+
+ options = (
+ ('synchronize',
+ {'type' : 'yn',
+ 'default': True,
+ 'help': ('Is the repository responsible to automatically import '
+ 'content from this source? '
+ 'You should say yes unless you don\'t want this behaviour '
+ 'or if you use a multiple repositories setup, in which '
+ 'case you should say yes on one repository, no on others.'),
+ 'group': 'datafeed-source', 'level': 2,
+ }),
+ ('synchronization-interval',
+ {'type' : 'time',
+ 'default': '5min',
+ 'help': ('Interval in seconds between synchronization with the '
+ 'external source (default to 5 minutes, must be >= 1 min).'),
+ 'group': 'datafeed-source', 'level': 2,
+ }),
+ ('delete-entities',
+ {'type' : 'yn',
+ 'default': True,
+ 'help': ('Should already imported entities not found anymore on the '
+ 'external source be deleted?'),
+ 'group': 'datafeed-source', 'level': 2,
+ }),
+
+ )
+ def __init__(self, repo, source_config, eid=None):
+ AbstractSource.__init__(self, repo, source_config, eid)
+ self.update_config(None, self.check_conf_dict(eid, source_config))
+
+ def check_config(self, source_entity):
+ """check configuration of source entity"""
+ typedconfig = super(DataFeedSource, self).check_config(source_entity)
+ if typedconfig['synchronization-interval'] < 60:
+ _ = source_entity._cw._
+ msg = _('synchronization-interval must be greater than 1 minute')
+ raise ValidationError(source_entity.eid, {'config': msg})
+ return typedconfig
+
+ def _entity_update(self, source_entity):
+ source_entity.complete()
+ self.parser = source_entity.parser
+ self.latest_retrieval = source_entity.latest_retrieval
+ self.urls = [url.strip() for url in source_entity.url.splitlines()
+ if url.strip()]
+
+ def update_config(self, source_entity, typedconfig):
+ """update configuration from source entity. `typedconfig` is config
+ properly typed with defaults set
+ """
+ self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
+ if source_entity is not None:
+ self._entity_update(source_entity)
+ self.config = typedconfig
+
+ def init(self, activated, source_entity):
+ if activated:
+ self._entity_update(source_entity)
+ self.parser = source_entity.parser
+ self.load_mapping(source_entity._cw)
+
+ def _get_parser(self, session, **kwargs):
+ return self.repo.vreg['parsers'].select(
+ self.parser, session, source=self, **kwargs)
+
+ def load_mapping(self, session):
+ self.mapping = {}
+ self.mapping_idx = {}
+ try:
+ parser = self._get_parser(session)
+ except (RegistryNotFound, ObjectNotFound):
+ return # no parser yet, don't go further
+ self._load_mapping(session, parser=parser)
+
+ def add_schema_config(self, schemacfg, checkonly=False, parser=None):
+ """added CWSourceSchemaConfig, modify mapping accordingly"""
+ if parser is None:
+ parser = self._get_parser(schemacfg._cw)
+ parser.add_schema_config(schemacfg, checkonly)
+
+ def del_schema_config(self, schemacfg, checkonly=False, parser=None):
+ """deleted CWSourceSchemaConfig, modify mapping accordingly"""
+ if parser is None:
+ parser = self._get_parser(schemacfg._cw)
+ parser.del_schema_config(schemacfg, checkonly)
+
+ def fresh(self):
+ if self.latest_retrieval is None:
+ return False
+ return datetime.now() < (self.latest_retrieval + self.synchro_interval)
+
+ def pull_data(self, session, force=False):
+ if not force and self.fresh():
+ return
+ if self.config['delete-entities']:
+ myuris = self.source_cwuris(session)
+ else:
+ myuris = None
+ parser = self._get_parser(session, sourceuris=myuris)
+ error = False
+ self.info('pulling data for source %s', self.uri)
+ for url in self.urls:
+ try:
+ parser.process(url)
+ except IOError, exc:
+ self.error('could not pull data while processing %s: %s',
+ url, exc)
+ error = True
+ if error:
+ self.warning("some error occured, don't attempt to delete entities")
+ elif self.config['delete-entities'] and myuris:
+ byetype = {}
+ for eid, etype in myuris.values():
+ byetype.setdefault(etype, []).append(str(eid))
+ self.error('delete %s entities %s', self.uri, byetype)
+ for etype, eids in byetype.iteritems():
+ session.execute('DELETE %s X WHERE X eid IN (%s)'
+ % (etype, ','.join(eids)))
+ self.latest_retrieval = datetime.now()
+ session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
+ {'x': self.eid, 'date': self.latest_retrieval})
+ return parser.stats
+
+ def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
+ """called by the repository when an eid has been attributed for an
+ entity stored here but the entity has not been inserted in the system
+ table yet.
+
+ This method must return the an Entity instance representation of this
+ entity.
+ """
+ entity = super(DataFeedSource, self).before_entity_insertion(
+ session, lid, etype, eid, sourceparams)
+ entity.cw_edited['cwuri'] = unicode(lid)
+ entity.cw_edited.set_defaults()
+ sourceparams['parser'].before_entity_copy(entity, sourceparams)
+ # avoid query to search full-text indexed attributes
+ for attr in entity.e_schema.indexable_attributes():
+ entity.cw_edited.setdefault(attr, u'')
+ return entity
+
+ def after_entity_insertion(self, session, lid, entity, sourceparams):
+ """called by the repository after an entity stored here has been
+ inserted in the system table.
+ """
+ if session.is_hook_category_activated('integrity'):
+ entity.cw_edited.check(creation=True)
+ self.repo.system_source.add_entity(session, entity)
+ entity.cw_edited.saved = entity._cw_is_saved = True
+ sourceparams['parser'].after_entity_copy(entity, sourceparams)
+
+ def source_cwuris(self, session):
+ sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
+ 'WHERE entities.eid=cw_source_relation.eid_from '
+ 'AND cw_source_relation.eid_to=%s' % self.eid)
+ return dict((b64decode(uri), (eid, type))
+ for uri, eid, type in session.system_sql(sql))
+
+
+class DataFeedParser(AppObject):
+ __registry__ = 'parsers'
+
+ def __init__(self, session, source, sourceuris=None):
+ self._cw = session
+ self.source = source
+ self.sourceuris = sourceuris
+ self.stats = {'created': set(),
+ 'updated': set()}
+
+ def add_schema_config(self, schemacfg, checkonly=False):
+ """added CWSourceSchemaConfig, modify mapping accordingly"""
+ msg = schemacfg._cw._("this parser doesn't use a mapping")
+ raise ValidationError(schemacfg.eid, {None: msg})
+
+ def del_schema_config(self, schemacfg, checkonly=False):
+ """deleted CWSourceSchemaConfig, modify mapping accordingly"""
+ msg = schemacfg._cw._("this parser doesn't use a mapping")
+ raise ValidationError(schemacfg.eid, {None: msg})
+
+ def extid2entity(self, uri, etype, **sourceparams):
+ sourceparams['parser'] = self
+ eid = self.source.extid2eid(str(uri), etype, self._cw,
+ sourceparams=sourceparams)
+ if self.sourceuris is not None:
+ self.sourceuris.pop(str(uri), None)
+ return self._cw.entity_from_eid(eid, etype)
+
+ def process(self, url):
+ """main callback: process the url"""
+ raise NotImplementedError
+
+ def before_entity_copy(self, entity, sourceparams):
+ raise NotImplementedError
+
+ def after_entity_copy(self, entity, sourceparams):
+ self.stats['created'].add(entity.eid)
+
+ def created_during_pull(self, entity):
+ return entity.eid in self.stats['created']
+
+ def updated_during_pull(self, entity):
+ return entity.eid in self.stats['updated']
+
+ def notify_updated(self, entity):
+ return self.stats['updated'].add(entity.eid)
--- a/server/sources/ldapuser.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/sources/ldapuser.py Wed Feb 09 18:06:17 2011 +0100
@@ -202,7 +202,7 @@
self._cache = {}
self._query_cache = TimedCache(self._cache_ttl)
- def init(self, activated, session=None):
+ def init(self, activated, source_entity):
"""method called by the repository once ready to handle request"""
if activated:
self.info('ldap init')
@@ -575,7 +575,7 @@
self.debug('ldap built results %s', len(result))
return result
- def before_entity_insertion(self, session, lid, etype, eid):
+ def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
"""called by the repository when an eid has been attributed for an
entity stored here but the entity has not been inserted in the system
table yet.
@@ -584,18 +584,20 @@
entity.
"""
self.debug('ldap before entity insertion')
- entity = super(LDAPUserSource, self).before_entity_insertion(session, lid, etype, eid)
+ entity = super(LDAPUserSource, self).before_entity_insertion(
+ session, lid, etype, eid, sourceparams)
res = self._search(session, lid, BASE)[0]
for attr in entity.e_schema.indexable_attributes():
entity.cw_edited[attr] = res[self.user_rev_attrs[attr]]
return entity
- def after_entity_insertion(self, session, lid, entity):
+ def after_entity_insertion(self, session, lid, entity, sourceparams):
"""called by the repository after an entity stored here has been
inserted in the system table.
"""
self.debug('ldap after entity insertion')
- super(LDAPUserSource, self).after_entity_insertion(session, lid, entity)
+ super(LDAPUserSource, self).after_entity_insertion(
+ session, lid, entity, sourceparams)
dn = lid
for group in self.user_default_groups:
session.execute('SET X in_group G WHERE X eid %(x)s, G name %(group)s',
--- a/server/sources/native.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/sources/native.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -372,8 +372,8 @@
if self.repo.config.open_connections_pools:
self.open_pool_connections()
- def init(self, activated, session=None):
- self.init_creating(session and session.pool)
+ def init(self, activated, source_entity):
+ self.init_creating(source_entity._cw.pool)
def shutdown(self):
if self._eid_creation_cnx:
@@ -803,13 +803,13 @@
res[-1] = b64decode(res[-1])
return res
- def extid2eid(self, session, source, extid):
+ def extid2eid(self, session, source_uri, extid):
"""get eid from an external id. Return None if no record found."""
assert isinstance(extid, str)
cursor = self.doexec(session,
'SELECT eid FROM entities '
'WHERE extid=%(x)s AND source=%(s)s',
- {'x': b64encode(extid), 's': source.uri})
+ {'x': b64encode(extid), 's': source_uri})
# XXX testing rowcount cause strange bug with sqlite, results are there
# but rowcount is 0
#if cursor.rowcount > 0:
@@ -898,8 +898,9 @@
if extid is not None:
assert isinstance(extid, str)
extid = b64encode(extid)
+ uri = 'system' if source.copy_based_source else source.uri
attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
- 'source': source.uri, 'mtime': datetime.now()}
+ 'source': uri, 'mtime': datetime.now()}
self.doexec(session, self.sqlgen.insert('entities', attrs), attrs)
# insert core relations: is, is_instance_of and cw_source
try:
--- a/server/sources/pyrorql.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/sources/pyrorql.py Wed Feb 09 18:06:17 2011 +0100
@@ -168,9 +168,9 @@
finally:
session.close()
- def init(self, activated, session=None):
+ def init(self, activated, source_entity):
"""method called by the repository once ready to handle request"""
- self.load_mapping(session)
+ self.load_mapping(source_entity._cw)
if activated:
interval = self.config['synchronization-interval']
self.repo.looping_task(interval, self.synchronize)
@@ -184,19 +184,7 @@
self.cross_relations = set()
assert self.eid is not None
self._schemacfg_idx = {}
- if session is None:
- _session = self.repo.internal_session()
- else:
- _session = session
- try:
- for schemacfg in _session.execute(
- 'Any CFG,CFGO,SN,S WHERE '
- 'CFG options CFGO, CFG cw_schema S, S name SN, '
- 'CFG cw_for_source X, X eid %(x)s', {'x': self.eid}).entities():
- self.add_schema_config(schemacfg)
- finally:
- if session is None:
- _session.close()
+ self._load_mapping(session)
etype_options = set(('write',))
rtype_options = set(('maycross', 'dontcross', 'write',))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server/test/unittest_datafeed.py Wed Feb 09 18:06:17 2011 +0100
@@ -0,0 +1,98 @@
+# copyright 2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import with_statement
+
+from datetime import timedelta
+
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb.server.sources import datafeed
+
+
+class DataFeedTC(CubicWebTC):
+ def setup_database(self):
+ self.request().create_entity('CWSource', name=u'myfeed', type=u'datafeed',
+ parser=u'testparser', url=u'ignored',
+ config=u'synchronization-interval=1min')
+
+ def test(self):
+ self.assertIn('myfeed', self.repo.sources_by_uri)
+ dfsource = self.repo.sources_by_uri['myfeed']
+ self.assertNotIn(dfsource, self.repo.sources)
+ self.assertEqual(dfsource.latest_retrieval, None)
+ self.assertEqual(dfsource.synchro_interval, timedelta(seconds=60))
+ self.assertFalse(dfsource.fresh())
+
+ class AParser(datafeed.DataFeedParser):
+ __regid__ = 'testparser'
+ def process(self, url):
+ entity = self.extid2entity('http://www.cubicweb.org/', 'Card',
+ item={'title': u'cubicweb.org',
+ 'content': u'the cw web site'})
+ if not self.created_during_pull(entity):
+ self.notify_updated(entity)
+ def before_entity_copy(self, entity, sourceparams):
+ entity.cw_edited.update(sourceparams['item'])
+
+ with self.temporary_appobjects(AParser):
+ stats = dfsource.pull_data(self.session, force=True)
+ self.commit()
+ # test import stats
+ self.assertEqual(sorted(stats.keys()), ['created', 'updated'])
+ self.assertEqual(len(stats['created']), 1)
+ entity = self.execute('Card X').get_entity(0, 0)
+ self.assertIn(entity.eid, stats['created'])
+ self.assertEqual(stats['updated'], set())
+ # test imported entities
+ self.assertEqual(entity.title, 'cubicweb.org')
+ self.assertEqual(entity.content, 'the cw web site')
+ self.assertEqual(entity.cwuri, 'http://www.cubicweb.org/')
+ self.assertEqual(entity.cw_source[0].name, 'myfeed')
+ self.assertEqual(entity.cw_metainformation(),
+ {'type': 'Card',
+ 'source': {'uri': 'system', 'type': 'native'},
+ 'extid': 'http://www.cubicweb.org/'}
+ )
+ # test repo cache keys
+ self.assertEqual(self.repo._type_source_cache[entity.eid],
+ ('Card', 'system', 'http://www.cubicweb.org/'))
+ self.assertEqual(self.repo._extid_cache[('http://www.cubicweb.org/', 'system')],
+ entity.eid)
+ # test repull
+ stats = dfsource.pull_data(self.session, force=True)
+ self.assertEqual(stats['created'], set())
+ self.assertEqual(stats['updated'], set((entity.eid,)))
+ # test repull with caches reseted
+ self.repo._type_source_cache.clear()
+ self.repo._extid_cache.clear()
+ stats = dfsource.pull_data(self.session, force=True)
+ self.assertEqual(stats['created'], set())
+ self.assertEqual(stats['updated'], set((entity.eid,)))
+ self.assertEqual(self.repo._type_source_cache[entity.eid],
+ ('Card', 'system', 'http://www.cubicweb.org/'))
+ self.assertEqual(self.repo._extid_cache[('http://www.cubicweb.org/', 'system')],
+ entity.eid)
+
+ self.assertEqual(dfsource.source_cwuris(self.session),
+ {'http://www.cubicweb.org/': (entity.eid, 'Card')}
+ )
+ self.assertTrue(dfsource.latest_retrieval)
+ self.assertTrue(dfsource.fresh())
+
+if __name__ == '__main__':
+ from logilab.common.testlib import unittest_main
+ unittest_main()
--- a/server/test/unittest_msplanner.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/test/unittest_msplanner.py Wed Feb 09 18:06:17 2011 +0100
@@ -58,6 +58,10 @@
def syntax_tree_search(self, *args, **kwargs):
return []
+
+class FakeDataFeedSource(FakeCardSource):
+ copy_based_source = True
+
X_ALL_SOLS = sorted([{'X': 'Affaire'}, {'X': 'BaseTransition'}, {'X': 'Basket'},
{'X': 'Bookmark'}, {'X': 'CWAttribute'}, {'X': 'CWCache'},
{'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWEType'},
@@ -110,6 +114,7 @@
self.schema['CWUser'].set_action_permissions('read', userreadperms)
self.add_source(FakeUserROSource, 'ldap')
self.add_source(FakeCardSource, 'cards')
+ self.add_source(FakeDataFeedSource, 'datafeed')
def tearDown(self):
# restore hijacked security
@@ -1955,6 +1960,22 @@
])
def test_source_specified_1_2(self):
+ self._test('Card X WHERE X cw_source S, S name "datafeed"',
+ [('OneFetchStep', [('Any X WHERE X cw_source S, S name "datafeed", X is Card',
+ [{'X': 'Card', 'S': 'CWSource'}])],
+ None, None,
+ [self.system],{}, [])
+ ])
+
+ def test_source_specified_1_3(self):
+ self._test('Any X, SN WHERE X is Card, X cw_source S, S name "datafeed", S name SN',
+ [('OneFetchStep', [('Any X,SN WHERE X is Card, X cw_source S, S name "datafeed", '
+ 'S name SN',
+ [{'S': 'CWSource', 'SN': 'String', 'X': 'Card'}])],
+ None, None, [self.system], {}, [])
+ ])
+
+ def test_source_specified_1_4(self):
sols = []
for sol in X_ALL_SOLS:
sol = sol.copy()
@@ -2004,6 +2025,14 @@
])
def test_source_specified_3_2(self):
+ self._test('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"',
+ [('OneFetchStep',
+ [('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"',
+ [{'X': 'Card', 'XT': 'String', 'S': 'CWSource'}])],
+ None, None, [self.system], {}, [])
+ ])
+
+ def test_source_specified_3_3(self):
self.skipTest('oops')
self._test('Any STN WHERE X is Note, X type XT, X in_state ST, ST name STN, X cw_source S, S name "cards"',
[('OneFetchStep',
--- a/server/test/unittest_ssplanner.py Wed Feb 09 18:06:13 2011 +0100
+++ b/server/test/unittest_ssplanner.py Wed Feb 09 18:06:17 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
--- a/test/unittest_schema.py Wed Feb 09 18:06:13 2011 +0100
+++ b/test/unittest_schema.py Wed Feb 09 18:06:17 2011 +0100
@@ -195,7 +195,7 @@
'identity', 'in_group', 'in_state', 'indexed',
'initial_state', 'inlined', 'internationalizable', 'is', 'is_instance_of',
- 'label', 'last_login_time', 'login',
+ 'label', 'last_login_time', 'latest_retrieval', 'login',
'mainvars', 'match_host', 'modification_date',
@@ -203,7 +203,7 @@
'options', 'ordernum', 'owned_by',
- 'path', 'pkey', 'prefered_form', 'prenom', 'primary_email',
+ 'parser', 'path', 'pkey', 'prefered_form', 'prenom', 'primary_email',
'read_permission', 'relation_type', 'relations', 'require_group',
@@ -211,7 +211,7 @@
'tags', 'timestamp', 'title', 'to_entity', 'to_state', 'transition_of', 'travaille', 'type',
- 'upassword', 'update_permission', 'uri', 'use_email',
+ 'upassword', 'update_permission', 'url', 'uri', 'use_email',
'value',