dataimport/pgstore.py
changeset 10810 0768bf2333a7
parent 10662 10942ed172de
child 10941 e3ec46b5c710
equal deleted inserted replaced
10809:359cbdf3a515 10810:0768bf2333a7
    19 from __future__ import print_function
    19 from __future__ import print_function
    20 
    20 
    21 import threading
    21 import threading
    22 import warnings
    22 import warnings
    23 import os.path as osp
    23 import os.path as osp
    24 from StringIO import StringIO
    24 from io import StringIO
    25 from time import asctime
    25 from time import asctime
    26 from datetime import date, datetime, time
    26 from datetime import date, datetime, time
    27 from collections import defaultdict
    27 from collections import defaultdict
    28 from base64 import b64encode
    28 from base64 import b64encode
    29 
    29 
    30 from six import string_types, integer_types
    30 from six import string_types, integer_types, text_type
    31 from six.moves import cPickle as pickle, range
    31 from six.moves import cPickle as pickle, range
    32 
    32 
    33 from cubicweb.utils import make_uid
    33 from cubicweb.utils import make_uid
    34 from cubicweb.server.sqlutils import SQL_PREFIX
    34 from cubicweb.server.sqlutils import SQL_PREFIX
    35 from cubicweb.dataimport.stores import NoHookRQLObjectStore
    35 from cubicweb.dataimport.stores import NoHookRQLObjectStore
    70     buf = _create_copyfrom_buffer(data, columns, encoding=encoding)
    70     buf = _create_copyfrom_buffer(data, columns, encoding=encoding)
    71     if buf is None:
    71     if buf is None:
    72         _execmany_thread_not_copy_from(cu, statement, data)
    72         _execmany_thread_not_copy_from(cu, statement, data)
    73     else:
    73     else:
    74         if columns is None:
    74         if columns is None:
    75             cu.copy_from(buf, table, null='NULL')
    75             cu.copy_from(buf, table, null=u'NULL')
    76         else:
    76         else:
    77             cu.copy_from(buf, table, null='NULL', columns=columns)
    77             cu.copy_from(buf, table, null=u'NULL', columns=columns)
    78 
    78 
    79 def _execmany_thread(sql_connect, statements, dump_output_dir=None,
    79 def _execmany_thread(sql_connect, statements, dump_output_dir=None,
    80                      support_copy_from=True, encoding='utf-8'):
    80                      support_copy_from=True, encoding='utf-8'):
    81     """
    81     """
    82     Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command,
    82     Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command,
   122         cu.close()
   122         cu.close()
   123 
   123 
   124 
   124 
   125 def _copyfrom_buffer_convert_None(value, **opts):
   125 def _copyfrom_buffer_convert_None(value, **opts):
   126     '''Convert None value to "NULL"'''
   126     '''Convert None value to "NULL"'''
   127     return 'NULL'
   127     return u'NULL'
   128 
   128 
   129 def _copyfrom_buffer_convert_number(value, **opts):
   129 def _copyfrom_buffer_convert_number(value, **opts):
   130     '''Convert a number into its string representation'''
   130     '''Convert a number into its string representation'''
   131     return str(value)
   131     return text_type(value)
   132 
   132 
   133 def _copyfrom_buffer_convert_string(value, **opts):
   133 def _copyfrom_buffer_convert_string(value, **opts):
   134     '''Convert string value.
   134     '''Convert string value.
   135 
       
   136     Recognized keywords:
       
   137     :encoding: resulting string encoding (default: utf-8)
       
   138     '''
   135     '''
   139     encoding = opts.get('encoding','utf-8')
       
   140     escape_chars = ((u'\\', u'\\\\'), (u'\t', u'\\t'), (u'\r', u'\\r'),
   136     escape_chars = ((u'\\', u'\\\\'), (u'\t', u'\\t'), (u'\r', u'\\r'),
   141                     (u'\n', u'\\n'))
   137                     (u'\n', u'\\n'))
   142     for char, replace in escape_chars:
   138     for char, replace in escape_chars:
   143         value = value.replace(char, replace)
   139         value = value.replace(char, replace)
   144     if isinstance(value, unicode):
       
   145         value = value.encode(encoding)
       
   146     return value
   140     return value
   147 
   141 
   148 def _copyfrom_buffer_convert_date(value, **opts):
   142 def _copyfrom_buffer_convert_date(value, **opts):
   149     '''Convert date into "YYYY-MM-DD"'''
   143     '''Convert date into "YYYY-MM-DD"'''
   150     # Do not use strftime, as it yields issue with date < 1900
   144     # Do not use strftime, as it yields issue with date < 1900
   151     # (http://bugs.python.org/issue1777412)
   145     # (http://bugs.python.org/issue1777412)
   152     return '%04d-%02d-%02d' % (value.year, value.month, value.day)
   146     return u'%04d-%02d-%02d' % (value.year, value.month, value.day)
   153 
   147 
   154 def _copyfrom_buffer_convert_datetime(value, **opts):
   148 def _copyfrom_buffer_convert_datetime(value, **opts):
   155     '''Convert date into "YYYY-MM-DD HH:MM:SS.UUUUUU"'''
   149     '''Convert date into "YYYY-MM-DD HH:MM:SS.UUUUUU"'''
   156     # Do not use strftime, as it yields issue with date < 1900
   150     # Do not use strftime, as it yields issue with date < 1900
   157     # (http://bugs.python.org/issue1777412)
   151     # (http://bugs.python.org/issue1777412)
   158     return '%s %s' % (_copyfrom_buffer_convert_date(value, **opts),
   152     return u'%s %s' % (_copyfrom_buffer_convert_date(value, **opts),
   159                       _copyfrom_buffer_convert_time(value, **opts))
   153                        _copyfrom_buffer_convert_time(value, **opts))
   160 
   154 
   161 def _copyfrom_buffer_convert_time(value, **opts):
   155 def _copyfrom_buffer_convert_time(value, **opts):
   162     '''Convert time into "HH:MM:SS.UUUUUU"'''
   156     '''Convert time into "HH:MM:SS.UUUUUU"'''
   163     return '%02d:%02d:%02d.%06d' % (value.hour, value.minute,
   157     return u'%02d:%02d:%02d.%06d' % (value.hour, value.minute,
   164                                     value.second, value.microsecond)
   158                                      value.second, value.microsecond)
   165 
   159 
   166 # (types, converter) list.
   160 # (types, converter) list.
   167 _COPYFROM_BUFFER_CONVERTERS = [
   161 _COPYFROM_BUFFER_CONVERTERS = [
   168     (type(None), _copyfrom_buffer_convert_None),
   162     (type(None), _copyfrom_buffer_convert_None),
   169     (integer_types + (float,), _copyfrom_buffer_convert_number),
   163     (integer_types + (float,), _copyfrom_buffer_convert_number),
   209                 # database point of view.
   203                 # database point of view.
   210                 value = None
   204                 value = None
   211             for types, converter in _COPYFROM_BUFFER_CONVERTERS:
   205             for types, converter in _COPYFROM_BUFFER_CONVERTERS:
   212                 if isinstance(value, types):
   206                 if isinstance(value, types):
   213                     value = converter(value, **convert_opts)
   207                     value = converter(value, **convert_opts)
       
   208                     assert isinstance(value, text_type)
   214                     break
   209                     break
   215             else:
   210             else:
   216                 raise ValueError("Unsupported value type %s" % type(value))
   211                 raise ValueError("Unsupported value type %s" % type(value))
   217             # We push the value to the new formatted row
   212             # We push the value to the new formatted row
   218             # if the value is not None and could be converted to a string.
   213             # if the value is not None and could be converted to a string.