"""
Structured model inspection for pyfcstm.
This module provides :func:`inspect_model`, a single entry point that
walks a :class:`pyfcstm.model.StateMachine` and produces a stable,
serialization-friendly view of its structure plus five derived
relational graphs (reachability, event emission, variable data flow,
aspect impact, action reference). The output is the foundation that
Layer 2 design-health warnings (``W_*`` / ``I_*`` codes) and downstream
LLM / evaluation tooling consume.
The view shape is the **single source of truth** for the pyfcstm /
jsfcstm contract. Adding or renaming a field here must be mirrored on
the jsfcstm side (``editors/jsfcstm/src/diagnostics/inspect.ts``) and
in ``pyfcstm/diagnostics/schema.json``.
The module exposes the following dataclasses:
* :class:`StateInfo` — per-state structural summary
* :class:`TransitionInfo` — per-transition structural summary
* :class:`VariableInfo` — per-variable structural summary plus
guard-affect flags used by ``W_UNREFERENCED_VAR``
* :class:`EventInfo` — per-event structural summary
* :class:`ModelMetrics` — aggregate counts and ratios
* :class:`ModelInspect` — top-level container including diagnostics
Examples::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> from pyfcstm.diagnostics import inspect_model
>>> source = '''
... def int counter = 0;
... state Root {
... state Idle;
... state Active;
... [*] -> Idle;
... Idle -> Active : if [counter > 0];
... }
... '''
>>> ast = parse_with_grammar_entry(source, 'state_machine_dsl')
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> report = inspect_model(machine)
>>> report.metrics.n_states_leaf
2
"""
import math
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple
from .analyzers import (
build_use_def_graph,
collect_design_health_warnings,
collect_expr_variables,
)
from .codes import CODE_REGISTRY, CodeFieldSpec
from ..utils.validate import ModelDiagnostic, Span
if TYPE_CHECKING: # pragma: no cover - import-time forward refs only
from ..model.expr import Expr, Float, Integer
from ..model.model import (
OperationStatement,
OnAspect,
OnStage,
StateMachine,
Transition,
)
from ..verify.inspect_adapter import InspectRunResult
DEFAULT_DEEP_HIERARCHY_THRESHOLD = 6
DEFAULT_LARGE_COMPOSITE_THRESHOLD = 12
DEFAULT_VAR_TO_LEAF_RATIO_THRESHOLD = 2.0
# PR-D2 span contract guard: analyzer diagnostics should carry a real
# source span by default. Any future code that intentionally cannot point to
# one source object must be listed here and covered by
# test/diagnostics/test_inspect_span_contract.py.
KNOWN_SPANLESS_CODES = frozenset()
# Some structural verify algorithms intentionally reuse a legacy static
# diagnostic code so callers see one stable public code for the same model
# problem, regardless of whether it came from design-health analysis or the
# optional verify adapter.
VERIFY_SHARED_STATIC_CODES = frozenset({
'W_UNREACHABLE_STATE',
})
_OP_PRECEDENCE = {
'function_call': 90,
'unary+': 80,
'unary-': 80,
'!': 80,
'not': 80,
'**': 70,
'*': 60,
'/': 60,
'%': 60,
'+': 50,
'-': 50,
'<<': 40,
'>>': 40,
'&': 35,
'^': 30,
'|': 25,
'<': 20,
'>': 20,
'<=': 20,
'>=': 20,
'==': 20,
'!=': 20,
'&&': 15,
'and': 15,
'||': 10,
'or': 10,
'?:': 5,
}
_FLOAT_EPSILON = 1e-10
[docs]
@dataclass(frozen=True)
class StateInfo:
"""
Structural summary of a single state.
:param path: Dotted hierarchical path, e.g. ``'Root.SubSystem.Active'``.
:type path: str
:param name: Short name of the state (last component of ``path``).
:type name: str
:param parent_path: Dotted path of the parent state, or ``None`` for
the root state.
:type parent_path: Optional[str]
:param is_leaf: ``True`` when this state has no substates.
:type is_leaf: bool
:param is_pseudo: ``True`` when the state was declared with
``pseudo state``.
:type is_pseudo: bool
:param is_composite: ``True`` when this state has substates.
:type is_composite: bool
:param substates: Direct-child state paths, in source order.
:type substates: Tuple[str, ...]
:param initial_targets: Each item describes one ``[*] -> X`` initial
transition declared inside this composite. ``target`` is the
target child path, ``guard`` is the source text of the guard or
``None``, ``event`` is the qualified event name or ``None``,
``is_unconditional`` is ``True`` only when both guard and event
are absent.
:type initial_targets: Tuple[Mapping[str, Any], ...]
:param entry_actions: Action labels (function name or ``'<inline>'``)
for ``enter`` actions on this state, in source order.
:type entry_actions: Tuple[str, ...]
:param during_actions: Action labels for ``during`` actions.
:type during_actions: Tuple[str, ...]
:param exit_actions: Action labels for ``exit`` actions.
:type exit_actions: Tuple[str, ...]
:param aspect_before: Aspect-action labels for ``>> during before``.
:type aspect_before: Tuple[str, ...]
:param aspect_after: Aspect-action labels for ``>> during after``.
:type aspect_after: Tuple[str, ...]
:param has_abstract_action: ``True`` if any of the actions above is
abstract. Used by :class:`VariableInfo` confidence judgements.
:type has_abstract_action: bool
"""
path: str
name: str
parent_path: Optional[str]
is_leaf: bool
is_pseudo: bool
is_composite: bool
substates: Tuple[str, ...]
initial_targets: Tuple[Dict[str, Any], ...]
entry_actions: Tuple[str, ...]
during_actions: Tuple[str, ...]
exit_actions: Tuple[str, ...]
aspect_before: Tuple[str, ...]
aspect_after: Tuple[str, ...]
has_abstract_action: bool
span: Optional['Span'] = None
[docs]
@dataclass(frozen=True)
class TransitionInfo:
"""
Structural summary of a single transition.
:param from_path: Dotted path of the source state, or the literal
``'[*]'`` for an initial transition declared at the root.
:type from_path: str
:param to_path: Dotted path of the target state, or ``'[*]'`` for an
exit transition.
:type to_path: str
:param event: Qualified event name (e.g. ``'Root.SubA.E'``) or
``None`` if the transition has no event.
:type event: Optional[str]
:param event_scope: ``'local'``, ``'chain'``, ``'absolute'``, or
``None`` when there is no event.
:type event_scope: Optional[str]
:param guard: Normalized guard expression text, or ``None``.
Pyfcstm and jsfcstm share this inspect expression format so
downstream range resolution can treat ``guard_text`` as a stable
disambiguation hint.
:type guard: Optional[str]
:param effect: Source text of the effect block, or ``None``.
:type effect: Optional[str]
:param effect_self_assigns: Variable names assigned to themselves
anywhere inside the transition effect block, including nested
``if`` branches. Duplicate names are preserved so quick-fix
emitters can detect ambiguous occurrences.
:type effect_self_assigns: Tuple[str, ...]
:param effect_self_assign_spans: Source spans for the self-assign
statements listed in ``effect_self_assigns``. The order matches
``effect_self_assigns`` and uses ``None`` when a statement has no
source span, preventing later spans from shifting to earlier names.
:type effect_self_assign_spans: Tuple[Optional[pyfcstm.utils.validate.Span], ...]
:param is_forced: ``True`` when the transition was expanded from a
``!``-prefixed forced transition.
:type is_forced: bool
:param forced_origin: Raw source text of the original
``!X -> Y`` declaration when ``is_forced`` is ``True``, otherwise
``None``.
:type forced_origin: Optional[str]
:param transition_index: Zero-based index in parent-first model
transition order, including expanded forced transitions at their
declaring state before ordinary transitions and descendant-state
transitions. Downstream tooling may use this as a best-effort
source-range disambiguation hint when spans are not available.
:type transition_index: Optional[int]
"""
from_path: str
to_path: str
event: Optional[str]
event_scope: Optional[str]
guard: Optional[str]
effect: Optional[str]
effect_self_assigns: Tuple[str, ...]
is_forced: bool
forced_origin: Optional[str]
transition_index: Optional[int]
span: Optional['Span'] = None
effect_spans: Tuple['Span', ...] = field(default_factory=tuple)
effect_self_assign_spans: Tuple[Optional['Span'], ...] = field(default_factory=tuple)
[docs]
@dataclass(frozen=True)
class VariableInfo:
"""
Structural summary of a variable definition plus guard-affect flags.
The ``affects_guard_directly`` and ``affects_guard_indirectly`` flags
are precomputed here so that unreferenced-variable diagnostics can
be expressed as a one-line filter against this object.
:param name: Variable identifier.
:type name: str
:param type: Declared type, currently ``'int'`` or ``'float'``.
:type type: str
:param init_value: Source text of the initializer expression.
:type init_value: str
:param read_in_states: State paths where the variable is read inside
any action (``enter`` / ``during`` / ``exit`` / aspect).
:type read_in_states: Tuple[str, ...]
:param written_in_states: State paths where the variable is written
inside any action.
:type written_in_states: Tuple[str, ...]
:param read_in_guards: Tuples ``(from_path, to_path)`` of transitions
whose guard reads this variable.
:type read_in_guards: Tuple[Tuple[str, str], ...]
:param written_in_effects: Tuples ``(from_path, to_path)`` of
transitions whose effect block writes this variable.
:type written_in_effects: Tuple[Tuple[str, str], ...]
:param affects_guard_directly: ``True`` when the variable is read by
at least one transition guard.
:type affects_guard_directly: bool
:param affects_guard_indirectly: ``True`` when the variable reaches
a transition guard through the conservative use-def graph.
:type affects_guard_indirectly: bool
:param abstract_actions_in_scope: Function names of abstract actions
that may access this variable. FCSTM variables are global, so any
abstract action in the machine is conservatively visible here.
Downstream diagnostics can use this to distinguish high-confidence
unused variables from variables that may be touched by abstract
behavior.
:type abstract_actions_in_scope: Tuple[str, ...]
:param float_literal_assignments: Source text of float literal
assignments to this variable from lifecycle actions or transition
effects.
:type float_literal_assignments: Tuple[str, ...]
"""
name: str
type: str
init_value: str
read_in_states: Tuple[str, ...]
written_in_states: Tuple[str, ...]
read_in_guards: Tuple[Tuple[str, str], ...]
written_in_effects: Tuple[Tuple[str, str], ...]
affects_guard_directly: bool
affects_guard_indirectly: bool
abstract_actions_in_scope: Tuple[str, ...]
float_literal_assignments: Tuple[str, ...] = field(default_factory=tuple)
span: Optional['Span'] = None
float_literal_assignment_spans: Tuple[Optional['Span'], ...] = field(default_factory=tuple)
[docs]
@dataclass(frozen=True)
class EventInfo:
"""
Structural summary of an event declaration.
:param qualified_name: Dotted fully qualified event name
(e.g. ``'Root.SubA.E'``).
:type qualified_name: str
:param scope: ``'local'``, ``'chain'``, or ``'absolute'``.
:type scope: str
:param used_by: ``(from_path, to_path)`` tuples for every transition
that references this event.
:type used_by: Tuple[Tuple[str, str], ...]
:param is_declared: ``True`` when the event came from an explicit
``event`` declaration.
:type is_declared: bool
:param is_used: ``True`` when at least one transition references the
event.
:type is_used: bool
"""
qualified_name: str
scope: str
used_by: Tuple[Tuple[str, str], ...]
is_declared: bool
is_used: bool
span: Optional['Span'] = None
[docs]
@dataclass(frozen=True)
class ActionInfo:
"""Structural summary of a lifecycle action declaration."""
signature: str
state_path: str
name: Optional[str]
stage: str
aspect: Optional[str]
is_ref: bool
ref_target: Optional[str]
is_attached: bool
span: Optional['Span'] = None
[docs]
@dataclass(frozen=True)
class ForcedTransitionInfo:
"""Structural summary of a forced transition declaration."""
state_path: str
from_path: str
to_path: str
event: Optional[str]
event_scope: Optional[str]
guard: Optional[str]
original_raw: str
expansion_count: int
span: Optional['Span'] = None
[docs]
@dataclass(frozen=True)
class ModelMetrics:
"""
Aggregate model metrics.
:param n_states_leaf: Number of leaf states excluding pseudo states.
:type n_states_leaf: int
:param n_states_composite: Number of composite states.
:type n_states_composite: int
:param n_states_pseudo: Number of pseudo states.
:type n_states_pseudo: int
:param max_hierarchy_depth: Maximum depth of state nesting, counted
from the root (depth 0 = root).
:type max_hierarchy_depth: int
:param n_transitions_normal: Number of transitions that did not
originate from a ``!``-forced declaration.
:type n_transitions_normal: int
:param n_transitions_forced: Number of transitions expanded from
``!``-forced declarations.
:type n_transitions_forced: int
:param n_events: Number of distinct qualified events exposed by the
inspect surface, including explicitly declared events that no
transition uses.
:type n_events: int
:param n_variables: Number of variable definitions.
:type n_variables: int
:param var_to_leaf_ratio: ``n_variables / max(n_states_leaf, 1)``.
:type var_to_leaf_ratio: float
:param aspect_coverage: Mapping ``composite_path -> n_descendant_leaves``
for composite states that declare ``>> during`` aspects.
:type aspect_coverage: Dict[str, int]
:param abstract_action_inventory: Function names of every abstract
action across the model, sorted for stable output.
:type abstract_action_inventory: Tuple[str, ...]
"""
n_states_leaf: int
n_states_composite: int
n_states_pseudo: int
max_hierarchy_depth: int
n_transitions_normal: int
n_transitions_forced: int
n_events: int
n_variables: int
var_to_leaf_ratio: float
aspect_coverage: Dict[str, int]
abstract_action_inventory: Tuple[str, ...]
[docs]
@dataclass(frozen=True)
class ModelInspect:
"""
Top-level structured view of a state machine model.
:param root_state_path: Dotted path of the root state.
:type root_state_path: str
:param states: All states walked from the root in pre-order.
:type states: Tuple[StateInfo, ...]
:param transitions: All transitions, including expanded forced
transitions, in source order.
:type transitions: Tuple[TransitionInfo, ...]
:param variables: All ``def`` variables, in declaration order.
:type variables: Tuple[VariableInfo, ...]
:param events: All qualified events exposed by the inspect surface,
including explicitly declared events that no transition uses,
sorted by qualified name.
:type events: Tuple[EventInfo, ...]
:param metrics: Aggregate model metrics.
:type metrics: ModelMetrics
:param reachability_graph: Mapping from every state path to state paths
reachable through normal transitions and composite initial edges.
Guards are ignored; ``[*]`` entry/exit markers are not exposed.
:type reachability_graph: Dict[str, Tuple[str, ...]]
:param event_emission_map: Mapping event qualified name → list of
source state paths that can emit it.
:type event_emission_map: Dict[str, Tuple[str, ...]]
:param var_dataflow: Mapping variable name → ``{'reads': [...],
'writes': [...]}`` of state paths.
:type var_dataflow: Dict[str, Dict[str, Tuple[str, ...]]]
:param aspect_impact_map: Mapping composite path → descendant leaf
paths actually reached by its aspect actions.
:type aspect_impact_map: Dict[str, Tuple[str, ...]]
:param action_ref_graph: Mapping named-action function path → list
of ``ref`` edges out of it.
:type action_ref_graph: Dict[str, Tuple[str, ...]]
:param diagnostics: Layer 1 ``E_*`` plus design-health ``W_*`` /
``I_*`` diagnostics derived from the inspect payload.
:type diagnostics: Tuple[ModelDiagnostic, ...]
"""
root_state_path: str
states: Tuple[StateInfo, ...]
transitions: Tuple[TransitionInfo, ...]
variables: Tuple[VariableInfo, ...]
events: Tuple[EventInfo, ...]
actions: Tuple[ActionInfo, ...]
forced_transitions: Tuple[ForcedTransitionInfo, ...]
metrics: ModelMetrics
reachability_graph: Dict[str, Tuple[str, ...]]
event_emission_map: Dict[str, Tuple[str, ...]]
var_dataflow: Dict[str, Dict[str, Tuple[str, ...]]]
aspect_impact_map: Dict[str, Tuple[str, ...]]
action_ref_graph: Dict[str, Tuple[str, ...]]
diagnostics: Tuple[ModelDiagnostic, ...] = field(default_factory=tuple)
[docs]
def to_json(self) -> Dict[str, Any]:
"""
Serialize this inspection report to a plain JSON-friendly dict.
Tuples are converted to lists; frozen dataclasses to dicts.
``ModelDiagnostic`` instances are serialized via their public
attributes (``code``, ``severity``, ``message``, ``span``,
``refs``).
:return: A dict that round-trips through :func:`json.dumps`
without loss.
:rtype: Dict[str, Any]
Examples::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> ast = parse_with_grammar_entry('state Root;', 'state_machine_dsl')
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> report = inspect_model(machine)
>>> report.to_json()['root_state_path']
'Root'
"""
return _to_json_inspect(self)
# ---------------------------------------------------------------------------
# Implementation
# ---------------------------------------------------------------------------
_INIT_MARK = '[*]'
_EXIT_MARK = '[*]'
def _state_path(state: Any) -> str:
path = getattr(state, 'path', None)
if not path: # pragma: no cover
# Defensive: grammar-produced State always has a non-empty
# ``path`` tuple. Reaching this guard means a future state
# builder shipped a half-initialized object; keep as fail-soft
# to avoid masking a downstream rewrite that produced ''.
return ''
return '.'.join(p for p in path if p is not None)
def _resolve_sibling_path(parent_state: Any, name: str) -> str:
"""Build the dotted path for a sibling-of-parent state name."""
parent_path = _state_path(parent_state)
return f'{parent_path}.{name}' if parent_path else name
def _transition_endpoint(parent_state: Any, marker_or_name: Any, is_source: bool) -> str:
"""Resolve ``Transition.from_state`` / ``to_state`` to a path string."""
# ``INIT_STATE`` / ``EXIT_STATE`` are SingletonMark instances; the
# class name is sufficient because the model layer never reuses that
# class for anything else.
if marker_or_name.__class__.__name__ == '_StateSingletonMark':
return _INIT_MARK if is_source else _EXIT_MARK
if isinstance(marker_or_name, str):
return _resolve_sibling_path(parent_state, marker_or_name)
return str(marker_or_name) # pragma: no cover -- grammar produces
# only INIT/EXIT singletons and string names; this str() fallback
# exists for future AST extensions and never fires today.
def _canonical_binary_operator(op: str) -> str:
if op == 'and':
return '&&'
if op == 'or':
return '||'
return op
def _canonical_unary_operator(op: str) -> str:
return '!' if op == 'not' else op
def _unary_precedence_key(op: str) -> str:
canonical = _canonical_unary_operator(op)
return f'unary{canonical}' if canonical in {'+', '-'} else canonical
def _expr_precedence(expr: 'Expr') -> Optional[int]:
from ..model.expr import BinaryOp, ConditionalOp, UnaryOp
if isinstance(expr, BinaryOp):
return _OP_PRECEDENCE.get(_canonical_binary_operator(expr.op))
if isinstance(expr, ConditionalOp):
return _OP_PRECEDENCE['?:']
if isinstance(expr, UnaryOp):
return _OP_PRECEDENCE.get(_unary_precedence_key(expr.op))
return None
def _integer_text(expr: 'Integer') -> str:
return str(int(expr.value))
def _float_text(expr: 'Float') -> str:
if abs(expr.value - math.pi) < _FLOAT_EPSILON:
return 'pi'
if abs(expr.value - math.e) < _FLOAT_EPSILON:
return 'E'
if abs(expr.value - math.tau) < _FLOAT_EPSILON:
return 'tau'
if float(expr.value).is_integer():
return f'{int(expr.value)}.0'
return str(expr.value)
def _expr_text(expr: Optional['Expr']) -> Optional[str]:
if expr is None:
return None
from ..model.expr import (
BinaryOp,
Boolean,
ConditionalOp,
Float,
Integer,
UFunc,
UnaryOp,
Variable,
)
if isinstance(expr, Integer):
return _integer_text(expr)
if isinstance(expr, Float):
return _float_text(expr)
if isinstance(expr, Boolean):
return 'true' if expr.value else 'false'
if isinstance(expr, Variable):
return expr.name
if isinstance(expr, UFunc):
argument = _expr_text(expr.x)
return None if argument is None else f'{expr.func}({argument})'
if isinstance(expr, UnaryOp):
op = _canonical_unary_operator(expr.op)
my_precedence = _OP_PRECEDENCE[_unary_precedence_key(expr.op)]
value = _expr_text(expr.x)
if value is None:
return None
value_precedence = _expr_precedence(expr.x)
if value_precedence is not None and value_precedence <= my_precedence:
value = f'({value})'
return f'{op}{value}'
if isinstance(expr, BinaryOp):
op = _canonical_binary_operator(expr.op)
my_precedence = _OP_PRECEDENCE[op]
left = _expr_text(expr.x)
right = _expr_text(expr.y)
if left is None or right is None:
return None
left_precedence = _expr_precedence(expr.x)
if left_precedence is not None and left_precedence < my_precedence:
left = f'({left})'
right_precedence = _expr_precedence(expr.y)
if right_precedence is not None and right_precedence <= my_precedence:
right = f'({right})'
return f'{left} {op} {right}'
if isinstance(expr, ConditionalOp):
my_precedence = _OP_PRECEDENCE['?:']
condition = _expr_text(expr.cond)
when_true = _expr_text(expr.if_true)
when_false = _expr_text(expr.if_false)
if condition is None or when_true is None or when_false is None:
return None
true_precedence = _expr_precedence(expr.if_true)
if true_precedence is not None and true_precedence <= my_precedence:
when_true = f'({when_true})'
false_precedence = _expr_precedence(expr.if_false)
if false_precedence is not None and false_precedence <= my_precedence:
when_false = f'({when_false})'
return f'({condition}) ? {when_true} : {when_false}'
try:
return str(expr.to_ast_node())
except (AttributeError, TypeError, ValueError): # pragma: no cover
# AttributeError: non-model Expr-like object; TypeError/ValueError:
# future expression implementations with invalid AST conversion.
return None
def _effects_text(effects: List['OperationStatement']) -> Optional[str]:
if not effects:
return None
parts: List[str] = []
for stmt in effects:
try:
parts.append(str(stmt.to_ast_node()))
except (AttributeError, TypeError, ValueError): # pragma: no cover
# AttributeError: non-model statement-like object; TypeError/ValueError:
# future statement implementations with invalid AST conversion.
continue
if not parts: # pragma: no cover
# Unreachable while ``except`` above is unreachable. Belt-and-
# braces guard so the empty-parts case still returns None
# cleanly rather than producing an empty-string label.
return None
return ' '.join(parts)
def _walk_expr_variables(expr: Optional['Expr']) -> List[str]:
"""Return variable names read by ``expr`` in left-to-right order."""
if expr is None:
return []
return list(collect_expr_variables(expr))
def _walk_stmt_reads_writes(
stmt: 'OperationStatement',
reads: List[str],
writes: List[str],
) -> None:
"""Collect variable reads/writes across a single operation statement."""
from ..model.model import IfBlock, Operation
if isinstance(stmt, Operation):
writes.append(stmt.var_name)
for v in _walk_expr_variables(stmt.expr):
reads.append(v)
return
if isinstance(stmt, IfBlock):
for branch in stmt.branches:
if branch.condition is not None:
for v in _walk_expr_variables(branch.condition):
reads.append(v)
for inner in branch.statements:
_walk_stmt_reads_writes(inner, reads, writes)
def _effect_self_assigns(effects: List['OperationStatement']) -> Tuple[str, ...]:
out: List[str] = []
for stmt in effects:
_walk_stmt_self_assigns(stmt, out)
return tuple(out)
def _effect_self_assign_spans(effects: List['OperationStatement']) -> Tuple[Optional['Span'], ...]:
out: List[Optional['Span']] = []
for stmt in effects:
_walk_stmt_self_assign_spans(stmt, out)
return tuple(out)
def _walk_stmt_self_assigns(stmt: 'OperationStatement', out: List[str]) -> None:
from ..model.expr import Variable
from ..model.model import IfBlock, Operation
if isinstance(stmt, Operation):
if isinstance(stmt.expr, Variable) and stmt.expr.name == stmt.var_name:
out.append(stmt.var_name)
return
if isinstance(stmt, IfBlock):
for branch in stmt.branches:
for inner in branch.statements:
_walk_stmt_self_assigns(inner, out)
def _walk_stmt_self_assign_spans(stmt: 'OperationStatement', out: List[Optional['Span']]) -> None:
from ..model.expr import Variable
from ..model.model import IfBlock, Operation
if isinstance(stmt, Operation):
if isinstance(stmt.expr, Variable) and stmt.expr.name == stmt.var_name:
out.append(getattr(stmt, '_span', None))
return
if isinstance(stmt, IfBlock):
for branch in stmt.branches:
for inner in branch.statements:
_walk_stmt_self_assign_spans(inner, out)
def _walk_stmt_float_literal_assignments(
stmt: 'OperationStatement',
out: Dict[str, List[str]],
spans: Optional[Dict[str, List[Optional[Span]]]] = None,
) -> None:
from ..model.expr import Float
from ..model.model import IfBlock, Operation
if isinstance(stmt, Operation):
if isinstance(stmt.expr, Float):
out.setdefault(stmt.var_name, []).append(_expr_text(stmt.expr) or '')
if spans is not None:
spans.setdefault(stmt.var_name, []).append(getattr(stmt, '_span', None))
return
if isinstance(stmt, IfBlock):
for branch in stmt.branches:
for inner in branch.statements:
_walk_stmt_float_literal_assignments(inner, out, spans)
def _stage_function_label(stage_item: 'OnStage') -> str:
"""Choose a stable label for an action (named, abstract, or inline)."""
if stage_item.name:
return stage_item.name
if stage_item.is_ref and stage_item.ref is not None and getattr(stage_item.ref, 'name', None):
return f'ref:{stage_item.ref.name}'
return '<inline>'
def _aspect_function_label(aspect_item: 'OnAspect') -> str:
if aspect_item.name:
return aspect_item.name
if aspect_item.is_ref and aspect_item.ref is not None and getattr(aspect_item.ref, 'name', None):
return f'ref:{aspect_item.ref.name}'
return '<inline>'
def _is_abstract(stage_or_aspect: Any) -> bool:
return bool(getattr(stage_or_aspect, 'is_abstract', False))
def _qualified_event_name(transition: 'Transition', parent_state: Any) -> Optional[str]:
event = transition.event
if event is None:
return None
# ``Event.path_name`` is the canonical dot-separated identifier used
# by the runtime for matching; reuse it as the public qualified name.
return event.path_name
def _event_scope(
event: Any,
parent_state: Any,
from_state: Any,
machine: Any,
) -> str:
"""Infer the scope label by comparing the event's owner path to context.
The model layer does not store a scope enum on :class:`Event`, but
the event's ``state_path`` uniquely determines which DSL operator
declared it:
* absolute (``/``) — owner is the root state path
* chain (``:``) — owner is the transition's parent state path
* local (``::``) — owner is the parent's path extended by
``from_state``
"""
if event is None: # pragma: no cover
# Defensive: callers (``_qualified_event_name``) already guard
# on ``transition.event is None`` and short-circuit; reaching
# here would mean the caller forgot the guard.
return 'absolute'
owner_path = tuple(event.state_path)
root_path = tuple(machine.root_state.path) if machine is not None else ()
parent_path = tuple(parent_state.path) if parent_state is not None else ()
if owner_path == root_path:
return 'absolute'
if owner_path == parent_path:
return 'chain'
if isinstance(from_state, str) and owner_path == parent_path + (from_state,):
return 'local'
# Owner is somewhere else on the chain — conservative fallback.
# Unreachable via real DSL (events are declared in local / chain /
# absolute scopes — never in a sibling that the transition cannot
# reach), but kept as a non-crashing safety net.
return 'chain' # pragma: no cover
def _state_actions(
state: Any,
) -> Tuple[Tuple[str, ...], Tuple[str, ...], Tuple[str, ...], Tuple[str, ...], Tuple[str, ...], bool]:
entries = tuple(_stage_function_label(a) for a in state.on_enters)
durings = tuple(_stage_function_label(a) for a in state.on_durings)
exits = tuple(_stage_function_label(a) for a in state.on_exits)
asp_before = tuple(
_aspect_function_label(a) for a in state.on_during_aspects if a.aspect == 'before'
)
asp_after = tuple(
_aspect_function_label(a) for a in state.on_during_aspects if a.aspect == 'after'
)
has_abstract = any(
_is_abstract(a)
for collection in (
state.on_enters,
state.on_durings,
state.on_exits,
state.on_during_aspects,
)
for a in collection
)
return entries, durings, exits, asp_before, asp_after, has_abstract
def _initial_targets(state: Any) -> Tuple[Dict[str, Any], ...]:
"""Collect every ``[*] -> X`` initial transition declared inside the state."""
out: List[Dict[str, Any]] = []
for transition in state.transitions:
if not _is_init_source(transition.from_state):
continue
target_name = transition.to_state
target_path = _resolve_sibling_path(state, target_name) if isinstance(target_name, str) else _EXIT_MARK
guard_text = _expr_text(transition.guard)
event = transition.event
event_name = event.name if event is not None else None
out.append({
'target': target_path,
'guard': guard_text,
'event': event_name,
'is_unconditional': guard_text is None and event_name is None,
})
return tuple(out)
def _is_init_source(from_state: Any) -> bool:
from ..dsl.node import INIT_STATE
return from_state is INIT_STATE
def _is_forced_transition(transition: 'Transition') -> bool:
return bool(getattr(transition, 'is_forced', False)) or hasattr(
transition, 'forced_origin'
) and getattr(transition, 'forced_origin', None) is not None
def _hierarchy_depth(states: Tuple[StateInfo, ...]) -> int:
if not states: # pragma: no cover
# Defensive: ``inspect_model`` always builds at least one
# StateInfo (the root). Empty input would mean a caller used
# this helper outside the pipeline; keep the fail-soft 0.
return 0
return max(s.path.count('.') for s in states)
def _build_state_infos(machine: 'StateMachine') -> Tuple[StateInfo, ...]:
"""Pre-order walk yielding one :class:`StateInfo` per state."""
out: List[StateInfo] = []
for state in machine.walk_states():
path = _state_path(state)
name = state.name
parent_path = _state_path(state.parent) if state.parent is not None else None
is_leaf = state.is_leaf_state
is_pseudo = bool(getattr(state, 'is_pseudo', False))
is_composite = not is_leaf
substates = tuple(
_resolve_sibling_path(state, sub_name) for sub_name in state.substates.keys()
)
entries, durings, exits, asp_before, asp_after, has_abstract = _state_actions(state)
out.append(StateInfo(
path=path,
name=name,
parent_path=parent_path,
is_leaf=is_leaf,
is_pseudo=is_pseudo,
is_composite=is_composite,
substates=substates,
initial_targets=_initial_targets(state),
entry_actions=entries,
during_actions=durings,
exit_actions=exits,
aspect_before=asp_before,
aspect_after=asp_after,
has_abstract_action=has_abstract,
span=getattr(state, '_span', None),
))
return tuple(out)
def _build_transition_infos(machine: 'StateMachine') -> Tuple[TransitionInfo, ...]:
out: List[TransitionInfo] = []
transition_index = 0
for state in machine.walk_states():
for transition in state.transitions:
from_path = _transition_endpoint(state, transition.from_state, is_source=True)
to_path = _transition_endpoint(state, transition.to_state, is_source=False)
qualified_event = _qualified_event_name(transition, state)
scope = (
getattr(transition, 'event_scope', None)
or _event_scope(transition.event, state, transition.from_state, machine)
if transition.event is not None
else None
)
is_forced = _is_forced_transition(transition)
forced_origin = getattr(transition, 'forced_origin', None) if is_forced else None
out.append(TransitionInfo(
from_path=from_path,
to_path=to_path,
event=qualified_event,
event_scope=scope,
guard=_expr_text(transition.guard),
effect=_effects_text(transition.effects),
effect_self_assigns=_effect_self_assigns(transition.effects),
is_forced=is_forced,
forced_origin=forced_origin,
transition_index=transition_index,
span=getattr(transition, '_span', None),
effect_spans=tuple(
span
for span in (getattr(effect, '_span', None) for effect in transition.effects)
if span is not None
),
effect_self_assign_spans=_effect_self_assign_spans(transition.effects),
))
transition_index += 1
return tuple(out)
def _collect_action_reads_writes(state: Any) -> Tuple[Dict[str, bool], Dict[str, bool]]:
"""Aggregate variable reads/writes across all action blocks of a state."""
reads: Dict[str, bool] = {}
writes: Dict[str, bool] = {}
for collection in (
state.on_enters,
state.on_durings,
state.on_exits,
state.on_during_aspects,
):
for action in collection:
if not action.operations:
continue
local_reads: List[str] = []
local_writes: List[str] = []
for stmt in action.operations:
_walk_stmt_reads_writes(stmt, local_reads, local_writes)
for name in local_reads:
reads[name] = True
for name in local_writes:
writes[name] = True
return reads, writes
def _build_variable_infos(
machine: 'StateMachine',
states: Tuple[StateInfo, ...],
) -> Tuple[VariableInfo, ...]:
var_reads_by_state: Dict[str, List[str]] = {name: [] for name in machine.defines}
var_writes_by_state: Dict[str, List[str]] = {name: [] for name in machine.defines}
var_read_guards: Dict[str, List[Tuple[str, str]]] = {name: [] for name in machine.defines}
var_written_effects: Dict[str, List[Tuple[str, str]]] = {name: [] for name in machine.defines}
var_float_literal_assignments: Dict[str, List[str]] = {
name: [] for name in machine.defines
}
var_float_literal_assignment_spans: Dict[str, List[Optional[Span]]] = {
name: [] for name in machine.defines
}
state_lookup: Dict[str, StateInfo] = {s.path: s for s in states}
for state in machine.walk_states():
path = _state_path(state)
reads, writes = _collect_action_reads_writes(state)
float_assigns: Dict[str, List[str]] = {}
float_assign_spans: Dict[str, List[Optional[Span]]] = {}
for collection in (
state.on_enters,
state.on_durings,
state.on_exits,
state.on_during_aspects,
):
for action in collection:
for stmt in action.operations:
_walk_stmt_float_literal_assignments(stmt, float_assigns, float_assign_spans)
for var_name in reads:
if var_name in var_reads_by_state:
var_reads_by_state[var_name].append(path)
for var_name in writes:
if var_name in var_writes_by_state:
var_writes_by_state[var_name].append(path)
for var_name, assignments in float_assigns.items():
if var_name in var_float_literal_assignments:
var_float_literal_assignments[var_name].extend(assignments)
var_float_literal_assignment_spans[var_name].extend(float_assign_spans.get(var_name, []))
for transition in state.transitions:
from_path = _transition_endpoint(state, transition.from_state, is_source=True)
to_path = _transition_endpoint(state, transition.to_state, is_source=False)
for v in _walk_expr_variables(transition.guard):
if v in var_read_guards:
var_read_guards[v].append((from_path, to_path))
for stmt in transition.effects:
lreads: List[str] = []
lwrites: List[str] = []
_walk_stmt_reads_writes(stmt, lreads, lwrites)
lfloat_assigns: Dict[str, List[str]] = {}
lfloat_assign_spans: Dict[str, List[Optional[Span]]] = {}
_walk_stmt_float_literal_assignments(stmt, lfloat_assigns, lfloat_assign_spans)
for v in lreads:
if v in var_reads_by_state:
var_reads_by_state[v].append(from_path)
for v in lwrites:
if v in var_written_effects:
var_written_effects[v].append((from_path, to_path))
for v, assignments in lfloat_assigns.items():
if v in var_float_literal_assignments:
var_float_literal_assignments[v].extend(assignments)
var_float_literal_assignment_spans[v].extend(lfloat_assign_spans.get(v, []))
# Stable, deduped sequences for the public payload.
def _dedupe_ordered(seq: List[str]) -> Tuple[str, ...]:
seen = set()
out: List[str] = []
for x in seq:
if x not in seen:
seen.add(x)
out.append(x)
return tuple(out)
def _dedupe_pairs(seq: List[Tuple[str, str]]) -> Tuple[Tuple[str, str], ...]:
seen = set()
out: List[Tuple[str, str]] = []
for x in seq:
if x not in seen:
seen.add(x)
out.append(x)
return tuple(out)
def _dedupe_float_assignment_pairs(
exprs: List[str],
spans: List[Optional[Span]],
) -> Tuple[Tuple[str, ...], Tuple[Optional[Span], ...]]:
seen = set()
out_exprs: List[str] = []
out_spans: List[Optional[Span]] = []
for index, expr in enumerate(exprs):
if expr in seen:
continue
seen.add(expr)
out_exprs.append(expr)
out_spans.append(spans[index] if index < len(spans) else None)
return tuple(out_exprs), tuple(out_spans)
use_def_graph = build_use_def_graph(machine)
direct_guard_vars = {
name for name, entries in var_read_guards.items()
if entries
}
indirect_guard_vars = set(use_def_graph.affecting_variables(direct_guard_vars))
out: List[VariableInfo] = []
for name, var_define in machine.defines.items():
read_states = _dedupe_ordered(var_reads_by_state[name])
written_states = _dedupe_ordered(var_writes_by_state[name])
read_guards = _dedupe_pairs(var_read_guards[name])
written_effects = _dedupe_pairs(var_written_effects[name])
abstract_actions = _abstract_actions_in_scope(state_lookup)
float_assignments, float_assignment_spans = _dedupe_float_assignment_pairs(
var_float_literal_assignments[name],
var_float_literal_assignment_spans[name],
)
out.append(VariableInfo(
name=name,
type=var_define.type,
init_value=_expr_text(var_define.init) or '',
read_in_states=read_states,
written_in_states=written_states,
read_in_guards=read_guards,
written_in_effects=written_effects,
affects_guard_directly=name in direct_guard_vars,
affects_guard_indirectly=(
name not in direct_guard_vars and name in indirect_guard_vars
),
abstract_actions_in_scope=abstract_actions,
float_literal_assignments=float_assignments,
span=getattr(var_define, '_span', None),
float_literal_assignment_spans=float_assignment_spans,
))
return tuple(out)
def _abstract_actions_in_scope(
state_lookup: Dict[str, StateInfo],
) -> Tuple[str, ...]:
"""Return abstract action labels that can see global variables."""
return tuple(
f'{info.path}:<abstract>'
for info in sorted(state_lookup.values(), key=lambda item: item.path)
if info.has_abstract_action
)
def _build_event_infos(machine: 'StateMachine', transitions: Tuple[TransitionInfo, ...]) -> Tuple[EventInfo, ...]:
"""Group declared and transition-used events by qualified event name."""
event_users: Dict[str, List[Tuple[str, str]]] = {}
event_scope: Dict[str, str] = {}
event_declared: Dict[str, bool] = {}
for state in machine.walk_states():
for event in state.events.values():
qn = event.path_name
event_users.setdefault(qn, [])
event_scope[qn] = _scope_from_event_origins(
getattr(event, 'origins', None) or []
)
event_declared[qn] = bool(getattr(event, 'declared', False))
for state in machine.walk_states():
for transition in state.transitions:
qn = _qualified_event_name(transition, state)
if qn is None:
continue
from_path = _transition_endpoint(state, transition.from_state, is_source=True)
to_path = _transition_endpoint(state, transition.to_state, is_source=False)
event_users.setdefault(qn, []).append((from_path, to_path))
event_scope[qn] = (
getattr(transition, 'event_scope', None)
or _event_scope(transition.event, state, transition.from_state, machine)
)
event_declared.setdefault(
qn,
bool(getattr(transition.event, 'declared', False)),
)
out: List[EventInfo] = []
for qn in sorted(set(event_users.keys()) | set(event_declared.keys())):
used_by = tuple(event_users.get(qn, []))
event_obj = _find_event_by_qualified_name(machine, qn)
out.append(EventInfo(
qualified_name=qn,
scope=event_scope.get(qn, 'absolute'),
used_by=used_by,
is_declared=event_declared.get(qn, False),
is_used=bool(used_by),
span=getattr(event_obj, '_span', None) if event_obj is not None else None,
))
return tuple(out)
def _find_event_by_qualified_name(machine: 'StateMachine', qualified_name: str):
for state in machine.walk_states():
for event in state.events.values():
if event.path_name == qualified_name:
return event
return None
def _scope_from_event_origins(origins: List[str]) -> str:
scopes = [origin for origin in origins if origin != 'declared']
if 'local' in scopes:
return 'local'
if 'absolute' in scopes:
return 'absolute'
return 'chain'
def _build_action_infos(machine: 'StateMachine') -> Tuple[ActionInfo, ...]:
out: List[ActionInfo] = []
for state in machine.walk_states():
path = _state_path(state)
for collection in (
state.on_enters,
state.on_durings,
state.on_exits,
state.on_during_aspects,
):
for action in collection:
signature = _function_signature(state, path, action)
ref_target = (
_function_signature(None, None, action.ref)
if action.is_ref and action.ref is not None
else None
)
out.append(ActionInfo(
signature=signature,
state_path=path,
name=action.name,
stage=action.stage,
aspect=action.aspect,
is_ref=action.is_ref,
ref_target=ref_target,
is_attached=True,
span=getattr(action, '_span', None),
))
return tuple(out)
def _build_forced_transition_infos(machine: 'StateMachine') -> Tuple[ForcedTransitionInfo, ...]:
out: List[ForcedTransitionInfo] = []
for item in getattr(machine, 'forced_transitions', ()):
out.append(ForcedTransitionInfo(
state_path=str(item.get('state_path', '')),
from_path=str(item.get('from_path', '')),
to_path=str(item.get('to_path', '')),
event=(
None if item.get('event') is None
else str(item.get('event'))
),
event_scope=(
None if item.get('event_scope') is None
else str(item.get('event_scope'))
),
guard=(
None if item.get('guard') is None
else str(item.get('guard'))
),
original_raw=str(item.get('original_raw', '')),
expansion_count=int(item.get('expansion_count', 0)),
span=item.get('span'),
))
return tuple(out)
def _build_metrics(
states: Tuple[StateInfo, ...],
transitions: Tuple[TransitionInfo, ...],
variables: Tuple[VariableInfo, ...],
events: Tuple[EventInfo, ...],
) -> ModelMetrics:
n_pseudo = sum(1 for s in states if s.is_pseudo)
n_leaf = sum(1 for s in states if s.is_leaf and not s.is_pseudo)
n_composite = sum(1 for s in states if s.is_composite)
n_normal = sum(1 for t in transitions if not t.is_forced)
n_forced = sum(1 for t in transitions if t.is_forced)
aspect_coverage: Dict[str, int] = {}
for s in states:
if not (s.is_composite and (s.aspect_before or s.aspect_after)):
continue
aspect_coverage[s.path] = sum(
1
for desc in states
if desc.path != s.path
and desc.path.startswith(s.path + '.')
and desc.is_leaf
and not desc.is_pseudo
)
abstract_inventory: List[str] = []
for s in states:
if s.has_abstract_action:
abstract_inventory.append(s.path)
return ModelMetrics(
n_states_leaf=n_leaf,
n_states_composite=n_composite,
n_states_pseudo=n_pseudo,
max_hierarchy_depth=_hierarchy_depth(states),
n_transitions_normal=n_normal,
n_transitions_forced=n_forced,
n_events=len(events),
n_variables=len(variables),
var_to_leaf_ratio=len(variables) / max(n_leaf, 1),
aspect_coverage=aspect_coverage,
abstract_action_inventory=tuple(sorted(abstract_inventory)),
)
def _build_reachability_graph(
states: Tuple[StateInfo, ...],
transitions: Tuple[TransitionInfo, ...],
) -> Dict[str, Tuple[str, ...]]:
"""Return the default inspect reachability graph.
The inspect graph is a guard-agnostic breadth-first closure over normal
transition endpoints plus composite ``[*]`` initial edges. It is a stable
inspect/jsfcstm contract and therefore intentionally independent from the
optional verify topology projection that only runs when callers pass
``enable_verify=True`` to :func:`inspect_model`.
:param states: Inspect state records, one per state path.
:type states: Tuple[StateInfo, ...]
:param transitions: Inspect transition records in model order.
:type transitions: Tuple[TransitionInfo, ...]
:return: Mapping from every state path to reachable state paths.
:rtype: Dict[str, Tuple[str, ...]]
Examples::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state B;
... [*] -> A;
... A -> B;
... }
... '''
>>> ast = parse_with_grammar_entry(source, 'state_machine_dsl')
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> states = _build_state_infos(machine)
>>> transitions = _build_transition_infos(machine)
>>> _build_reachability_graph(states, transitions)['Root']
('Root.A', 'Root.B')
>>> _build_reachability_graph(states, transitions)['Root.A']
('Root.B',)
"""
adjacency: Dict[str, set] = {state.path: set() for state in states}
initial_edges: Dict[str, set] = {state.path: set() for state in states}
for transition in transitions:
if (
transition.from_path == _INIT_MARK
or transition.to_path == _EXIT_MARK
):
continue
if transition.from_path not in adjacency: # pragma: no cover
# Model-built transitions always point at a known source path. The
# guard keeps hand-built test doubles from crashing this helper.
continue
adjacency[transition.from_path].add(transition.to_path)
for state in states:
if not (state.is_composite and state.initial_targets):
continue
for initial_target in state.initial_targets:
target = initial_target['target']
if target != _EXIT_MARK:
initial_edges[state.path].add(target)
graph: Dict[str, Tuple[str, ...]] = {}
for state in states:
seen = set()
queue = [state.path]
while queue:
current = queue.pop(0)
next_paths = adjacency.get(current, set()) | initial_edges.get(
current,
set(),
)
for next_path in sorted(next_paths):
if next_path in seen or next_path == state.path:
continue
seen.add(next_path)
queue.append(next_path)
graph[state.path] = tuple(sorted(seen))
return graph
def _build_event_emission_map(
events: Tuple[EventInfo, ...],
) -> Dict[str, Tuple[str, ...]]:
out: Dict[str, Tuple[str, ...]] = {}
for e in events:
if not e.is_used:
continue
froms = sorted({pair[0] for pair in e.used_by if pair[0] != _INIT_MARK})
out[e.qualified_name] = tuple(froms)
return out
def _build_var_dataflow(
variables: Tuple[VariableInfo, ...],
) -> Dict[str, Dict[str, Tuple[str, ...]]]:
out: Dict[str, Dict[str, Tuple[str, ...]]] = {}
for v in variables:
out[v.name] = {
'reads': tuple(sorted(set(v.read_in_states))),
'writes': tuple(sorted(set(v.written_in_states))),
}
return out
def _build_aspect_impact_map(
states: Tuple[StateInfo, ...],
) -> Dict[str, Tuple[str, ...]]:
out: Dict[str, Tuple[str, ...]] = {}
for s in states:
if not (s.is_composite and (s.aspect_before or s.aspect_after)):
continue
descendants = tuple(sorted(
desc.path
for desc in states
if desc.path != s.path
and desc.path.startswith(s.path + '.')
and desc.is_leaf
and not desc.is_pseudo
))
out[s.path] = descendants
return out
def _build_action_ref_graph(machine: 'StateMachine') -> Dict[str, Tuple[str, ...]]:
"""Capture ``ref`` edges between named actions in the model."""
edges: Dict[str, List[str]] = {}
for state in machine.walk_states():
path = _state_path(state)
for collection in (
state.on_enters,
state.on_durings,
state.on_exits,
state.on_during_aspects,
):
for action in collection:
source_label = _function_signature(state, path, action)
if action.is_ref and action.ref is not None:
target_label = _function_signature(None, None, action.ref)
edges.setdefault(source_label, []).append(target_label)
else:
# Ensure even non-ref'd functions appear in the graph
# so downstream "no outgoing edges" lookups work.
edges.setdefault(source_label, [])
return {key: tuple(sorted(set(value))) for key, value in edges.items()}
def _function_signature(state: Any, default_path: Optional[str], action: Any) -> str:
"""Build a stable ``state_path:function`` label for a named action."""
action_path = getattr(action, 'state_path', None)
if action_path is not None:
normalized = '.'.join(p for p in action_path[:-1] if p is not None) or (
default_path or _state_path(state)
)
else: # pragma: no cover
# Defensive: grammar-emitted actions always have a non-empty
# state_path. Reaching here means a future action synthesizer
# produced an action without one; fall through to the
# default_path / state.path chain so labels stay useful.
normalized = default_path or _state_path(state) or ''
leaf = (action.name or '<inline>') if action_path is None else (action_path[-1] or '<inline>')
return f'{normalized}:{leaf}' if normalized else leaf
def _run_verify_inspect_algorithms(machine: 'StateMachine', **kwargs):
"""Run the verify inspect adapter lazily.
Keeping this import behind the ``enable_verify`` branch preserves the
default inspect path for users that only need structural diagnostics.
:param machine: State machine to verify.
:type machine: pyfcstm.model.StateMachine
:param kwargs: Inspect-adapter policy arguments.
:type kwargs: object
:return: Adapter results in registry order.
:rtype: Tuple[pyfcstm.verify.inspect_adapter.InspectRunResult, ...]
Examples::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> ast = parse_with_grammar_entry("state Root;", "state_machine_dsl")
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> bool(_run_verify_inspect_algorithms(machine))
True
"""
from ..verify.inspect_adapter import run_inspect_algorithms
return run_inspect_algorithms(machine, **kwargs)
def _transition_summary(payload: Mapping[str, Any]) -> str:
"""Render a raw verify transition payload as a compact label.
:param payload: Raw transition payload produced by
:mod:`pyfcstm.verify.encoding`.
:type payload: Mapping[str, Any]
:return: ``parent:from->to`` transition label.
:rtype: str
Examples::
>>> _transition_summary({
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... })
'Root:A->B'
"""
return '{parent}:{from_state}->{to_state}'.format(**payload)
def _verify_transition_payload(payload: Any) -> Optional[Dict[str, Any]]:
"""Return a validated raw verify transition payload.
Verify algorithms return diagnostics-layer-free dictionaries. The
inspect conversion layer must fail closed when a raw payload is malformed,
because the public ``ModelDiagnostic.refs`` schema can only validate the
outer ``dict`` type for transition refs.
:param payload: Raw transition payload to validate.
:type payload: Any
:return: A defensive copy when the payload is shaped like
``pyfcstm.verify.encoding`` transition data, otherwise ``None``.
:rtype: Optional[Dict[str, Any]]
Examples::
>>> _verify_transition_payload({
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... 'event': None,
... 'guard': 'x > 0',
... 'is_forced': False,
... })['from_state']
'A'
>>> _verify_transition_payload({'parent': 'Root'}) is None
True
"""
if not isinstance(payload, Mapping):
return None
parent = payload.get('parent')
from_state = payload.get('from_state')
to_state = payload.get('to_state')
event = payload.get('event')
guard = payload.get('guard')
is_forced = payload.get('is_forced')
if not all(isinstance(item, str) for item in (parent, from_state, to_state)):
return None
if event is not None and not isinstance(event, str):
return None
if guard is not None and not isinstance(guard, str):
return None
if not isinstance(is_forced, bool):
return None
return {
'parent': parent,
'from_state': from_state,
'to_state': to_state,
'event': event,
'guard': guard,
'is_forced': is_forced,
}
def _verify_transition_summaries(payloads: Any) -> Optional[List[str]]:
"""Return summaries for a sequence of raw verify transition payloads.
The conversion fails closed: one malformed item invalidates the entire
sequence so the diagnostic cannot present partial evidence as complete.
:param payloads: Raw transition payload sequence.
:type payloads: Any
:return: Transition summaries, or ``None`` when the sequence is malformed.
:rtype: Optional[List[str]]
Examples::
>>> _verify_transition_summaries(({
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... 'event': None,
... 'guard': None,
... 'is_forced': False,
... },))
['Root:A->B']
>>> _verify_transition_summaries(({'parent': 'Root'},)) is None
True
"""
if not isinstance(payloads, (list, tuple)):
return None
summaries: List[str] = []
for raw_item in payloads:
item = _verify_transition_payload(raw_item)
if item is None:
return None
summaries.append(_transition_summary(item))
return summaries
def _state_span_by_path(states: Sequence[StateInfo], state_path: Optional[str]) -> Optional[Span]:
"""Return the source span for a state path.
:param states: Inspect state payloads.
:type states: Sequence[StateInfo]
:param state_path: Dotted state path to locate.
:type state_path: Optional[str]
:return: Matching state span, or ``None`` when the path is absent.
:rtype: Optional[Span]
Examples::
>>> state = StateInfo(
... path='Root',
... name='Root',
... parent_path=None,
... is_leaf=True,
... is_pseudo=False,
... is_composite=False,
... substates=(),
... initial_targets=(),
... entry_actions=(),
... during_actions=(),
... exit_actions=(),
... aspect_before=(),
... aspect_after=(),
... has_abstract_action=False,
... span=Span(line=1, column=1),
... )
>>> _state_span_by_path((state,), 'Root').line
1
"""
if state_path is None:
return None
for state in states:
if state.path == state_path:
return state.span
return None
def _event_span_by_name(events: Sequence[EventInfo], event_name: Optional[str]) -> Optional[Span]:
"""Return the source span for a qualified event name.
:param events: Inspect event payloads.
:type events: Sequence[EventInfo]
:param event_name: Qualified event name to locate.
:type event_name: Optional[str]
:return: Matching event declaration span, or ``None``.
:rtype: Optional[Span]
Examples::
>>> event = EventInfo(
... qualified_name='Root.Tick',
... scope='chain',
... used_by=(),
... is_declared=True,
... is_used=False,
... span=Span(line=2, column=5),
... )
>>> _event_span_by_name((event,), 'Root.Tick').column
5
"""
if event_name is None:
return None
for event in events:
if event.qualified_name == event_name:
return event.span
return None
def _transition_matches_payload(info: TransitionInfo, payload: Mapping[str, Any]) -> bool:
"""Return whether inspect transition data matches raw verify payload.
:param info: Inspect transition payload.
:type info: TransitionInfo
:param payload: Raw verify transition payload.
:type payload: Mapping[str, Any]
:return: ``True`` when endpoints, event, guard, and forced flag match.
:rtype: bool
Examples::
>>> info = TransitionInfo(
... from_path='Root.A',
... to_path='Root.B',
... event=None,
... event_scope=None,
... guard='x > 0',
... effect=None,
... effect_self_assigns=(),
... is_forced=False,
... forced_origin=None,
... transition_index=0,
... )
>>> _transition_matches_payload(info, {
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... 'event': None,
... 'guard': 'x > 0',
... 'is_forced': False,
... })
True
"""
transition_payload = _verify_transition_payload(payload)
if transition_payload is None:
return False
parent = transition_payload['parent']
from_state = transition_payload['from_state']
to_state = transition_payload['to_state']
from_path = '[*]' if from_state == '[*]' else (
f'{parent}.{from_state}' if parent else from_state
)
to_path = '[*]' if to_state == '[*]' else (
f'{parent}.{to_state}' if parent else to_state
)
return (
info.from_path == from_path
and info.to_path == to_path
and info.event == transition_payload['event']
and info.guard == transition_payload['guard']
and info.is_forced == transition_payload['is_forced']
)
def _transition_span_by_payload(
transitions: Sequence[TransitionInfo],
payload: Optional[Mapping[str, Any]],
) -> Optional[Span]:
"""Return the transition span for a raw verify transition payload.
:param transitions: Inspect transition payloads.
:type transitions: Sequence[TransitionInfo]
:param payload: Raw verify transition payload.
:type payload: Optional[Mapping[str, Any]]
:return: Matching transition source span, or ``None``.
:rtype: Optional[Span]
Examples::
>>> transition = TransitionInfo(
... from_path='Root.A',
... to_path='Root.B',
... event=None,
... event_scope=None,
... guard=None,
... effect=None,
... effect_self_assigns=(),
... is_forced=False,
... forced_origin=None,
... transition_index=0,
... span=Span(line=3, column=5),
... )
>>> _transition_span_by_payload((transition,), {
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... 'event': None,
... 'guard': None,
... 'is_forced': False,
... }).line
3
"""
if payload is None:
return None
for transition in transitions:
if _transition_matches_payload(transition, payload):
return transition.span
return None
def _effect_span_by_payload(
transitions: Sequence[TransitionInfo],
payload: Optional[Mapping[str, Any]],
) -> Optional[Span]:
"""Return the most specific effect span for a raw transition payload.
:param transitions: Inspect transition payloads.
:type transitions: Sequence[TransitionInfo]
:param payload: Raw verify transition payload.
:type payload: Optional[Mapping[str, Any]]
:return: First effect span when present, otherwise the transition span.
:rtype: Optional[Span]
Examples::
>>> span = Span(line=4, column=10)
>>> transition = TransitionInfo(
... from_path='Root.A',
... to_path='Root.B',
... event=None,
... event_scope=None,
... guard=None,
... effect='x = x + 0',
... effect_self_assigns=(),
... is_forced=False,
... forced_origin=None,
... transition_index=0,
... effect_spans=(span,),
... )
>>> _effect_span_by_payload((transition,), {
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... 'event': None,
... 'guard': None,
... 'is_forced': False,
... }).column
10
"""
if payload is None:
return None
for transition in transitions:
if _transition_matches_payload(transition, payload):
if transition.effect_spans:
return transition.effect_spans[0]
return transition.span
return None
def _action_span_by_state_and_condition(
actions: Sequence[ActionInfo],
state_path: Optional[str],
condition_source: Optional[str],
) -> Optional[Span]:
"""Return the lifecycle action span associated with a verify condition.
:param actions: Inspect action payloads.
:type actions: Sequence[ActionInfo]
:param state_path: State path reported by the raw verify diagnostic.
:type state_path: Optional[str]
:param condition_source: Condition source label, such as
``"during:0"``.
:type condition_source: Optional[str]
:return: Matching action span, or ``None``.
:rtype: Optional[Span]
Examples::
>>> action = ActionInfo(
... signature='Root.A:<inline>',
... state_path='Root.A',
... name=None,
... stage='during',
... aspect=None,
... is_ref=False,
... ref_target=None,
... is_attached=True,
... span=Span(line=5, column=9),
... )
>>> _action_span_by_state_and_condition(
... (action,),
... 'Root.A',
... 'during:0',
... ).line
5
"""
if state_path is None:
return None
if condition_source and ':' in condition_source:
stage = condition_source.split(':', 1)[0]
for action in actions:
if action.state_path == state_path and action.stage == stage:
return action.span
for action in actions:
if action.state_path == state_path and action.stage == 'during':
return action.span
return None
def _verify_smt_refs(raw: Mapping[str, Any]) -> Optional[Dict[str, Any]]:
"""Build inspect diagnostic refs from one raw SMT-local finding.
Raw verify algorithms deliberately return dictionaries so they do not
depend on the diagnostics package. This helper translates the shared raw
fields into the refs vocabulary declared in ``codes.yaml``.
:param raw: Raw diagnostic dictionary with ``code``, ``algorithm_name``,
and ``data`` keys.
:type raw: Mapping[str, Any]
:return: Refs payload candidate, or ``None`` when ``raw`` is not shaped
like a verify diagnostic.
:rtype: Optional[Dict[str, Any]]
Examples::
>>> raw = {
... 'code': 'W_DEAD_GUARD',
... 'algorithm_name': 'dead_guard',
... 'data': {
... 'transition': {
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... 'event': None,
... 'guard': 'x > 0',
... 'is_forced': False,
... },
... 'verification_scope': 'smt_local',
... },
... }
>>> refs = _verify_smt_refs(raw)
>>> refs['transition_summary']
'Root:A->B'
"""
code = raw.get('code')
data = raw.get('data')
if not isinstance(code, str) or not isinstance(data, Mapping):
return None
refs: Dict[str, Any] = {
'algorithm_name': raw.get('algorithm_name'),
'verification_scope': data.get('verification_scope'),
}
transition_dict = _verify_transition_payload(data.get('transition'))
if transition_dict is not None:
refs['transition'] = transition_dict
refs['transition_summary'] = _transition_summary(transition_dict)
if code == 'W_FORCED_GUARD_UNSAT':
refs['scope'] = data.get('scope')
elif code == 'W_TRANSITION_SHADOWED':
shadowed_by = data.get('shadowed_by') or ()
shadowed_by_summaries = _verify_transition_summaries(shadowed_by)
if shadowed_by_summaries is None:
return None
refs.update({
'source_state_path': data.get('source'),
'reason': data.get('reason'),
'shadowed_by_count': len(shadowed_by_summaries),
'shadowed_by': shadowed_by_summaries,
})
elif code == 'I_ENTER_DURING_CONTRADICT':
refs.update({
'state_path': data.get('state'),
'condition': data.get('condition'),
'condition_source': data.get('condition_source'),
'branch_taken': data.get('branch_taken'),
})
elif code == 'W_COMPOSITE_INIT_INCOMPLETE':
init_transitions = data.get('init_transitions') or ()
init_transition_summaries = _verify_transition_summaries(init_transitions)
if init_transition_summaries is None:
return None
refs.update({
'composite_path': data.get('state'),
'init_transition_count': len(init_transition_summaries),
'init_transitions': init_transition_summaries,
'witness': data.get('witness'),
})
return refs
def _verify_smt_span(
code: str,
raw: Mapping[str, Any],
states: Sequence[StateInfo],
transitions: Sequence[TransitionInfo],
actions: Sequence[ActionInfo],
) -> Optional[Span]:
"""Choose a source span for an SMT-local verify diagnostic.
The span follows the semantic object declared for each verify code:
guard/transition findings point at the transition, effect findings prefer
the effect statement, lifecycle findings point at the relevant action, and
composite-init findings point at the composite state.
:param code: Diagnostic code.
:type code: str
:param raw: Raw verify diagnostic dictionary.
:type raw: Mapping[str, Any]
:param states: Inspect state payloads.
:type states: Sequence[StateInfo]
:param transitions: Inspect transition payloads.
:type transitions: Sequence[TransitionInfo]
:param actions: Inspect action payloads.
:type actions: Sequence[ActionInfo]
:return: Best-effort source span, or ``None`` when no source object can
be matched.
:rtype: Optional[Span]
Examples::
>>> transition = TransitionInfo(
... from_path='Root.A',
... to_path='Root.B',
... event=None,
... event_scope=None,
... guard='x > 0',
... effect=None,
... effect_self_assigns=(),
... is_forced=False,
... forced_origin=None,
... transition_index=0,
... span=Span(line=3, column=5),
... )
>>> raw = {'data': {'transition': {
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... 'event': None,
... 'guard': 'x > 0',
... 'is_forced': False,
... }}}
>>> _verify_smt_span('W_DEAD_GUARD', raw, (), (transition,), ()).line
3
"""
data = raw.get('data')
if not isinstance(data, Mapping):
return None
transition = data.get('transition')
if code in {
'W_DEAD_GUARD',
'W_GUARD_TAUTOLOGY',
'W_FORCED_GUARD_UNSAT',
'W_TRANSITION_SHADOWED',
}:
return _transition_span_by_payload(transitions, transition)
if code in {'W_EFFECT_SMT_NO_OP', 'I_EFFECT_GUARD_CONTRADICT'}:
return _effect_span_by_payload(transitions, transition)
if code == 'I_ENTER_DURING_CONTRADICT':
return _action_span_by_state_and_condition(
actions,
data.get('state'),
data.get('condition_source'),
)
if code == 'W_COMPOSITE_INIT_INCOMPLETE':
return _state_span_by_path(states, data.get('state'))
return None
def _structural_verify_diagnostics(
result: 'InspectRunResult',
states: Sequence[StateInfo],
events: Sequence[EventInfo],
) -> List[ModelDiagnostic]:
"""Convert structural verify payloads into model diagnostics.
Structural algorithms return graph-oriented payloads rather than raw
diagnostic dictionaries. This helper maps the known structural adapter
results onto the verify-pipeline codes declared in ``codes.yaml``.
:param result: One normalized inspect-adapter result.
:type result: pyfcstm.verify.inspect_adapter.InspectRunResult
:param states: Inspect state payloads for source-span lookup.
:type states: Sequence[StateInfo]
:param events: Inspect event payloads for event-span and consumer lookup.
:type events: Sequence[EventInfo]
:return: Diagnostics converted from ``result``.
:rtype: List[ModelDiagnostic]
Examples::
>>> from pyfcstm.verify.inspect_adapter import InspectRunResult
>>> state = StateInfo(
... path='Root.A',
... name='A',
... parent_path='Root',
... is_leaf=True,
... is_pseudo=False,
... is_composite=False,
... substates=(),
... initial_targets=(),
... entry_actions=(),
... during_actions=(),
... exit_actions=(),
... aspect_before=(),
... aspect_after=(),
... has_abstract_action=False,
... span=Span(line=2, column=5),
... )
>>> result = InspectRunResult(
... algorithm_name='strongly_connected_components',
... complexity_tier='structural',
... smt_logic=None,
... verification_scope='topological_only',
... diagnostic_codes=('I_NONTRIVIAL_SCC',),
... result_kind='sat',
... diagnostics=(),
... reason=None,
... raw_result=(('Root.A',),),
... )
>>> _structural_verify_diagnostics(result, (state,), ())[0].code
'I_NONTRIVIAL_SCC'
"""
diagnostics: List[ModelDiagnostic] = []
if result.algorithm_name == 'strongly_connected_components':
for component in result.raw_result or ():
scc = list(component)
if not scc:
continue
refs = {
'algorithm_name': result.algorithm_name,
'verification_scope': result.verification_scope,
'representative_state_path': scc[0],
'scc': scc,
}
diagnostic = _make_verify_diagnostic(
'I_NONTRIVIAL_SCC',
refs,
_state_span_by_path(states, scc[0]),
)
if diagnostic is not None:
diagnostics.append(diagnostic)
elif result.algorithm_name == 'unreachable_states':
for state_path in result.raw_result or ():
refs = {'state_path': state_path}
diagnostic = _make_verify_diagnostic(
'W_UNREACHABLE_STATE',
refs,
_state_span_by_path(states, state_path),
)
if diagnostic is not None:
diagnostics.append(diagnostic)
elif result.algorithm_name == 'topological_finite':
counterexamples = getattr(result.raw_result, 'counterexamples', ())
for kind, payload in counterexamples:
representative = payload[0] if isinstance(payload, tuple) else payload
scc = list(payload) if isinstance(payload, tuple) else [payload]
refs = {
'algorithm_name': result.algorithm_name,
'verification_scope': result.verification_scope,
'representative_state_path': representative,
'counterexample_kind': kind,
'scc': scc,
}
diagnostic = _make_verify_diagnostic(
'W_TOPOLOGICAL_NOEXIT',
refs,
_state_span_by_path(states, representative),
)
if diagnostic is not None:
diagnostics.append(diagnostic)
elif result.algorithm_name == 'topological_inevitable_terminator':
path = list(getattr(result.raw_result, 'counterexample_path', ()) or ())
if path:
refs = {
'algorithm_name': result.algorithm_name,
'verification_scope': result.verification_scope,
'representative_state_path': path[0],
'counterexample_path': path,
}
diagnostic = _make_verify_diagnostic(
'I_TOPOLOGICAL_NON_TERMINATING',
refs,
_state_span_by_path(states, path[0]),
)
if diagnostic is not None:
diagnostics.append(diagnostic)
elif result.algorithm_name == 'event_emission_to_consumer_reachable':
for event_name in result.raw_result or ():
refs = {
'algorithm_name': result.algorithm_name,
'verification_scope': result.verification_scope,
'event_name': event_name,
'consumer_count': _event_consumer_count(events, event_name),
}
diagnostic = _make_verify_diagnostic(
'W_EVENT_UNREACHABLE_EMIT',
refs,
_event_span_by_name(events, event_name),
)
if diagnostic is not None:
diagnostics.append(diagnostic)
return diagnostics
def _event_consumer_count(events: Sequence[EventInfo], event_name: str) -> int:
"""Return how many inspect transitions consume an event.
:param events: Inspect event payloads.
:type events: Sequence[EventInfo]
:param event_name: Qualified event name.
:type event_name: str
:return: Number of transition consumers recorded for the event.
:rtype: int
Examples::
>>> event = EventInfo(
... qualified_name='Root.Panic',
... scope='chain',
... used_by=(('Root.Lost', 'Root.A'),),
... is_declared=True,
... is_used=True,
... )
>>> _event_consumer_count((event,), 'Root.Panic')
1
"""
for event in events:
if event.qualified_name == event_name:
return len(event.used_by)
return 0
def _type_matches_schema(value: Any, field_spec: CodeFieldSpec) -> bool:
"""Return whether a runtime value fits a ``codes.yaml`` type token.
:param value: Runtime value from a diagnostic refs payload.
:type value: Any
:param field_spec: Field schema loaded from ``codes.yaml``.
:type field_spec: CodeFieldSpec
:return: ``True`` when ``value`` fits the field type.
:rtype: bool
Examples::
>>> spec = CodeFieldSpec(
... name='count',
... type='int',
... required=True,
... description='Count value.',
... )
>>> _type_matches_schema(3, spec)
True
>>> _type_matches_schema(True, spec)
False
"""
type_token = field_spec.type
if type_token == 'str':
return isinstance(value, str)
if type_token == 'int':
return isinstance(value, int) and not isinstance(value, bool)
if type_token == 'float':
return isinstance(value, float)
if type_token == 'number':
return isinstance(value, (int, float)) and not isinstance(value, bool)
if type_token == 'bool':
return isinstance(value, bool)
if type_token == 'dict':
return isinstance(value, dict)
if type_token == 'list[str]':
return isinstance(value, list) and all(isinstance(item, str) for item in value)
if type_token == 'list[Span]':
return isinstance(value, list) and all(
item is None or hasattr(item, 'line') for item in value
)
if type_token == 'Span':
return value is None or hasattr(value, 'line')
if type_token == 'str_or_null':
return value is None or isinstance(value, str)
if type_token == 'int_or_null':
return value is None or (
isinstance(value, int) and not isinstance(value, bool)
)
return True
def _refs_match_code_schema(code: str, refs: Mapping[str, Any]) -> bool:
"""Return whether refs satisfy the declared verify-adapter schema.
The conversion layer uses this as a fail-closed guard: raw verify findings
are emitted only after their refs are declared, complete, enum-safe, and
type-compatible with ``codes.yaml``. Most accepted codes are declared as
``emit_tier: verify_pipeline``; codes in
:data:`VERIFY_SHARED_STATIC_CODES` are legacy static diagnostics that a
structural verify algorithm may also emit.
:param code: Diagnostic code to validate.
:type code: str
:param refs: Candidate refs payload.
:type refs: Mapping[str, Any]
:return: ``True`` when the code may be emitted by the verify adapter and
refs match its schema.
:rtype: bool
Examples::
>>> _refs_match_code_schema('W_DEAD_GUARD', {
... 'algorithm_name': 'dead_guard',
... 'verification_scope': 'smt_local',
... 'transition': {
... 'parent': 'Root',
... 'from_state': 'A',
... 'to_state': 'B',
... },
... 'transition_summary': 'Root:A->B',
... })
True
>>> _refs_match_code_schema('W_UNREACHABLE_STATE', {
... 'state_path': 'Root.Orphan',
... })
True
"""
spec = CODE_REGISTRY.get(code)
if spec is None:
return False
if spec.emit_tier != 'verify_pipeline' and code not in VERIFY_SHARED_STATIC_CODES:
return False
declared = set(spec.refs_schema.keys())
if set(refs.keys()) - declared:
return False
for field_name in spec.required_fields():
if field_name not in refs or refs[field_name] is None:
return False
for field_name, field_spec in spec.refs_schema.items():
if field_name not in refs:
continue
value = refs[field_name]
if field_spec.enum and value not in set(field_spec.enum):
return False
if not _type_matches_schema(value, field_spec):
return False
return True
def _make_verify_diagnostic(
code: str,
refs: Mapping[str, Any],
span: Optional[Span],
) -> Optional[ModelDiagnostic]:
"""Create one ``ModelDiagnostic`` from verified refs.
Verify diagnostics must bind back to the source object declared by the
code registry. A raw verify finding with valid refs but no source object
is dropped instead of becoming a public spanless diagnostic.
:param code: Verify-pipeline diagnostic code.
:type code: str
:param refs: Candidate refs payload.
:type refs: Mapping[str, Any]
:param span: Best-effort source span for the diagnostic.
:type span: Optional[Span]
:return: ``ModelDiagnostic`` when refs pass schema validation, otherwise
``None``.
:rtype: Optional[ModelDiagnostic]
Examples::
>>> diag = _make_verify_diagnostic('I_NONTRIVIAL_SCC', {
... 'algorithm_name': 'strongly_connected_components',
... 'verification_scope': 'topological_only',
... 'representative_state_path': 'Root.A',
... 'scc': ['Root.A'],
... }, Span(line=2, column=5))
>>> diag.code
'I_NONTRIVIAL_SCC'
"""
if not _refs_match_code_schema(code, refs):
return None
spec = CODE_REGISTRY[code]
if (
spec.span_object is not None
and code not in KNOWN_SPANLESS_CODES
and span is None
):
return None
return ModelDiagnostic(
code=code,
severity=spec.severity,
message=spec.description,
span=span,
refs=dict(refs),
)
def _verify_diagnostics_from_results(
results: Sequence['InspectRunResult'],
states: Sequence[StateInfo],
transitions: Sequence[TransitionInfo],
events: Sequence[EventInfo],
actions: Sequence[ActionInfo],
) -> Tuple[ModelDiagnostic, ...]:
"""Convert inspect-adapter results into public model diagnostics.
SMT-local raw diagnostic dictionaries are consumed directly. Structural
results are interpreted only for algorithms whose payloads have an
explicit conversion. Indeterminate results are skipped even when partial
raw diagnostics are attached, so ``unknown`` or ``timeout`` does not
become a false warning.
:param results: Normalized inspect-adapter results.
:type results: Sequence[pyfcstm.verify.inspect_adapter.InspectRunResult]
:param states: Inspect state payloads.
:type states: Sequence[StateInfo]
:param transitions: Inspect transition payloads.
:type transitions: Sequence[TransitionInfo]
:param events: Inspect event payloads.
:type events: Sequence[EventInfo]
:param actions: Inspect action payloads.
:type actions: Sequence[ActionInfo]
:return: Converted verify diagnostics.
:rtype: Tuple[ModelDiagnostic, ...]
Examples::
>>> from pyfcstm.verify.inspect_adapter import InspectRunResult
>>> result = InspectRunResult(
... algorithm_name='dead_guard',
... complexity_tier='smt_linear',
... smt_logic='QF_LIRA',
... verification_scope='smt_local',
... diagnostic_codes=('W_DEAD_GUARD',),
... result_kind='unknown',
... diagnostics=(),
... reason='solver said unknown',
... raw_result=None,
... )
>>> _verify_diagnostics_from_results((result,), (), (), (), ())
()
"""
diagnostics: List[ModelDiagnostic] = []
for result in results:
if result.result_kind in {'unknown', 'timeout', 'undecidable_skip'}:
continue
if result.diagnostics:
for raw in result.diagnostics:
if not isinstance(raw, Mapping):
continue
code = raw.get('code')
if not isinstance(code, str):
continue
refs = _verify_smt_refs(raw)
if refs is None:
continue
diagnostic = _make_verify_diagnostic(
code,
refs,
_verify_smt_span(code, raw, states, transitions, actions),
)
if diagnostic is not None:
diagnostics.append(diagnostic)
continue
diagnostics.extend(_structural_verify_diagnostics(result, states, events))
return tuple(diagnostics)
def _deduplicate_model_diagnostics(
diagnostics: Sequence[ModelDiagnostic],
) -> Tuple[ModelDiagnostic, ...]:
"""Remove semantic duplicates while preserving diagnostic order.
The inspect surface may combine legacy design-health warnings with optional
verify-pipeline diagnostics. When both paths report the same unreachable
state, callers should see one diagnostic rather than two equivalent
warnings. Other diagnostics are left untouched because repeated findings on
one state can represent distinct source occurrences.
:param diagnostics: Diagnostics in emission order.
:type diagnostics: Sequence[ModelDiagnostic]
:return: Diagnostics with duplicate state-scoped entries removed.
:rtype: Tuple[ModelDiagnostic, ...]
Examples::
>>> diag = ModelDiagnostic(
... code='W_UNREACHABLE_STATE',
... severity='warning',
... message='unreachable',
... refs={'state_path': 'Root.Orphan'},
... )
>>> len(_deduplicate_model_diagnostics((diag, diag)))
1
"""
out: List[ModelDiagnostic] = []
seen_state_scoped = set()
for diagnostic in diagnostics:
state_path = diagnostic.refs.get('state_path')
if diagnostic.code == 'W_UNREACHABLE_STATE' and isinstance(state_path, str):
key = (diagnostic.code, state_path)
if key in seen_state_scoped:
continue
seen_state_scoped.add(key)
out.append(diagnostic)
return tuple(out)
[docs]
def inspect_model(
machine: 'StateMachine',
*,
deep_hierarchy_threshold: int = DEFAULT_DEEP_HIERARCHY_THRESHOLD,
large_composite_threshold: int = DEFAULT_LARGE_COMPOSITE_THRESHOLD,
var_to_leaf_ratio_threshold: float = DEFAULT_VAR_TO_LEAF_RATIO_THRESHOLD,
enable_verify: bool = False,
max_complexity_tier: str = 'structural',
max_call_count_scaling: str = 'linear_in_transitions',
smt_timeout_ms: Optional[int] = None,
) -> ModelInspect:
"""
Build a structured inspection report for a state machine model.
The report combines the structural payload, the five derived view
graphs, and design-health diagnostics that can be computed from the
inspect surface.
:param machine: The state machine model to inspect.
:type machine: pyfcstm.model.StateMachine
:param deep_hierarchy_threshold: Maximum accepted hierarchy depth.
:type deep_hierarchy_threshold: int
:param large_composite_threshold: Maximum accepted number of direct
child states in one composite.
:type large_composite_threshold: int
:param var_to_leaf_ratio_threshold: Maximum accepted variable to
non-pseudo leaf-state ratio.
:type var_to_leaf_ratio_threshold: float
:param enable_verify: Whether to run inspect-eligible
:mod:`pyfcstm.verify` algorithms and append their diagnostics.
The default ``False`` preserves the Layer 2 inspect contract.
:type enable_verify: bool, optional
:param max_complexity_tier: Maximum verify complexity tier accepted by
the inspect adapter when ``enable_verify`` is true.
``"structural"`` keeps the default to graph-only verification.
:type max_complexity_tier: str, optional
:param max_call_count_scaling: Maximum verify call-count scaling accepted
by the inspect adapter when ``enable_verify`` is true.
:type max_call_count_scaling: str, optional
:param smt_timeout_ms: Optional solver timeout forwarded to SMT-local
verify algorithms. ``None`` preserves the raw verify default of no
configured timeout.
:type smt_timeout_ms: Optional[int], optional
:return: Structured view of the model.
:rtype: ModelInspect
Examples::
>>> from pyfcstm.dsl import parse_with_grammar_entry
>>> from pyfcstm.model import parse_dsl_node_to_state_machine
>>> source = '''
... state Root {
... state A;
... state B;
... [*] -> A;
... A -> B;
... }
... '''
>>> ast = parse_with_grammar_entry(source, 'state_machine_dsl')
>>> machine = parse_dsl_node_to_state_machine(ast)
>>> report = inspect_model(machine)
>>> report.reachability_graph['Root.A']
('Root.B',)
>>> verify_report = inspect_model(machine, enable_verify=True)
>>> len(verify_report.diagnostics) >= len(report.diagnostics)
True
"""
deep_hierarchy_threshold = _normalize_int_threshold(
'deep_hierarchy_threshold',
deep_hierarchy_threshold,
)
large_composite_threshold = _normalize_int_threshold(
'large_composite_threshold',
large_composite_threshold,
)
var_to_leaf_ratio_threshold = _normalize_float_threshold(
'var_to_leaf_ratio_threshold',
var_to_leaf_ratio_threshold,
)
states = _build_state_infos(machine)
transitions = _build_transition_infos(machine)
variables = _build_variable_infos(machine, states)
events = _build_event_infos(machine, transitions)
actions = _build_action_infos(machine)
forced_transitions = _build_forced_transition_infos(machine)
metrics = _build_metrics(states, transitions, variables, events)
reachability_graph = _build_reachability_graph(states, transitions)
root_state_path = _state_path(machine.root_state)
diagnostics = list(collect_design_health_warnings(
states,
transitions,
variables,
events,
actions,
forced_transitions,
metrics,
reachability_graph,
root_state_path=root_state_path,
deep_hierarchy_threshold=deep_hierarchy_threshold,
large_composite_threshold=large_composite_threshold,
var_to_leaf_ratio_threshold=var_to_leaf_ratio_threshold,
machine=machine,
))
if enable_verify:
verify_results = _run_verify_inspect_algorithms(
machine,
max_complexity_tier=max_complexity_tier,
max_call_count_scaling=max_call_count_scaling,
smt_timeout_ms=smt_timeout_ms,
)
diagnostics.extend(_verify_diagnostics_from_results(
verify_results,
states,
transitions,
events,
actions,
))
diagnostics = list(_deduplicate_model_diagnostics(diagnostics))
return ModelInspect(
root_state_path=root_state_path,
states=states,
transitions=transitions,
variables=variables,
events=events,
actions=actions,
forced_transitions=forced_transitions,
metrics=metrics,
reachability_graph=reachability_graph,
event_emission_map=_build_event_emission_map(events),
var_dataflow=_build_var_dataflow(variables),
aspect_impact_map=_build_aspect_impact_map(states),
action_ref_graph=_build_action_ref_graph(machine),
diagnostics=tuple(diagnostics),
)
def _normalize_int_threshold(name: str, value: int) -> int:
if isinstance(value, bool):
raise TypeError(f'{name} must be an integer threshold, got bool')
if isinstance(value, int):
return value
if isinstance(value, float):
if math.isfinite(value) and value.is_integer():
return int(value)
raise ValueError(f'{name} must be an integer threshold, got {value!r}')
raise TypeError(f'{name} must be an integer threshold, got {type(value).__name__}')
def _normalize_float_threshold(name: str, value: float) -> float:
if isinstance(value, bool):
raise TypeError(f'{name} must be a finite numeric threshold, got bool')
if isinstance(value, (int, float)):
normalized = float(value)
if math.isfinite(normalized):
return normalized
raise ValueError(f'{name} must be a finite numeric threshold, got {value!r}')
raise TypeError(f'{name} must be a finite numeric threshold, got {type(value).__name__}')
def _to_json_dataclass(obj: Any) -> Any:
if hasattr(obj, '__dataclass_fields__'):
return {
name: _to_json_dataclass(getattr(obj, name))
for name in obj.__dataclass_fields__
if name not in {
'span',
'effect_spans',
'effect_self_assign_spans',
'float_literal_assignment_spans',
}
}
if isinstance(obj, tuple):
return [_to_json_dataclass(x) for x in obj]
if isinstance(obj, list): # pragma: no cover
# Defensive: the current model emits ``Tuple[...]`` fields
# exclusively, but future payloads may introduce list-typed
# dataclass fields.
return [_to_json_dataclass(x) for x in obj]
if isinstance(obj, dict): # pragma: no cover
# Same as list: the current ModelInspect keeps every dict field
# at the top level, but nested dict payloads should still
# serialize predictably if introduced later.
return {str(k): _to_json_dataclass(v) for k, v in obj.items()}
return obj
def _to_json_inspect(report: ModelInspect) -> Dict[str, Any]:
"""Custom serializer that flattens dataclass attribute names cleanly."""
return {
'root_state_path': report.root_state_path,
'states': [_to_json_dataclass(s) for s in report.states],
'transitions': [_to_json_dataclass(t) for t in report.transitions],
'variables': [_to_json_dataclass(v) for v in report.variables],
'events': [_to_json_dataclass(e) for e in report.events],
'actions': [_to_json_dataclass(a) for a in report.actions],
'forced_transitions': [
_to_json_dataclass(f) for f in report.forced_transitions
],
'metrics': _to_json_dataclass(report.metrics),
'reachability_graph': {k: list(v) for k, v in report.reachability_graph.items()},
'event_emission_map': {k: list(v) for k, v in report.event_emission_map.items()},
'var_dataflow': {
k: {kk: list(vv) for kk, vv in inner.items()}
for k, inner in report.var_dataflow.items()
},
'aspect_impact_map': {k: list(v) for k, v in report.aspect_impact_map.items()},
'action_ref_graph': {k: list(v) for k, v in report.action_ref_graph.items()},
'diagnostics': [_diagnostic_to_json(d) for d in report.diagnostics],
}
def _diagnostic_to_json(d: ModelDiagnostic) -> Dict[str, Any]:
span = None
if d.span is not None:
span = {
'line': d.span.line,
'column': d.span.column,
'end_line': d.span.end_line,
'end_column': d.span.end_column,
}
return {
'code': d.code,
'severity': d.severity,
'message': d.message,
'span': span,
'refs': _to_json_dataclass(d.refs),
}