"""Topological verification algorithms for :mod:`pyfcstm.verify`.
This module implements the group-1 verification algorithms that work only on
the hierarchical state topology. The algorithms project a
:class:`pyfcstm.model.StateMachine` into a leaf-level macro graph and then run
structural graph queries over that projection. They intentionally ignore guard
satisfiability, event availability, transition ordering, and runtime variable
evolution; positive reachability results are therefore over-approximations of
runtime behavior, while unreachable results describe structural topology facts.
The module contains:
* :class:`LeafLevelGraph` - Immutable container for the projected leaf graph.
* :class:`FinitenessReport` - Report object for
:func:`topological_finite`.
* :class:`InevitabilityReport` - Report object for
:func:`topological_inevitable_terminator`.
* :func:`build_leaf_level_macro_graph` - Build the guard-agnostic projection.
* :func:`topological_reachable_set` - Compute reachable leaf states.
* :func:`unreachable_states` - Find structurally unreachable leaf states.
* :func:`strongly_connected_components` - Find cyclic leaf-level regions.
* :func:`topological_finite` - Detect reachable no-exit regions.
* :func:`topological_inevitable_terminator` - Detect non-terminating topology.
* :func:`event_emission_to_consumer_reachable` - Detect events whose consumers
are all unreachable.
.. note::
The graph projection is independent from the diagnostics package.
Diagnostic integration is expected to wrap these functions and translate
their plain Python return values into diagnostic payloads.
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> ast = parse_with_grammar_entry("state Root;", "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> graph = build_leaf_level_macro_graph(machine)
>>> graph.nodes
('Root',)
>>> graph.edges[EXIT_ROOT_SINK]
()
"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass
from types import MappingProxyType
from typing import (
TYPE_CHECKING,
Deque,
Dict,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
)
try:
from typing import Literal
except ImportError: # pragma: no cover - Python < 3.8 compatibility
from typing_extensions import Literal
from ..dsl import EXIT_STATE, INIT_STATE
if TYPE_CHECKING: # pragma: no cover - imports used only by static type checkers
from ..model import State, StateMachine
EXIT_ROOT_SINK = "⊥_root"
"""Synthetic sink used for transitions that exit the root state."""
[docs]
@dataclass(frozen=True)
class LeafLevelGraph:
"""
Leaf-level macro graph projected from a hierarchical state machine.
Each node is a dotted path for a leaf state in the source model. Edges
describe guard-agnostic macro steps between leaf states after expanding
composite targets through their initial transitions and bubbling leaf exits
through parent transitions. The synthetic :data:`EXIT_ROOT_SINK` key is
present in :attr:`edges` so callers can query root termination uniformly.
:param nodes: Dotted paths of leaf states in stable sorted order.
:type nodes: Tuple[str, ...]
:param edges: Mapping from each leaf path, and optionally
:data:`EXIT_ROOT_SINK`, to sorted successor paths. The mapping is copied
into a read-only proxy during initialization.
:type edges: Mapping[str, Tuple[str, ...]]
Example::
>>> graph = LeafLevelGraph(
... nodes=("Root.Idle",),
... edges={"Root.Idle": (EXIT_ROOT_SINK,), EXIT_ROOT_SINK: ()},
... )
>>> graph.nodes
('Root.Idle',)
>>> graph.edges["Root.Idle"]
('⊥_root',)
>>> graph.edges["Root.Idle"] = ("Root.Other",)
Traceback (most recent call last):
...
TypeError: 'mappingproxy' object does not support item assignment
"""
nodes: Tuple[str, ...]
edges: Mapping[str, Tuple[str, ...]]
[docs]
def __post_init__(self) -> None:
"""
Freeze the edge mapping after dataclass initialization.
:return: ``None``.
:rtype: None
Example::
>>> graph = LeafLevelGraph(nodes=("Root.A",), edges={"Root.A": ()})
>>> type(graph.edges).__name__
'mappingproxy'
"""
object.__setattr__(self, "edges", MappingProxyType(dict(self.edges)))
[docs]
@dataclass(frozen=True)
class FinitenessReport:
"""
Result for :func:`topological_finite`.
The report is true when every root-reachable leaf can eventually reach the
synthetic root-exit sink in the topology graph. Counterexamples are plain
tuples so later diagnostic layers can choose their own payload shape without
coupling the topology package to the diagnostics package.
:param finite: ``True`` when every root-reachable leaf can reach the root
exit sink in the topological graph.
:type finite: bool
:param counterexamples: Counterexamples as ``('trap_cycle', scc_tuple)`` or
``('deadlock', state_path)`` entries.
:type counterexamples: Tuple[Tuple[Literal['trap_cycle', 'deadlock'], object], ...]
Example::
>>> report = FinitenessReport(finite=False, counterexamples=(
... ("deadlock", "Root.Stuck"),
... ))
>>> report.finite
False
>>> report.counterexamples[0][0]
'deadlock'
"""
finite: bool
counterexamples: Tuple[Tuple[Literal["trap_cycle", "deadlock"], object], ...]
[docs]
@dataclass(frozen=True)
class InevitabilityReport:
"""
Result for :func:`topological_inevitable_terminator`.
The report is true only when the topology has no root-reachable cycle or
deadlock that could keep execution away from the root terminator forever.
A false report contains one representative path or SCC; it is not intended
to enumerate every possible non-terminating region.
:param inevitable: ``True`` when no root-reachable topological path can stay
in a cycle or deadlock forever.
:type inevitable: bool
:param counterexample_path: Representative non-terminating SCC or deadlock
path, or ``None`` when :attr:`inevitable` is true.
:type counterexample_path: Optional[Tuple[str, ...]]
Example::
>>> report = InevitabilityReport(
... inevitable=False,
... counterexample_path=("Root.Loop",),
... )
>>> report.inevitable
False
>>> report.counterexample_path
('Root.Loop',)
"""
inevitable: bool
counterexample_path: Optional[Tuple[str, ...]]
def _state_path(state: State) -> str:
"""Return the dotted path for a model state.
:param state: State whose root-to-node path should be rendered.
:type state: State
:return: Dotted state path.
:rtype: str
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> ast = parse_with_grammar_entry("state Root;", "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> _state_path(machine.root_state)
'Root'
"""
return ".".join(state.path)
def _leaf_states(machine: StateMachine) -> Tuple[State, ...]:
"""Return every leaf state in model traversal order.
:param machine: State machine to inspect.
:type machine: StateMachine
:return: Leaf states yielded by :meth:`StateMachine.walk_states`.
:rtype: Tuple[State, ...]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state B;
... [*] -> A;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> tuple(_state_path(state) for state in _leaf_states(machine))
('Root.A', 'Root.B')
"""
return tuple(state for state in machine.walk_states() if state.is_leaf_state)
def _dedupe_sorted(items: Iterable[str]) -> Tuple[str, ...]:
"""Return unique strings in deterministic sorted order.
:param items: String items that may contain duplicates.
:type items: Iterable[str]
:return: Sorted tuple with duplicates removed.
:rtype: Tuple[str, ...]
Example::
>>> _dedupe_sorted(["b", "a", "b"])
('a', 'b')
"""
return tuple(sorted(set(items)))
def _successors(edges: Mapping[str, Tuple[str, ...]], node: str) -> Tuple[str, ...]:
"""Return graph successors for ``node`` with a total empty default.
:param edges: Mapping from node path to successor node paths.
:type edges: Mapping[str, Tuple[str, ...]]
:param node: Node to query.
:type node: str
:return: Successor tuple, or ``()`` when ``node`` is absent.
:rtype: Tuple[str, ...]
Example::
>>> _successors({"A": ("B",)}, "A")
('B',)
>>> _successors({"A": ("B",)}, "missing")
()
"""
return edges.get(node, tuple())
def _project_target(parent_state: State, target: object) -> Tuple[str, ...]:
"""Project a transition target into leaf-level graph successors.
:param parent_state: State that owns the transition target.
:type parent_state: State
:param target: Transition target, either a substate name or
:data:`EXIT_STATE`.
:type target: object
:return: Leaf-level successor paths reached by taking the target.
:rtype: Tuple[str, ...]
:raises TypeError: If ``target`` is not an FCSTM transition endpoint shape.
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state B;
... [*] -> A;
... }
... '''
>>> machine = parse_dsl_node_to_state_machine(
... parse_with_grammar_entry(source, "state_machine_dsl")
... )
>>> _project_target(machine.root_state, "B")
('Root.B',)
"""
if target is EXIT_STATE:
if parent_state.is_root_state:
return (EXIT_ROOT_SINK,)
parent_parent = parent_state.parent
assert parent_parent is not None
projected: List[str] = []
for transition in parent_state.transitions_from:
if transition.to_state is EXIT_STATE:
projected.extend(_project_target(parent_parent, EXIT_STATE))
else:
projected.extend(_project_target(parent_parent, transition.to_state))
return _dedupe_sorted(projected)
if isinstance(target, str):
target_state = parent_state.substates[target]
return _initial_leaf_targets(target_state)
raise TypeError( # pragma: no cover
# Grammar-produced model transitions use only string state names and
# INIT/EXIT singletons. This guard exposes future model extensions
# loudly instead of silently treating an unknown endpoint as safe.
f"Unsupported transition target: {target!r}."
)
def _initial_leaf_targets(state: State) -> Tuple[str, ...]:
"""Project entering ``state`` to the leaves reached by initial descent.
:param state: State being entered by a transition or root initialization.
:type state: State
:return: Leaf paths reached after following initial transitions.
:rtype: Tuple[str, ...]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> ast = parse_with_grammar_entry("state Root { state A; [*] -> A; }", "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> _initial_leaf_targets(machine.root_state)
('Root.A',)
"""
if state.is_leaf_state:
return (_state_path(state),)
projected: List[str] = []
for transition in state.init_transitions:
projected.extend(_project_target(state, transition.to_state))
return _dedupe_sorted(projected)
[docs]
def build_leaf_level_macro_graph(machine: StateMachine) -> LeafLevelGraph:
"""
Build the guard-agnostic leaf-level macro graph for ``machine``.
The projection follows normal leaf-to-leaf transitions, expands composite
targets through their initial transitions, and bubbles ``Leaf -> [*]``
transitions through parent outgoing transitions. Parent-level transitions
whose source is a composite state are therefore considered only after a
descendant leaf explicitly exits to that parent; they are not copied onto
every active descendant leaf. If a non-root parent has no outgoing
transition, the bubble is an off-cliff exit and contributes no edge. A root
leaf is treated as root-exit-capable by adding an edge to
:data:`EXIT_ROOT_SINK`.
:param machine: State machine to project.
:type machine: StateMachine
:return: Leaf-level macro graph.
:rtype: LeafLevelGraph
:raises TypeError: If a transition endpoint uses a model object type that
is not produced by the FCSTM grammar pipeline.
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state Idle;
... [*] -> Idle;
... Idle -> [*];
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> graph = build_leaf_level_macro_graph(machine)
>>> graph.edges["Root.Idle"]
('⊥_root',)
"""
leaves = _leaf_states(machine)
nodes = tuple(sorted(_state_path(state) for state in leaves))
edge_sets: Dict[str, Set[str]] = {node: set() for node in nodes}
for parent_state in machine.walk_states():
for transition in parent_state.transitions:
if transition.from_state is INIT_STATE:
continue
if not isinstance(transition.from_state, str):
raise TypeError( # pragma: no cover
# Grammar-produced non-init transition sources are strings.
# A future model endpoint type should be made explicit here.
f"Unsupported transition source: {transition.from_state!r}."
)
source_state = parent_state.substates[transition.from_state]
if not source_state.is_leaf_state:
continue
source_path = _state_path(source_state)
edge_sets.setdefault(source_path, set()).update(
_project_target(parent_state, transition.to_state)
)
if machine.root_state.is_leaf_state:
edge_sets.setdefault(_state_path(machine.root_state), set()).add(EXIT_ROOT_SINK)
edges = {node: tuple(sorted(targets)) for node, targets in edge_sets.items()}
edges.setdefault(EXIT_ROOT_SINK, tuple())
return LeafLevelGraph(nodes=nodes, edges=edges)
def _closure_from(
edges: Mapping[str, Tuple[str, ...]],
starts: Iterable[str],
) -> Set[str]:
"""Return the directed reachability closure from start nodes.
The traversal is breadth-first and treats missing edge entries as nodes
with no outgoing successors, matching the helper's use on projected graph
views and reverse graph views.
:param edges: Mapping from node path to successor node paths.
:type edges: Mapping[str, Tuple[str, ...]]
:param starts: Initial nodes for the closure.
:type starts: Iterable[str]
:return: Set containing all reachable nodes, including the starts.
:rtype: Set[str]
Example::
>>> sorted(_closure_from({"A": ("B",), "B": ("C",)}, ["A"]))
['A', 'B', 'C']
"""
seen: Set[str] = set()
queue: Deque[str] = deque(starts)
while queue:
node = queue.popleft()
if node in seen:
continue
seen.add(node)
for successor in _successors(edges, node):
if successor not in seen:
queue.append(successor)
return seen
[docs]
def topological_reachable_set(machine: StateMachine) -> Dict[str, Tuple[str, ...]]:
"""
Compute guard-agnostic reachable leaf states for every model state.
Composite states are first projected through their initial descent and then
closed over the leaf-level macro graph. Leaf-state results omit the leaf
itself, matching the existing inspect reachability convention for cycles.
The returned mapping is keyed by dotted state paths for all states, not only
leaves.
:param machine: State machine to inspect.
:type machine: StateMachine
:return: Mapping from every state path to sorted reachable leaf paths.
:rtype: Dict[str, Tuple[str, ...]]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state B;
... [*] -> A;
... A -> B;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> topological_reachable_set(machine)["Root.A"]
('Root.B',)
"""
graph = build_leaf_level_macro_graph(machine)
result: Dict[str, Tuple[str, ...]] = {}
for state in machine.walk_states():
state_path = _state_path(state)
starts = _initial_leaf_targets(state)
reachable = _closure_from(graph.edges, starts)
reachable.discard(EXIT_ROOT_SINK)
if state.is_leaf_state:
reachable.discard(state_path)
result[state_path] = tuple(sorted(reachable))
return result
[docs]
def unreachable_states(machine: StateMachine) -> Tuple[str, ...]:
"""
Return non-pseudo leaf states unreachable from the root topology.
The result uses topology reachability from the root initial descent and
filters out pseudo leaf states. Composite states are not returned directly;
their unreachable leaf descendants are reported instead.
:param machine: State machine to inspect.
:type machine: StateMachine
:return: Sorted unreachable leaf state paths.
:rtype: Tuple[str, ...]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state Lost;
... [*] -> A;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> unreachable_states(machine)
('Root.Lost',)
"""
root_path = _state_path(machine.root_state)
reachable = set(topological_reachable_set(machine).get(root_path, tuple()))
reachable.add(root_path)
return tuple(
sorted(
_state_path(state)
for state in machine.walk_states()
if state.is_leaf_state
and not state.is_pseudo
and _state_path(state) not in reachable
)
)
def _scc_components(
nodes: Sequence[str], edges: Mapping[str, Tuple[str, ...]]
) -> Tuple[Tuple[str, ...], ...]:
"""Return strongly connected components using an iterative Kosaraju pass.
Components are sorted internally and the returned tuple is sorted for
deterministic tests and diagnostics. The synthetic root-exit sink is not
included unless the caller includes it in ``nodes``.
:param nodes: Graph nodes to decompose.
:type nodes: Sequence[str]
:param edges: Directed graph edges.
:type edges: Mapping[str, Tuple[str, ...]]
:return: Sorted strongly connected components.
:rtype: Tuple[Tuple[str, ...], ...]
Example::
>>> _scc_components(("A", "B", "C"), {"A": ("B",), "B": ("A",)})
(('A', 'B'), ('C',))
"""
node_set = set(nodes)
ordered_nodes = sorted(node_set)
visited: Set[str] = set()
finish_order: List[str] = []
for start in ordered_nodes:
if start in visited:
continue
visit_stack: List[Tuple[str, bool]] = [(start, False)]
while visit_stack:
node, expanded = visit_stack.pop()
if expanded:
finish_order.append(node)
continue
if node in visited:
continue
visited.add(node)
visit_stack.append((node, True))
successors = [
successor
for successor in edges.get(node, tuple())
if successor in node_set and successor not in visited
]
for successor in reversed(sorted(successors)):
visit_stack.append((successor, False))
reverse_edges: Dict[str, List[str]] = {node: [] for node in ordered_nodes}
for source in ordered_nodes:
for target in edges.get(source, tuple()):
if target in node_set:
reverse_edges[target].append(source)
components: List[Tuple[str, ...]] = []
assigned: Set[str] = set()
for start in reversed(finish_order):
if start in assigned:
continue
component: List[str] = []
collect_stack = [start]
assigned.add(start)
while collect_stack:
node = collect_stack.pop()
component.append(node)
for predecessor in reversed(sorted(reverse_edges.get(node, []))):
if predecessor not in assigned:
assigned.add(predecessor)
collect_stack.append(predecessor)
components.append(tuple(sorted(component)))
return tuple(sorted(components))
def _is_cyclic_component(
component: Tuple[str, ...],
edges: Mapping[str, Tuple[str, ...]],
) -> bool:
"""Return whether an SCC represents a non-trivial cycle.
A component with multiple nodes is cyclic by definition. A singleton is
cyclic only when it has a self-loop.
:param component: Strongly connected component to classify.
:type component: Tuple[str, ...]
:param edges: Directed graph edges.
:type edges: Mapping[str, Tuple[str, ...]]
:return: ``True`` if the component contains a cycle.
:rtype: bool
Example::
>>> _is_cyclic_component(("A", "B"), {})
True
>>> _is_cyclic_component(("A",), {"A": ("A",)})
True
>>> _is_cyclic_component(("A",), {"A": ("B",)})
False
"""
if len(component) > 1:
return True
if len(component) == 0: # pragma: no cover
# SCC decomposition only emits non-empty components. Keep this guard so
# the helper remains total if it is reused with hand-built component
# data.
return False
node = component[0]
return node in edges.get(node, tuple())
[docs]
def strongly_connected_components(machine: StateMachine) -> Tuple[Tuple[str, ...], ...]:
"""
Return non-trivial SCCs in the leaf-level topology graph.
A non-trivial SCC is either a component with more than one leaf or a single
leaf with a self-loop. The implementation is iterative so deeply nested or
wide generated models are not limited by Python's recursion depth.
:param machine: State machine to inspect.
:type machine: StateMachine
:return: Sorted non-trivial SCCs.
:rtype: Tuple[Tuple[str, ...], ...]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state B;
... [*] -> A;
... A -> B;
... B -> A;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> strongly_connected_components(machine)
(('Root.A', 'Root.B'),)
"""
graph = build_leaf_level_macro_graph(machine)
return tuple(
component
for component in _scc_components(graph.nodes, graph.edges)
if _is_cyclic_component(component, graph.edges)
)
def _root_reachable_leaf_paths(
machine: StateMachine,
graph: LeafLevelGraph,
) -> Set[str]:
"""Return leaf paths reachable from the root initial descent.
:param machine: State machine that produced ``graph``.
:type machine: StateMachine
:param graph: Leaf-level macro graph for ``machine``.
:type graph: LeafLevelGraph
:return: Root-reachable leaf paths, excluding the root-exit sink.
:rtype: Set[str]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... [*] -> A;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> graph = build_leaf_level_macro_graph(machine)
>>> sorted(_root_reachable_leaf_paths(machine, graph))
['Root.A']
"""
reachable = _closure_from(graph.edges, _initial_leaf_targets(machine.root_state))
reachable.discard(EXIT_ROOT_SINK)
return reachable
def _nodes_that_can_reach_sink(graph: LeafLevelGraph) -> Set[str]:
"""Return graph nodes that can reach the synthetic root-exit sink.
:param graph: Leaf-level macro graph.
:type graph: LeafLevelGraph
:return: Node paths with at least one path to :data:`EXIT_ROOT_SINK`.
:rtype: Set[str]
Example::
>>> graph = LeafLevelGraph(
... nodes=("A", "B"),
... edges={"A": ("B",), "B": (EXIT_ROOT_SINK,), EXIT_ROOT_SINK: ()},
... )
>>> sorted(_nodes_that_can_reach_sink(graph))
['A', 'B']
"""
reverse_edges: Dict[str, Set[str]] = {node: set() for node in graph.nodes}
reverse_edges[EXIT_ROOT_SINK] = set()
for source, targets in graph.edges.items():
for target in targets:
reverse_edges.setdefault(target, set()).add(source)
can_reach = _closure_from(
{node: tuple(sorted(targets)) for node, targets in reverse_edges.items()},
(EXIT_ROOT_SINK,),
)
can_reach.discard(EXIT_ROOT_SINK)
return can_reach
[docs]
def topological_finite(machine: StateMachine) -> FinitenessReport:
"""
Check whether all root-reachable leaves can reach the root exit sink.
This check accepts models where every reachable leaf has at least one
topological route to termination, even if runtime guards could block that
route. It reports closed trap cycles that cannot reach the root sink and
deadlock leaves with no outgoing projected edge.
:param machine: State machine to inspect.
:type machine: StateMachine
:return: Finiteness report with trap-cycle and deadlock counterexamples.
:rtype: FinitenessReport
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state B;
... [*] -> A;
... A -> B;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> report = topological_finite(machine)
>>> report.finite
False
>>> report.counterexamples
(('deadlock', 'Root.B'),)
"""
graph = build_leaf_level_macro_graph(machine)
reachable = _root_reachable_leaf_paths(machine, graph)
can_reach_sink = _nodes_that_can_reach_sink(graph)
counterexamples: List[Tuple[Literal["trap_cycle", "deadlock"], object]] = []
for component in strongly_connected_components(machine):
if not reachable.intersection(component):
continue
if not any(node in can_reach_sink for node in component):
counterexamples.append(("trap_cycle", component))
for node in sorted(reachable):
if node in can_reach_sink:
continue
if len(graph.edges.get(node, tuple())) == 0:
counterexamples.append(("deadlock", node))
return FinitenessReport(
finite=not counterexamples,
counterexamples=tuple(counterexamples),
)
[docs]
def topological_inevitable_terminator(machine: StateMachine) -> InevitabilityReport:
"""
Check whether every topological path must eventually terminate.
This is an intentionally structural check: any root-reachable deadlock or
cycle is enough to produce a non-inevitability counterexample, even when
another outgoing edge could exit at runtime.
:param machine: State machine to inspect.
:type machine: StateMachine
:return: Inevitability report.
:rtype: InevitabilityReport
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... [*] -> A;
... A -> A;
... A -> [*];
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> report = topological_inevitable_terminator(machine)
>>> report.inevitable
False
>>> report.counterexample_path
('Root.A',)
"""
graph = build_leaf_level_macro_graph(machine)
reachable = _root_reachable_leaf_paths(machine, graph)
for component in strongly_connected_components(machine):
if reachable.intersection(component):
return InevitabilityReport(
inevitable=False,
counterexample_path=component,
)
for node in sorted(reachable):
if len(graph.edges.get(node, tuple())) == 0:
return InevitabilityReport(
inevitable=False,
counterexample_path=(node,),
)
return InevitabilityReport(inevitable=True, counterexample_path=None)
def _root_reachable_initial_state_paths(
machine: StateMachine,
graph: LeafLevelGraph,
) -> Set[str]:
"""Return composite paths whose initial transitions are root-reachable.
The result contains the root state and any composite state whose initial
leaf targets intersect the root-reachable leaf set.
:param machine: State machine that produced ``graph``.
:type machine: StateMachine
:param graph: Leaf-level macro graph for ``machine``.
:type graph: LeafLevelGraph
:return: Dotted paths of root-reachable initial-transition owners.
:rtype: Set[str]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> ast = parse_with_grammar_entry("state Root;", "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> graph = build_leaf_level_macro_graph(machine)
>>> sorted(_root_reachable_initial_state_paths(machine, graph))
['Root']
"""
reachable_leaves = _root_reachable_leaf_paths(machine, graph)
reachable_paths = {_state_path(machine.root_state)}
for state in machine.walk_states():
if state.is_leaf_state:
continue
if reachable_leaves.intersection(_initial_leaf_targets(state)):
reachable_paths.add(_state_path(state))
return reachable_paths
def _root_reachable_boundary_state_paths(
machine: StateMachine,
graph: LeafLevelGraph,
) -> Set[str]:
"""Return composite boundary states reachable through leaf exits.
Parent-level outgoing transitions are considered selectable only when a
reachable descendant leaf explicitly exits to that composite boundary. This
helper computes those boundary paths for event-consumer reachability.
:param machine: State machine that produced ``graph``.
:type machine: StateMachine
:param graph: Leaf-level macro graph for ``machine``.
:type graph: LeafLevelGraph
:return: Composite state paths reachable as leaf-exit boundaries.
:rtype: Set[str]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state Parent {
... state A;
... [*] -> A;
... A -> [*];
... }
... state Done;
... [*] -> Parent;
... Parent -> Done;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> graph = build_leaf_level_macro_graph(machine)
>>> sorted(_root_reachable_boundary_state_paths(machine, graph))
['Root.Parent']
"""
reachable_leaves = _root_reachable_leaf_paths(machine, graph)
boundary_paths: Set[str] = set()
queue: Deque[State] = deque()
def add_boundary(state: State) -> None:
"""Record a boundary state once and queue it for upward bubbling.
:param state: Composite boundary state reached by a descendant exit.
:type state: State
:return: ``None``.
:rtype: None
Example::
>>> # This nested helper is exercised through
>>> # _root_reachable_boundary_state_paths().
>>> True
True
"""
state_path = _state_path(state)
if state_path in boundary_paths:
return
boundary_paths.add(state_path)
queue.append(state)
for leaf in _leaf_states(machine):
if _state_path(leaf) not in reachable_leaves:
continue
if not any(transition.to_state is EXIT_STATE for transition in leaf.transitions_from):
continue
parent = leaf.parent
if parent is not None and not parent.is_root_state:
add_boundary(parent)
while queue:
state = queue.popleft()
for transition in state.transitions_from:
if transition.to_state is not EXIT_STATE:
continue
parent = state.parent
if parent is not None and not parent.is_root_state:
add_boundary(parent)
return boundary_paths
def _event_consumer_reachability(
machine: StateMachine,
graph: LeafLevelGraph,
) -> Dict[str, List[bool]]:
"""Return reachability booleans for every used event consumer.
Each event maps to one boolean per transition that consumes the event.
Initial-transition consumers are checked against reachable composite entry
points, leaf-source consumers against reachable active leaves, and
composite-source consumers against reachable leaf-exit boundaries.
:param machine: State machine that produced ``graph``.
:type machine: StateMachine
:param graph: Leaf-level macro graph for ``machine``.
:type graph: LeafLevelGraph
:return: Mapping from qualified event name to consumer-source reachability
flags.
:rtype: Dict[str, List[bool]]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... event Go;
... state A;
... [*] -> A;
... A -> A : Go;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> graph = build_leaf_level_macro_graph(machine)
>>> _event_consumer_reachability(machine, graph)
{'Root.Go': [True]}
"""
active_leaves = _root_reachable_leaf_paths(machine, graph)
init_sources = _root_reachable_initial_state_paths(machine, graph)
boundary_sources = _root_reachable_boundary_state_paths(machine, graph)
consumers: Dict[str, List[bool]] = {}
for parent_state in machine.walk_states():
parent_path = _state_path(parent_state)
for transition in parent_state.transitions:
event = transition.event
if event is None:
continue
event_name = event.path_name
if transition.from_state is INIT_STATE:
reachable = parent_path in init_sources
elif isinstance(transition.from_state, str):
source_state = parent_state.substates[transition.from_state]
source_path = _state_path(source_state)
if source_state.is_leaf_state:
reachable = source_path in active_leaves
else:
reachable = source_path in boundary_sources
else:
raise TypeError( # pragma: no cover
# Grammar-produced non-init transition sources are strings.
# A future model endpoint type should be made explicit here.
f"Unsupported transition source: {transition.from_state!r}."
)
consumers.setdefault(event_name, []).append(reachable)
return consumers
[docs]
def event_emission_to_consumer_reachable(machine: StateMachine) -> Tuple[str, ...]:
"""
Return events whose consumer source states are all unreachable.
Unused declared events are ignored here; they remain the responsibility of
the existing design-health unused-event check.
A transition is treated as an event consumer at the point where its source
can be selected. Leaf-source consumers are reachable when their leaf is
root-reachable. Initial-transition consumers are reachable when the owning
composite can be entered. Composite-source consumers are reachable only when
a root-reachable descendant leaf can explicitly exit to the composite
boundary, because parent-level transitions are considered after
``Leaf -> [*]`` bubble semantics rather than while any descendant leaf is
merely active. If every consumer source for a used event is outside these
root-reachable topology points, the event's qualified name is returned.
:param machine: State machine to inspect.
:type machine: StateMachine
:return: Sorted qualified event names whose consumers are all unreachable.
:rtype: Tuple[str, ...]
Example::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... event Panic;
... state A;
... state Lost;
... [*] -> A;
... Lost -> A : Panic;
... }
... '''
>>> ast = parse_with_grammar_entry(source, "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> event_emission_to_consumer_reachable(machine)
('Root.Panic',)
"""
graph = build_leaf_level_macro_graph(machine)
unreachable_events = []
for event_name, source_reachability in _event_consumer_reachability(
machine, graph
).items():
if source_reachability and not any(source_reachability):
unreachable_events.append(event_name)
return tuple(sorted(unreachable_events))
__all__ = [
"EXIT_ROOT_SINK",
"FinitenessReport",
"InevitabilityReport",
"LeafLevelGraph",
"build_leaf_level_macro_graph",
"event_emission_to_consumer_reachable",
"strongly_connected_components",
"topological_finite",
"topological_inevitable_terminator",
"topological_reachable_set",
"unreachable_states",
]