Source code for pyfcstm.diagnostics.inspect

"""
Structured model inspection for pyfcstm.

This module provides :func:`inspect_model`, a single entry point that
walks a :class:`pyfcstm.model.StateMachine` and produces a stable,
serialization-friendly view of its structure plus five derived
relational graphs (reachability, event emission, variable data flow,
aspect impact, action reference). The output is the foundation that
Layer 2 design-health warnings (``W_*`` / ``I_*`` codes) and downstream
LLM / evaluation tooling consume.

The view shape is the **single source of truth** for the pyfcstm /
jsfcstm contract. Adding or renaming a field here must be mirrored on
the jsfcstm side (``editors/jsfcstm/src/diagnostics/inspect.ts``) and
in ``pyfcstm/diagnostics/schema.json``.

The module exposes the following dataclasses:

* :class:`StateInfo` — per-state structural summary
* :class:`TransitionInfo` — per-transition structural summary
* :class:`VariableInfo` — per-variable structural summary plus
  participation flags used by ``W_UNREFERENCED_VAR``
* :class:`EventInfo` — per-event structural summary
* :class:`ModelMetrics` — aggregate counts and ratios
* :class:`ModelInspect` — top-level container including diagnostics

Example::

    >>> from pyfcstm.dsl import parse_with_grammar_entry
    >>> from pyfcstm.model.parse import parse_dsl_node_to_state_machine
    >>> from pyfcstm.diagnostics import inspect_model
    >>> source = '''
    ... def int counter = 0;
    ... state Root {
    ...     state Idle;
    ...     state Active;
    ...     [*] -> Idle;
    ...     Idle -> Active : if [counter > 0];
    ... }
    ... '''
    >>> ast = parse_with_grammar_entry(source, 'state_machine_dsl')
    >>> machine = parse_dsl_node_to_state_machine(ast)
    >>> report = inspect_model(machine)
    >>> report.metrics.n_states_leaf
    2
"""

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from ..utils.validate import ModelDiagnostic

if TYPE_CHECKING:  # pragma: no cover - import-time forward refs only
    from ..model.expr import Expr
    from ..model.model import (
        OperationStatement,
        OnAspect,
        OnStage,
        StateMachine,
        Transition,
    )


