merge with 3.21.5
authorRémi Cardona <remi.cardona@logilab.fr>
Wed, 16 Dec 2015 11:23:48 +0100
changeset 10991 7ceb0971c694
parent 10981 45bc791275b4 (current diff)
parent 10990 3a98422df969 (diff)
child 10995 c449f4415d0c
merge with 3.21.5
cubicweb.spec
dataimport/pgstore.py
--- a/.hgtags	Tue Dec 15 14:12:59 2015 +0100
+++ b/.hgtags	Wed Dec 16 11:23:48 2015 +0100
@@ -526,3 +526,6 @@
 d3b92d3a7db098b25168beef9b3ee7b36263a652 3.21.4
 d3b92d3a7db098b25168beef9b3ee7b36263a652 debian/3.21.4-1
 d3b92d3a7db098b25168beef9b3ee7b36263a652 centos/3.21.4-1
+e0572a786e6b4b0965d405dd95cf5bce754005a2 3.21.5
+e0572a786e6b4b0965d405dd95cf5bce754005a2 debian/3.21.5-1
+e0572a786e6b4b0965d405dd95cf5bce754005a2 centos/3.21.5-1
--- a/cubicweb.spec	Tue Dec 15 14:12:59 2015 +0100
+++ b/cubicweb.spec	Wed Dec 16 11:23:48 2015 +0100
@@ -7,7 +7,7 @@
 %endif
 
 Name:           cubicweb
-Version:        3.21.4
+Version:        3.21.5
 Release:        logilab.1%{?dist}
 Summary:        CubicWeb is a semantic web application framework
 Source0:        http://download.logilab.org/pub/cubicweb/cubicweb-%{version}.tar.gz
--- a/dataimport/pgstore.py	Tue Dec 15 14:12:59 2015 +0100
+++ b/dataimport/pgstore.py	Wed Dec 16 11:23:48 2015 +0100
@@ -18,7 +18,6 @@
 """Postgres specific store"""
 from __future__ import print_function
 
-import threading
 import warnings
 import os.path as osp
 from io import StringIO
@@ -35,28 +34,6 @@
 from cubicweb.server.sqlutils import SQL_PREFIX
 from cubicweb.dataimport.stores import NoHookRQLObjectStore
 
-def _import_statements(sql_connect, statements, nb_threads=3,
-                       dump_output_dir=None,
-                       support_copy_from=True, encoding='utf-8'):
-    """
-    Import a bunch of sql statements, using different threads.
-    """
-    try:
-        chunksize = (len(statements) / nb_threads) + 1
-        threads = []
-        for i in range(nb_threads):
-            chunks = statements[i*chunksize:(i+1)*chunksize]
-            thread = threading.Thread(target=_execmany_thread,
-                                      args=(sql_connect, chunks,
-                                            dump_output_dir,
-                                            support_copy_from,
-                                            encoding))
-            thread.start()
-            threads.append(thread)
-        for t in threads:
-            t.join()
-    except Exception:
-        print('Error in import statements')
 
 def _execmany_thread_not_copy_from(cu, statement, data, table=None,
                                    columns=None, encoding='utf-8'):
@@ -227,7 +204,7 @@
     >>> store.flush()
     """
 
-    def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3):
+    def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1):
         """
         Initialize a SQLGenObjectStore.
 
@@ -236,19 +213,18 @@
           - cnx: connection on the cubicweb instance
           - dump_output_dir: a directory to dump failed statements
             for easier recovery. Default is None (no dump).
