321 def check_doubles_not_none(buckets): |
322 def check_doubles_not_none(buckets): |
322 """Extract the keys that have more than one item in their bucket.""" |
323 """Extract the keys that have more than one item in their bucket.""" |
323 return [(k, len(v)) for k, v in buckets.items() |
324 return [(k, len(v)) for k, v in buckets.items() |
324 if k is not None and len(v) > 1] |
325 if k is not None and len(v) > 1] |
325 |
326 |
326 |
|
327 # sql generator utility functions ############################################# |
327 # sql generator utility functions ############################################# |
328 |
328 |
329 |
329 |
330 def _import_statements(sql_connect, statements, nb_threads=3, |
330 def _import_statements(sql_connect, statements, nb_threads=3, |
331 dump_output_dir=None, |
331 dump_output_dir=None, |
394 columns = None |
394 columns = None |
395 else: |
395 else: |
396 columns = list(data[0]) |
396 columns = list(data[0]) |
397 execmany_func(cu, statement, data, table, columns, encoding) |
397 execmany_func(cu, statement, data, table, columns, encoding) |
398 except Exception: |
398 except Exception: |
399 print 'unable to copy data into table %s', table |
399 print 'unable to copy data into table %s' % table |
400 # Error in import statement, save data in dump_output_dir |
400 # Error in import statement, save data in dump_output_dir |
401 if dump_output_dir is not None: |
401 if dump_output_dir is not None: |
402 pdata = {'data': data, 'statement': statement, |
402 pdata = {'data': data, 'statement': statement, |
403 'time': asctime(), 'columns': columns} |
403 'time': asctime(), 'columns': columns} |
404 filename = make_uid() |
404 filename = make_uid() |
429 # Iterate over the different columns and the different values |
429 # Iterate over the different columns and the different values |
430 # and try to convert them to a correct datatype. |
430 # and try to convert them to a correct datatype. |
431 # If an error is raised, do not continue. |
431 # If an error is raised, do not continue. |
432 formatted_row = [] |
432 formatted_row = [] |
433 for col in columns: |
433 for col in columns: |
434 value = row[col] |
434 try: |
|
435 value = row[col] |
|
436 except KeyError: |
|
437 warnings.warn(u"Column %s is not accessible in row %s" |
|
438 % (col, row), RuntimeWarning) |
|
439 # XXX 'value' set to None so that the import does not end in |
|
440 # error. |
|
441 # Instead, the extra keys are set to NULL from the |
|
442 # database point of view. |
|
443 value = None |
435 if value is None: |
444 if value is None: |
436 value = 'NULL' |
445 value = 'NULL' |
437 elif isinstance(value, (long, int, float)): |
446 elif isinstance(value, (long, int, float)): |
438 value = str(value) |
447 value = str(value) |
439 elif isinstance(value, (str, unicode)): |
448 elif isinstance(value, (str, unicode)): |
504 assert isinstance(item, dict), 'item is not a dict but a %s' % type(item) |
513 assert isinstance(item, dict), 'item is not a dict but a %s' % type(item) |
505 data = self.create_entity(etype, **item) |
514 data = self.create_entity(etype, **item) |
506 item['eid'] = data['eid'] |
515 item['eid'] = data['eid'] |
507 return item |
516 return item |
508 |
517 |
509 def relate(self, eid_from, rtype, eid_to, inlined=False): |
518 def relate(self, eid_from, rtype, eid_to, **kwargs): |
510 """Add new relation""" |
519 """Add new relation""" |
511 relation = eid_from, rtype, eid_to |
520 relation = eid_from, rtype, eid_to |
512 self.relations.add(relation) |
521 self.relations.add(relation) |
513 return relation |
522 return relation |
514 |
523 |
535 def nb_inserted_types(self): |
556 def nb_inserted_types(self): |
536 return len(self.types) |
557 return len(self.types) |
537 @property |
558 @property |
538 def nb_inserted_relations(self): |
559 def nb_inserted_relations(self): |
539 return len(self.relations) |
560 return len(self.relations) |
540 |
|
541 @deprecated("[3.7] index support will disappear") |
|
542 def build_index(self, name, type, func=None, can_be_empty=False): |
|
543 """build internal index for further search""" |
|
544 index = {} |
|
545 if func is None or not callable(func): |
|
546 func = lambda x: x['eid'] |
|
547 for eid in self.types[type]: |
|
548 index.setdefault(func(self.eids[eid]), []).append(eid) |
|
549 if not can_be_empty: |
|
550 assert index, "new index '%s' cannot be empty" % name |
|
551 self.indexes[name] = index |
|
552 |
|
553 @deprecated("[3.7] index support will disappear") |
|
554 def build_rqlindex(self, name, type, key, rql, rql_params=False, |
|
555 func=None, can_be_empty=False): |
|
556 """build an index by rql query |
|
557 |
|
558 rql should return eid in first column |
|
559 ctl.store.build_index('index_name', 'users', 'login', 'Any U WHERE U is CWUser') |
|
560 """ |
|
561 self.types[type] = [] |
|
562 rset = self.rql(rql, rql_params or {}) |
|
563 if not can_be_empty: |
|
564 assert rset, "new index type '%s' cannot be empty (0 record found)" % type |
|
565 for entity in rset.entities(): |
|
566 getattr(entity, key) # autopopulate entity with key attribute |
|
567 self.eids[entity.eid] = dict(entity) |
|
568 if entity.eid not in self.types[type]: |
|
569 self.types[type].append(entity.eid) |
|
570 |
|
571 # Build index with specified key |
|
572 func = lambda x: x[key] |
|
573 self.build_index(name, type, func, can_be_empty=can_be_empty) |
|
574 |
|
575 @deprecated("[3.7] index support will disappear") |
|
576 def fetch(self, name, key, unique=False, decorator=None): |
|
577 """index fetcher method |
|
578 |
|
579 decorator is a callable method or an iterator of callable methods (usually a lambda function) |
|
580 decorator=lambda x: x[:1] (first value is returned) |
|
581 decorator=lambda x: x.lower (lowercased value is returned) |
|
582 |
|
583 decorator is handy when you want to improve index keys but without |
|
584 changing the original field |
|
585 |
|
586 Same check functions can be reused here. |
|
587 """ |
|
588 eids = self.indexes[name].get(key, []) |
|
589 if decorator is not None: |
|
590 if not hasattr(decorator, '__iter__'): |
|
591 decorator = (decorator,) |
|
592 for f in decorator: |
|
593 eids = f(eids) |
|
594 if unique: |
|
595 assert len(eids) == 1, u'expected a single one value for key "%s" in index "%s". Got %i' % (key, name, len(eids)) |
|
596 eids = eids[0] |
|
597 return eids |
|
598 |
|
599 @deprecated("[3.7] index support will disappear") |
|
600 def find(self, type, key, value): |
|
601 for idx in self.types[type]: |
|
602 item = self.items[idx] |
|
603 if item[key] == value: |
|
604 yield item |
|
605 |
|
606 @deprecated("[3.7] checkpoint() deprecated. use commit() instead") |
|
607 def checkpoint(self): |
|
608 self.commit() |
|
609 |
|
610 |
561 |
611 class RQLObjectStore(ObjectStore): |
562 class RQLObjectStore(ObjectStore): |
612 """ObjectStore that works with an actual RQL repository (production mode)""" |
563 """ObjectStore that works with an actual RQL repository (production mode)""" |
613 _rql = None # bw compat |
564 _rql = None # bw compat |
614 |
565 |
655 if item: |
602 if item: |
656 query += ': ' + ', '.join('X %s %%(%s)s' % (k, k) |
603 query += ': ' + ', '.join('X %s %%(%s)s' % (k, k) |
657 for k in item) |
604 for k in item) |
658 return self.rql(query, item)[0][0] |
605 return self.rql(query, item)[0][0] |
659 |
606 |
660 def relate(self, eid_from, rtype, eid_to, inlined=False): |
607 def relate(self, eid_from, rtype, eid_to, **kwargs): |
661 eid_from, rtype, eid_to = super(RQLObjectStore, self).relate( |
608 eid_from, rtype, eid_to = super(RQLObjectStore, self).relate( |
662 eid_from, rtype, eid_to) |
609 eid_from, rtype, eid_to, **kwargs) |
663 self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
610 self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
664 {'x': int(eid_from), 'y': int(eid_to)}) |
611 {'x': int(eid_from), 'y': int(eid_to)}) |
665 |
612 |
666 def find_entities(self, *args, **kwargs): |
613 def find_entities(self, *args, **kwargs): |
667 return self.session.find_entities(*args, **kwargs) |
614 return self.session.find_entities(*args, **kwargs) |
807 self._nb_inserted_entities = 0 |
754 self._nb_inserted_entities = 0 |
808 self._nb_inserted_types = 0 |
755 self._nb_inserted_types = 0 |
809 self._nb_inserted_relations = 0 |
756 self._nb_inserted_relations = 0 |
810 self.rql = session.execute |
757 self.rql = session.execute |
811 # deactivate security |
758 # deactivate security |
812 session.set_read_security(False) |
759 session.read_security = False |
813 session.set_write_security(False) |
760 session.write_security = False |
814 |
761 |
815 def create_entity(self, etype, **kwargs): |
762 def create_entity(self, etype, **kwargs): |
816 for k, v in kwargs.iteritems(): |
763 for k, v in kwargs.iteritems(): |
817 kwargs[k] = getattr(v, 'eid', v) |
764 kwargs[k] = getattr(v, 'eid', v) |
818 entity, rels = self.metagen.base_etype_dicts(etype) |
765 entity, rels = self.metagen.base_etype_dicts(etype) |
823 self.metagen.init_entity(entity) |
770 self.metagen.init_entity(entity) |
824 entity.cw_edited.update(kwargs, skipsec=False) |
771 entity.cw_edited.update(kwargs, skipsec=False) |
825 session = self.session |
772 session = self.session |
826 self.source.add_entity(session, entity) |
773 self.source.add_entity(session, entity) |
827 self.source.add_info(session, entity, self.source, None, complete=False) |
774 self.source.add_info(session, entity, self.source, None, complete=False) |
|
775 kwargs = dict() |
|
776 if inspect.getargspec(self.add_relation).keywords: |
|
777 kwargs['subjtype'] = entity.cw_etype |
828 for rtype, targeteids in rels.iteritems(): |
778 for rtype, targeteids in rels.iteritems(): |
829 # targeteids may be a single eid or a list of eids |
779 # targeteids may be a single eid or a list of eids |
830 inlined = self.rschema(rtype).inlined |
780 inlined = self.rschema(rtype).inlined |
831 try: |
781 try: |
832 for targeteid in targeteids: |
782 for targeteid in targeteids: |
833 self.add_relation(session, entity.eid, rtype, targeteid, |
783 self.add_relation(session, entity.eid, rtype, targeteid, |
834 inlined) |
784 inlined, **kwargs) |
835 except TypeError: |
785 except TypeError: |
836 self.add_relation(session, entity.eid, rtype, targeteids, |
786 self.add_relation(session, entity.eid, rtype, targeteids, |
837 inlined) |
787 inlined, **kwargs) |
838 self._nb_inserted_entities += 1 |
788 self._nb_inserted_entities += 1 |
839 return entity |
789 return entity |
840 |
790 |
841 def relate(self, eid_from, rtype, eid_to): |
791 def relate(self, eid_from, rtype, eid_to, **kwargs): |
842 assert not rtype.startswith('reverse_') |
792 assert not rtype.startswith('reverse_') |
843 self.add_relation(self.session, eid_from, rtype, eid_to, |
793 self.add_relation(self.session, eid_from, rtype, eid_to, |
844 self.rschema(rtype).inlined) |
794 self.rschema(rtype).inlined) |
845 self._nb_inserted_relations += 1 |
795 self._nb_inserted_relations += 1 |
846 |
796 |
960 |
910 |
961 def flush(self): |
911 def flush(self): |
962 """Flush data to the database""" |
912 """Flush data to the database""" |
963 self.source.flush() |
913 self.source.flush() |
964 |
914 |
965 def relate(self, subj_eid, rtype, obj_eid, subjtype=None): |
915 def relate(self, subj_eid, rtype, obj_eid, **kwargs): |
966 if subj_eid is None or obj_eid is None: |
916 if subj_eid is None or obj_eid is None: |
967 return |
917 return |
968 # XXX Could subjtype be inferred ? |
918 # XXX Could subjtype be inferred ? |
969 self.source.add_relation(self.session, subj_eid, rtype, obj_eid, |
919 self.source.add_relation(self.session, subj_eid, rtype, obj_eid, |
970 self.rschema(rtype).inlined, subjtype) |
920 self.rschema(rtype).inlined, **kwargs) |
971 |
921 |
972 def drop_indexes(self, etype): |
922 def drop_indexes(self, etype): |
973 """Drop indexes for a given entity type""" |
923 """Drop indexes for a given entity type""" |
974 if etype not in self.indexes_etypes: |
924 if etype not in self.indexes_etypes: |
975 cu = self.session.cnxset['system'] |
925 cu = self.session.cnxset['system'] |
1079 nb_threads=self.nb_threads_statement, |
1029 nb_threads=self.nb_threads_statement, |
1080 support_copy_from=self.support_copy_from, |
1030 support_copy_from=self.support_copy_from, |
1081 encoding=self.dbencoding) |
1031 encoding=self.dbencoding) |
1082 except: |
1032 except: |
1083 print 'failed to flush' |
1033 print 'failed to flush' |
|
1034 else: |
|
1035 print 'flush done' |
1084 finally: |
1036 finally: |
1085 _entities_sql.clear() |
1037 _entities_sql.clear() |
1086 _relations_sql.clear() |
1038 _relations_sql.clear() |
1087 _insertdicts.clear() |
1039 _insertdicts.clear() |
1088 _inlined_relations_sql.clear() |
1040 _inlined_relations_sql.clear() |
1089 print 'flush done' |
|
1090 |
1041 |
1091 def add_relation(self, session, subject, rtype, object, |
1042 def add_relation(self, session, subject, rtype, object, |
1092 inlined=False, subjtype=None): |
1043 inlined=False, **kwargs): |
1093 if inlined: |
1044 if inlined: |
1094 _sql = self._sql.inlined_relations |
1045 _sql = self._sql.inlined_relations |
1095 data = {'cw_eid': subject, SQL_PREFIX + rtype: object} |
1046 data = {'cw_eid': subject, SQL_PREFIX + rtype: object} |
|
1047 subjtype = kwargs.get('subjtype') |
1096 if subjtype is None: |
1048 if subjtype is None: |
1097 # Try to infer it |
1049 # Try to infer it |
1098 targets = [t.type for t in |
1050 targets = [t.type for t in |
1099 self.schema.rschema(rtype).targets()] |
1051 self.schema.rschema(rtype).targets()] |
1100 if len(targets) == 1: |
1052 if len(targets) == 1: |
1101 subjtype = targets[0] |
1053 subjtype = targets[0] |
1102 else: |
1054 else: |
1103 raise ValueError('You should give the subject etype for ' |
1055 raise ValueError('You should give the subject etype for ' |
1104 'inlined relation %s' |
1056 'inlined relation %s' |
1105 ', as it cannot be inferred' % rtype) |
1057 ', as it cannot be inferred: ' |
|
1058 'this type is given as keyword argument ' |
|
1059 '``subjtype``'% rtype) |
1106 statement = self.sqlgen.update(SQL_PREFIX + subjtype, |
1060 statement = self.sqlgen.update(SQL_PREFIX + subjtype, |
1107 data, ['cw_eid']) |
1061 data, ['cw_eid']) |
1108 else: |
1062 else: |
1109 _sql = self._sql.relations |
1063 _sql = self._sql.relations |
1110 data = {'eid_from': subject, 'eid_to': object} |
1064 data = {'eid_from': subject, 'eid_to': object} |
1115 _sql[statement] = [data] |
1069 _sql[statement] = [data] |
1116 |
1070 |
1117 def add_entity(self, session, entity): |
1071 def add_entity(self, session, entity): |
1118 with self._storage_handler(entity, 'added'): |
1072 with self._storage_handler(entity, 'added'): |
1119 attrs = self.preprocess_entity(entity) |
1073 attrs = self.preprocess_entity(entity) |
1120 rtypes = self._inlined_rtypes_cache.get(entity.__regid__, ()) |
1074 rtypes = self._inlined_rtypes_cache.get(entity.cw_etype, ()) |
1121 if isinstance(rtypes, str): |
1075 if isinstance(rtypes, str): |
1122 rtypes = (rtypes,) |
1076 rtypes = (rtypes,) |
1123 for rtype in rtypes: |
1077 for rtype in rtypes: |
1124 if rtype not in attrs: |
1078 if rtype not in attrs: |
1125 attrs[rtype] = None |
1079 attrs[rtype] = None |
1126 sql = self.sqlgen.insert(SQL_PREFIX + entity.__regid__, attrs) |
1080 sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs) |
1127 self._sql.eid_insertdicts[entity.eid] = attrs |
1081 self._sql.eid_insertdicts[entity.eid] = attrs |
1128 self._append_to_entities(sql, attrs) |
1082 self._append_to_entities(sql, attrs) |
1129 |
1083 |
1130 def _append_to_entities(self, sql, attrs): |
1084 def _append_to_entities(self, sql, attrs): |
1131 self._sql.entities[sql].append(attrs) |
1085 self._sql.entities[sql].append(attrs) |
1154 # begin by inserting eid/type/source/extid into the entities table |
1108 # begin by inserting eid/type/source/extid into the entities table |
1155 if extid is not None: |
1109 if extid is not None: |
1156 assert isinstance(extid, str) |
1110 assert isinstance(extid, str) |
1157 extid = b64encode(extid) |
1111 extid = b64encode(extid) |
1158 uri = 'system' if source.copy_based_source else source.uri |
1112 uri = 'system' if source.copy_based_source else source.uri |
1159 attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid, |
1113 attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid, |
1160 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} |
1114 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} |
1161 self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) |
1115 self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) |
1162 # insert core relations: is, is_instance_of and cw_source |
1116 # insert core relations: is, is_instance_of and cw_source |
1163 try: |
1117 try: |
1164 self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', |
1118 self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', |
1173 (entity.eid, eschema_eid(session, eschema))) |
1127 (entity.eid, eschema_eid(session, eschema))) |
1174 if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 |
1128 if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 |
1175 self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', |
1129 self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', |
1176 (entity.eid, source.eid)) |
1130 (entity.eid, source.eid)) |
1177 # now we can update the full text index |
1131 # now we can update the full text index |
1178 if self.do_fti and self.need_fti_indexation(entity.__regid__): |
1132 if self.do_fti and self.need_fti_indexation(entity.cw_etype): |
1179 if complete: |
1133 if complete: |
1180 entity.complete(entity.e_schema.indexable_attributes()) |
1134 entity.complete(entity.e_schema.indexable_attributes()) |
1181 self.index_entity(session, entity=entity) |
1135 self.index_entity(session, entity=entity) |