--- a/vregistry.py Thu Feb 02 14:33:30 2012 +0100
+++ b/vregistry.py Mon Jan 23 13:25:02 2012 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -15,487 +15,6 @@
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""
-* the vregistry handles various types of objects interacting
- together. The vregistry handles registration of dynamically loaded
- objects and provides a convenient api to access those objects
- according to a context
-
-* to interact with the vregistry, objects should inherit from the
- AppObject abstract class
-
-* the selection procedure has been generalized by delegating to a
- selector, which is responsible to score the appobject according to the
- current state (req, rset, row, col). At the end of the selection, if
- a appobject class has been found, an instance of this class is
- returned. The selector is instantiated at appobject registration
-"""
-
-__docformat__ = "restructuredtext en"
-
-import sys
-from os import listdir, stat
-from os.path import dirname, join, realpath, isdir, exists
-from logging import getLogger
from warnings import warn
-
-from logilab.common.deprecation import deprecated, class_moved
-from logilab.common.logging_ext import set_log_methods
-
-from cubicweb import CW_SOFTWARE_ROOT
-from cubicweb import RegistryNotFound, ObjectNotFound, NoSelectableObject
-from cubicweb.appobject import AppObject, class_regid
-
-
-def _toload_info(path, extrapath, _toload=None):
- """return a dictionary of <modname>: <modpath> and an ordered list of
- (file, module name) to load
- """
- from logilab.common.modutils import modpath_from_file
- if _toload is None:
- assert isinstance(path, list)
- _toload = {}, []
- for fileordir in path:
- if isdir(fileordir) and exists(join(fileordir, '__init__.py')):
- subfiles = [join(fileordir, fname) for fname in listdir(fileordir)]
- _toload_info(subfiles, extrapath, _toload)
- elif fileordir[-3:] == '.py':
- modpath = modpath_from_file(fileordir, extrapath)
- # omit '__init__' from package's name to avoid loading that module
- # once for each name when it is imported by some other appobject
- # module. This supposes import in modules are done as::
- #
- # from package import something
- #
- # not::
- #
- # from package.__init__ import something
- #
- # which seems quite correct.
- if modpath[-1] == '__init__':
- modpath.pop()
- modname = '.'.join(modpath)
- _toload[0][modname] = fileordir
- _toload[1].append((fileordir, modname))
- return _toload
-
-
-def classid(cls):
- """returns a unique identifier for an appobject class"""
- return '%s.%s' % (cls.__module__, cls.__name__)
-
-def class_registries(cls, registryname):
- if registryname:
- return (registryname,)
- return cls.__registries__
-
-
-class Registry(dict):
-
- def __init__(self, config):
- super(Registry, self).__init__()
- self.config = config
-
- def __getitem__(self, name):
- """return the registry (dictionary of class objects) associated to
- this name
- """
- try:
- return super(Registry, self).__getitem__(name)
- except KeyError:
- raise ObjectNotFound(name), None, sys.exc_info()[-1]
-
- def initialization_completed(self):
- for appobjects in self.itervalues():
- for appobjectcls in appobjects:
- appobjectcls.__registered__(self)
-
- def register(self, obj, oid=None, clear=False):
- """base method to add an object in the registry"""
- assert not '__abstract__' in obj.__dict__
- oid = oid or class_regid(obj)
- assert oid
- if clear:
- appobjects = self[oid] = []
- else:
- appobjects = self.setdefault(oid, [])
- assert not obj in appobjects, \
- 'object %s is already registered' % obj
- appobjects.append(obj)
-
- def register_and_replace(self, obj, replaced):
- # XXXFIXME this is a duplication of unregister()
- # remove register_and_replace in favor of unregister + register
- # or simplify by calling unregister then register here
- if not isinstance(replaced, basestring):
- replaced = classid(replaced)
- # prevent from misspelling
- assert obj is not replaced, 'replacing an object by itself: %s' % obj
- registered_objs = self.get(class_regid(obj), ())
- for index, registered in enumerate(registered_objs):
- if classid(registered) == replaced:
- del registered_objs[index]
- break
- else:
- self.warning('trying to replace an unregistered view %s by %s',
- replaced, obj)
- self.register(obj)
-
- def unregister(self, obj):
- clsid = classid(obj)
- oid = class_regid(obj)
- for registered in self.get(oid, ()):
- # use classid() to compare classes because vreg will probably
- # have its own version of the class, loaded through execfile
- if classid(registered) == clsid:
- self[oid].remove(registered)
- break
- else:
- self.warning('can\'t remove %s, no id %s in the registry',
- clsid, oid)
-
- def all_objects(self):
- """return a list containing all objects in this registry.
- """
- result = []
- for objs in self.values():
- result += objs
- return result
-
- # dynamic selection methods ################################################
-
- def object_by_id(self, oid, *args, **kwargs):
- """return object with the `oid` identifier. Only one object is expected
- to be found.
-
- raise :exc:`ObjectNotFound` if not object with id <oid> in <registry>
-
- raise :exc:`AssertionError` if there is more than one object there
- """
- objects = self[oid]
- assert len(objects) == 1, objects
- return objects[0](*args, **kwargs)
-
- def select(self, __oid, *args, **kwargs):
- """return the most specific object among those with the given oid
- according to the given context.
-
- raise :exc:`ObjectNotFound` if not object with id <oid> in <registry>
-
- raise :exc:`NoSelectableObject` if not object apply
- """
- obj = self._select_best(self[__oid], *args, **kwargs)
- if obj is None:
- raise NoSelectableObject(args, kwargs, self[__oid] )
- return obj
-
- def select_or_none(self, __oid, *args, **kwargs):
- """return the most specific object among those with the given oid
- according to the given context, or None if no object applies.
- """
- try:
- return self.select(__oid, *args, **kwargs)
- except (NoSelectableObject, ObjectNotFound):
- return None
-
- def possible_objects(self, *args, **kwargs):
- """return an iterator on possible objects in this registry for the given
- context
- """
- for appobjects in self.itervalues():
- obj = self._select_best(appobjects, *args, **kwargs)
- if obj is None:
- continue
- yield obj
-
- def _select_best(self, appobjects, *args, **kwargs):
- """return an instance of the most specific object according
- to parameters
-
- return None if not object apply (don't raise `NoSelectableObject` since
- it's costly when searching appobjects using `possible_objects`
- (e.g. searching for hooks).
- """
- score, winners = 0, None
- for appobject in appobjects:
- appobjectscore = appobject.__select__(appobject, *args, **kwargs)
- if appobjectscore > score:
- score, winners = appobjectscore, [appobject]
- elif appobjectscore > 0 and appobjectscore == score:
- winners.append(appobject)
- if winners is None:
- return None
- if len(winners) > 1:
- # log in production environement / test, error while debugging
- msg = 'select ambiguity: %s\n(args: %s, kwargs: %s)'
- if self.config.debugmode or self.config.mode == 'test':
- # raise bare exception in debug mode
- raise Exception(msg % (winners, args, kwargs.keys()))
- self.error(msg, winners, args, kwargs.keys())
- # return the result of calling the appobject
- return winners[0](*args, **kwargs)
-
- # these are overridden by set_log_methods below
- # only defining here to prevent pylint from complaining
- info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None
-
-
-class VRegistry(dict):
- """class responsible to register, propose and select the various
- elements used to build the web interface. Currently, we have templates,
- views, actions and components.
- """
-
- def __init__(self, config):
- super(VRegistry, self).__init__()
- self.config = config
- # need to clean sys.path this to avoid import confusion pb (i.e. having
- # the same module loaded as 'cubicweb.web.views' subpackage and as
- # views' or 'web.views' subpackage. This is mainly for testing purpose,
- # we should'nt need this in production environment
- for webdir in (join(dirname(realpath(__file__)), 'web'),
- join(dirname(__file__), 'web')):
- if webdir in sys.path:
- sys.path.remove(webdir)
- if CW_SOFTWARE_ROOT in sys.path:
- sys.path.remove(CW_SOFTWARE_ROOT)
-
- def reset(self):
- # don't use self.clear, we want to keep existing subdictionaries
- for subdict in self.itervalues():
- subdict.clear()
- self._lastmodifs = {}
-
- def __getitem__(self, name):
- """return the registry (dictionary of class objects) associated to
- this name
- """
- try:
- return super(VRegistry, self).__getitem__(name)
- except KeyError:
- raise RegistryNotFound(name), None, sys.exc_info()[-1]
-
- # methods for explicit (un)registration ###################################
-
- # default class, when no specific class set
- REGISTRY_FACTORY = {None: Registry}
-
- def registry_class(self, regid):
- try:
- return self.REGISTRY_FACTORY[regid]
- except KeyError:
- return self.REGISTRY_FACTORY[None]
-
- def setdefault(self, regid):
- try:
- return self[regid]
- except KeyError:
- self[regid] = self.registry_class(regid)(self.config)
- return self[regid]
-
-# def clear(self, key):
-# regname, oid = key.split('.')
-# self[regname].pop(oid, None)
-
- def register_all(self, objects, modname, butclasses=()):
- """register all `objects` given. Objects which are not from the module
- `modname` or which are in `butclasses` won't be registered.
-
- Typical usage is:
-
- .. sourcecode:: python
-
- vreg.register_all(globals().values(), __name__, (ClassIWantToRegisterExplicitly,))
-
- So you get partially automatic registration, keeping manual registration
- for some object (to use
- :meth:`~cubicweb.cwvreg.CubicWebRegistry.register_and_replace` for
- instance)
- """
- for obj in objects:
- try:
- if obj.__module__ != modname or obj in butclasses:
- continue
- oid = class_regid(obj)
- except AttributeError:
- continue
- if oid and not '__abstract__' in obj.__dict__:
- self.register(obj, oid=oid)
-
- def register(self, obj, registryname=None, oid=None, clear=False):
- """register `obj` application object into `registryname` or
- `obj.__registry__` if not specified, with identifier `oid` or
- `obj.__regid__` if not specified.
-
- If `clear` is true, all objects with the same identifier will be
- previously unregistered.
- """
- assert not '__abstract__' in obj.__dict__
- try:
- vname = obj.__name__
- except AttributeError:
- # XXX may occurs?
- vname = obj.__class__.__name__
- for registryname in class_registries(obj, registryname):
- registry = self.setdefault(registryname)
- registry.register(obj, oid=oid, clear=clear)
- self.debug('register %s in %s[\'%s\']',
- vname, registryname, oid or class_regid(obj))
- self._loadedmods.setdefault(obj.__module__, {})[classid(obj)] = obj
-
- def unregister(self, obj, registryname=None):
- """unregister `obj` application object from the registry `registryname` or
- `obj.__registry__` if not specified.
- """
- for registryname in class_registries(obj, registryname):
- self[registryname].unregister(obj)
-
- def register_and_replace(self, obj, replaced, registryname=None):
- """register `obj` application object into `registryname` or
- `obj.__registry__` if not specified. If found, the `replaced` object
- will be unregistered first (else a warning will be issued as it's
- generally unexpected).
- """
- for registryname in class_registries(obj, registryname):
- self[registryname].register_and_replace(obj, replaced)
-
- # initialization methods ###################################################
-
- def init_registration(self, path, extrapath=None):
- self.reset()
- # compute list of all modules that have to be loaded
- self._toloadmods, filemods = _toload_info(path, extrapath)
- # XXX is _loadedmods still necessary ? It seems like it's useful
- # to avoid loading same module twice, especially with the
- # _load_ancestors_then_object logic but this needs to be checked
- self._loadedmods = {}
- return filemods
-
- def register_objects(self, path, extrapath=None):
- # load views from each directory in the instance's path
- filemods = self.init_registration(path, extrapath)
- for filepath, modname in filemods:
- self.load_file(filepath, modname)
- self.initialization_completed()
-
- def initialization_completed(self):
- for regname, reg in self.iteritems():
- reg.initialization_completed()
-
- def _mdate(self, filepath):
- try:
- return stat(filepath)[-2]
- except OSError:
- # this typically happens on emacs backup files (.#foo.py)
- self.warning('Unable to load %s. It is likely to be a backup file',
- filepath)
- return None
-
- def is_reload_needed(self, path):
- """return True if something module changed and the registry should be
- reloaded
- """
- lastmodifs = self._lastmodifs
- for fileordir in path:
- if isdir(fileordir) and exists(join(fileordir, '__init__.py')):
- if self.is_reload_needed([join(fileordir, fname)
- for fname in listdir(fileordir)]):
- return True
- elif fileordir[-3:] == '.py':
- mdate = self._mdate(fileordir)
- if mdate is None:
- continue # backup file, see _mdate implementation
- elif "flymake" in fileordir:
- # flymake + pylint in use, don't consider these they will corrupt the registry
- continue
- if fileordir not in lastmodifs or lastmodifs[fileordir] < mdate:
- self.info('File %s changed since last visit', fileordir)
- return True
- return False
-
- def load_file(self, filepath, modname):
- """load app objects from a python file"""
- from logilab.common.modutils import load_module_from_name
- if modname in self._loadedmods:
- return
- self._loadedmods[modname] = {}
- mdate = self._mdate(filepath)
- if mdate is None:
- return # backup file, see _mdate implementation
- elif "flymake" in filepath:
- # flymake + pylint in use, don't consider these they will corrupt the registry
- return
- # set update time before module loading, else we get some reloading
- # weirdness in case of syntax error or other error while importing the
- # module
- self._lastmodifs[filepath] = mdate
- # load the module
- module = load_module_from_name(modname)
- self.load_module(module)
-
- def load_module(self, module):
- self.info('loading %s from %s', module.__name__, module.__file__)
- if hasattr(module, 'registration_callback'):
- module.registration_callback(self)
- else:
- for objname, obj in vars(module).items():
- if objname.startswith('_'):
- continue
- self._load_ancestors_then_object(module.__name__, obj)
-
- def _load_ancestors_then_object(self, modname, appobjectcls):
- """handle automatic appobject class registration:
-
- - first ensure parent classes are already registered
-
- - class with __abstract__ == True in their local dictionnary or
- with a name starting with an underscore are not registered
-
- - appobject class needs to have __registry__ and __regid__ attributes
- set to a non empty string to be registered.
- """
- # imported classes
- objmodname = getattr(appobjectcls, '__module__', None)
- if objmodname != modname:
- if objmodname in self._toloadmods:
- self.load_file(self._toloadmods[objmodname], objmodname)
- return
- # skip non registerable object
- try:
- if not issubclass(appobjectcls, AppObject):
- return
- except TypeError:
- return
- clsid = classid(appobjectcls)
- if clsid in self._loadedmods[modname]:
- return
- self._loadedmods[modname][clsid] = appobjectcls
- for parent in appobjectcls.__bases__:
- self._load_ancestors_then_object(modname, parent)
- if (appobjectcls.__dict__.get('__abstract__')
- or appobjectcls.__name__[0] == '_'
- or not appobjectcls.__registries__
- or not class_regid(appobjectcls)):
- return
- try:
- self.register(appobjectcls)
- except Exception, ex:
- if self.config.mode in ('test', 'dev'):
- raise
- self.exception('appobject %s registration failed: %s',
- appobjectcls, ex)
- # these are overridden by set_log_methods below
- # only defining here to prevent pylint from complaining
- info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None
-
-
-# init logging
-set_log_methods(VRegistry, getLogger('cubicweb.vreg'))
-set_log_methods(Registry, getLogger('cubicweb.registry'))
-
-
-# XXX bw compat functions #####################################################
-
-from cubicweb.appobject import objectify_selector, AndSelector, OrSelector, Selector
-
-Selector = class_moved(Selector)
+warn('[3.15] moved to logilab.common.registry', DeprecationWarning, stacklevel=2)
+from logilab.common.registry import *