-          - nb_threads_statement: number of threads used
-            for SQL insertion (default is 3).
         """
         super(SQLGenObjectStore, self).__init__(cnx)
         ### hijack default source
         self.source = SQLGenSourceWrapper(
             self.source, cnx.vreg.schema,
-            dump_output_dir=dump_output_dir,
-            nb_threads_statement=nb_threads_statement)
+            dump_output_dir=dump_output_dir)
         ### XXX This is done in super().__init__(), but should be
         ### redone here to link to the correct source
         self.add_relation = self.source.add_relation
         self.indexes_etypes = {}
+        if nb_threads_statement != 1:
+            warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning)
 
     def flush(self):
         """Flush data to the database"""
@@ -293,9 +269,8 @@
 class SQLGenSourceWrapper(object):
 
     def __init__(self, system_source, schema,
-                 dump_output_dir=None, nb_threads_statement=3):
+                 dump_output_dir=None):
         self.system_source = system_source
-        self._sql = threading.local()
         # Explicitely backport attributes from system source
         self._storage_handler = self.system_source._storage_handler
         self.preprocess_entity = self.system_source.preprocess_entity
@@ -310,9 +285,7 @@
         spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
         self.support_copy_from = spcfrom
         self.dbencoding = system_source.dbhelper.dbencoding
-        self.nb_threads_statement = nb_threads_statement
-        # initialize thread-local data for main thread
-        self.init_thread_locals()
+        self.init_statement_lists()
         self._inlined_rtypes_cache = {}
         self._fill_inlined_rtypes_cache(schema)
         self.schema = schema
@@ -325,20 +298,20 @@
                 if rschema.inlined:
                     cache[eschema.type] = SQL_PREFIX + rschema.type
 
-    def init_thread_locals(self):
-        """initializes thread-local data"""
-        self._sql.entities = defaultdict(list)
-        self._sql.relations = {}
-        self._sql.inlined_relations = {}
+    def init_statement_lists(self):
+        self._sql_entities = defaultdict(list)
+        self._sql_relations = {}
+        self._sql_inlined_relations = {}
+        self._sql_eids = defaultdict(list)
         # keep track, for each eid of the corresponding data dict
-        self._sql.eid_insertdicts = {}
+        self._sql_eid_insertdicts = {}
 
     def flush(self):
         print('starting flush')
-        _entities_sql = self._sql.entities
-        _relations_sql = self._sql.relations
-        _inlined_relations_sql = self._sql.inlined_relations
-        _insertdicts = self._sql.eid_insertdicts
+        _entities_sql = self._sql_entities
+        _relations_sql = self._sql_relations
+        _inlined_relations_sql = self._sql_inlined_relations
+        _insertdicts = self._sql_eid_insertdicts
         try:
             # try, for each inlined_relation, to find if we're also creating
             # the host entity (i.e. the subject of the relation).
@@ -365,14 +338,14 @@
                         # UPDATE query
                         new_datalist.append(data)
                 _inlined_relations_sql[statement] = new_datalist
-            _import_statements(self.system_source.get_connection,
-                               _entities_sql.items()
-                               + _relations_sql.items()
-                               + _inlined_relations_sql.items(),
-                               dump_output_dir=self.dump_output_dir,
-                               nb_threads=self.nb_threads_statement,
-                               support_copy_from=self.support_copy_from,
-                               encoding=self.dbencoding)
+            _execmany_thread(self.system_source.get_connection,
+                             self._sql_eids.items()
+                             + _entities_sql.items()
+                             + _relations_sql.items()
+                             + _inlined_relations_sql.items(),
+                             dump_output_dir=self.dump_output_dir,
+                             support_copy_from=self.support_copy_from,
+                             encoding=self.dbencoding)
         finally:
             _entities_sql.clear()
             _relations_sql.clear()
@@ -382,7 +355,7 @@
     def add_relation(self, cnx, subject, rtype, object,
                      inlined=False, **kwargs):
         if inlined:
-            _sql = self._sql.inlined_relations
+            _sql = self._sql_inlined_relations
             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
             subjtype = kwargs.get('subjtype')
             if subjtype is None:
@@ -400,7 +373,7 @@
             statement = self.sqlgen.update(SQL_PREFIX + subjtype,
                                            data, ['cw_eid'])
         else:
-            _sql = self._sql.relations
+            _sql = self._sql_relations
             data = {'eid_from': subject, 'eid_to': object}
             statement = self.sqlgen.insert('%s_relation' % rtype, data)
         if statement in _sql:
@@ -418,17 +391,17 @@
                 if rtype not in attrs:
                     attrs[rtype] = None
             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
-            self._sql.eid_insertdicts[entity.eid] = attrs
+            self._sql_eid_insertdicts[entity.eid] = attrs
             self._append_to_entities(sql, attrs)
 
     def _append_to_entities(self, sql, attrs):
-        self._sql.entities[sql].append(attrs)
+        self._sql_entities[sql].append(attrs)
 
     def _handle_insert_entity_sql(self, cnx, sql, attrs):
         # We have to overwrite the source given in parameters
         # as here, we directly use the system source
         attrs['asource'] = self.system_source.uri
-        self._append_to_entities(sql, attrs)
+        self._sql_eids[sql].append(attrs)
 
     def _handle_is_relation_sql(self, cnx, sql, attrs):
         self._append_to_entities(sql, attrs)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dataimport/test/test_sqlgenstore.py	Wed Dec 16 11:23:48 2015 +0100
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+# copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr -- mailto:contact@logilab.fr
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with this program. If not, see <http://www.gnu.org/licenses/>.
+"""SQL object store test case"""
+
+import itertools
+
+from cubicweb.dataimport import ucsvreader
+from cubicweb.devtools import testlib, PostgresApptestConfiguration
+from cubicweb.devtools import startpgcluster, stoppgcluster
+from cubicweb.dataimport.pgstore import SQLGenObjectStore
+
+
+def setUpModule():
+    startpgcluster(__file__)
+
+
+def tearDownModule(*args):
+    stoppgcluster(__file__)
+
+
+class SQLGenImportSimpleTC(testlib.CubicWebTC):
+    configcls = PostgresApptestConfiguration
+    appid = 'data-massimport'
+
+    def cast(self, _type, value):
+        try:
+            return _type(value)
+        except ValueError:
+            return None
+
+    def push_geonames_data(self, dumpname, store):
+        # Push timezones
+        cnx = store._cnx
+        for code, gmt, dst, raw_offset in ucsvreader(open(self.datapath('timeZones.txt'), 'rb'),
+                                                     delimiter='\t'):
+            cnx.create_entity('TimeZone', code=code, gmt=float(gmt),
+                                    dst=float(dst), raw_offset=float(raw_offset))
+        timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C'))
+        cnx.commit()
+        # Push data
+        for ind, infos in enumerate(ucsvreader(open(dumpname, 'rb'),
+                                               delimiter='\t',
+                                               ignore_errors=True)):
+            if ind > 99:
+                break
+            latitude = self.cast(float, infos[4])
+            longitude = self.cast(float, infos[5])
+            population = self.cast(int, infos[14])
+            elevation = self.cast(int, infos[15])
+            gtopo = self.cast(int, infos[16])
+            feature_class = infos[6]
+            if len(infos[6]) != 1:
+                feature_class = None
+            entity = {'name': infos[1],
+                      'asciiname': infos[2],
+                      'alternatenames': infos[3],
+                      'latitude': latitude, 'longitude': longitude,
+                      'feature_class': feature_class,
+                      'alternate_country_code':infos[9],
+                      'admin_code_3': infos[12],
+                      'admin_code_4': infos[13],
+                      'population': population, 'elevation': elevation,
+                      'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]),
+                      'cwuri':  u'http://sws.geonames.org/%s/' % int(infos[0]),
+                      'geonameid': int(infos[0]),
+                      }
+            store.prepare_insert_entity('Location', **entity)
+
+    def test_autoflush_metadata(self):
+        with self.admin_access.repo_cnx() as cnx:
+            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
+                                 {'t': 'Location'})
+            self.assertEqual(len(crs.fetchall()), 0)
+            store = SQLGenObjectStore(cnx)
+            store.prepare_insert_entity('Location', name=u'toto')
+            store.flush()
+            store.commit()
+            cnx.commit()
+        with self.admin_access.repo_cnx() as cnx:
+            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
+                                 {'t': 'Location'})
+            self.assertEqual(len(crs.fetchall()), 1)
+
+    def test_sqlgenstore_etype_metadata(self):
+        with self.admin_access.repo_cnx() as cnx:
+            store = SQLGenObjectStore(cnx)
+            timezone_eid = store.prepare_insert_entity('TimeZone')
+            store.prepare_insert_entity('Location', timezone=timezone_eid)
+            store.flush()
+            store.commit()
+            eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
+                                      'T name TN')[0]
+            self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname)
+
+    def test_simple_insert(self):
+        with self.admin_access.repo_cnx() as cnx:
+            store = SQLGenObjectStore(cnx)
+            self.push_geonames_data(self.datapath('geonames.csv'), store)
+            store.flush()
+            store.commit()
+        with self.admin_access.repo_cnx() as cnx:
+            rset = cnx.execute('Any X WHERE X is Location')
+            self.assertEqual(len(rset), 100)
+            rset = cnx.execute('Any X WHERE X is Location, X timezone T')
+            self.assertEqual(len(rset), 100)
+
+
+if __name__ == '__main__':
+    from logilab.common.testlib import unittest_main
+    unittest_main()
--- a/debian/changelog	Tue Dec 15 14:12:59 2015 +0100
+++ b/debian/changelog	Wed Dec 16 11:23:48 2015 +0100
@@ -1,3 +1,9 @@
+cubicweb (3.21.5-1) unstable; urgency=medium
+
+  * New upstream release.
+
+ -- RĂ©mi Cardona <remi.cardona@logilab.fr>  Tue, 15 Dec 2015 17:33:05 +0100
+
 cubicweb (3.21.4-1) unstable; urgency=medium
 
   * New upstream release.
--- a/web/data/cubicweb.preferences.js	Tue Dec 15 14:12:59 2015 +0100
+++ b/web/data/cubicweb.preferences.js	Wed Dec 16 11:23:48 2015 +0100
@@ -45,7 +45,7 @@
 function validatePrefsForm(formid) {
     clearPreviousMessages();
     _clearPreviousErrors(formid);
-    return validateForm(formid, null, submitSucces, submitFailure);
+    return validateForm(formid, null, submitSuccess, submitFailure);
 }
 
 function submitFailure(result, formid, cbargs) {
@@ -59,13 +59,13 @@
     return false; // so handleFormValidationResponse doesn't try to display error
 }
 
-function submitSucces(result, formid, cbargs) {
+function submitSuccess(result, formid, cbargs) {
     var $form = jQuery('#' + formid);
     setCurrentValues($form);
     var dom = DIV({'class': 'msg'}, _("changes applied"));
     $form.find('div.formsg').empty().append(dom);
     $form.find('input').removeClass('changed');
-    checkValues(form, true);
+    checkValues($form, true);
     return;
 }