common/mixins.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 13 Aug 2009 11:54:19 +0200
changeset 2820 66b31686d92b
parent 2789 39712da6f397
child 2968 0e3460341023
permissions -rw-r--r--
rename registered to __registered__

"""mixins of entity/views organized somewhat in a graph or tree structure


:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
__docformat__ = "restructuredtext en"

from logilab.common.deprecation import deprecated
from logilab.common.decorators import cached

from cubicweb import typed_eid
from cubicweb.selectors import implements
from cubicweb.interfaces import IWorkflowable, IEmailable, ITree


class TreeMixIn(object):
    """base tree-mixin providing the tree interface

    This mixin has to be inherited explicitly and configured using the
    tree_attribute, parent_target and children_target class attribute to
    benefit from this default implementation
    """
    tree_attribute = None
    # XXX misnamed
    parent_target = 'subject'
    children_target = 'object'

    def different_type_children(self, entities=True):
        """return children entities of different type as this entity.

        according to the `entities` parameter, return entity objects or the
        equivalent result set
        """
        res = self.related(self.tree_attribute, self.children_target,
                           entities=entities)
        if entities:
            return [e for e in res if e.e_schema != self.e_schema]
        return res.filtered_rset(lambda x: x.e_schema != self.e_schema, self.col)

    def same_type_children(self, entities=True):
        """return children entities of the same type as this entity.

        according to the `entities` parameter, return entity objects or the
        equivalent result set
        """
        res = self.related(self.tree_attribute, self.children_target,
                           entities=entities)
        if entities:
            return [e for e in res if e.e_schema == self.e_schema]
        return res.filtered_rset(lambda x: x.e_schema == self.e_schema, self.col)

    def iterchildren(self, _done=None):
        if _done is None:
            _done = set()
        for child in self.children():
            if child.eid in _done:
                self.error('loop in %s tree', self.id.lower())
                continue
            yield child
            _done.add(child.eid)

    def prefixiter(self, _done=None):
        if _done is None:
            _done = set()
        if self.eid in _done:
            return
        yield self
        _done.add(self.eid)
        for child in self.iterchildren(_done):
            try:
                for entity in child.prefixiter(_done):
                    yield entity
            except AttributeError:
                pass

    @cached
    def path(self):
        """returns the list of eids from the root object to this object"""
        path = []
        parent = self
        while parent:
            if parent.eid in path:
                self.error('loop in %s tree', self.id.lower())
                break
            path.append(parent.eid)
            try:
                # check we are not leaving the tree
                if (parent.tree_attribute != self.tree_attribute or
                    parent.parent_target != self.parent_target):
                    break
                parent = parent.parent()
            except AttributeError:
                break

        path.reverse()
        return path

    def iterparents(self):
        def _uptoroot(self):
            curr = self
            while True:
                curr = curr.parent()
                if curr is None:
                    break
                yield curr
        return _uptoroot(self)

    def notification_references(self, view):
        """used to control References field of email send on notification
        for this entity. `view` is the notification view.

        Should return a list of eids which can be used to generate message ids
        of previously sent email
        """
        return self.path()[:-1]


    ## ITree interface ########################################################
    def parent(self):
        """return the parent entity if any, else None (e.g. if we are on the
        root
        """
        try:
            return self.related(self.tree_attribute, self.parent_target,
                                entities=True)[0]
        except (KeyError, IndexError):
            return None

    def children(self, entities=True, sametype=False):
        """return children entities

        according to the `entities` parameter, return entity objects or the
        equivalent result set
        """
        if sametype:
            return self.same_type_children(entities)
        else:
            return self.related(self.tree_attribute, self.children_target,
                                entities=entities)

    def children_rql(self):
        return self.related_rql(self.tree_attribute, self.children_target)

    def __iter__(self):
        return self.iterchildren()

    def is_leaf(self):
        return len(self.children()) == 0

    def is_root(self):
        return self.parent() is None

    def root(self):
        """return the root object"""
        return self.req.entity_from_eid(self.path()[0])


class WorkflowableMixIn(object):
    """base mixin providing workflow helper methods for workflowable entities.
    This mixin will be automatically set on class supporting the 'in_state'
    relation (which implies supporting 'wf_info_for' as well)
    """
    __implements__ = (IWorkflowable,)

    @property
    def state(self):
        try:
            return self.in_state[0].name
        except IndexError:
            self.warning('entity %s has no state', self)
            return None

    @property
    def displayable_state(self):
        return self.req._(self.state)

    def wf_state(self, statename):
        rset = self.req.execute('Any S, SN WHERE S name SN, S name %(n)s, S state_of E, E name %(e)s',
                                {'n': statename, 'e': str(self.e_schema)})
        if rset:
            return rset.get_entity(0, 0)
        return None

    def wf_transition(self, trname):
        rset = self.req.execute('Any T, TN WHERE T name TN, T name %(n)s, T transition_of E, E name %(e)s',
                                {'n': trname, 'e': str(self.e_schema)})
        if rset:
            return rset.get_entity(0, 0)
        return None

    def change_state(self, state, trcomment=None, trcommentformat=None):
        """change the entity's state according to a state defined in given
        parameters
        """
        if isinstance(state, basestring):
            state = self.wf_state(state)
            assert state is not None, 'not a %s state: %s' % (self.id, state)
        if hasattr(state, 'eid'):
            stateeid = state.eid
        else:
            stateeid = state
        stateeid = typed_eid(stateeid)
        if trcomment:
            self.req.set_shared_data('trcomment', trcomment)
        if trcommentformat:
            self.req.set_shared_data('trcommentformat', trcommentformat)
        self.req.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
                         {'x': self.eid, 's': stateeid}, 'x')

    def can_pass_transition(self, trname):
        """return the Transition instance if the current user can pass the
        transition with the given name, else None
        """
        stateeid = self.in_state[0].eid
        rset = self.req.execute('Any T,N,DS WHERE S allowed_transition T,'
                                'S eid %(x)s,T name %(trname)s,ET name %(et)s,'
                                'T name N,T destination_state DS,T transition_of ET',
                                {'x': stateeid, 'et': str(self.e_schema),
                                 'trname': trname}, 'x')
        for tr in rset.entities():
            if tr.may_be_passed(self.eid, stateeid):
                return tr

    def latest_trinfo(self):
        """return the latest transition information for this entity"""
        return self.reverse_wf_info_for[-1]

    # __method methods ########################################################

    def set_state(self, params=None):
        """change the entity's state according to a state defined in given
        parameters, used to be called using __method controler facility
        """
        params = params or self.req.form
        self.change_state(typed_eid(params.pop('state')),
                          params.get('trcomment'),
                          params.get('trcomment_format'))
        self.req.set_message(self.req._('__msg state changed'))

    # specific vocabulary methods #############################################

    @deprecated('use EntityFieldsForm.subject_in_state_vocabulary')
    def subject_in_state_vocabulary(self, rschema, limit=None):
        form = self.vreg.select('forms', 'edition', self.req, entity=self)
        return form.subject_in_state_vocabulary(rschema, limit)



class EmailableMixIn(object):
    """base mixin providing the default get_email() method used by
    the massmailing view

    NOTE: The default implementation is based on the
    primary_email / use_email scheme
    """
    __implements__ = (IEmailable,)

    def get_email(self):
        if getattr(self, 'primary_email', None):
            return self.primary_email[0].address
        if getattr(self, 'use_email', None):
            return self.use_email[0].address
        return None

    @classmethod
    def allowed_massmail_keys(cls):
        """returns a set of allowed email substitution keys

        The default is to return the entity's attribute list but an
        entity class might override this method to allow extra keys.
        For instance, the Person class might want to return a `companyname`
        key.
        """
        return set(rs.type for rs, _ in cls.e_schema.attribute_definitions())

    def as_email_context(self):
        """returns the dictionary as used by the sendmail controller to
        build email bodies.

        NOTE: the dictionary keys should match the list returned by the
        `allowed_massmail_keys` method.
        """
        return dict( (attr, getattr(self, attr)) for attr in self.allowed_massmail_keys() )



MI_REL_TRIGGERS = {
    ('in_state',    'subject'): WorkflowableMixIn,
    ('primary_email',   'subject'): EmailableMixIn,
    ('use_email',   'subject'): EmailableMixIn,
    }



def _done_init(done, view, row, col):
    """handle an infinite recursion safety belt"""
    if done is None:
        done = set()
    entity = view.rset.get_entity(row, col)
    if entity.eid in done:
        msg = entity.req._('loop in %(rel)s relation (%(eid)s)') % {
            'rel': entity.tree_attribute,
            'eid': entity.eid
            }
        return None, msg
    done.add(entity.eid)
    return done, entity


class TreeViewMixIn(object):
    """a recursive tree view"""
    id = 'tree'
    item_vid = 'treeitem'
    __select__ = implements(ITree)

    def call(self, done=None, **kwargs):
        if done is None:
            done = set()
        super(TreeViewMixIn, self).call(done=done, **kwargs)

    def cell_call(self, row, col=0, vid=None, done=None, **kwargs):
        done, entity = _done_init(done, self, row, col)
        if done is None:
            # entity is actually an error message
            self.w(u'<li class="badcontent">%s</li>' % entity)
            return
        self.open_item(entity)
        entity.view(vid or self.item_vid, w=self.w, **kwargs)
        relatedrset = entity.children(entities=False)
        self.wview(self.id, relatedrset, 'null', done=done, **kwargs)
        self.close_item(entity)

    def open_item(self, entity):
        self.w(u'<li class="%s">\n' % entity.id.lower())
    def close_item(self, entity):
        self.w(u'</li>\n')


class TreePathMixIn(object):
    """a recursive path view"""
    id = 'path'
    item_vid = 'oneline'
    separator = u'&nbsp;&gt;&nbsp;'

    def call(self, **kwargs):
        self.w(u'<div class="pathbar">')
        super(TreePathMixIn, self).call(**kwargs)
        self.w(u'</div>')

    def cell_call(self, row, col=0, vid=None, done=None, **kwargs):
        done, entity = _done_init(done, self, row, col)
        if done is None:
            # entity is actually an error message
            self.w(u'<span class="badcontent">%s</span>' % entity)
            return
        parent = entity.parent()
        if parent:
            parent.view(self.id, w=self.w, done=done)
            self.w(self.separator)
        entity.view(vid or self.item_vid, w=self.w)


class ProgressMixIn(object):
    """provide default implementations for IProgress interface methods"""
    # This is an adapter isn't it ?

    @property
    def cost(self):
        return self.progress_info()['estimated']

    @property
    def revised_cost(self):
        return self.progress_info().get('estimatedcorrected', self.cost)

    @property
    def done(self):
        return self.progress_info()['done']

    @property
    def todo(self):
        return self.progress_info()['todo']

    @cached
    def progress_info(self):
        raise NotImplementedError()

    def finished(self):
        return not self.in_progress()

    def in_progress(self):
        raise NotImplementedError()

    def progress(self):
        try:
            return 100. * self.done / self.revised_cost
        except ZeroDivisionError:
            # total cost is 0 : if everything was estimated, task is completed
            if self.progress_info().get('notestimated'):
                return 0.
            return 100