server/sources/datafeed.py
changeset 7931 60068dc83457
parent 7910 e5d5609e3bf1
parent 7921 a93e2ed5877a
child 7934 2250a60a7653
equal deleted inserted replaced
7929:900f1627b171 7931:60068dc83457
    53           'default': '5min',
    53           'default': '5min',
    54           'help': ('Interval in seconds between synchronization with the '
    54           'help': ('Interval in seconds between synchronization with the '
    55                    'external source (default to 5 minutes, must be >= 1 min).'),
    55                    'external source (default to 5 minutes, must be >= 1 min).'),
    56           'group': 'datafeed-source', 'level': 2,
    56           'group': 'datafeed-source', 'level': 2,
    57           }),
    57           }),
       
    58         ('max-lock-lifetime',
       
    59          {'type' : 'time',
       
    60           'default': '1h',
       
    61           'help': ('Maximum time allowed for a synchronization to be run. '
       
    62                    'Exceeded that time, the synchronization will be considered '
       
    63                    'as having failed and not properly released the lock, hence '
       
    64                    'it won\'t be considered'),
       
    65           'group': 'datafeed-source', 'level': 2,
       
    66           }),
    58         ('delete-entities',
    67         ('delete-entities',
    59          {'type' : 'yn',
    68          {'type' : 'yn',
    60           'default': True,
    69           'default': True,
    61           'help': ('Should already imported entities not found anymore on the '
    70           'help': ('Should already imported entities not found anymore on the '
    62                    'external source be deleted?'),
    71                    'external source be deleted?'),
    90     def update_config(self, source_entity, typedconfig):
    99     def update_config(self, source_entity, typedconfig):
    91         """update configuration from source entity. `typedconfig` is config
   100         """update configuration from source entity. `typedconfig` is config
    92         properly typed with defaults set
   101         properly typed with defaults set
    93         """
   102         """
    94         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
   103         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
       
   104         self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime'])
    95         if source_entity is not None:
   105         if source_entity is not None:
    96             self._entity_update(source_entity)
   106             self._entity_update(source_entity)
    97         self.config = typedconfig
   107         self.config = typedconfig
    98 
   108 
    99     def init(self, activated, source_entity):
   109     def init(self, activated, source_entity):
   138                         {'x': self.eid, 'date': self.latest_retrieval})
   148                         {'x': self.eid, 'date': self.latest_retrieval})
   139 
   149 
   140     def acquire_synchronization_lock(self, session):
   150     def acquire_synchronization_lock(self, session):
   141         # XXX race condition until WHERE of SET queries is executed using
   151         # XXX race condition until WHERE of SET queries is executed using
   142         # 'SELECT FOR UPDATE'
   152         # 'SELECT FOR UPDATE'
   143         if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
   153         now = datetime.utcnow()
   144                                {'x': self.eid}):
   154         if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s',
       
   155                                {'x': self.eid,
       
   156                                 'now': now,
       
   157                                 'maxdt': now - self.max_lock_lifetime}):
   145             self.error('concurrent synchronization detected, skip pull')
   158             self.error('concurrent synchronization detected, skip pull')
   146             session.commit(free_cnxset=False)
   159             session.commit(free_cnxset=False)
   147             return False
   160             return False
   148         session.commit(free_cnxset=False)
   161         session.commit(free_cnxset=False)
   149         return True
   162         return True
   150 
   163 
   151     def release_synchronization_lock(self, session):
   164     def release_synchronization_lock(self, session):
   152         session.set_cnxset()
   165         session.set_cnxset()
   153         session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
   166         session.execute('SET X synchronizing None WHERE X eid %(x)s',
   154                         {'x': self.eid})
   167                         {'x': self.eid})
   155         session.commit()
   168         session.commit()
   156 
   169 
   157     def pull_data(self, session, force=False, raise_on_error=False):
   170     def pull_data(self, session, force=False, raise_on_error=False):
   158         """Launch synchronization of the source if needed.
   171         """Launch synchronization of the source if needed.