server/sources/__init__.py
changeset 0 b97547f5f1fa
child 382 03964dd370e7
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """cubicweb server sources support
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 """
       
     7 __docformat__ = "restructuredtext en"
       
     8 
       
     9 from logging import getLogger
       
    10 
       
    11 from cubicweb import set_log_methods
       
    12 
       
    13 
       
    14 class AbstractSource(object):
       
    15     """an abstract class for sources"""
       
    16 
       
    17     # boolean telling if modification hooks should be called when something is
       
    18     # modified in this source
       
    19     should_call_hooks = True
       
    20     # boolean telling if the repository should connect to this source during
       
    21     # migration
       
    22     connect_for_migration = True
       
    23     
       
    24     # mappings telling which entities and relations are available in the source
       
    25     # keys are supported entity/relation types and values are boolean indicating
       
    26     # wether the support is read-only (False) or read-write (True)
       
    27     support_entities = {}
       
    28     support_relations = {}
       
    29     # a global identifier for this source, which has to be set by the source
       
    30     # instance
       
    31     uri = None
       
    32     # a reference to the system information helper
       
    33     repo = None
       
    34     # a reference to the application'schema (may differs from the source'schema)
       
    35     schema = None
       
    36     
       
    37     def __init__(self, repo, appschema, source_config, *args, **kwargs):
       
    38         self.repo = repo
       
    39         self.uri = source_config['uri']
       
    40         set_log_methods(self, getLogger('cubicweb.sources.'+self.uri))
       
    41         self.set_schema(appschema)
       
    42         self.support_relations['identity'] = False
       
    43         
       
    44     def init_creating(self):
       
    45         """method called by the repository once ready to create a new instance"""
       
    46         pass
       
    47  
       
    48     def init(self):
       
    49         """method called by the repository once ready to handle request"""
       
    50         pass
       
    51     
       
    52     def reset_caches(self):
       
    53         """method called during test to reset potential source caches"""
       
    54         pass
       
    55     
       
    56     def clear_eid_cache(self, eid, etype):
       
    57         """clear potential caches for the given eid"""
       
    58         pass
       
    59     
       
    60     def __repr__(self):
       
    61         return '<%s source>' % self.uri
       
    62 
       
    63     def __cmp__(self, other):
       
    64         """simple comparison function to get predictable source order, with the
       
    65         system source at last
       
    66         """
       
    67         if self.uri == other.uri:
       
    68             return 0
       
    69         if self.uri == 'system':
       
    70             return 1
       
    71         if other.uri == 'system':
       
    72             return -1
       
    73         return cmp(self.uri, other.uri)
       
    74         
       
    75     def set_schema(self, schema):
       
    76         """set the application'schema"""
       
    77         self.schema = schema
       
    78         
       
    79     def support_entity(self, etype, write=False):
       
    80         """return true if the given entity's type is handled by this adapter
       
    81         if write is true, return true only if it's a RW support
       
    82         """
       
    83         try:
       
    84             wsupport = self.support_entities[etype]
       
    85         except KeyError:
       
    86             return False
       
    87         if write:
       
    88             return wsupport
       
    89         return True
       
    90     
       
    91     def support_relation(self, rtype, write=False):
       
    92         """return true if the given relation's type is handled by this adapter
       
    93         if write is true, return true only if it's a RW support
       
    94 
       
    95         current implementation return true if the relation is defined into 
       
    96         `support_relations` or if it is a final relation of a supported entity 
       
    97         type
       
    98         """
       
    99         try:
       
   100             wsupport = self.support_relations[rtype]
       
   101         except KeyError:
       
   102             rschema = self.schema.rschema(rtype)
       
   103             if not rschema.is_final() or rschema == 'has_text':
       
   104                 return False
       
   105             for etype in rschema.subjects():
       
   106                 try:
       
   107                     wsupport = self.support_entities[etype]
       
   108                     break
       
   109                 except KeyError:
       
   110                     continue
       
   111             else:
       
   112                 return False
       
   113         if write:
       
   114             return wsupport
       
   115         return True    
       
   116     
       
   117     def eid2extid(self, eid, session=None):
       
   118         return self.repo.eid2extid(self, eid, session)
       
   119 
       
   120     def extid2eid(self, value, etype, session=None, insert=True):
       
   121         return self.repo.extid2eid(self, value, etype, session, insert)
       
   122 
       
   123     PUBLIC_KEYS = ('adapter', 'uri')
       
   124     def remove_sensitive_information(self, sourcedef):
       
   125         """remove sensitive information such as login / password from source
       
   126         definition
       
   127         """
       
   128         for key in sourcedef.keys():
       
   129             if not key in self.PUBLIC_KEYS:
       
   130                 sourcedef.pop(key)
       
   131 
       
   132     def cleanup_entities_info(self, session):
       
   133         """cleanup system tables from information for entities coming from
       
   134         this source. This should be called when a source is removed to
       
   135         properly cleanup the database
       
   136         """
       
   137         # fti / entities tables cleanup
       
   138         dbhelper = session.pool.source('system').dbhelper
       
   139         # sqlite doesn't support DELETE FROM xxx USING yyy
       
   140         session.system_sql('DELETE FROM %s WHERE %s.%s IN (SELECT eid FROM '
       
   141                            'entities WHERE entities.source=%%(uri)s)'
       
   142                            % (dbhelper.fti_table, dbhelper.fti_table,
       
   143                               dbhelper.fti_uid_attr),
       
   144                            {'uri': self.uri})
       
   145         session.system_sql('DELETE FROM entities WHERE source=%(uri)s',
       
   146                            {'uri': self.uri})
       
   147 
       
   148     # abstract methods to overide (at least) in concrete source classes #######
       
   149     
       
   150     def get_connection(self):
       
   151         """open and return a connection to the source"""
       
   152         raise NotImplementedError()
       
   153     
       
   154     def check_connection(self, cnx):
       
   155         """check connection validity, return None if the connection is still valid
       
   156         else a new connection (called when the pool using the given connection is
       
   157         being attached to a session)
       
   158 
       
   159         do nothing by default
       
   160         """
       
   161         pass
       
   162     
       
   163     def pool_reset(self, cnx):
       
   164         """the pool using the given connection is being reseted from its current
       
   165         attached session
       
   166 
       
   167         do nothing by default
       
   168         """
       
   169         pass
       
   170     
       
   171     def authenticate(self, session, login, password):
       
   172         """if the source support EUser entity type, it should implements
       
   173         this method which should return EUser eid for the given login/password
       
   174         if this account is defined in this source and valid login / password is
       
   175         given. Else raise `AuthenticationError`
       
   176         """
       
   177         raise NotImplementedError()
       
   178     
       
   179     def syntax_tree_search(self, session, union,
       
   180                            args=None, cachekey=None, varmap=None, debug=0):
       
   181         """return result from this source for a rql query (actually from a rql 
       
   182         syntax tree and a solution dictionary mapping each used variable to a 
       
   183         possible type). If cachekey is given, the query necessary to fetch the
       
   184         results (but not the results themselves) may be cached using this key.
       
   185         """
       
   186         raise NotImplementedError()
       
   187                 
       
   188     def flying_insert(self, table, session, union, args=None, varmap=None):
       
   189         """similar as .syntax_tree_search, but inserts data in the temporary
       
   190         table (on-the-fly if possible, eg for the system source whose the given
       
   191         cursor come from). If not possible, inserts all data by calling
       
   192         .executemany().
       
   193         """
       
   194         res = self.syntax_tree_search(session, union, args, varmap=varmap)
       
   195         session.pool.source('system')._manual_insert(res, table, session)
       
   196 
       
   197         
       
   198     # system source don't have to implement the two methods below
       
   199     
       
   200     def before_entity_insertion(self, session, lid, etype, eid):
       
   201         """called by the repository when an eid has been attributed for an
       
   202         entity stored here but the entity has not been inserted in the system
       
   203         table yet.
       
   204         
       
   205         This method must return the an Entity instance representation of this
       
   206         entity.
       
   207         """
       
   208         entity = self.repo.vreg.etype_class(etype)(session, None)
       
   209         entity.set_eid(eid)
       
   210         return entity
       
   211     
       
   212     def after_entity_insertion(self, session, lid, entity):
       
   213         """called by the repository after an entity stored here has been
       
   214         inserted in the system table.
       
   215         """
       
   216         pass
       
   217 
       
   218     # read-only sources don't have to implement methods below
       
   219 
       
   220     def get_extid(self, entity):
       
   221         """return the external id for the given newly inserted entity"""
       
   222         raise NotImplementedError()
       
   223         
       
   224     def add_entity(self, session, entity):
       
   225         """add a new entity to the source"""
       
   226         raise NotImplementedError()
       
   227         
       
   228     def update_entity(self, session, entity):
       
   229         """update an entity in the source"""
       
   230         raise NotImplementedError()
       
   231 
       
   232     def delete_entity(self, session, etype, eid):
       
   233         """delete an entity from the source"""
       
   234         raise NotImplementedError()
       
   235 
       
   236     def add_relation(self, session, subject, rtype, object):
       
   237         """add a relation to the source"""
       
   238         raise NotImplementedError()
       
   239     
       
   240     def delete_relation(self, session, subject, rtype, object):
       
   241         """delete a relation from the source"""
       
   242         raise NotImplementedError()
       
   243 
       
   244     # system source interface #################################################
       
   245 
       
   246     def eid_type_source(self, session, eid):
       
   247         """return a tuple (type, source, extid) for the entity with id <eid>"""
       
   248         raise NotImplementedError()
       
   249     
       
   250     def create_eid(self, session):
       
   251         raise NotImplementedError()
       
   252 
       
   253     def add_info(self, session, entity, source, extid=None):
       
   254         """add type and source info for an eid into the system table"""
       
   255         raise NotImplementedError()
       
   256 
       
   257     def delete_info(self, session, eid, etype, uri, extid):
       
   258         """delete system information on deletion of an entity by transfering
       
   259         record from the entities table to the deleted_entities table
       
   260         """
       
   261         raise NotImplementedError()
       
   262         
       
   263     def fti_unindex_entity(self, session, eid):
       
   264         """remove text content for entity with the given eid from the full text
       
   265         index
       
   266         """
       
   267         raise NotImplementedError()
       
   268         
       
   269     def fti_index_entity(self, session, entity):
       
   270         """add text content of a created/modified entity to the full text index
       
   271         """
       
   272         raise NotImplementedError()
       
   273         
       
   274     def modified_entities(self, session, etypes, mtime):
       
   275         """return a 2-uple:
       
   276         * list of (etype, eid) of entities of the given types which have been
       
   277           modified since the given timestamp (actually entities whose full text
       
   278           index content has changed)
       
   279         * list of (etype, eid) of entities of the given types which have been
       
   280           deleted since the given timestamp
       
   281         """
       
   282         raise NotImplementedError()
       
   283 
       
   284     # sql system source interface #############################################
       
   285 
       
   286     def sqlexec(self, session, sql, args=None):
       
   287         """execute the query and return its result"""
       
   288         raise NotImplementedError()
       
   289     
       
   290     def temp_table_def(self, selection, solution, table, basemap):
       
   291         raise NotImplementedError()
       
   292     
       
   293     def create_index(self, session, table, column, unique=False):
       
   294         raise NotImplementedError()
       
   295             
       
   296     def drop_index(self, session, table, column, unique=False):
       
   297         raise NotImplementedError()
       
   298 
       
   299     def create_temp_table(self, session, table, schema):
       
   300         raise NotImplementedError()
       
   301 
       
   302     def clean_temp_data(self, session, temptables):
       
   303         """remove temporary data, usually associated to temporary tables"""
       
   304         pass
       
   305 
       
   306         
       
   307 class TrFunc(object):
       
   308     """lower, upper"""
       
   309     def __init__(self, trname, index, attrname=None):
       
   310         self._tr = trname.lower()
       
   311         self.index = index
       
   312         self.attrname = attrname
       
   313         
       
   314     def apply(self, resdict):
       
   315         value = resdict.get(self.attrname)
       
   316         if value is not None:
       
   317             return getattr(value, self._tr)()
       
   318         return None
       
   319 
       
   320 
       
   321 class GlobTrFunc(TrFunc):
       
   322     """count, sum, max, min, avg"""
       
   323     funcs = {
       
   324         'count': len,
       
   325         'sum': sum,
       
   326         'max': max,
       
   327         'min': min,
       
   328         # XXX avg
       
   329         }
       
   330     def apply(self, result):
       
   331         """have to 'groupby' manually. For instance, if we 'count' for index 1:
       
   332         >>> self.apply([(1, 2), (3, 4), (1, 5)])
       
   333         [(1, 7), (3, 4)]
       
   334         """
       
   335         keys, values = [], {}
       
   336         for row in result:
       
   337             key = tuple(v for i, v in enumerate(row) if i != self.index)
       
   338             value = row[self.index]
       
   339             try:
       
   340                 values[key].append(value)
       
   341             except KeyError:
       
   342                 keys.append(key)
       
   343                 values[key] = [value]
       
   344         result = []
       
   345         trfunc = self.funcs[self._tr]
       
   346         for key in keys:
       
   347             row = list(key)
       
   348             row.insert(self.index, trfunc(values[key]))
       
   349             result.append(row)
       
   350         return result
       
   351 
       
   352 
       
   353 class ConnectionWrapper(object):
       
   354     def __init__(self, cnx=None):
       
   355         self.cnx = cnx
       
   356     def commit(self):
       
   357         pass
       
   358     def rollback(self):
       
   359         pass
       
   360     def cursor(self):
       
   361         return None # no actual cursor support
       
   362 
       
   363 from cubicweb.server import SOURCE_TYPES
       
   364 
       
   365 def source_adapter(source_config):
       
   366     adapter_type = source_config['adapter'].lower()
       
   367     try:
       
   368         return SOURCE_TYPES[adapter_type]
       
   369     except KeyError:
       
   370         raise RuntimeError('Unknown adapter %r' % adapter_type)
       
   371     
       
   372 def get_source(source_config, global_schema, repo):
       
   373     """return a source adapter according to the adapter field in the
       
   374     source's configuration
       
   375     """
       
   376     return source_adapter(source_config)(repo, global_schema, source_config)