[datafeed] add max-lifetime for concurrent synchronization lock (closes #1908676) stable
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 07 Oct 2011 15:55:14 +0200
branchstable
changeset 7921 a93e2ed5877a
parent 7917 436400e7f807
child 7922 d307c3817782
[datafeed] add max-lifetime for concurrent synchronization lock (closes #1908676)
__pkginfo__.py
i18n/de.po
i18n/en.po
i18n/es.po
i18n/fr.po
misc/migration/3.13.0_Any.py
misc/migration/3.13.8_Any.py
schemas/base.py
server/sources/datafeed.py
--- a/__pkginfo__.py	Fri Oct 07 11:47:42 2011 +0200
+++ b/__pkginfo__.py	Fri Oct 07 15:55:14 2011 +0200
@@ -22,7 +22,7 @@
 
 modname = distname = "cubicweb"
 
-numversion = (3, 13, 7)
+numversion = (3, 13, 8)
 version = '.'.join(str(num) for num in numversion)
 
 description = "a repository of entities / relations for knowledge management"
--- a/i18n/de.po	Fri Oct 07 11:47:42 2011 +0200
+++ b/i18n/de.po	Fri Oct 07 15:55:14 2011 +0200
@@ -1867,9 +1867,6 @@
 msgid "ctxtoolbar"
 msgstr "Werkzeugleiste"
 
-msgid "currently in synchronization"
-msgstr ""
-
 msgid "custom_workflow"
 msgstr "angepasster Workflow"
 
@@ -2753,6 +2750,13 @@
 msgid "in_state_object"
 msgstr "Zustand von"
 
+msgid "in_synchronization"
+msgstr ""
+
+msgctxt "CWSource"
+msgid "in_synchronization"
+msgstr ""
+
 msgid "incontext"
 msgstr "im Kontext"
 
@@ -3800,6 +3804,11 @@
 msgid "specifying %s is mandatory"
 msgstr ""
 
+msgid ""
+"start timestamp of the currently in synchronization, or NULL when no "
+"synchronization in progress."
+msgstr ""
+
 msgid "startup views"
 msgstr "Start-Ansichten"
 
@@ -3935,13 +3944,6 @@
 msgid "synchronization-interval must be greater than 1 minute"
 msgstr ""
 
-msgid "synchronizing"
-msgstr ""
-
-msgctxt "CWSource"
-msgid "synchronizing"
-msgstr ""
-
 msgid "table"
 msgstr "Tabelle"
 
--- a/i18n/en.po	Fri Oct 07 11:47:42 2011 +0200
+++ b/i18n/en.po	Fri Oct 07 15:55:14 2011 +0200
@@ -1822,9 +1822,6 @@
 msgid "ctxtoolbar"
 msgstr "toolbar"
 
-msgid "currently in synchronization"
-msgstr ""
-
 msgid "custom_workflow"
 msgstr "custom workflow"
 
@@ -2680,6 +2677,13 @@
 msgid "in_state_object"
 msgstr "state of"
 
+msgid "in_synchronization"
+msgstr "in synchronization"
+
+msgctxt "CWSource"
+msgid "in_synchronization"
+msgstr "in synchronization"
+
 msgid "incontext"
 msgstr "in-context"
 
@@ -3700,6 +3704,11 @@
 msgid "specifying %s is mandatory"
 msgstr ""
 
+msgid ""
+"start timestamp of the currently in synchronization, or NULL when no "
+"synchronization in progress."
+msgstr ""
+
 msgid "startup views"
 msgstr ""
 
@@ -3831,13 +3840,6 @@
 msgid "synchronization-interval must be greater than 1 minute"
 msgstr ""
 
-msgid "synchronizing"
-msgstr ""
-
-msgctxt "CWSource"
-msgid "synchronizing"
-msgstr ""
-
 msgid "table"
 msgstr ""
 
--- a/i18n/es.po	Fri Oct 07 11:47:42 2011 +0200
+++ b/i18n/es.po	Fri Oct 07 15:55:14 2011 +0200
@@ -1896,9 +1896,6 @@
 msgid "ctxtoolbar"
 msgstr "Barra de herramientas"
 
-msgid "currently in synchronization"
-msgstr ""
-
 msgid "custom_workflow"
 msgstr "Workflow específico"
 
@@ -2796,6 +2793,13 @@
 msgid "in_state_object"
 msgstr "Estado de"
 
+msgid "in_synchronization"
+msgstr ""
+
+msgctxt "CWSource"
+msgid "in_synchronization"
+msgstr ""
+
 msgid "incontext"
 msgstr "En el contexto"
 
@@ -3850,6 +3854,11 @@
 msgid "specifying %s is mandatory"
 msgstr "especificar %s es obligatorio"
 
+msgid ""
+"start timestamp of the currently in synchronization, or NULL when no "
+"synchronization in progress."
+msgstr ""
+
 msgid "startup views"
 msgstr "Vistas de inicio"
 
@@ -3985,13 +3994,6 @@
 msgid "synchronization-interval must be greater than 1 minute"
 msgstr "synchronization-interval debe ser mayor a 1 minuto"
 
-msgid "synchronizing"
-msgstr ""
-
-msgctxt "CWSource"
-msgid "synchronizing"
-msgstr ""
-
 msgid "table"
 msgstr "Tabla"
 
--- a/i18n/fr.po	Fri Oct 07 11:47:42 2011 +0200
+++ b/i18n/fr.po	Fri Oct 07 15:55:14 2011 +0200
@@ -1902,9 +1902,6 @@
 msgid "ctxtoolbar"
 msgstr "barre d'outils"
 
-msgid "currently in synchronization"
-msgstr "en cours de synchronisation"
-
 msgid "custom_workflow"
 msgstr "workflow spécifique"
 
@@ -2797,6 +2794,13 @@
 msgid "in_state_object"
 msgstr "état de"
 
+msgid "in_synchronization"
+msgstr "en cours de synchronisation"
+
+msgctxt "CWSource"
+msgid "in_synchronization"
+msgstr "en cours de synchronisation"
+
 msgid "incontext"
 msgstr "dans le contexte"
 
@@ -3855,6 +3859,12 @@
 msgid "specifying %s is mandatory"
 msgstr "spécifier %s est obligatoire"
 
+msgid ""
+"start timestamp of the currently in synchronization, or NULL when no "
+"synchronization in progress."
+msgstr ""
+"horodate de départ de la synchronisation en cours, ou NULL s'il n'y en a pas."
+
 msgid "startup views"
 msgstr "vues de départ"
 
@@ -3990,13 +4000,6 @@
 msgid "synchronization-interval must be greater than 1 minute"
 msgstr "synchronization-interval doit être supérieur à 1 minute"
 
-msgid "synchronizing"
-msgstr "synchronisation"
-
-msgctxt "CWSource"
-msgid "synchronizing"
-msgstr "synchronisation"
-
 msgid "table"
 msgstr "table"
 
--- a/misc/migration/3.13.0_Any.py	Fri Oct 07 11:47:42 2011 +0200
+++ b/misc/migration/3.13.0_Any.py	Fri Oct 07 15:55:14 2011 +0200
@@ -1,4 +1,3 @@
 sync_schema_props_perms('cw_source', syncprops=False)
-add_attribute('CWSource', 'synchronizing')
 if schema['BigInt'].eid is None:
     add_entity_type('BigInt')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/misc/migration/3.13.8_Any.py	Fri Oct 07 15:55:14 2011 +0200
@@ -0,0 +1,2 @@
+drop_attribute('CWSource', 'synchronizing')
+add_attribute('CWSource', 'in_synchronization')
--- a/schemas/base.py	Fri Oct 07 11:47:42 2011 +0200
+++ b/schemas/base.py	Fri Oct 07 15:55:14 2011 +0200
@@ -278,8 +278,9 @@
     url = String(description=_('URLs from which content will be imported. You can put one url per line'))
     parser = String(description=_('parser to use to extract entities from content retrieved at given URLs.'))
     latest_retrieval = Datetime(description=_('latest synchronization time'))
-    synchronizing = Boolean(description=_('currently in synchronization'),
-                            default=False)
+    in_synchronization = TZDatetime(description=_('start timestamp of the currently in synchronization, or NULL when no synchronization in progress.'),
+                                    default=False)
+
 
 ENTITY_MANAGERS_PERMISSIONS = {
     'read':   ('managers',),
--- a/server/sources/datafeed.py	Fri Oct 07 11:47:42 2011 +0200
+++ b/server/sources/datafeed.py	Fri Oct 07 15:55:14 2011 +0200
@@ -55,6 +55,15 @@
                    'external source (default to 5 minutes, must be >= 1 min).'),
           'group': 'datafeed-source', 'level': 2,
           }),
+        ('max-lock-lifetime',
+         {'type' : 'time',
+          'default': '1h',
+          'help': ('Maximum time allowed for a synchronization to be run. '
+                   'Exceeded that time, the synchronization will be considered '
+                   'as having failed and not properly released the lock, hence '
+                   'it won\'t be considered'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
         ('delete-entities',
          {'type' : 'yn',
           'default': True,
@@ -90,6 +99,7 @@
         properly typed with defaults set
         """
         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
+        self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime'])
         if source_entity is not None:
             self._entity_update(source_entity)
         self.config = typedconfig
@@ -138,8 +148,11 @@
     def acquire_synchronization_lock(self, session):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
-        if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
-                               {'x': self.eid}):
+        now = datetime.utcnow()
+        if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s',
+                               {'x': self.eid,
+                                'now': now,
+                                'maxdt': now - self.max_lock_lifetime}):
             self.error('concurrent synchronization detected, skip pull')
             session.commit(free_cnxset=False)
             return False
@@ -148,7 +161,7 @@
 
     def release_synchronization_lock(self, session):
         session.set_cnxset()
-        session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+        session.execute('SET X synchronizing None WHERE X eid %(x)s',
                         {'x': self.eid})
         session.commit()