server/sources/datafeed.py
branchstable
changeset 7921 a93e2ed5877a
parent 7731 48e78934a4e2
child 7931 60068dc83457
child 7933 b25dda2214a2
equal deleted inserted replaced
7917:436400e7f807 7921:a93e2ed5877a
    53           'default': '5min',
    53           'default': '5min',
    54           'help': ('Interval in seconds between synchronization with the '
    54           'help': ('Interval in seconds between synchronization with the '
    55                    'external source (default to 5 minutes, must be >= 1 min).'),
    55                    'external source (default to 5 minutes, must be >= 1 min).'),
    56           'group': 'datafeed-source', 'level': 2,
    56           'group': 'datafeed-source', 'level': 2,
    57           }),
    57           }),
       
    58         ('max-lock-lifetime',
       
    59          {'type' : 'time',
       
    60           'default': '1h',
       
    61           'help': ('Maximum time allowed for a synchronization to be run. '
       
    62                    'Exceeded that time, the synchronization will be considered '
       
    63                    'as having failed and not properly released the lock, hence '
       
    64                    'it won\'t be considered'),
       
    65           'group': 'datafeed-source', 'level': 2,
       
    66           }),
    58         ('delete-entities',
    67         ('delete-entities',
    59          {'type' : 'yn',
    68          {'type' : 'yn',
    60           'default': True,
    69           'default': True,
    61           'help': ('Should already imported entities not found anymore on the '
    70           'help': ('Should already imported entities not found anymore on the '
    62                    'external source be deleted?'),
    71                    'external source be deleted?'),
    88     def update_config(self, source_entity, typedconfig):
    97     def update_config(self, source_entity, typedconfig):
    89         """update configuration from source entity. `typedconfig` is config
    98         """update configuration from source entity. `typedconfig` is config
    90         properly typed with defaults set
    99         properly typed with defaults set
    91         """
   100         """
    92         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
   101         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
       
   102         self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime'])
    93         if source_entity is not None:
   103         if source_entity is not None:
    94             self._entity_update(source_entity)
   104             self._entity_update(source_entity)
    95         self.config = typedconfig
   105         self.config = typedconfig
    96 
   106 
    97     def init(self, activated, source_entity):
   107     def init(self, activated, source_entity):
   136                         {'x': self.eid, 'date': self.latest_retrieval})
   146                         {'x': self.eid, 'date': self.latest_retrieval})
   137 
   147 
   138     def acquire_synchronization_lock(self, session):
   148     def acquire_synchronization_lock(self, session):
   139         # XXX race condition until WHERE of SET queries is executed using
   149         # XXX race condition until WHERE of SET queries is executed using
   140         # 'SELECT FOR UPDATE'
   150         # 'SELECT FOR UPDATE'
   141         if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
   151         now = datetime.utcnow()
   142                                {'x': self.eid}):
   152         if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s',
       
   153                                {'x': self.eid,
       
   154                                 'now': now,
       
   155                                 'maxdt': now - self.max_lock_lifetime}):
   143             self.error('concurrent synchronization detected, skip pull')
   156             self.error('concurrent synchronization detected, skip pull')
   144             session.commit(free_cnxset=False)
   157             session.commit(free_cnxset=False)
   145             return False
   158             return False
   146         session.commit(free_cnxset=False)
   159         session.commit(free_cnxset=False)
   147         return True
   160         return True
   148 
   161 
   149     def release_synchronization_lock(self, session):
   162     def release_synchronization_lock(self, session):
   150         session.set_cnxset()
   163         session.set_cnxset()
   151         session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
   164         session.execute('SET X synchronizing None WHERE X eid %(x)s',
   152                         {'x': self.eid})
   165                         {'x': self.eid})
   153         session.commit()
   166         session.commit()
   154 
   167 
   155     def pull_data(self, session, force=False, raise_on_error=False):
   168     def pull_data(self, session, force=False, raise_on_error=False):
   156         """Launch synchronization of the source if needed.
   169         """Launch synchronization of the source if needed.