27 from six.moves.urllib.parse import urlparse |
27 from six.moves.urllib.parse import urlparse |
28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor |
28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor |
29 from six.moves.urllib.error import HTTPError |
29 from six.moves.urllib.error import HTTPError |
30 from six.moves.http_cookiejar import CookieJar |
30 from six.moves.http_cookiejar import CookieJar |
31 |
31 |
|
32 from pytz import utc |
32 from lxml import etree |
33 from lxml import etree |
33 |
34 |
34 from logilab.common.deprecation import deprecated |
35 from logilab.common.deprecation import deprecated |
35 |
36 |
36 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid |
37 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid |
160 parser.del_schema_config(schemacfg, checkonly) |
161 parser.del_schema_config(schemacfg, checkonly) |
161 |
162 |
162 def fresh(self): |
163 def fresh(self): |
163 if self.latest_retrieval is None: |
164 if self.latest_retrieval is None: |
164 return False |
165 return False |
165 return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) |
166 return datetime.now(tz=utc) < (self.latest_retrieval + self.synchro_interval) |
166 |
167 |
167 def update_latest_retrieval(self, cnx): |
168 def update_latest_retrieval(self, cnx): |
168 self.latest_retrieval = datetime.utcnow() |
169 self.latest_retrieval = datetime.now(tz=utc) |
169 cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
170 cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
170 {'x': self.eid, 'date': self.latest_retrieval}) |
171 {'x': self.eid, 'date': self.latest_retrieval}) |
171 cnx.commit() |
172 cnx.commit() |
172 |
173 |
173 def acquire_synchronization_lock(self, cnx): |
174 def acquire_synchronization_lock(self, cnx): |
174 # XXX race condition until WHERE of SET queries is executed using |
175 # XXX race condition until WHERE of SET queries is executed using |
175 # 'SELECT FOR UPDATE' |
176 # 'SELECT FOR UPDATE' |
176 now = datetime.utcnow() |
177 now = datetime.now(tz=utc) |
177 if not cnx.execute( |
178 if not cnx.execute( |
178 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
179 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
179 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
180 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
180 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
181 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
181 self.error('concurrent synchronization detected, skip pull') |
182 self.error('concurrent synchronization detected, skip pull') |
292 return dict((self.decode_extid(uri), (eid, type)) |
293 return dict((self.decode_extid(uri), (eid, type)) |
293 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
294 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
294 |
295 |
295 def init_import_log(self, cnx, **kwargs): |
296 def init_import_log(self, cnx, **kwargs): |
296 dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, |
297 dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, |
297 start_timestamp=datetime.utcnow(), |
298 start_timestamp=datetime.now(tz=utc), |
298 **kwargs) |
299 **kwargs) |
299 dataimport.init() |
300 dataimport.init() |
300 return dataimport |
301 return dataimport |
301 |
302 |
302 |
303 |
303 class DataFeedParser(AppObject): |
304 class DataFeedParser(AppObject): |