server/sources/datafeed.py
changeset 11042 079b32f4cd0d
parent 10914 fed8bd56f223
child 11055 3c1139344621
equal deleted inserted replaced
11041:eddf49a4d12e 11042:079b32f4cd0d
    27 from six.moves.urllib.parse import urlparse
    27 from six.moves.urllib.parse import urlparse
    28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor
    28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor
    29 from six.moves.urllib.error import HTTPError
    29 from six.moves.urllib.error import HTTPError
    30 from six.moves.http_cookiejar import CookieJar
    30 from six.moves.http_cookiejar import CookieJar
    31 
    31 
       
    32 from pytz import utc
    32 from lxml import etree
    33 from lxml import etree
    33 
    34 
    34 from logilab.common.deprecation import deprecated
    35 from logilab.common.deprecation import deprecated
    35 
    36 
    36 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
    37 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
   160         parser.del_schema_config(schemacfg, checkonly)
   161         parser.del_schema_config(schemacfg, checkonly)
   161 
   162 
   162     def fresh(self):
   163     def fresh(self):
   163         if self.latest_retrieval is None:
   164         if self.latest_retrieval is None:
   164             return False
   165             return False
   165         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   166         return datetime.now(tz=utc) < (self.latest_retrieval + self.synchro_interval)
   166 
   167 
   167     def update_latest_retrieval(self, cnx):
   168     def update_latest_retrieval(self, cnx):
   168         self.latest_retrieval = datetime.utcnow()
   169         self.latest_retrieval = datetime.now(tz=utc)
   169         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   170         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   170                     {'x': self.eid, 'date': self.latest_retrieval})
   171                     {'x': self.eid, 'date': self.latest_retrieval})
   171         cnx.commit()
   172         cnx.commit()
   172 
   173 
   173     def acquire_synchronization_lock(self, cnx):
   174     def acquire_synchronization_lock(self, cnx):
   174         # XXX race condition until WHERE of SET queries is executed using
   175         # XXX race condition until WHERE of SET queries is executed using
   175         # 'SELECT FOR UPDATE'
   176         # 'SELECT FOR UPDATE'
   176         now = datetime.utcnow()
   177         now = datetime.now(tz=utc)
   177         if not cnx.execute(
   178         if not cnx.execute(
   178             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   179             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   179             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   180             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   180             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   181             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   181             self.error('concurrent synchronization detected, skip pull')
   182             self.error('concurrent synchronization detected, skip pull')
   292         return dict((self.decode_extid(uri), (eid, type))
   293         return dict((self.decode_extid(uri), (eid, type))
   293                     for uri, eid, type in cnx.system_sql(sql).fetchall())
   294                     for uri, eid, type in cnx.system_sql(sql).fetchall())
   294 
   295 
   295     def init_import_log(self, cnx, **kwargs):
   296     def init_import_log(self, cnx, **kwargs):
   296         dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
   297         dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
   297                                            start_timestamp=datetime.utcnow(),
   298                                        start_timestamp=datetime.now(tz=utc),
   298                                            **kwargs)
   299                                        **kwargs)
   299         dataimport.init()
   300         dataimport.init()
   300         return dataimport
   301         return dataimport
   301 
   302 
   302 
   303 
   303 class DataFeedParser(AppObject):
   304 class DataFeedParser(AppObject):