server/sources/pyrorql.py
changeset 6724 24bf6f181d0e
parent 6672 2008fd2f628c
child 6941 9ed02daa7dbb
equal deleted inserted replaced
6723:a2ccbcbb08a6 6724:24bf6f181d0e
    43 
    43 
    44 def uidtype(union, col, etype, args):
    44 def uidtype(union, col, etype, args):
    45     select, col = union.locate_subquery(col, etype, args)
    45     select, col = union.locate_subquery(col, etype, args)
    46     return getattr(select.selection[col], 'uidtype', None)
    46     return getattr(select.selection[col], 'uidtype', None)
    47 
    47 
    48 def load_mapping_file(mappingfile):
       
    49     mapping = {}
       
    50     execfile(mappingfile, mapping)
       
    51     for junk in ('__builtins__', '__doc__'):
       
    52         mapping.pop(junk, None)
       
    53     mapping.setdefault('support_relations', {})
       
    54     mapping.setdefault('dont_cross_relations', set())
       
    55     mapping.setdefault('cross_relations', set())
       
    56 
       
    57     # do some basic checks of the mapping content
       
    58     assert 'support_entities' in mapping, \
       
    59            'mapping file should at least define support_entities'
       
    60     assert isinstance(mapping['support_entities'], dict)
       
    61     assert isinstance(mapping['support_relations'], dict)
       
    62     assert isinstance(mapping['dont_cross_relations'], set)
       
    63     assert isinstance(mapping['cross_relations'], set)
       
    64     unknown = set(mapping) - set( ('support_entities', 'support_relations',
       
    65                                    'dont_cross_relations', 'cross_relations') )
       
    66     assert not unknown, 'unknown mapping attribute(s): %s' % unknown
       
    67     # relations that are necessarily not crossed
       
    68     mapping['dont_cross_relations'] |= set(('owned_by', 'created_by'))
       
    69     for rtype in ('is', 'is_instance_of', 'cw_source'):
       
    70         assert rtype not in mapping['dont_cross_relations'], \
       
    71                '%s relation should not be in dont_cross_relations' % rtype
       
    72         assert rtype not in mapping['support_relations'], \
       
    73                '%s relation should not be in support_relations' % rtype
       
    74     return mapping
       
    75 
       
    76 
    48 
    77 class ReplaceByInOperator(Exception):
    49 class ReplaceByInOperator(Exception):
    78     def __init__(self, eids):
    50     def __init__(self, eids):
    79         self.eids = eids
    51         self.eids = eids
    80 
    52 
    94          {'type' : 'string',
    66          {'type' : 'string',
    95           'default': REQUIRED,
    67           'default': REQUIRED,
    96           'help': 'identifier of the repository in the pyro name server',
    68           'help': 'identifier of the repository in the pyro name server',
    97           'group': 'pyro-source', 'level': 0,
    69           'group': 'pyro-source', 'level': 0,
    98           }),
    70           }),
    99         ('mapping-file',
       
   100          {'type' : 'string',
       
   101           'default': REQUIRED,
       
   102           'help': 'path to a python file with the schema mapping definition',
       
   103           'group': 'pyro-source', 'level': 1,
       
   104           }),
       
   105         ('cubicweb-user',
    71         ('cubicweb-user',
   106          {'type' : 'string',
    72          {'type' : 'string',
   107           'default': REQUIRED,
    73           'default': REQUIRED,
   108           'help': 'user to use for connection on the distant repository',
    74           'help': 'user to use for connection on the distant repository',
   109           'group': 'pyro-source', 'level': 0,
    75           'group': 'pyro-source', 'level': 0,
   154     PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
   120     PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
   155     _conn = None
   121     _conn = None
   156 
   122 
   157     def __init__(self, repo, source_config, *args, **kwargs):
   123     def __init__(self, repo, source_config, *args, **kwargs):
   158         AbstractSource.__init__(self, repo, source_config, *args, **kwargs)
   124         AbstractSource.__init__(self, repo, source_config, *args, **kwargs)
   159         mappingfile = source_config['mapping-file']
   125         # XXX get it through pyro if unset
   160         if not mappingfile[0] == '/':
       
   161             mappingfile = join(repo.config.apphome, mappingfile)
       
   162         try:
       
   163             mapping = load_mapping_file(mappingfile)
       
   164         except IOError:
       
   165             self.disabled = True
       
   166             self.error('cant read mapping file %s, source disabled',
       
   167                        mappingfile)
       
   168             self.support_entities = {}
       
   169             self.support_relations = {}
       
   170             self.dont_cross_relations = set()
       
   171             self.cross_relations = set()
       
   172         else:
       
   173             self.support_entities = mapping['support_entities']
       
   174             self.support_relations = mapping['support_relations']
       
   175             self.dont_cross_relations = mapping['dont_cross_relations']
       
   176             self.cross_relations = mapping['cross_relations']
       
   177         baseurl = source_config.get('base-url')
   126         baseurl = source_config.get('base-url')
   178         if baseurl and not baseurl.endswith('/'):
   127         if baseurl and not baseurl.endswith('/'):
   179             source_config['base-url'] += '/'
   128             source_config['base-url'] += '/'
   180         self.config = source_config
   129         self.config = source_config
   181         myoptions = (('%s.latest-update-time' % self.uri,
   130         myoptions = (('%s.latest-update-time' % self.uri,
   210                 timestamp = int(rset[0][0])
   159                 timestamp = int(rset[0][0])
   211             return datetime.fromtimestamp(timestamp)
   160             return datetime.fromtimestamp(timestamp)
   212         finally:
   161         finally:
   213             session.close()
   162             session.close()
   214 
   163 
   215     def init(self):
   164     def init(self, activated, session=None):
   216         """method called by the repository once ready to handle request"""
   165         """method called by the repository once ready to handle request"""
   217         interval = int(self.config.get('synchronization-interval', 5*60))
   166         self.load_mapping(session)
   218         self.repo.looping_task(interval, self.synchronize)
   167         if activated:
   219         self.repo.looping_task(self._query_cache.ttl.seconds/10,
   168             interval = int(self.config.get('synchronization-interval', 5*60))
   220                                self._query_cache.clear_expired)
   169             self.repo.looping_task(interval, self.synchronize)
       
   170             self.repo.looping_task(self._query_cache.ttl.seconds/10,
       
   171                                    self._query_cache.clear_expired)
       
   172 
       
   173     def load_mapping(self, session=None):
       
   174         self.support_entities = {}
       
   175         self.support_relations = {}
       
   176         self.dont_cross_relations = set(('owned_by', 'created_by'))
       
   177         self.cross_relations = set()
       
   178         assert self.eid is not None
       
   179         if session is None:
       
   180             _session = self.repo.internal_session()
       
   181         else:
       
   182             _session = session
       
   183         try:
       
   184             for rql, struct in [('Any ETN WHERE S cw_support ET, ET name ETN, ET is CWEType, S eid %(s)s',
       
   185                                  self.support_entities),
       
   186                                 ('Any RTN WHERE S cw_support RT, RT name RTN, RT is CWRType, S eid %(s)s',
       
   187                                  self.support_relations)]:
       
   188                 for ertype, in _session.execute(rql, {'s': self.eid}):
       
   189                     struct[ertype] = True # XXX write support
       
   190             for rql, struct in [('Any RTN WHERE S cw_may_cross RT, RT name RTN, S eid %(s)s',
       
   191                                  self.cross_relations),
       
   192                                 ('Any RTN WHERE S cw_dont_cross RT, RT name RTN, S eid %(s)s',
       
   193                                  self.dont_cross_relations)]:
       
   194                 for rtype, in _session.execute(rql, {'s': self.eid}):
       
   195                     struct.add(rtype)
       
   196         finally:
       
   197             if session is None:
       
   198                 _session.close()
       
   199         # XXX move in hooks or schema constraints
       
   200         for rtype in ('is', 'is_instance_of', 'cw_source'):
       
   201             assert rtype not in self.dont_cross_relations, \
       
   202                    '%s relation should not be in dont_cross_relations' % rtype
       
   203             assert rtype not in self.support_relations, \
       
   204                    '%s relation should not be in support_relations' % rtype
   221 
   205 
   222     def local_eid(self, cnx, extid, session):
   206     def local_eid(self, cnx, extid, session):
   223         etype, dexturi, dextid = cnx.describe(extid)
   207         etype, dexturi, dextid = cnx.describe(extid)
   224         if dexturi == 'system' or not (
   208         if dexturi == 'system' or not (
   225             dexturi in self.repo.sources_by_uri or self._skip_externals):
   209             dexturi in self.repo.sources_by_uri or self._skip_externals):
   244             # external source (hence we've no chance to synchronize...)
   228             # external source (hence we've no chance to synchronize...)
   245             return
   229             return
   246         etypes = self.support_entities.keys()
   230         etypes = self.support_entities.keys()
   247         if mtime is None:
   231         if mtime is None:
   248             mtime = self.last_update_time()
   232             mtime = self.last_update_time()
   249         updatetime, modified, deleted = extrepo.entities_modified_since(etypes,
   233         updatetime, modified, deleted = extrepo.entities_modified_since(
   250                                                                         mtime)
   234             etypes, mtime)
   251         self._query_cache.clear()
   235         self._query_cache.clear()
   252         repo = self.repo
   236         repo = self.repo
   253         session = repo.internal_session()
   237         session = repo.internal_session()
   254         source = repo.system_source
   238         source = repo.system_source
   255         try:
   239         try: