Source code for pyfcstm.diagnostics.analyzers.use_def

"""Use-def graph construction for guard-affect data-flow analysis."""

from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple

if TYPE_CHECKING:  # pragma: no cover - import-time type hints only
    from ...model.expr import Expr
    from ...model.model import OperationStatement, StateMachine


[docs] @dataclass(frozen=True) class UseDefGraph: """Directed variable dependency graph. Each edge is ``(source, target)`` and means that ``target``'s value may depend on ``source`` through an assignment expression or an enclosing operation-block condition. """ edges: Tuple[Tuple[str, str], ...] dependencies_by_target: Dict[str, Tuple[str, ...]]
[docs] def dependencies_of(self, target: str) -> Tuple[str, ...]: """Return variables that may affect ``target``.""" return self.dependencies_by_target.get(target, tuple())
[docs] def affecting_variables(self, direct_variables: Iterable[str]) -> Tuple[str, ...]: """Return all variables that can flow into ``direct_variables``.""" direct = set(direct_variables) seen: Set[str] = set(direct) out: Set[str] = set() queue = list(direct) index = 0 while index < len(queue): target = queue[index] index += 1 for source in self.dependencies_of(target): if source in seen: continue seen.add(source) out.add(source) queue.append(source) return tuple(sorted(out))
[docs] def build_use_def_graph(machine: 'StateMachine') -> UseDefGraph: """Build a conservative use-def graph from concrete actions/effects.""" edges: Set[Tuple[str, str]] = set() for state in machine.walk_states(): for collection in ( state.on_enters, state.on_durings, state.on_exits, state.on_during_aspects, ): for action in collection: if getattr(action, 'is_abstract', False): continue _walk_block(action.operations or [], set(), edges) for transition in state.transitions: _walk_block(transition.effects or [], set(), edges) ordered_edges = tuple(sorted(edges)) deps: Dict[str, List[str]] = {} for source, target in ordered_edges: deps.setdefault(target, []).append(source) return UseDefGraph( edges=ordered_edges, dependencies_by_target={ target: tuple(sources) for target, sources in sorted(deps.items()) }, )
[docs] def collect_expr_variables(expr: 'Expr') -> Tuple[str, ...]: """Return variable names read by ``expr`` in stable first-seen order.""" out: List[str] = [] _walk_expr_collect(expr, out) seen = set() deduped: List[str] = [] for name in out: if name in seen: continue seen.add(name) deduped.append(name) return tuple(deduped)
def _walk_block( statements: Iterable['OperationStatement'], enclosing_cond_vars: Set[str], edges: Set[Tuple[str, str]], ) -> None: from ...model.model import IfBlock, Operation for stmt in statements: if isinstance(stmt, Operation): deps = set(collect_expr_variables(stmt.expr)) | set(enclosing_cond_vars) for source in deps: edges.add((source, stmt.var_name)) continue if isinstance(stmt, IfBlock): accumulated: Set[str] = set() for branch in stmt.branches: this_cond = ( set(collect_expr_variables(branch.condition)) if branch.condition is not None else set() ) branch_enclosing = ( set(enclosing_cond_vars) | accumulated | this_cond ) _walk_block(branch.statements, branch_enclosing, edges) accumulated.update(this_cond) continue raise TypeError( f'unhandled OperationStatement subclass: {type(stmt).__name__}' ) def _walk_expr_collect(expr: 'Expr', out: List[str]) -> None: from ...model.expr import ( BinaryOp, Boolean, ConditionalOp, Float, Integer, UFunc, UnaryOp, Variable, ) if isinstance(expr, Variable): out.append(expr.name) return if isinstance(expr, UnaryOp): _walk_expr_collect(expr.x, out) return if isinstance(expr, BinaryOp): _walk_expr_collect(expr.x, out) _walk_expr_collect(expr.y, out) return if isinstance(expr, ConditionalOp): _walk_expr_collect(expr.cond, out) _walk_expr_collect(expr.if_true, out) _walk_expr_collect(expr.if_false, out) return if isinstance(expr, UFunc): _walk_expr_collect(expr.x, out) return if isinstance(expr, (Integer, Float, Boolean)): return raise TypeError(f'unhandled Expr subclass: {type(expr).__name__}')