diff --git a/pyproject.toml b/pyproject.toml index fc64b82..3660460 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstate" -version = "1.6.2a1" +version = "1.6.2a2" description = "Robust statechart for configurable automation rules." readme = "README.md" license = {file = "LICENSE"} diff --git a/src/superstate/__init__.py b/src/superstate/__init__.py index 14d7b6e..a6b015c 100644 --- a/src/superstate/__init__.py +++ b/src/superstate/__init__.py @@ -64,7 +64,7 @@ __author_email__ = 'jpj6652@gmail.com' __title__ = 'superstate' __description__ = 'Compact statechart that can be vendored.' -__version__ = '1.6.2a1' +__version__ = '1.6.2a2' __license__ = 'MIT' __copyright__ = 'Copyright 2022 Jesse Johnson.' __all__ = ( diff --git a/src/superstate/machine.py b/src/superstate/machine.py index 1d92b67..18450db 100644 --- a/src/superstate/machine.py +++ b/src/superstate/machine.py @@ -11,6 +11,7 @@ TYPE_CHECKING, Any, Dict, + Iterator, List, Optional, # Sequence, @@ -31,10 +32,10 @@ from superstate.provider import PROVIDERS from superstate.state import ( AtomicState, - SubstateMixin, # CompoundState, ParallelState, State, + SubstateMixin, ) from superstate.types import Selection @@ -133,7 +134,8 @@ def __init__( log.info('initializing statechart') self._sessionid = UUID( - bytes=os.urandom(16), version=4 # pylint: disable=no-member + bytes=os.urandom(16), + version=4, # pylint: disable=no-member ) self.datamodel.populate() @@ -170,18 +172,9 @@ def __getattr__(self, name: str) -> Any: # do not attempt to resolve missing dunders if name.startswith('__'): raise AttributeError - # handle state check for active states if name.startswith('is_'): return name[3:] in self.active - - # handle automatic transitions - if name == '_auto_': - - def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]: - return self.trigger('', *args, **kwargs) - - return wrapper raise AttributeError(f"cannot find attribute: {name}") @property @@ -202,6 +195,18 @@ def current_state(self) -> State: # TODO: rename to head or position potentially return self.__current_state + @current_state.setter + def current_state(self, state: State) -> None: + """Set the current state.""" + if hasattr(self.current_state, 'states'): + if state in list(self.current_state.states.values()): + self.__current_state = state + return + if len(self.active) > 1 and self.active[1] == state: + self.__current_state = state + else: + raise InvalidTransition('cannot transition from final state') + @property def root(self) -> SubstateMixin: """Return root state of statechart.""" @@ -342,9 +347,7 @@ def get_state(self, statepath: str) -> State: break raise InvalidState(f"state could not be found: {statepath}") - def add_state( - self, state: State, statepath: Optional[str] = None - ) -> None: + def add_state(self, state: State, statepath: Optional[str] = None) -> None: """Add state to either parent or target state.""" parent = self.get_state(statepath) if statepath else self.parent if isinstance(parent, SubstateMixin): @@ -356,13 +359,13 @@ def add_state( ) @property - def transitions(self) -> Tuple[Transition, ...]: + def transitions(self) -> Iterator[Transition]: """Return list of current transitions.""" - return ( - tuple(self.current_state.transitions) - if hasattr(self.current_state, 'transitions') - else () - ) + for state in self.active: + if hasattr(state, 'transitions'): + yield from state.transitions + else: + continue def add_transition( self, transition: Transition, statepath: Optional[str] = None @@ -375,57 +378,28 @@ def add_transition( else: raise InvalidState('cannot add transition to %s', target) - @staticmethod - def _lookup_transitions(event: str, state: State) -> List[Transition]: - return ( - state.get_transition(event) - if hasattr(state, 'get_transition') - else [] - ) + def get_transitions(self, event: str) -> tuple[Transition, ...]: + """Get each transition maching event.""" + return tuple(filter(lambda t: t.event == event, self.transitions)) - def process_transitions( - self, event: str, /, *args: Any, **kwargs: Any - ) -> Transition: - """Get transition event from active states.""" - # TODO: must use datamodel to process transitions - # child => parent => grandparent - guarded: List[Transition] = [] - for current in self.active: - transitions: List[Transition] = [] - - # search parallel states for transitions - if isinstance(current, ParallelState): - for state in current.states.values(): - transitions += self._lookup_transitions(event, state) - else: - transitions = self._lookup_transitions(event, current) - - # evaluate conditions - allowed = [ - t for t in transitions if t.evaluate(self, *args, **kwargs) - ] - if allowed: - # if len(allowed) > 1: - # raise InvalidConfig( - # 'Conflicting transitions were allowed for event', - # event - # ) - return allowed[0] - guarded += transitions - if len(guarded) != 0: - raise ConditionNotSatisfied('no transition possible from state') - raise InvalidTransition(f"transition could not be found: {event}") - - def trigger( - self, event: str, /, *args: Any, **kwargs: Any - ) -> Optional[Any]: + def trigger(self, event: str, /, *args: Any, **kwargs: Any) -> None: """Transition from event to target state.""" - # NOTE: 'on' should register an event with an event loop for callback - # trigger - # XXX: currently does not allow contional transient states - transition = self.process_transitions(event, *args, **kwargs) - if transition: - log.info('transitioning to %r', event) - result = transition(self, *args, **kwargs) - return result - raise InvalidTransition('transition %r not found', event) + # TODO: need to consider superstate transitions. + if self.current_state.type == 'final': + raise InvalidTransition('cannot transition from final state') + + transitions = self.get_transitions(event) + if not transitions: + raise InvalidTransition('no transitions match event') + allowed = [t for t in transitions if t.evaluate(self, *args, **kwargs)] + if not allowed: + raise ConditionNotSatisfied( + 'Condition is not satisfied for this transition' + ) + if len(allowed) > 1: + raise InvalidTransition( + 'More than one transition was allowed for this event' + ) + log.info('processed guard for %s', allowed[0].event) + allowed[0].execute(self, *args, **kwargs) + log.info('processed transition event %s', allowed[0].event) diff --git a/src/superstate/state.py b/src/superstate/state.py index 1cf40b4..8417259 100644 --- a/src/superstate/state.py +++ b/src/superstate/state.py @@ -72,21 +72,6 @@ class TransitionMixin: __transitions: List[Transition] - def __register_transition_callback(self, transition: Transition) -> None: - # XXX: currently mapping to class instead of instance - setattr( - self, - transition.event if transition.event != '' else '_auto_', - # pylint: disable-next=unnecessary-dunder-call - transition.callback().__get__(self, self.__class__), - ) - - def _process_transient_state(self, ctx: StateChart) -> None: - for transition in self.transitions: - if transition.event == '': - ctx._auto_() # pylint: disable=protected-access - break - @property def transitions(self) -> Tuple[Transition, ...]: """Return transitions of this state.""" @@ -96,13 +81,10 @@ def transitions(self) -> Tuple[Transition, ...]: def transitions(self, transitions: List[Transition]) -> None: """Initialize atomic state.""" self.__transitions = transitions - for transition in self.transitions: - self.__register_transition_callback(transition) def add_transition(self, transition: Transition) -> None: """Add transition to this state.""" self.__transitions.append(transition) - self.__register_transition_callback(transition) def get_transition(self, event: str) -> Tuple[Transition, ...]: """Get each transition maching event.""" @@ -184,7 +166,7 @@ class State: name: str = cast(str, Identifier()) # history: Optional['HistoryState'] # final: Optional[FinalState] - states: Dict[str, State] + # states: Dict[str, State] # transitions: Tuple[Transition, ...] # onentry: Tuple[ActionTypes, ...] # onexit: Tuple[ActionTypes, ...] @@ -493,8 +475,14 @@ def run_on_entry(self, ctx: StateChart) -> Optional[Any]: self.datamodel, 'maps' ): self.datamodel.populate() - self._process_transient_state(ctx) - return super().run_on_entry(ctx) + log.info("executing 'on_entry' state change actions for %s", self.name) + results = super().run_on_entry(ctx) + # process transient states + for transition in self.transitions: + if transition.event == '': + ctx.trigger(transition.event) + break + return results class SubstateMixin(State): diff --git a/src/superstate/transition.py b/src/superstate/transition.py index 183fc18..d39f403 100644 --- a/src/superstate/transition.py +++ b/src/superstate/transition.py @@ -31,7 +31,7 @@ class Transition: name. In all cases, the token matching is case sensitive.] """ - # __slots__ = ['event', 'target', 'action', 'cond', 'type'] + # __slots__ = ['event', 'target', 'content', 'cond', 'type'] __source: Optional[State] = None event: str = cast(str, Identifier(TRANSITION_PATTERN)) @@ -57,32 +57,6 @@ def __init__( def __repr__(self) -> str: return repr(f"Transition(event={self.event}, target={self.target})") - def __call__( - self, ctx: StateChart, *args: Any, **kwargs: Any - ) -> Optional[Any]: - """Run transition process.""" - # TODO: move change_state to process_transitions - if 'statepath' in kwargs: - superstate_path = kwargs['statepath'].split('.')[:-1] - target = ( - '.'.join(superstate_path + [self.target]) - if superstate_path != [] - else self.target - ) - else: - target = self.target - - results = None - if self.content: - results = [] - provider = ctx.datamodel.provider(ctx) - for expression in tuplize(self.content): - results.append(provider.handle(expression, *args, **kwargs)) - log.info("executed action event for %r", self.event) - ctx.change_state(target) - log.info("no action event for %r", self.event) - return results - @classmethod def create(cls, settings: Union[Transition, dict]) -> Transition: """Create transition from configuration.""" @@ -119,16 +93,47 @@ def source(self, state: State) -> None: else: raise SuperstateException('cannot change source of transition') - def callback(self) -> Callable: - """Provide callback from source state when transition is called.""" - - def event(ctx: StateChart, *args: Any, **kwargs: Any) -> None: - """Provide callback event.""" - ctx.process_transitions(self.event, *args, **kwargs) - - event.__name__ = self.event - event.__doc__ = f"Transition event: '{self.event}'" - return event + def execute( + self, ctx: StateChart, *args: Any, **kwargs: Any + ) -> Optional[List[Any]]: + """Transition the state of the statechart.""" + log.info("executing transition contents for event %r", self.event) + results: Optional[List[Any]] = None + if self.content: + results = [] + provider = ctx.datamodel.provider(ctx) + for expression in tuplize(self.content): + results.append(provider.handle(expression, *args, **kwargs)) + log.info("completed transition contents for event %r", self.event) + relpath = ctx.get_relpath(self.target) + if relpath == '.': # handle self transition + ctx.current_state.run_on_exit(ctx) + ctx.current_state.run_on_entry(ctx) + else: + macrostep = relpath.split('.')[2 if relpath.endswith('.') else 1 :] + while macrostep[0] == '': # reverse + ctx.current_state.run_on_exit(ctx) + ctx.current_state = ctx.active[1] + macrostep.pop(0) + for microstep in macrostep: # forward + try: + if ( + # isinstance(ctx.current_state, State) + hasattr(ctx.current_state, 'states') + and microstep in ctx.current_state.states + ): + state = ctx.get_state(microstep) + ctx.current_state = state + state.run_on_entry(ctx) + else: + raise InvalidState( + f"statepath not found: {self.target}" + ) + except SuperstateException as err: + log.error(err) + raise KeyError('superstate is undefined') from err + log.info('changed state to %s', self.target) + return results def evaluate(self, ctx: StateChart, *args: Any, **kwargs: Any) -> bool: """Evaluate conditionss of transition.""" diff --git a/tests/state/test_change_notification.py b/tests/state/test_change_notification.py index 4731897..ee56759 100644 --- a/tests/state/test_change_notification.py +++ b/tests/state/test_change_notification.py @@ -8,11 +8,12 @@ class Door(StateChart): """Represent a door.""" - state = { - 'initial': 'closed', - 'states': [ + state = State( + name='door', + initial='closed', + states=[ State( - 'open', + name='open', transitions=[ Transition( event='close', @@ -39,7 +40,7 @@ class Door(StateChart): ], ), State( - 'closed', + name='closed', transitions=[ Transition( event='open', @@ -67,7 +68,7 @@ class Door(StateChart): ), State('broken'), ], - } + ) def __init__(self) -> None: super().__init__() diff --git a/tests/test_individuation.py b/tests/test_individuation.py index ee25cd1..42e1a65 100644 --- a/tests/test_individuation.py +++ b/tests/test_individuation.py @@ -25,9 +25,9 @@ class Door(StateChart): ) -def test_it_responds_to_an_event() -> None: - """Test door responds to an event.""" - assert hasattr(door.current_state, 'crack') +# def test_it_responds_to_an_event() -> None: +# """Test door responds to an event.""" +# assert hasattr(door.current_state, 'crack') def test_event_changes_state_when_called() -> None: diff --git a/tests/test_version.py b/tests/test_version.py index 76ec871..0ec3c0e 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -4,4 +4,4 @@ def test_version() -> None: """Test project metadata version.""" - assert __version__ == '1.6.2a1' + assert __version__ == '1.6.2a2' diff --git a/tests/transition/test_transition.py b/tests/transition/test_transition.py index 3f114d2..629fbff 100644 --- a/tests/transition/test_transition.py +++ b/tests/transition/test_transition.py @@ -34,17 +34,6 @@ class MyMachine(StateChart): } -def test_its_declaration_creates_a_method_with_its_name(): - machine = MyMachine() - assert hasattr(machine.current_state, 'queue') and callable( - machine.current_state.queue - ) - assert hasattr(machine.current_state, 'cancel') and callable( - machine.current_state.cancel - ) - machine.trigger('queue') - - def test_it_changes_machine_state(): machine = MyMachine() assert machine.current_state == 'created'