[docs] @dataclass(frozen=True) class StateInfo: """ Structural summary of a single state. :param path: Dotted hierarchical path, e.g. ``'Root.SubSystem.Active'``. :type path: str :param name: Short name of the state (last component of ``path``). :type name: str :param parent_path: Dotted path of the parent state, or ``None`` for the root state. :type parent_path: Optional[str] :param is_leaf: ``True`` when this state has no substates. :type is_leaf: bool :param is_pseudo: ``True`` when the state was declared with ``pseudo state``. :type is_pseudo: bool :param is_composite: ``True`` when this state has substates. :type is_composite: bool :param substates: Direct-child state paths, in source order. :type substates: Tuple[str, ...] :param initial_targets: Each item describes one ``[*] -> X`` initial transition declared inside this composite. ``target`` is the target child path, ``guard`` is the source text of the guard or ``None``, ``event`` is the qualified event name or ``None``, ``is_unconditional`` is ``True`` only when both guard and event are absent. :type initial_targets: Tuple[Mapping[str, Any], ...] :param entry_actions: Action labels (function name or ``'<inline>'``) for ``enter`` actions on this state, in source order. :type entry_actions: Tuple[str, ...] :param during_actions: Action labels for ``during`` actions. :type during_actions: Tuple[str, ...] :param exit_actions: Action labels for ``exit`` actions. :type exit_actions: Tuple[str, ...] :param aspect_before: Aspect-action labels for ``>> during before``. :type aspect_before: Tuple[str, ...] :param aspect_after: Aspect-action labels for ``>> during after``. :type aspect_after: Tuple[str, ...] :param has_abstract_action: ``True`` if any of the actions above is abstract. Used by :class:`VariableInfo` confidence judgements. :type has_abstract_action: bool """ path: str name: str parent_path: Optional[str] is_leaf: bool is_pseudo: bool is_composite: bool substates: Tuple[str, ...] initial_targets: Tuple[Dict[str, Any], ...] entry_actions: Tuple[str, ...] during_actions: Tuple[str, ...] exit_actions: Tuple[str, ...] aspect_before: Tuple[str, ...] aspect_after: Tuple[str, ...] has_abstract_action: bool
[docs] @dataclass(frozen=True) class TransitionInfo: """ Structural summary of a single transition. :param from_path: Dotted path of the source state, or the literal ``'[*]'`` for an initial transition declared at the root. :type from_path: str :param to_path: Dotted path of the target state, or ``'[*]'`` for an exit transition. :type to_path: str :param event: Qualified event name (e.g. ``'Root.SubA.E'``) or ``None`` if the transition has no event. :type event: Optional[str] :param event_scope: ``'local'``, ``'chain'``, ``'absolute'``, or ``None`` when there is no event. :type event_scope: Optional[str] :param guard: Source text of the guard expression, or ``None``. :type guard: Optional[str] :param effect: Source text of the effect block, or ``None``. :type effect: Optional[str] :param is_forced: ``True`` when the transition was expanded from a ``!``-prefixed forced transition. :type is_forced: bool :param forced_origin: Raw source text of the original ``!X -> Y`` declaration when ``is_forced`` is ``True``, otherwise ``None``. :type forced_origin: Optional[str] """ from_path: str to_path: str event: Optional[str] event_scope: Optional[str] guard: Optional[str] effect: Optional[str] is_forced: bool forced_origin: Optional[str]
[docs] @dataclass(frozen=True) class VariableInfo: """ Structural summary of a variable definition plus participation flags. The ``participates_directly`` and ``participates_indirectly`` flags are precomputed here so that PR-C's ``W_UNREFERENCED_VAR`` / ``I_UNREFERENCED_VAR_MAYBE_ABSTRACT`` rules can be expressed as a one-line filter against this object. :param name: Variable identifier. :type name: str :param type: Declared type, currently ``'int'`` or ``'float'``. :type type: str :param init_value: Source text of the initializer expression. :type init_value: str :param read_in_states: State paths where the variable is read inside any action (``enter`` / ``during`` / ``exit`` / aspect). :type read_in_states: Tuple[str, ...] :param written_in_states: State paths where the variable is written inside any action. :type written_in_states: Tuple[str, ...] :param read_in_guards: Tuples ``(from_path, to_path)`` of transitions whose guard reads this variable. :type read_in_guards: Tuple[Tuple[str, str], ...] :param written_in_effects: Tuples ``(from_path, to_path)`` of transitions whose effect block writes this variable. :type written_in_effects: Tuple[Tuple[str, str], ...] :param participates_directly: ``True`` when the variable is read by at least one guard, transition effect, or action operation. :type participates_directly: bool :param participates_indirectly: ``True`` when the variable is not directly referenced but is transitively reachable through write-then-read data dependency across blocks. PR-A keeps this field as ``False`` for variables that lack any read; PR-C replaces it with the closure-based computation. :type participates_indirectly: bool :param abstract_actions_in_scope: Function names of abstract actions whose enclosing state is on the ancestor chain or sub-tree of any state that touches this variable. Used by PR-C to split ``W_UNREFERENCED_VAR`` (high confidence) from ``I_UNREFERENCED_VAR_MAYBE_ABSTRACT`` (low confidence). :type abstract_actions_in_scope: Tuple[str, ...] """ name: str type: str init_value: str read_in_states: Tuple[str, ...] written_in_states: Tuple[str, ...] read_in_guards: Tuple[Tuple[str, str], ...] written_in_effects: Tuple[Tuple[str, str], ...] participates_directly: bool participates_indirectly: bool abstract_actions_in_scope: Tuple[str, ...]
[docs] @dataclass(frozen=True) class EventInfo: """ Structural summary of an event declaration. :param qualified_name: Dotted fully qualified event name (e.g. ``'Root.SubA.E'``). :type qualified_name: str :param scope: ``'local'``, ``'chain'``, or ``'absolute'``. :type scope: str :param used_by: ``(from_path, to_path)`` tuples for every transition that references this event. :type used_by: Tuple[Tuple[str, str], ...] """ qualified_name: str scope: str used_by: Tuple[Tuple[str, str], ...]
[docs] @dataclass(frozen=True) class ModelMetrics: """ Aggregate model metrics. :param n_states_leaf: Number of leaf states excluding pseudo states. :type n_states_leaf: int :param n_states_composite: Number of composite states. :type n_states_composite: int :param n_states_pseudo: Number of pseudo states. :type n_states_pseudo: int :param max_hierarchy_depth: Maximum depth of state nesting, counted from the root (depth 0 = root). :type max_hierarchy_depth: int :param n_transitions_normal: Number of transitions that did not originate from a ``!``-forced declaration. :type n_transitions_normal: int :param n_transitions_forced: Number of transitions expanded from ``!``-forced declarations. :type n_transitions_forced: int :param n_events: Number of distinct qualified events used in transitions. :type n_events: int :param n_variables: Number of variable definitions. :type n_variables: int :param var_to_leaf_ratio: ``n_variables / max(n_states_leaf, 1)``. :type var_to_leaf_ratio: float :param aspect_coverage: Mapping ``composite_path -> n_descendant_leaves`` for composite states that declare ``>> during`` aspects. :type aspect_coverage: Dict[str, int] :param abstract_action_inventory: Function names of every abstract action across the model, sorted for stable output. :type abstract_action_inventory: Tuple[str, ...] """ n_states_leaf: int n_states_composite: int n_states_pseudo: int max_hierarchy_depth: int n_transitions_normal: int n_transitions_forced: int n_events: int n_variables: int var_to_leaf_ratio: float aspect_coverage: Dict[str, int] abstract_action_inventory: Tuple[str, ...]
[docs] @dataclass(frozen=True) class ModelInspect: """ Top-level structured view of a state machine model. :param root_state_path: Dotted path of the root state. :type root_state_path: str :param states: All states walked from the root in pre-order. :type states: Tuple[StateInfo, ...] :param transitions: All transitions, including expanded forced transitions, in source order. :type transitions: Tuple[TransitionInfo, ...] :param variables: All ``def`` variables, in declaration order. :type variables: Tuple[VariableInfo, ...] :param events: All qualified events that appear in at least one transition, sorted by qualified name. :type events: Tuple[EventInfo, ...] :param metrics: Aggregate model metrics. :type metrics: ModelMetrics :param reachability_graph: Mapping state path → list of state paths reachable through normal transitions (BFS closure, ignoring guards). :type reachability_graph: Dict[str, Tuple[str, ...]] :param event_emission_map: Mapping event qualified name → list of source state paths that can emit it. :type event_emission_map: Dict[str, Tuple[str, ...]] :param var_dataflow: Mapping variable name → ``{'reads': [...], 'writes': [...]}`` of state paths. :type var_dataflow: Dict[str, Dict[str, Tuple[str, ...]]] :param aspect_impact_map: Mapping composite path → descendant leaf paths actually reached by its aspect actions. :type aspect_impact_map: Dict[str, Tuple[str, ...]] :param action_ref_graph: Mapping named-action function path → list of ``ref`` edges out of it. :type action_ref_graph: Dict[str, Tuple[str, ...]] :param diagnostics: Layer 1 ``E_*`` + Layer 2 ``W_*`` / ``I_*`` diagnostics. PR-A returns an empty tuple; PR-B / PR-C populate it. :type diagnostics: Tuple[ModelDiagnostic, ...] """ root_state_path: str states: Tuple[StateInfo, ...] transitions: Tuple[TransitionInfo, ...] variables: Tuple[VariableInfo, ...] events: Tuple[EventInfo, ...] metrics: ModelMetrics reachability_graph: Dict[str, Tuple[str, ...]] event_emission_map: Dict[str, Tuple[str, ...]] var_dataflow: Dict[str, Dict[str, Tuple[str, ...]]] aspect_impact_map: Dict[str, Tuple[str, ...]] action_ref_graph: Dict[str, Tuple[str, ...]] diagnostics: Tuple[ModelDiagnostic, ...] = field(default_factory=tuple)
[docs] def to_json(self) -> Dict[str, Any]: """ Serialize this inspection report to a plain JSON-friendly dict. Tuples are converted to lists; frozen dataclasses to dicts. ``ModelDiagnostic`` instances are serialized via their public attributes (``code``, ``severity``, ``message``, ``span``, ``refs``). :return: A dict that round-trips through :func:`json.dumps` without loss. :rtype: Dict[str, Any] Example:: >>> report.to_json()['root_state_path'] 'Root' """ return _to_json_inspect(self)
# --------------------------------------------------------------------------- # Implementation # --------------------------------------------------------------------------- _INIT_MARK = '[*]' _EXIT_MARK = '[*]' def _state_path(state: Any) -> str: path = getattr(state, 'path', None) if not path: # pragma: no cover # Defensive: grammar-produced State always has a non-empty # ``path`` tuple. Reaching this guard means a future state # builder shipped a half-initialized object; keep as fail-soft # to avoid masking a downstream rewrite that produced ''. return '' return '.'.join(p for p in path if p is not None) def _resolve_sibling_path(parent_state: Any, name: str) -> str: """Build the dotted path for a sibling-of-parent state name.""" parent_path = _state_path(parent_state) return f'{parent_path}.{name}' if parent_path else name def _transition_endpoint(parent_state: Any, marker_or_name: Any, is_source: bool) -> str: """Resolve ``Transition.from_state`` / ``to_state`` to a path string.""" # ``INIT_STATE`` / ``EXIT_STATE`` are SingletonMark instances; the # class name is sufficient because the model layer never reuses that # class for anything else. if marker_or_name.__class__.__name__ == '_StateSingletonMark': return _INIT_MARK if is_source else _EXIT_MARK if isinstance(marker_or_name, str): return _resolve_sibling_path(parent_state, marker_or_name) return str(marker_or_name) # pragma: no cover -- grammar produces # only INIT/EXIT singletons and string names; this str() fallback # exists for future AST extensions and never fires today. def _expr_text(expr: Optional['Expr']) -> Optional[str]: if expr is None: return None try: return str(expr.to_ast_node()) except Exception: # pragma: no cover # Defensive: grammar-emitted Expr.to_ast_node always succeeds. # The try/except guards against future Expr subclasses whose # to_ast_node could fail; keep as fail-soft so inspection # never crashes the IDE / CLI. return None def _effects_text(effects: List['OperationStatement']) -> Optional[str]: if not effects: return None parts: List[str] = [] for stmt in effects: try: parts.append(str(stmt.to_ast_node())) except Exception: # pragma: no cover # Defensive: see ``_expr_text`` — grammar-emitted stmts # always serialize. continue if not parts: # pragma: no cover # Unreachable while ``except`` above is unreachable. Belt-and- # braces guard so the empty-parts case still returns None # cleanly rather than producing an empty-string label. return None return ' '.join(parts) def _walk_expr_variables(expr: Optional['Expr']) -> List[str]: """Return variable names read by ``expr`` in left-to-right order.""" if expr is None: return [] seen: List[str] = [] _walk_expr_collect(expr, seen) return seen def _walk_expr_collect(expr: 'Expr', out: List[str]) -> None: from ..model.expr import ( BinaryOp, ConditionalOp, UFunc, UnaryOp, Variable, ) if isinstance(expr, Variable): out.append(expr.name) return # Constants have no children. if isinstance(expr, UnaryOp): _walk_expr_collect(expr.x, out) return if isinstance(expr, BinaryOp): _walk_expr_collect(expr.x, out) _walk_expr_collect(expr.y, out) return if isinstance(expr, ConditionalOp): _walk_expr_collect(expr.cond, out) _walk_expr_collect(expr.if_true, out) _walk_expr_collect(expr.if_false, out) return if isinstance(expr, UFunc): _walk_expr_collect(expr.x, out) return def _walk_stmt_reads_writes( stmt: 'OperationStatement', reads: List[str], writes: List[str], ) -> None: """Collect variable reads/writes across a single operation statement.""" from ..model.model import IfBlock, Operation if isinstance(stmt, Operation): writes.append(stmt.var_name) for v in _walk_expr_variables(stmt.expr): reads.append(v) return if isinstance(stmt, IfBlock): for branch in stmt.branches: if branch.condition is not None: for v in _walk_expr_variables(branch.condition): reads.append(v) for inner in branch.statements: _walk_stmt_reads_writes(inner, reads, writes) def _stage_function_label(stage_item: 'OnStage') -> str: """Choose a stable label for an action (named, abstract, or inline).""" if stage_item.name: return stage_item.name if stage_item.is_ref and stage_item.ref is not None and getattr(stage_item.ref, 'name', None): return f'ref:{stage_item.ref.name}' return '<inline>' def _aspect_function_label(aspect_item: 'OnAspect') -> str: if aspect_item.name: return aspect_item.name if aspect_item.is_ref and aspect_item.ref is not None and getattr(aspect_item.ref, 'name', None): return f'ref:{aspect_item.ref.name}' return '<inline>' def _is_abstract(stage_or_aspect: Any) -> bool: return bool(getattr(stage_or_aspect, 'is_abstract', False)) def _qualified_event_name(transition: 'Transition', parent_state: Any) -> Optional[str]: event = transition.event if event is None: return None # ``Event.path_name`` is the canonical dot-separated identifier used # by the runtime for matching; reuse it as the public qualified name. return event.path_name def _event_scope( event: Any, parent_state: Any, from_state: Any, machine: Any, ) -> str: """Infer the scope label by comparing the event's owner path to context. The model layer does not store a scope enum on :class:`Event`, but the event's ``state_path`` uniquely determines which DSL operator declared it: * absolute (``/``) — owner is the root state path * chain (``:``) — owner is the transition's parent state path * local (``::``) — owner is the parent's path extended by ``from_state`` """ if event is None: # pragma: no cover # Defensive: callers (``_qualified_event_name``) already guard # on ``transition.event is None`` and short-circuit; reaching # here would mean the caller forgot the guard. return 'absolute' owner_path = tuple(event.state_path) root_path = tuple(machine.root_state.path) if machine is not None else () parent_path = tuple(parent_state.path) if parent_state is not None else () if owner_path == root_path: return 'absolute' if owner_path == parent_path: return 'chain' if isinstance(from_state, str) and owner_path == parent_path + (from_state,): return 'local' # Owner is somewhere else on the chain — conservative fallback. # Unreachable via real DSL (events are declared in local / chain / # absolute scopes — never in a sibling that the transition cannot # reach), but kept as a non-crashing safety net. return 'chain' # pragma: no cover def _state_actions( state: Any, ) -> Tuple[Tuple[str, ...], Tuple[str, ...], Tuple[str, ...], Tuple[str, ...], Tuple[str, ...], bool]: entries = tuple(_stage_function_label(a) for a in state.on_enters) durings = tuple(_stage_function_label(a) for a in state.on_durings) exits = tuple(_stage_function_label(a) for a in state.on_exits) asp_before = tuple( _aspect_function_label(a) for a in state.on_during_aspects if a.aspect == 'before' ) asp_after = tuple( _aspect_function_label(a) for a in state.on_during_aspects if a.aspect == 'after' ) has_abstract = any( _is_abstract(a) for collection in ( state.on_enters, state.on_durings, state.on_exits, state.on_during_aspects, ) for a in collection ) return entries, durings, exits, asp_before, asp_after, has_abstract def _initial_targets(state: Any) -> Tuple[Dict[str, Any], ...]: """Collect every ``[*] -> X`` initial transition declared inside the state.""" out: List[Dict[str, Any]] = [] for transition in state.transitions: if not _is_init_source(transition.from_state): continue target_name = transition.to_state target_path = _resolve_sibling_path(state, target_name) if isinstance(target_name, str) else _EXIT_MARK guard_text = _expr_text(transition.guard) event = transition.event event_name = event.name if event is not None else None out.append({ 'target': target_path, 'guard': guard_text, 'event': event_name, 'is_unconditional': guard_text is None and event_name is None, }) return tuple(out) def _is_init_source(from_state: Any) -> bool: from ..dsl.node import INIT_STATE return from_state is INIT_STATE def _is_forced_transition(transition: 'Transition') -> bool: return bool(getattr(transition, 'is_forced', False)) or hasattr( transition, 'forced_origin' ) and getattr(transition, 'forced_origin', None) is not None def _hierarchy_depth(states: Tuple[StateInfo, ...]) -> int: if not states: # pragma: no cover # Defensive: ``inspect_model`` always builds at least one # StateInfo (the root). Empty input would mean a caller used # this helper outside the pipeline; keep the fail-soft 0. return 0 return max(s.path.count('.') for s in states) def _build_state_infos(machine: 'StateMachine') -> Tuple[StateInfo, ...]: """Pre-order walk yielding one :class:`StateInfo` per state.""" out: List[StateInfo] = [] for state in machine.walk_states(): path = _state_path(state) name = state.name parent_path = _state_path(state.parent) if state.parent is not None else None is_leaf = state.is_leaf_state is_pseudo = bool(getattr(state, 'is_pseudo', False)) is_composite = not is_leaf substates = tuple( _resolve_sibling_path(state, sub_name) for sub_name in state.substates.keys() ) entries, durings, exits, asp_before, asp_after, has_abstract = _state_actions(state) out.append(StateInfo( path=path, name=name, parent_path=parent_path, is_leaf=is_leaf, is_pseudo=is_pseudo, is_composite=is_composite, substates=substates, initial_targets=_initial_targets(state), entry_actions=entries, during_actions=durings, exit_actions=exits, aspect_before=asp_before, aspect_after=asp_after, has_abstract_action=has_abstract, )) return tuple(out) def _build_transition_infos(machine: 'StateMachine') -> Tuple[TransitionInfo, ...]: out: List[TransitionInfo] = [] for state in machine.walk_states(): for transition in state.transitions: from_path = _transition_endpoint(state, transition.from_state, is_source=True) to_path = _transition_endpoint(state, transition.to_state, is_source=False) qualified_event = _qualified_event_name(transition, state) scope = ( _event_scope(transition.event, state, transition.from_state, machine) if transition.event is not None else None ) is_forced = _is_forced_transition(transition) forced_origin = getattr(transition, 'forced_origin', None) if is_forced else None out.append(TransitionInfo( from_path=from_path, to_path=to_path, event=qualified_event, event_scope=scope, guard=_expr_text(transition.guard), effect=_effects_text(transition.effects), is_forced=is_forced, forced_origin=forced_origin, )) return tuple(out) def _collect_action_reads_writes(state: Any) -> Tuple[Dict[str, bool], Dict[str, bool]]: """Aggregate variable reads/writes across all action blocks of a state.""" reads: Dict[str, bool] = {} writes: Dict[str, bool] = {} for collection in ( state.on_enters, state.on_durings, state.on_exits, state.on_during_aspects, ): for action in collection: if not action.operations: continue local_reads: List[str] = [] local_writes: List[str] = [] for stmt in action.operations: _walk_stmt_reads_writes(stmt, local_reads, local_writes) for name in local_reads: reads[name] = True for name in local_writes: writes[name] = True return reads, writes def _build_variable_infos( machine: 'StateMachine', states: Tuple[StateInfo, ...], ) -> Tuple[VariableInfo, ...]: var_reads_by_state: Dict[str, List[str]] = {name: [] for name in machine.defines} var_writes_by_state: Dict[str, List[str]] = {name: [] for name in machine.defines} var_read_guards: Dict[str, List[Tuple[str, str]]] = {name: [] for name in machine.defines} var_written_effects: Dict[str, List[Tuple[str, str]]] = {name: [] for name in machine.defines} state_lookup: Dict[str, StateInfo] = {s.path: s for s in states} for state in machine.walk_states(): path = _state_path(state) reads, writes = _collect_action_reads_writes(state) for var_name in reads: if var_name in var_reads_by_state: var_reads_by_state[var_name].append(path) for var_name in writes: if var_name in var_writes_by_state: var_writes_by_state[var_name].append(path) for transition in state.transitions: from_path = _transition_endpoint(state, transition.from_state, is_source=True) to_path = _transition_endpoint(state, transition.to_state, is_source=False) for v in _walk_expr_variables(transition.guard): if v in var_read_guards: var_read_guards[v].append((from_path, to_path)) for stmt in transition.effects: lreads: List[str] = [] lwrites: List[str] = [] _walk_stmt_reads_writes(stmt, lreads, lwrites) for v in lreads: if v in var_reads_by_state: var_reads_by_state[v].append(from_path) for v in lwrites: if v in var_written_effects: var_written_effects[v].append((from_path, to_path)) # Stable, deduped sequences for the public payload. def _dedupe_ordered(seq: List[str]) -> Tuple[str, ...]: seen = set() out: List[str] = [] for x in seq: if x not in seen: seen.add(x) out.append(x) return tuple(out) def _dedupe_pairs(seq: List[Tuple[str, str]]) -> Tuple[Tuple[str, str], ...]: seen = set() out: List[Tuple[str, str]] = [] for x in seq: if x not in seen: seen.add(x) out.append(x) return tuple(out) out: List[VariableInfo] = [] for name, var_define in machine.defines.items(): read_states = _dedupe_ordered(var_reads_by_state[name]) written_states = _dedupe_ordered(var_writes_by_state[name]) read_guards = _dedupe_pairs(var_read_guards[name]) written_effects = _dedupe_pairs(var_written_effects[name]) participates_directly = bool(read_states or read_guards) abstract_actions = _abstract_actions_in_scope(state_lookup, read_states, written_states) out.append(VariableInfo( name=name, type=var_define.type, init_value=_expr_text(var_define.init) or '', read_in_states=read_states, written_in_states=written_states, read_in_guards=read_guards, written_in_effects=written_effects, participates_directly=participates_directly, participates_indirectly=False, # PR-C will fill the closure. abstract_actions_in_scope=abstract_actions, )) return tuple(out) def _abstract_actions_in_scope( state_lookup: Dict[str, StateInfo], read_states: Tuple[str, ...], written_states: Tuple[str, ...], ) -> Tuple[str, ...]: """Return abstract action labels visible from any touching state.""" touched = set(read_states) | set(written_states) if not touched: # Variable touches no state — declared but unused. There is no # "scope" to look into, return empty. PR-C decides whether this # counts as W_ or I_. return tuple() out: List[str] = [] for path in sorted(touched): info = state_lookup.get(path) if info is None: # pragma: no cover # Defensive: ``touched`` paths come from VariableInfo # read_in_states / written_in_states, which are populated # only with paths that exist in state_lookup. Unreachable # in the current pipeline; kept as a safety net. continue if info.has_abstract_action: label = f'{info.path}:<abstract>' if label not in out: out.append(label) return tuple(out) def _build_event_infos(machine: 'StateMachine', transitions: Tuple[TransitionInfo, ...]) -> Tuple[EventInfo, ...]: """Group transitions by their qualified event name.""" event_users: Dict[str, List[Tuple[str, str]]] = {} event_scope: Dict[str, str] = {} for state in machine.walk_states(): for transition in state.transitions: qn = _qualified_event_name(transition, state) if qn is None: continue from_path = _transition_endpoint(state, transition.from_state, is_source=True) to_path = _transition_endpoint(state, transition.to_state, is_source=False) event_users.setdefault(qn, []).append((from_path, to_path)) event_scope[qn] = _event_scope( transition.event, state, transition.from_state, machine ) out: List[EventInfo] = [] for qn in sorted(event_users.keys()): out.append(EventInfo( qualified_name=qn, scope=event_scope.get(qn, 'absolute'), used_by=tuple(event_users[qn]), )) return tuple(out) def _build_metrics( states: Tuple[StateInfo, ...], transitions: Tuple[TransitionInfo, ...], variables: Tuple[VariableInfo, ...], events: Tuple[EventInfo, ...], ) -> ModelMetrics: n_pseudo = sum(1 for s in states if s.is_pseudo) n_leaf = sum(1 for s in states if s.is_leaf and not s.is_pseudo) n_composite = sum(1 for s in states if s.is_composite) n_normal = sum(1 for t in transitions if not t.is_forced) n_forced = sum(1 for t in transitions if t.is_forced) aspect_coverage: Dict[str, int] = {} for s in states: if not (s.is_composite and (s.aspect_before or s.aspect_after)): continue aspect_coverage[s.path] = sum( 1 for desc in states if desc.path != s.path and desc.path.startswith(s.path + '.') and desc.is_leaf and not desc.is_pseudo ) abstract_inventory: List[str] = [] for s in states: if s.has_abstract_action: abstract_inventory.append(s.path) return ModelMetrics( n_states_leaf=n_leaf, n_states_composite=n_composite, n_states_pseudo=n_pseudo, max_hierarchy_depth=_hierarchy_depth(states), n_transitions_normal=n_normal, n_transitions_forced=n_forced, n_events=len(events), n_variables=len(variables), var_to_leaf_ratio=(len(variables) / n_leaf) if n_leaf else 0.0, aspect_coverage=aspect_coverage, abstract_action_inventory=tuple(sorted(abstract_inventory)), ) def _build_reachability_graph( states: Tuple[StateInfo, ...], transitions: Tuple[TransitionInfo, ...], ) -> Dict[str, Tuple[str, ...]]: """BFS reachability over normal transitions, ignoring guards.""" # Adjacency list keyed by state path. ``[*]`` markers are skipped. adjacency: Dict[str, set] = {s.path: set() for s in states} initial_edges: Dict[str, set] = {s.path: set() for s in states} for t in transitions: if t.from_path == _INIT_MARK or t.to_path == _EXIT_MARK: # Initial / exit pseudo edges feed reachability from the # parent composite (the parent's "active" implies traversing # the initial transition). continue if t.from_path not in adjacency: # pragma: no cover # Defensive: transitions emitted by the model layer always # have from_path equal to a known state path (or the INIT # marker caught above). Unreachable through grammar-driven # input; kept as a safety net so a future synthesizer that # invents from_paths doesn't crash here. continue adjacency[t.from_path].add(t.to_path) for s in states: if s.is_composite and s.initial_targets: for it in s.initial_targets: target = it['target'] if target != _EXIT_MARK: initial_edges[s.path].add(target) out: Dict[str, Tuple[str, ...]] = {} for s in states: seen = set() queue = [s.path] while queue: cur = queue.pop(0) for nxt in sorted(adjacency.get(cur, set()) | initial_edges.get(cur, set())): if nxt in seen or nxt == s.path: continue seen.add(nxt) queue.append(nxt) out[s.path] = tuple(sorted(seen)) return out def _build_event_emission_map( events: Tuple[EventInfo, ...], ) -> Dict[str, Tuple[str, ...]]: out: Dict[str, Tuple[str, ...]] = {} for e in events: froms = sorted({pair[0] for pair in e.used_by if pair[0] != _INIT_MARK}) out[e.qualified_name] = tuple(froms) return out def _build_var_dataflow( variables: Tuple[VariableInfo, ...], ) -> Dict[str, Dict[str, Tuple[str, ...]]]: out: Dict[str, Dict[str, Tuple[str, ...]]] = {} for v in variables: out[v.name] = { 'reads': tuple(sorted(set(v.read_in_states))), 'writes': tuple(sorted(set(v.written_in_states))), } return out def _build_aspect_impact_map( states: Tuple[StateInfo, ...], ) -> Dict[str, Tuple[str, ...]]: out: Dict[str, Tuple[str, ...]] = {} for s in states: if not (s.is_composite and (s.aspect_before or s.aspect_after)): continue descendants = tuple(sorted( desc.path for desc in states if desc.path != s.path and desc.path.startswith(s.path + '.') and desc.is_leaf and not desc.is_pseudo )) out[s.path] = descendants return out def _build_action_ref_graph(machine: 'StateMachine') -> Dict[str, Tuple[str, ...]]: """Capture ``ref`` edges between named actions in the model.""" edges: Dict[str, List[str]] = {} for state in machine.walk_states(): path = _state_path(state) for collection in ( state.on_enters, state.on_durings, state.on_exits, state.on_during_aspects, ): for action in collection: source_label = _function_signature(state, path, action) if action.is_ref and action.ref is not None: target_label = _function_signature(None, None, action.ref) edges.setdefault(source_label, []).append(target_label) else: # Ensure even non-ref'd functions appear in the graph # so downstream "no outgoing edges" lookups work. edges.setdefault(source_label, []) return {key: tuple(sorted(set(value))) for key, value in edges.items()} def _function_signature(state: Any, default_path: Optional[str], action: Any) -> str: """Build a stable ``state_path:function`` label for a named action.""" action_path = getattr(action, 'state_path', None) if action_path is not None: normalized = '.'.join(p for p in action_path[:-1] if p is not None) or ( default_path or _state_path(state) ) else: # pragma: no cover # Defensive: grammar-emitted actions always have a non-empty # state_path. Reaching here means a future action synthesizer # produced an action without one; fall through to the # default_path / state.path chain so labels stay useful. normalized = default_path or _state_path(state) or '' leaf = (action.name or '<inline>') if action_path is None else (action_path[-1] or '<inline>') return f'{normalized}:{leaf}' if normalized else leaf
[docs] def inspect_model(machine: 'StateMachine') -> ModelInspect: """ Build a structured inspection report for a state machine model. PR-A focuses on the structural payload and the five derived view graphs; the ``diagnostics`` field stays empty until PR-B / PR-C add the ``W_*`` and ``I_*`` rules. :param machine: The state machine model to inspect. :type machine: pyfcstm.model.StateMachine :return: Structured view of the model. :rtype: ModelInspect Example:: >>> report = inspect_model(machine) >>> sorted(report.reachability_graph['Root.Sub.A']) ['Root.Sub.B'] """ states = _build_state_infos(machine) transitions = _build_transition_infos(machine) variables = _build_variable_infos(machine, states) events = _build_event_infos(machine, transitions) metrics = _build_metrics(states, transitions, variables, events) return ModelInspect( root_state_path=_state_path(machine.root_state), states=states, transitions=transitions, variables=variables, events=events, metrics=metrics, reachability_graph=_build_reachability_graph(states, transitions), event_emission_map=_build_event_emission_map(events), var_dataflow=_build_var_dataflow(variables), aspect_impact_map=_build_aspect_impact_map(states), action_ref_graph=_build_action_ref_graph(machine), diagnostics=tuple(), )
def _to_json_dataclass(obj: Any) -> Any: if hasattr(obj, '__dataclass_fields__'): return { name: _to_json_dataclass(getattr(obj, name)) for name in obj.__dataclass_fields__ } if isinstance(obj, tuple): return [_to_json_dataclass(x) for x in obj] if isinstance(obj, list): # pragma: no cover # Defensive: PR-A model emits ``Tuple[...]`` fields exclusively. # PR-B / PR-C may emit list-typed fields (e.g. open-ended # ``related_diagnostics``) — this branch reserves the recursion # for that future. return [_to_json_dataclass(x) for x in obj] if isinstance(obj, dict): # pragma: no cover # Same as list: PR-A keeps every dict field at the ModelInspect # top level (so they go through ``_to_json_inspect``). PR-B / PR-C # may nest dict payloads inside dataclass fields. return {str(k): _to_json_dataclass(v) for k, v in obj.items()} return obj def _to_json_inspect(report: ModelInspect) -> Dict[str, Any]: """Custom serializer that flattens dataclass attribute names cleanly.""" return { 'root_state_path': report.root_state_path, 'states': [_to_json_dataclass(s) for s in report.states], 'transitions': [_to_json_dataclass(t) for t in report.transitions], 'variables': [_to_json_dataclass(v) for v in report.variables], 'events': [_to_json_dataclass(e) for e in report.events], 'metrics': _to_json_dataclass(report.metrics), 'reachability_graph': {k: list(v) for k, v in report.reachability_graph.items()}, 'event_emission_map': {k: list(v) for k, v in report.event_emission_map.items()}, 'var_dataflow': { k: {kk: list(vv) for kk, vv in inner.items()} for k, inner in report.var_dataflow.items() }, 'aspect_impact_map': {k: list(v) for k, v in report.aspect_impact_map.items()}, 'action_ref_graph': {k: list(v) for k, v in report.action_ref_graph.items()}, 'diagnostics': [_diagnostic_to_json(d) for d in report.diagnostics], } def _diagnostic_to_json(d: ModelDiagnostic) -> Dict[str, Any]: span = None if d.span is not None: # pragma: no cover # Defensive: PR-A keeps ``ModelInspect.diagnostics`` empty. # PR-B / PR-C will emit W_*/I_* diagnostics that carry spans; # this serializer branch exists to be ready for them. Once a # fixture lands that exercises a spanned diagnostic, drop the # pragma. span = { 'line': d.span.line, 'column': d.span.column, 'end_line': d.span.end_line, 'end_column': d.span.end_column, } return { 'code': d.code, 'severity': d.severity, 'message': d.message, 'span': span, 'refs': dict(d.refs), }