server/sources/native.py
changeset 0 b97547f5f1fa
child 438 69b79faefa94
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """Adapters for native cubicweb sources.
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 """
       
     7 __docformat__ = "restructuredtext en"
       
     8 
       
     9 from threading import Lock
       
    10 
       
    11 from mx.DateTime import now
       
    12 
       
    13 from logilab.common.cache import Cache
       
    14 from logilab.common.configuration import REQUIRED
       
    15 from logilab.common.adbh import get_adv_func_helper
       
    16 
       
    17 from indexer import get_indexer
       
    18 
       
    19 from cubicweb import UnknownEid, AuthenticationError, Binary, server
       
    20 from cubicweb.server.utils import crypt_password
       
    21 from cubicweb.server.sqlutils import SQLAdapterMixIn
       
    22 from cubicweb.server.rqlannotation import set_qdata
       
    23 from cubicweb.server.sources import AbstractSource
       
    24 from cubicweb.server.sources.rql2sql import SQLGenerator
       
    25 
       
    26 
       
    27 NONSYSTEM_ETYPES = set()
       
    28 NONSYSTEM_RELATIONS = set()
       
    29 
       
    30 class LogCursor(object):
       
    31     def __init__(self, cursor):
       
    32         self.cu = cursor
       
    33         
       
    34     def execute(self, query, args=None):
       
    35         """Execute a query.
       
    36         it's a function just so that it shows up in profiling
       
    37         """
       
    38         if server.DEBUG:
       
    39             print 'exec', query, args
       
    40         try:
       
    41             self.cu.execute(str(query), args)
       
    42         except Exception, ex:
       
    43             print "sql: %r\n args: %s\ndbms message: %r" % (
       
    44                 query, args, ex.args[0])
       
    45             raise
       
    46         
       
    47     def fetchall(self):
       
    48         return self.cu.fetchall()
       
    49         
       
    50     def fetchone(self):
       
    51         return self.cu.fetchone()
       
    52     
       
    53 def make_schema(selected, solution, table, typemap):
       
    54     """return a sql schema to store RQL query result"""
       
    55     sql = []
       
    56     varmap = {}
       
    57     for i, term in enumerate(selected):
       
    58         name = 'C%s' % i
       
    59         key = term.as_string()
       
    60         varmap[key] = '%s.%s' % (table, name)
       
    61         ttype = term.get_type(solution)
       
    62         try:
       
    63             sql.append('%s %s' % (name, typemap[ttype]))
       
    64         except KeyError:
       
    65             # assert not schema(ttype).is_final()
       
    66             sql.append('%s %s' % (name, typemap['Int']))
       
    67     return ','.join(sql), varmap
       
    68 
       
    69 def _modified_sql(table, etypes):
       
    70     # XXX protect against sql injection
       
    71     if len(etypes) > 1:
       
    72         restr = 'type IN (%s)' % ','.join("'%s'" % etype for etype in etypes)
       
    73     else:
       
    74         restr = "type='%s'" % etypes[0]
       
    75     if table == 'entities':
       
    76         attr = 'mtime'
       
    77     else:
       
    78         attr = 'dtime'
       
    79     return 'SELECT type, eid FROM %s WHERE %s AND %s > %%(time)s' % (
       
    80         table, restr, attr)
       
    81 
       
    82 
       
    83 class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
       
    84     """adapter for source using the native cubicweb schema (see below)
       
    85     """
       
    86     # need default value on class since migration doesn't call init method
       
    87     has_deleted_entitites_table = True
       
    88     
       
    89     passwd_rql = "Any P WHERE X is EUser, X login %(login)s, X upassword P"
       
    90     auth_rql = "Any X WHERE X is EUser, X login %(login)s, X upassword %(pwd)s"
       
    91     _sols = ({'X': 'EUser', 'P': 'Password'},)
       
    92     
       
    93     options = (
       
    94         ('db-driver',
       
    95          {'type' : 'string',
       
    96           'default': 'postgres',
       
    97           'help': 'database driver (postgres or sqlite)',
       
    98           'group': 'native-source', 'inputlevel': 1,
       
    99           }),
       
   100         ('db-host',
       
   101          {'type' : 'string',
       
   102           'default': '',
       
   103           'help': 'database host',
       
   104           'group': 'native-source', 'inputlevel': 1,
       
   105           }),
       
   106         ('db-name',
       
   107          {'type' : 'string',
       
   108           'default': REQUIRED,
       
   109           'help': 'database name',
       
   110           'group': 'native-source', 'inputlevel': 0,
       
   111           }),
       
   112         ('db-user',
       
   113          {'type' : 'string',
       
   114           'default': 'cubicweb',
       
   115           'help': 'database user',
       
   116           'group': 'native-source', 'inputlevel': 0,
       
   117           }),
       
   118         ('db-password',
       
   119          {'type' : 'password',
       
   120           'default': '',
       
   121           'help': 'database password',
       
   122           'group': 'native-source', 'inputlevel': 0,
       
   123           }),
       
   124         ('db-encoding',
       
   125          {'type' : 'string',
       
   126           'default': 'utf8',
       
   127           'help': 'database encoding',
       
   128           'group': 'native-source', 'inputlevel': 1,
       
   129           }),
       
   130     )
       
   131     
       
   132     def __init__(self, repo, appschema, source_config, *args, **kwargs):
       
   133         SQLAdapterMixIn.__init__(self, source_config)
       
   134         AbstractSource.__init__(self, repo, appschema, source_config,
       
   135                                 *args, **kwargs)
       
   136         # sql generator
       
   137         self._rql_sqlgen = SQLGenerator(appschema, self.dbhelper,
       
   138                                         self.encoding)
       
   139         # full text index helper
       
   140         self.indexer = get_indexer(self.dbdriver, self.encoding)
       
   141         # advanced functionality helper
       
   142         self.dbhelper.fti_uid_attr = self.indexer.uid_attr
       
   143         self.dbhelper.fti_table = self.indexer.table
       
   144         self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
       
   145         self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
       
   146         # sql queries cache
       
   147         self._cache = Cache(repo.config['rql-cache-size'])
       
   148         self._temp_table_data = {}
       
   149         self._eid_creation_lock = Lock()
       
   150 
       
   151     def reset_caches(self):
       
   152         """method called during test to reset potential source caches"""
       
   153         self._cache = Cache(self.repo.config['rql-cache-size'])
       
   154     
       
   155     def clear_eid_cache(self, eid, etype):
       
   156         """clear potential caches for the given eid"""
       
   157         self._cache.pop('%s X WHERE X eid %s' % (etype, eid), None)
       
   158         self._cache.pop('Any X WHERE X eid %s' % eid, None)
       
   159         
       
   160     def sqlexec(self, session, sql, args=None):
       
   161         """execute the query and return its result"""
       
   162         cursor = session.pool[self.uri]
       
   163         self.doexec(cursor, sql, args)
       
   164         return self.process_result(cursor)
       
   165     
       
   166     def init_creating(self):
       
   167         # check full text index availibility
       
   168         pool = self.repo._get_pool()
       
   169         if not self.indexer.has_fti_table(pool['system']):
       
   170             self.error('no text index table')
       
   171             self.indexer = None
       
   172         self.repo._free_pool(pool)
       
   173 
       
   174     def init(self):
       
   175         self.init_creating() 
       
   176         pool = self.repo._get_pool()
       
   177         # XXX cubicweb < 2.42 compat
       
   178         if 'deleted_entities' in self.dbhelper.list_tables(pool['system']):
       
   179             self.has_deleted_entitites_table = True
       
   180         else:
       
   181             self.has_deleted_entitites_table = False
       
   182         self.repo._free_pool(pool)
       
   183         
       
   184     # ISource interface #######################################################
       
   185 
       
   186     def compile_rql(self, rql):
       
   187         rqlst = self.repo.querier._rqlhelper.parse(rql)
       
   188         rqlst.restricted_vars = ()
       
   189         rqlst.children[0].solutions = self._sols
       
   190         self.repo.querier.sqlgen_annotate(rqlst)
       
   191         set_qdata(rqlst, ())
       
   192         return rqlst
       
   193     
       
   194     def set_schema(self, schema):
       
   195         """set the application'schema"""
       
   196         self._cache = Cache(self.repo.config['rql-cache-size'])
       
   197         self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0
       
   198         self.schema = schema
       
   199         try:
       
   200             self._rql_sqlgen.schema = schema
       
   201         except AttributeError:
       
   202             pass # __init__
       
   203         if 'EUser' in schema: # probably an empty schema if not true...
       
   204             # rql syntax trees used to authenticate users
       
   205             self._passwd_rqlst = self.compile_rql(self.passwd_rql)
       
   206             self._auth_rqlst = self.compile_rql(self.auth_rql)
       
   207                 
       
   208     def support_entity(self, etype, write=False):
       
   209         """return true if the given entity's type is handled by this adapter
       
   210         if write is true, return true only if it's a RW support
       
   211         """
       
   212         return not etype in NONSYSTEM_ETYPES
       
   213     
       
   214     def support_relation(self, rtype, write=False):
       
   215         """return true if the given relation's type is handled by this adapter
       
   216         if write is true, return true only if it's a RW support
       
   217         """
       
   218         if write:
       
   219             return not rtype in NONSYSTEM_RELATIONS
       
   220         # due to current multi-sources implementation, the system source
       
   221         # can't claim not supporting a relation            
       
   222         return True #not rtype == 'content_for'
       
   223 
       
   224     def authenticate(self, session, login, password):
       
   225         """return EUser eid for the given login/password if this account is
       
   226         defined in this source, else raise `AuthenticationError`
       
   227 
       
   228         two queries are needed since passwords are stored crypted, so we have
       
   229         to fetch the salt first
       
   230         """
       
   231         args = {'login': login, 'pwd' : password}
       
   232         if password is not None:
       
   233             rset = self.syntax_tree_search(session, self._passwd_rqlst, args)
       
   234             try:
       
   235                 pwd = rset[0][0]
       
   236             except IndexError:
       
   237                 raise AuthenticationError('bad login')
       
   238             # passwords are stored using the bytea type, so we get a StringIO
       
   239             if pwd is not None:
       
   240                 args['pwd'] = crypt_password(password, pwd.getvalue()[:2])
       
   241         # get eid from login and (crypted) password
       
   242         rset = self.syntax_tree_search(session, self._auth_rqlst, args)
       
   243         try:
       
   244             return rset[0][0]
       
   245         except IndexError:
       
   246             raise AuthenticationError('bad password')
       
   247     
       
   248     def syntax_tree_search(self, session, union, args=None, cachekey=None, 
       
   249                            varmap=None):
       
   250         """return result from this source for a rql query (actually from
       
   251         a rql syntax tree and a solution dictionary mapping each used
       
   252         variable to a possible type). If cachekey is given, the query
       
   253         necessary to fetch the results (but not the results themselves)
       
   254         may be cached using this key.
       
   255         """
       
   256         if server.DEBUG:
       
   257             print 'RQL FOR NATIVE SOURCE', self.uri, cachekey
       
   258             if varmap:
       
   259                 print 'USING VARMAP', varmap
       
   260             print union.as_string()
       
   261             if args: print 'ARGS', args
       
   262             print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children)
       
   263         # remember number of actually selected term (sql generation may append some)
       
   264         if cachekey is None:
       
   265             self.no_cache += 1
       
   266             # generate sql query if we are able to do so (not supported types...)
       
   267             sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
       
   268         else:
       
   269             # sql may be cached
       
   270             try:
       
   271                 sql, query_args = self._cache[cachekey]
       
   272                 self.cache_hit += 1
       
   273             except KeyError:
       
   274                 self.cache_miss += 1
       
   275                 sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
       
   276                 self._cache[cachekey] = sql, query_args
       
   277         args = self.merge_args(args, query_args)
       
   278         cursor = session.pool[self.uri]
       
   279         assert isinstance(sql, basestring), repr(sql)
       
   280         try:
       
   281             self.doexec(cursor, sql, args)
       
   282         except (self.dbapi_module.OperationalError,
       
   283                 self.dbapi_module.InterfaceError):
       
   284             # FIXME: better detection of deconnection pb
       
   285             self.info("request failed '%s' ... retry with a new cursor", sql)
       
   286             session.pool.reconnect(self)
       
   287             cursor = session.pool[self.uri]
       
   288             self.doexec(cursor, sql, args)
       
   289         res = self.process_result(cursor)
       
   290         if server.DEBUG:
       
   291             print '------>', res
       
   292         return res
       
   293                 
       
   294     def flying_insert(self, table, session, union, args=None, varmap=None):
       
   295         """similar as .syntax_tree_search, but inserts data in the
       
   296         temporary table (on-the-fly if possible, eg for the system
       
   297         source whose the given cursor come from). If not possible,
       
   298         inserts all data by calling .executemany().
       
   299         """
       
   300         if self.uri == 'system':
       
   301             if server.DEBUG:
       
   302                 print 'FLYING RQL FOR SOURCE', self.uri
       
   303                 if varmap:
       
   304                     print 'USING VARMAP', varmap
       
   305                 print union.as_string()
       
   306                 print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children)
       
   307             # generate sql queries if we are able to do so
       
   308             sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
       
   309             query = 'INSERT INTO %s %s' % (table, sql.encode(self.encoding))
       
   310             self.doexec(session.pool[self.uri], query,
       
   311                         self.merge_args(args, query_args))
       
   312 # XXX commented until it's proved to be necessary
       
   313 #             # XXX probably inefficient
       
   314 #             tempdata = self._temp_table_data.setdefault(table, set())
       
   315 #             cursor = session.pool[self.uri]
       
   316 #             cursor.execute('select * from %s' % table)
       
   317 #             for row in cursor.fetchall():
       
   318 #                 print 'data', row
       
   319 #                 tempdata.add(tuple(row))
       
   320         else:
       
   321             super(NativeSQLSource, self).flying_insert(table, session, union,
       
   322                                                        args, varmap)
       
   323         
       
   324     def _manual_insert(self, results, table, session):
       
   325         """insert given result into a temporary table on the system source"""
       
   326         #print 'manual insert', table, results
       
   327         if not results:
       
   328             return
       
   329         #cursor.execute('select * from %s'%table)
       
   330         #assert len(cursor.fetchall())== 0
       
   331         encoding = self.encoding
       
   332         # added chr to be sqlite compatible
       
   333         query_args = ['%%(%s)s' % i for i in xrange(len(results[0]))]
       
   334         query = 'INSERT INTO %s VALUES(%s)' % (table, ','.join(query_args))
       
   335         kwargs_list = []
       
   336 #        tempdata = self._temp_table_data.setdefault(table, set())
       
   337         for row in results:
       
   338             kwargs = {}
       
   339             row = tuple(row)
       
   340 # XXX commented until it's proved to be necessary
       
   341 #             if row in tempdata:
       
   342 #                 continue
       
   343 #             tempdata.add(row)
       
   344             for index, cell in enumerate(row):
       
   345                 if type(cell) is unicode:
       
   346                     cell = cell.encode(encoding)
       
   347                 elif isinstance(cell, Binary):
       
   348                     cell = self.binary(cell.getvalue())
       
   349                 kwargs[str(index)] = cell
       
   350             kwargs_list.append(kwargs)
       
   351         self.doexecmany(session.pool[self.uri], query, kwargs_list)
       
   352 
       
   353     def clean_temp_data(self, session, temptables):
       
   354         """remove temporary data, usually associated to temporary tables"""
       
   355         if temptables:
       
   356             cursor = session.pool[self.uri]
       
   357             for table in temptables:
       
   358                 try:
       
   359                     self.doexec(cursor,'DROP TABLE %s' % table)
       
   360                 except:
       
   361                     pass
       
   362                 try:
       
   363                     del self._temp_table_data[table]
       
   364                 except KeyError:
       
   365                     continue
       
   366     
       
   367     def add_entity(self, session, entity):
       
   368         """add a new entity to the source"""
       
   369         attrs = self.preprocess_entity(entity)
       
   370         sql = self.sqlgen.insert(str(entity.e_schema), attrs)
       
   371         self.doexec(session.pool[self.uri], sql, attrs)
       
   372         
       
   373     def update_entity(self, session, entity):
       
   374         """replace an entity in the source"""
       
   375         attrs = self.preprocess_entity(entity)
       
   376         sql = self.sqlgen.update(str(entity.e_schema), attrs, ['eid'])
       
   377         self.doexec(session.pool[self.uri], sql, attrs)
       
   378 
       
   379     def delete_entity(self, session, etype, eid):
       
   380         """delete an entity from the source"""
       
   381         attrs = {'eid': eid}
       
   382         sql = self.sqlgen.delete(etype, attrs)
       
   383         self.doexec(session.pool[self.uri], sql, attrs)
       
   384 
       
   385     def add_relation(self, session, subject, rtype, object):
       
   386         """add a relation to the source"""
       
   387         attrs = {'eid_from': subject, 'eid_to': object}
       
   388         sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
       
   389         self.doexec(session.pool[self.uri], sql, attrs)
       
   390     
       
   391     def delete_relation(self, session, subject, rtype, object):
       
   392         """delete a relation from the source"""
       
   393         rschema = self.schema.rschema(rtype)
       
   394         if rschema.inlined:
       
   395             etype = session.describe(subject)[0]
       
   396             sql = 'UPDATE %s SET %s=NULL WHERE eid=%%(eid)s' % (etype, rtype)
       
   397             attrs = {'eid' : subject}
       
   398         else:
       
   399             attrs = {'eid_from': subject, 'eid_to': object}
       
   400             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
       
   401         self.doexec(session.pool[self.uri], sql, attrs)    
       
   402 
       
   403     def doexec(self, cursor, query, args=None):
       
   404         """Execute a query.
       
   405         it's a function just so that it shows up in profiling
       
   406         """
       
   407         #t1 = time()
       
   408         if server.DEBUG:
       
   409             print 'exec', query, args
       
   410         #import sys
       
   411         #sys.stdout.flush()
       
   412         # str(query) to avoid error if it's an unicode string
       
   413         try:
       
   414             cursor.execute(str(query), args)
       
   415         except Exception, ex:
       
   416             self.critical("sql: %r\n args: %s\ndbms message: %r",
       
   417                           query, args, ex.args[0])
       
   418             raise
       
   419         
       
   420     def doexecmany(self, cursor, query, args):
       
   421         """Execute a query.
       
   422         it's a function just so that it shows up in profiling
       
   423         """
       
   424         #t1 = time()
       
   425         if server.DEBUG:
       
   426             print 'execmany', query, 'with', len(args), 'arguments'
       
   427         #import sys
       
   428         #sys.stdout.flush()
       
   429         # str(query) to avoid error if it's an unicode string
       
   430         try:
       
   431             cursor.executemany(str(query), args)
       
   432         except:
       
   433             self.critical("sql many: %r\n args: %s", query, args)
       
   434             raise
       
   435         
       
   436     # short cut to method requiring advanced db helper usage ##################
       
   437             
       
   438     def create_index(self, session, table, column, unique=False):
       
   439         cursor = LogCursor(session.pool[self.uri])
       
   440         self.dbhelper.create_index(cursor, table, column, unique)
       
   441             
       
   442     def drop_index(self, session, table, column, unique=False):
       
   443         cursor = LogCursor(session.pool[self.uri])
       
   444         self.dbhelper.drop_index(cursor, table, column, unique)
       
   445 
       
   446     # system source interface #################################################
       
   447 
       
   448     def eid_type_source(self, session, eid):
       
   449         """return a tuple (type, source, extid) for the entity with id <eid>"""
       
   450         sql = 'SELECT type, source, extid FROM entities WHERE eid=%s' % eid
       
   451         try:
       
   452             res = session.system_sql(sql).fetchone()
       
   453         except:
       
   454             raise UnknownEid(eid)
       
   455         if res is None:
       
   456             raise UnknownEid(eid)
       
   457         return res
       
   458 
       
   459     def extid2eid(self, session, source, lid):
       
   460         """get eid from a local id. An eid is attributed if no record is found"""
       
   461         cursor = session.system_sql('SELECT eid FROM entities WHERE '
       
   462                                     'extid=%(x)s AND source=%(s)s',
       
   463                                     # str() necessary with pg 8.3
       
   464                                     {'x': str(lid), 's': source.uri})
       
   465         # XXX testing rowcount cause strange bug with sqlite, results are there
       
   466         #     but rowcount is 0
       
   467         #if cursor.rowcount > 0: 
       
   468         try:
       
   469             result = cursor.fetchone()
       
   470             if result:
       
   471                 eid = result[0]
       
   472                 return eid            
       
   473         except:
       
   474             pass
       
   475         return None
       
   476     
       
   477     def temp_table_def(self, selected, sol, table):
       
   478         return make_schema(selected, sol, table, self.dbhelper.TYPE_MAPPING)
       
   479 
       
   480     def create_temp_table(self, session, table, schema):
       
   481         # we don't want on commit drop, this may cause problem when
       
   482         # running with an ldap source, and table will be deleted manually any way
       
   483         # on commit
       
   484         sql = self.dbhelper.sql_temporary_table(table, schema, False)
       
   485         self.doexec(session.pool[self.uri], sql)
       
   486     
       
   487     def create_eid(self, session):
       
   488         self._eid_creation_lock.acquire()
       
   489         try:
       
   490             cursor = session.pool[self.uri]
       
   491             for sql in self.dbhelper.sqls_increment_sequence('entities_id_seq'):
       
   492                 self.doexec(cursor, sql)
       
   493             return cursor.fetchone()[0]
       
   494         finally:
       
   495             self._eid_creation_lock.release()
       
   496 
       
   497     def add_info(self, session, entity, source, extid=None):
       
   498         """add type and source info for an eid into the system table"""
       
   499         # begin by inserting eid/type/source/extid into the entities table
       
   500         attrs = {'type': str(entity.e_schema), 'eid': entity.eid,
       
   501                  'extid': extid, 'source': source.uri, 'mtime': now()}
       
   502         session.system_sql(self.sqlgen.insert('entities', attrs), attrs)
       
   503 
       
   504     def delete_info(self, session, eid, etype, uri, extid):
       
   505         """delete system information on deletion of an entity by transfering
       
   506         record from the entities table to the deleted_entities table
       
   507         """
       
   508         attrs = {'eid': eid}
       
   509         session.system_sql(self.sqlgen.delete('entities', attrs), attrs)
       
   510         if self.has_deleted_entitites_table:
       
   511             attrs = {'type': etype, 'eid': eid, 'extid': extid,
       
   512                      'source': uri, 'dtime': now()}
       
   513             session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
       
   514         
       
   515     def fti_unindex_entity(self, session, eid):
       
   516         """remove text content for entity with the given eid from the full text
       
   517         index
       
   518         """
       
   519         try:
       
   520             self.indexer.cursor_unindex_object(eid, session.pool['system'])
       
   521         except:
       
   522             if self.indexer is not None:
       
   523                 self.exception('error while unindexing %s', eid)
       
   524         
       
   525     def fti_index_entity(self, session, entity):
       
   526         """add text content of a created/modified entity to the full text index
       
   527         """
       
   528         self.info('reindexing %r', entity.eid)
       
   529         try:
       
   530             self.indexer.cursor_reindex_object(entity.eid, entity,
       
   531                                                session.pool['system'])
       
   532         except:
       
   533             if self.indexer is not None:
       
   534                 self.exception('error while reindexing %s', entity)
       
   535         # update entities.mtime
       
   536         attrs = {'eid': entity.eid, 'mtime': now()}
       
   537         session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
       
   538         
       
   539     def modified_entities(self, session, etypes, mtime):
       
   540         """return a 2-uple:
       
   541         * list of (etype, eid) of entities of the given types which have been
       
   542           modified since the given timestamp (actually entities whose full text
       
   543           index content has changed)
       
   544         * list of (etype, eid) of entities of the given types which have been
       
   545           deleted since the given timestamp
       
   546         """
       
   547         modsql = _modified_sql('entities', etypes)
       
   548         cursor = session.system_sql(modsql, {'time': mtime})
       
   549         modentities = cursor.fetchall()
       
   550         delsql = _modified_sql('deleted_entities', etypes)
       
   551         cursor = session.system_sql(delsql, {'time': mtime})
       
   552         delentities = cursor.fetchall()
       
   553         return modentities, delentities
       
   554 
       
   555 
       
   556 def sql_schema(driver):
       
   557     helper = get_adv_func_helper(driver)
       
   558     schema = """
       
   559 /* Create the repository's system database */
       
   560 
       
   561 %s
       
   562 
       
   563 CREATE TABLE entities (
       
   564   eid INTEGER PRIMARY KEY NOT NULL,
       
   565   type VARCHAR(64) NOT NULL,
       
   566   source VARCHAR(64) NOT NULL,
       
   567   mtime TIMESTAMP NOT NULL,
       
   568   extid VARCHAR(256)
       
   569 );
       
   570 CREATE INDEX entities_type_idx ON entities(type);
       
   571 CREATE INDEX entities_mtime_idx ON entities(mtime);
       
   572 CREATE INDEX entities_extid_idx ON entities(extid);
       
   573 
       
   574 CREATE TABLE deleted_entities (
       
   575   eid INTEGER PRIMARY KEY NOT NULL,
       
   576   type VARCHAR(64) NOT NULL,
       
   577   source VARCHAR(64) NOT NULL,
       
   578   dtime TIMESTAMP NOT NULL,
       
   579   extid VARCHAR(256)
       
   580 );
       
   581 CREATE INDEX deleted_entities_type_idx ON deleted_entities(type);
       
   582 CREATE INDEX deleted_entities_dtime_idx ON deleted_entities(dtime);
       
   583 CREATE INDEX deleted_entities_extid_idx ON deleted_entities(extid);
       
   584 """ % helper.sql_create_sequence('entities_id_seq')
       
   585     return schema
       
   586 
       
   587 
       
   588 def sql_drop_schema(driver):
       
   589     helper = get_adv_func_helper(driver)
       
   590     return """
       
   591 %s
       
   592 DROP TABLE entities;
       
   593 DROP TABLE deleted_entities;
       
   594 """ % helper.sql_drop_sequence('entities_id_seq')
       
   595 
       
   596 
       
   597 def grant_schema(user, set_owner=True):
       
   598     result = ''
       
   599     if set_owner:
       
   600         result = 'ALTER TABLE entities OWNER TO %s;\n' % user
       
   601         result += 'ALTER TABLE deleted_entities OWNER TO %s;\n' % user
       
   602         result += 'ALTER TABLE entities_id_seq OWNER TO %s;\n' % user
       
   603     result += 'GRANT ALL ON entities TO %s;\n' % user
       
   604     result += 'GRANT ALL ON deleted_entities TO %s;\n' % user
       
   605     result += 'GRANT ALL ON entities_id_seq TO %s;\n' % user
       
   606     return result