server/sources/datafeed.py
branchstable
changeset 8529 1daea1f433c9
parent 8435 5064b6e0d6f4
child 8535 268b6349baf3
child 8547 f23ac525ddd1
equal deleted inserted replaced
8528:f32c50c6b7e0 8529:1daea1f433c9
   150             return False
   150             return False
   151         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   151         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   152 
   152 
   153     def update_latest_retrieval(self, session):
   153     def update_latest_retrieval(self, session):
   154         self.latest_retrieval = datetime.utcnow()
   154         self.latest_retrieval = datetime.utcnow()
       
   155         session.set_cnxset()
   155         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   156         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   156                         {'x': self.eid, 'date': self.latest_retrieval})
   157                         {'x': self.eid, 'date': self.latest_retrieval})
       
   158         session.commit()
   157 
   159 
   158     def acquire_synchronization_lock(self, session):
   160     def acquire_synchronization_lock(self, session):
   159         # XXX race condition until WHERE of SET queries is executed using
   161         # XXX race condition until WHERE of SET queries is executed using
   160         # 'SELECT FOR UPDATE'
   162         # 'SELECT FOR UPDATE'
   161         now = datetime.utcnow()
   163         now = datetime.utcnow()
       
   164         session.set_cnxset()
   162         if not session.execute(
   165         if not session.execute(
   163             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   166             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   164             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   167             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   165             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   168             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   166             self.error('concurrent synchronization detected, skip pull')
   169             self.error('concurrent synchronization detected, skip pull')
   167             session.commit(free_cnxset=False)
   170             session.commit()
   168             return False
   171             return False
   169         session.commit(free_cnxset=False)
   172         session.commit()
   170         return True
   173         return True
   171 
   174 
   172     def release_synchronization_lock(self, session):
   175     def release_synchronization_lock(self, session):
   173         session.set_cnxset()
   176         session.set_cnxset()
   174         session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   177         session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   203         stats = parser.stats
   206         stats = parser.stats
   204         if stats.get('created'):
   207         if stats.get('created'):
   205             importlog.record_info('added %s entities' % len(stats['created']))
   208             importlog.record_info('added %s entities' % len(stats['created']))
   206         if stats.get('updated'):
   209         if stats.get('updated'):
   207             importlog.record_info('updated %s entities' % len(stats['updated']))
   210             importlog.record_info('updated %s entities' % len(stats['updated']))
       
   211         session.set_cnxset()
   208         importlog.write_log(session, end_timestamp=self.latest_retrieval)
   212         importlog.write_log(session, end_timestamp=self.latest_retrieval)
       
   213         session.commit()
   209         return stats
   214         return stats
   210 
   215 
   211     def process_urls(self, parser, urls, raise_on_error=False):
   216     def process_urls(self, parser, urls, raise_on_error=False):
   212         error = False
   217         error = False
   213         for url in urls:
   218         for url in urls:
   374             for extid, (eid, etype) in myuris.iteritems():
   379             for extid, (eid, etype) in myuris.iteritems():
   375                 if self.is_deleted(extid, etype, eid):
   380                 if self.is_deleted(extid, etype, eid):
   376                     byetype.setdefault(etype, []).append(str(eid))
   381                     byetype.setdefault(etype, []).append(str(eid))
   377             for etype, eids in byetype.iteritems():
   382             for etype, eids in byetype.iteritems():
   378                 self.warning('delete %s %s entities', len(eids), etype)
   383                 self.warning('delete %s %s entities', len(eids), etype)
       
   384                 session.set_cnxset()
   379                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   385                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   380                                 % (etype, ','.join(eids)))
   386                                 % (etype, ','.join(eids)))
       
   387                 session.commit()
   381 
   388 
   382     def update_if_necessary(self, entity, attrs):
   389     def update_if_necessary(self, entity, attrs):
   383         entity.complete(tuple(attrs))
   390         entity.complete(tuple(attrs))
   384         # check modification date and compare attribute values to only update
   391         # check modification date and compare attribute values to only update
   385         # what's actually needed
   392         # what's actually needed