HEX
Server: Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.2.34
System: Linux atalantini.com 3.10.0-1127.13.1.el7.x86_64 #1 SMP Tue Jun 23 15:46:38 UTC 2020 x86_64
User: root (0)
PHP: 7.2.34
Disabled: NONE
Upload Files
File: //lib/python2.7/site-packages/kobo/types.py
# -*- coding: utf-8 -*-


__all__ = (
    "Enum",
    "EnumItem",
    "State",
    "StateEnum",
    "DictSet",
)


class EnumItem(object):
    """Data wrapper for Enum."""

    def __init__(self, name, help_text=None, **kwargs):
        self.name = name
        self.help_text = help_text or ""
        self.options = kwargs

    def __str__(self):
        return self.name

    def __repr__(self):
        return str(self)

    def __eq__(self, obj):
        return str(self) == str(obj)

    def __getitem__(self, name):
        return self.options[name]

    def get(self, name, default=None):
        return self.options.get(name, default)


class Enum(object):
    """Enumerated list."""

    def __init__(self, *args):
        """
        @param args: items to be enumerated
        @type args: str, EnumItem
        """
        self._items = []
        self._order = []

        # TODO: duplicity!
        for i in args:
            if not issubclass(type(i), EnumItem):
                i = EnumItem(str(i))
            if i in self._order:
                raise ValueError("Duplicite item: %s" % i)
            self._items.append(i)
            self._order.append(str(i))

        self._dict = dict([ (value, i) for i, value in enumerate(self._order) ])

    def __iter__(self):
        return iter(self._order)

    def __getitem__(self, key):
        if type(key) in (int, slice):
            return self._order[key]
        return self._dict[key]

    def get(self, key, default=None):
        try:
            return self[key]
        except (IndexError, KeyError):
            return default

    def get_num(self, key, default=None):
        try:
            return self._dict[key]
        except (IndexError, KeyError):
            return default

    def get_value(self, key, default=None):
        try:
            return self._order[key]
        except (IndexError, KeyError):
            return default

    def get_mapping(self):
        """Return list of (key, value) mappings."""
        return [ (self._dict[key], key) for key in self._order ]

    def get_item(self, key):
        if type(key) is int:
            return self._items[key]
        elif isinstance(key, State):
            return key
        elif isinstance(key, StateEnum):
            return self._current_state
        return self._items[self._dict[key]]

    def get_item_help_text(self, key):
        """Return item's help text."""
        return self.get_item(key).help_text

    def get_item_option(self, key, option, default=None):
        """Return item's option passed as a kwarg in it's constructor."""
        return self.get_item(key).get(option, default)


class State(EnumItem):

    def __init__(self, name, next_states, help_text=None, enter=None, leave=None, check_perms=None, methods=None, **kwargs):
        """
        @param name: state name
        @type name:  str
        @param next_states: list of allowed state transitions
        @type next_states:  [str]
        @param help_text: help text associated to the state
        @type help_text:  str
        @param enter: list of functions to be executed on entering the state, they take (old_state, **kwargs) arguments
        @type enter:  [function]
        @param leave: list of functions to be executed on leaving the state, they take (old_state, **kwargs) arguments
        @type leave:  [function]
        @param methods:
        @type methods:  {"": }
        """

        EnumItem.__init__(self, name, help_text=help_text, **kwargs)
        self.next_states = next_states or []
        self.enter = enter or []
        self.leave = leave or []
        self.check_perms = check_perms or []
        self.methods = methods or {}


    def __getattr__(self, name):
        if name in self.methods:
            return self.methods[name]


class StateEnum(Enum):

    def __init__(self, *args):
        Enum.__init__(self, *args)
        self._current_state = None
        self._to = None

        # check if all states and transitions are sane
        all_next_states = set()
        for state in self._items:
            for next_state in state.next_states:
                 all_next_states.add(next_state)
        for state in all_next_states:
            if state not in self._items:
                 raise ValueError("State '%s' is not defined. Available states: %s" % (state, sorted(self._items)))

    def __str__(self):
        if self._current_state is None:
            return ""
        return str(self.get_state_id())
#        return self._current_state.name

    def set_state(self, state):
        self._current_state = self.get_item(state)

    def get_state(self):
        return self._current_state

    def get_state_id(self):
        return self.get_num(self._current_state.name)

    def get_final_states(self, return_id_list=False):
        result = []
        for state in self._items:
            if state.next_states:
                continue
            if return_id_list:
                result.append(self.get_num(state.name))
            else:
                result.append(state.name)
        return result

    def get_next_states_mapping(self, append_current=True, user=None, **kwargs):
        states = set()
        current_state = self._current_state.name

        if self._current_state is None:
            return []

        if append_current:
            states.add(current_state)
        states |= set(self._current_state.next_states)

        if user is not None:
            invalid_states = set()
            check_perms_args = dict(transition=True, user=user)
            check_perms_args.update(kwargs)

            for new_state in states:
                if new_state == current_state:
#                    valid_states.append(state)
                    continue

                # check permissions: all must be valid
                for func in self._current_state.check_perms:
                    if not func(current_state, new_state, **check_perms_args):
                        invalid_states.add(new_state)
                        break

            states -= invalid_states

        return sorted([(self.get_num(name), name) for name in states])

    def change_state(self, new_state, commit = True, **kwargs):
        '''new_state is number or state_name
           if commit is set, state is changed,
           otherwise transition is just tested and prepared
           commit with new_state == None just makes prepared transition if
           there is any, otherwise raises exception

           All callbacks are called with 'commit' parameter, to make
           side-effects just once

           returns: True if change was commited, False if not or no change
           was made (same new state, no new state)

           raises:
               ValueError:
                   - for invalid permissions
                   - invalid transitions
                   - commiting not prepared transition
        '''
        if new_state:
            new_state = self.get_item(new_state)
        if new_state == self._current_state or (new_state is None and not commit):
            return False
        if new_state is None and (not commit or self._to is None):
            raise ValueError('No new state set and commit specified')
        if new_state is None:
            new_state = self._to

        current_state = self._current_state.name
        #new_state = self.get_value(new_state)
        if str(new_state) not in self._current_state.next_states:
            raise ValueError("Invalid transition '%s' -> '%s'." % (current_state, new_state))

        # check transition permissions
        # if username is needed check_perms functions, pass it through kwargs
        check_perms_args = dict(commit=commit)
        check_perms_args.update(kwargs)
        for func in self._current_state.check_perms:
            if not func(current_state, new_state, **check_perms_args):
                raise ValueError("Insufficient rights for transition '%s' -> '%s'." % (current_state, new_state))

        # run "leave" functions on current state
        for func in self._current_state.leave:
            func(current_state, new_state, **kwargs)

        # run "enter" functions on new state
        for func in new_state.enter:
            func(current_state, new_state, **kwargs)

        if commit:
            self.set_state(new_state)
            self._to = None
            return True
        else:
            self._to = new_state
        return False


class DictSet(dict):
    """Dictionary with set operations on keys."""

    def copy(self):
        return DictSet(super(DictSet, self).copy())

    def __sub__(self, other):
        result = DictSet()
        for key, value in self.iteritems():
            if key not in other:
                result[key] = value
        return result

    def __or__(self, other):
        result = self.copy()
        result.update(other)
        return result

    def __and__(self, other):
        result = DictSet()
        for key, value in self.iteritems():
            if key in other:
                result[key] = value
        return result

    def add(self, key, value):
        # return True if a new key was added
        if key in self:
            return False
        self[key] = value
        return True

    def remove(self, key):
        del self[key]