Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "superstate"
version = "1.6.2a1"
version = "1.6.2a2"
description = "Robust statechart for configurable automation rules."
readme = "README.md"
license = {file = "LICENSE"}
Expand Down
2 changes: 1 addition & 1 deletion src/superstate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
__author_email__ = 'jpj6652@gmail.com'
__title__ = 'superstate'
__description__ = 'Compact statechart that can be vendored.'
__version__ = '1.6.2a1'
__version__ = '1.6.2a2'
__license__ = 'MIT'
__copyright__ = 'Copyright 2022 Jesse Johnson.'
__all__ = (
Expand Down
118 changes: 46 additions & 72 deletions src/superstate/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Optional,
# Sequence,
Expand All @@ -31,10 +32,10 @@
from superstate.provider import PROVIDERS
from superstate.state import (
AtomicState,
SubstateMixin,
# CompoundState,
ParallelState,
State,
SubstateMixin,
)
from superstate.types import Selection

Expand Down Expand Up @@ -133,7 +134,8 @@ def __init__(
log.info('initializing statechart')

self._sessionid = UUID(
bytes=os.urandom(16), version=4 # pylint: disable=no-member
bytes=os.urandom(16),
version=4, # pylint: disable=no-member
)

self.datamodel.populate()
Expand Down Expand Up @@ -170,18 +172,9 @@ def __getattr__(self, name: str) -> Any:
# do not attempt to resolve missing dunders
if name.startswith('__'):
raise AttributeError

# handle state check for active states
if name.startswith('is_'):
return name[3:] in self.active

# handle automatic transitions
if name == '_auto_':

def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
return self.trigger('', *args, **kwargs)

return wrapper
raise AttributeError(f"cannot find attribute: {name}")

@property
Expand All @@ -202,6 +195,18 @@ def current_state(self) -> State:
# TODO: rename to head or position potentially
return self.__current_state

@current_state.setter
def current_state(self, state: State) -> None:
"""Set the current state."""
if hasattr(self.current_state, 'states'):
if state in list(self.current_state.states.values()):
self.__current_state = state
return
if len(self.active) > 1 and self.active[1] == state:
self.__current_state = state
else:
raise InvalidTransition('cannot transition from final state')

@property
def root(self) -> SubstateMixin:
"""Return root state of statechart."""
Expand Down Expand Up @@ -342,9 +347,7 @@ def get_state(self, statepath: str) -> State:
break
raise InvalidState(f"state could not be found: {statepath}")

def add_state(
self, state: State, statepath: Optional[str] = None
) -> None:
def add_state(self, state: State, statepath: Optional[str] = None) -> None:
"""Add state to either parent or target state."""
parent = self.get_state(statepath) if statepath else self.parent
if isinstance(parent, SubstateMixin):
Expand All @@ -356,13 +359,13 @@ def add_state(
)

@property
def transitions(self) -> Tuple[Transition, ...]:
def transitions(self) -> Iterator[Transition]:
"""Return list of current transitions."""
return (
tuple(self.current_state.transitions)
if hasattr(self.current_state, 'transitions')
else ()
)
for state in self.active:
if hasattr(state, 'transitions'):
yield from state.transitions
else:
continue

def add_transition(
self, transition: Transition, statepath: Optional[str] = None
Expand All @@ -375,57 +378,28 @@ def add_transition(
else:
raise InvalidState('cannot add transition to %s', target)

@staticmethod
def _lookup_transitions(event: str, state: State) -> List[Transition]:
return (
state.get_transition(event)
if hasattr(state, 'get_transition')
else []
)
def get_transitions(self, event: str) -> tuple[Transition, ...]:
"""Get each transition maching event."""
return tuple(filter(lambda t: t.event == event, self.transitions))

def process_transitions(
self, event: str, /, *args: Any, **kwargs: Any
) -> Transition:
"""Get transition event from active states."""
# TODO: must use datamodel to process transitions
# child => parent => grandparent
guarded: List[Transition] = []
for current in self.active:
transitions: List[Transition] = []

# search parallel states for transitions
if isinstance(current, ParallelState):
for state in current.states.values():
transitions += self._lookup_transitions(event, state)
else:
transitions = self._lookup_transitions(event, current)

# evaluate conditions
allowed = [
t for t in transitions if t.evaluate(self, *args, **kwargs)
]
if allowed:
# if len(allowed) > 1:
# raise InvalidConfig(
# 'Conflicting transitions were allowed for event',
# event
# )
return allowed[0]
guarded += transitions
if len(guarded) != 0:
raise ConditionNotSatisfied('no transition possible from state')
raise InvalidTransition(f"transition could not be found: {event}")

def trigger(
self, event: str, /, *args: Any, **kwargs: Any
) -> Optional[Any]:
def trigger(self, event: str, /, *args: Any, **kwargs: Any) -> None:
"""Transition from event to target state."""
# NOTE: 'on' should register an event with an event loop for callback
# trigger
# XXX: currently does not allow contional transient states
transition = self.process_transitions(event, *args, **kwargs)
if transition:
log.info('transitioning to %r', event)
result = transition(self, *args, **kwargs)
return result
raise InvalidTransition('transition %r not found', event)
# TODO: need to consider superstate transitions.
if self.current_state.type == 'final':
raise InvalidTransition('cannot transition from final state')

transitions = self.get_transitions(event)
if not transitions:
raise InvalidTransition('no transitions match event')
allowed = [t for t in transitions if t.evaluate(self, *args, **kwargs)]
if not allowed:
raise ConditionNotSatisfied(
'Condition is not satisfied for this transition'
)
if len(allowed) > 1:
raise InvalidTransition(
'More than one transition was allowed for this event'
)
log.info('processed guard for %s', allowed[0].event)
allowed[0].execute(self, *args, **kwargs)
log.info('processed transition event %s', allowed[0].event)
30 changes: 9 additions & 21 deletions src/superstate/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,6 @@ class TransitionMixin:

__transitions: List[Transition]

def __register_transition_callback(self, transition: Transition) -> None:
# XXX: currently mapping to class instead of instance
setattr(
self,
transition.event if transition.event != '' else '_auto_',
# pylint: disable-next=unnecessary-dunder-call
transition.callback().__get__(self, self.__class__),
)

def _process_transient_state(self, ctx: StateChart) -> None:
for transition in self.transitions:
if transition.event == '':
ctx._auto_() # pylint: disable=protected-access
break

@property
def transitions(self) -> Tuple[Transition, ...]:
"""Return transitions of this state."""
Expand All @@ -96,13 +81,10 @@ def transitions(self) -> Tuple[Transition, ...]:
def transitions(self, transitions: List[Transition]) -> None:
"""Initialize atomic state."""
self.__transitions = transitions
for transition in self.transitions:
self.__register_transition_callback(transition)

def add_transition(self, transition: Transition) -> None:
"""Add transition to this state."""
self.__transitions.append(transition)
self.__register_transition_callback(transition)

def get_transition(self, event: str) -> Tuple[Transition, ...]:
"""Get each transition maching event."""
Expand Down Expand Up @@ -184,7 +166,7 @@ class State:
name: str = cast(str, Identifier())
# history: Optional['HistoryState']
# final: Optional[FinalState]
states: Dict[str, State]
# states: Dict[str, State]
# transitions: Tuple[Transition, ...]
# onentry: Tuple[ActionTypes, ...]
# onexit: Tuple[ActionTypes, ...]
Expand Down Expand Up @@ -493,8 +475,14 @@ def run_on_entry(self, ctx: StateChart) -> Optional[Any]:
self.datamodel, 'maps'
):
self.datamodel.populate()
self._process_transient_state(ctx)
return super().run_on_entry(ctx)
log.info("executing 'on_entry' state change actions for %s", self.name)
results = super().run_on_entry(ctx)
# process transient states
for transition in self.transitions:
if transition.event == '':
ctx.trigger(transition.event)
break
return results


class SubstateMixin(State):
Expand Down
79 changes: 42 additions & 37 deletions src/superstate/transition.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Transition:
name. In all cases, the token matching is case sensitive.]
"""

# __slots__ = ['event', 'target', 'action', 'cond', 'type']
# __slots__ = ['event', 'target', 'content', 'cond', 'type']

__source: Optional[State] = None
event: str = cast(str, Identifier(TRANSITION_PATTERN))
Expand All @@ -57,32 +57,6 @@ def __init__(
def __repr__(self) -> str:
return repr(f"Transition(event={self.event}, target={self.target})")

def __call__(
self, ctx: StateChart, *args: Any, **kwargs: Any
) -> Optional[Any]:
"""Run transition process."""
# TODO: move change_state to process_transitions
if 'statepath' in kwargs:
superstate_path = kwargs['statepath'].split('.')[:-1]
target = (
'.'.join(superstate_path + [self.target])
if superstate_path != []
else self.target
)
else:
target = self.target

results = None
if self.content:
results = []
provider = ctx.datamodel.provider(ctx)
for expression in tuplize(self.content):
results.append(provider.handle(expression, *args, **kwargs))
log.info("executed action event for %r", self.event)
ctx.change_state(target)
log.info("no action event for %r", self.event)
return results

@classmethod
def create(cls, settings: Union[Transition, dict]) -> Transition:
"""Create transition from configuration."""
Expand Down Expand Up @@ -119,16 +93,47 @@ def source(self, state: State) -> None:
else:
raise SuperstateException('cannot change source of transition')

def callback(self) -> Callable:
"""Provide callback from source state when transition is called."""

def event(ctx: StateChart, *args: Any, **kwargs: Any) -> None:
"""Provide callback event."""
ctx.process_transitions(self.event, *args, **kwargs)

event.__name__ = self.event
event.__doc__ = f"Transition event: '{self.event}'"
return event
def execute(
self, ctx: StateChart, *args: Any, **kwargs: Any
) -> Optional[List[Any]]:
"""Transition the state of the statechart."""
log.info("executing transition contents for event %r", self.event)
results: Optional[List[Any]] = None
if self.content:
results = []
provider = ctx.datamodel.provider(ctx)
for expression in tuplize(self.content):
results.append(provider.handle(expression, *args, **kwargs))
log.info("completed transition contents for event %r", self.event)
relpath = ctx.get_relpath(self.target)
if relpath == '.': # handle self transition
ctx.current_state.run_on_exit(ctx)
ctx.current_state.run_on_entry(ctx)
else:
macrostep = relpath.split('.')[2 if relpath.endswith('.') else 1 :]
while macrostep[0] == '': # reverse
ctx.current_state.run_on_exit(ctx)
ctx.current_state = ctx.active[1]
macrostep.pop(0)
for microstep in macrostep: # forward
try:
if (
# isinstance(ctx.current_state, State)
hasattr(ctx.current_state, 'states')
and microstep in ctx.current_state.states
):
state = ctx.get_state(microstep)
ctx.current_state = state
state.run_on_entry(ctx)
else:
raise InvalidState(
f"statepath not found: {self.target}"
)
except SuperstateException as err:
log.error(err)
raise KeyError('superstate is undefined') from err
log.info('changed state to %s', self.target)
return results

def evaluate(self, ctx: StateChart, *args: Any, **kwargs: Any) -> bool:
"""Evaluate conditionss of transition."""
Expand Down
13 changes: 7 additions & 6 deletions tests/state/test_change_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
class Door(StateChart):
"""Represent a door."""

state = {
'initial': 'closed',
'states': [
state = State(
name='door',
initial='closed',
states=[
State(
'open',
name='open',
transitions=[
Transition(
event='close',
Expand All @@ -39,7 +40,7 @@ class Door(StateChart):
],
),
State(
'closed',
name='closed',
transitions=[
Transition(
event='open',
Expand Down Expand Up @@ -67,7 +68,7 @@ class Door(StateChart):
),
State('broken'),
],
}
)

def __init__(self) -> None:
super().__init__()
Expand Down
Loading
Loading