"""
Operation parsing and execution utilities for Z3 solver integration.
This module provides functions to parse DSL operation code strings into
operation statements and execute them on Z3 variable dictionaries.
The module contains the following main components:
* :func:`parse_operations` - Parse DSL operation code string to statement tree
* :func:`execute_operations` - Execute operation statements on a Z3 variable dictionary
Example::
>>> from pyfcstm.solver.operation import parse_operations, execute_operations
>>> import z3
>>>
>>> # Parse operation statements from DSL string
>>> ops = parse_operations("x = x + 1; y = x * 2;", allowed_vars=['x', 'y'])
>>>
>>> # Execute operations on Z3 variables
>>> z3_vars = {'x': z3.Int('x'), 'y': z3.Int('y')}
>>> z3_state = {'x': z3.IntVal(5), 'y': z3.IntVal(0)}
>>> new_state = execute_operations(ops, z3_state)
>>> # new_state['x'] represents the Z3 expression: 5 + 1
>>> # new_state['y'] represents the Z3 expression: (5 + 1) * 2
"""
from collections.abc import Iterable
from dataclasses import dataclass
from typing import List, Dict, Mapping, Optional, Sequence, Tuple, Union
import z3
from .expr import expr_to_z3
from .domain import (
DomainConstraint,
DomainSource,
TranslationFailure,
merge_definedness_constraints,
translate_expr_domain,
)
from .logical import is_sat
from ..dsl.parse import parse_with_grammar_entry
from ..dsl import node as dsl_nodes
from ..model.expr import parse_expr_node_to_expr
from ..model.model import IfBlock, IfBlockBranch, Operation, OperationStatement
_Z3Expr = Union[z3.ArithRef, z3.BoolRef]
_Z3Env = Dict[str, _Z3Expr]
OperationSource = DomainSource
[docs]
@dataclass(frozen=True)
class OperationFailure:
"""Pure solver-layer operation execution failure.
:param kind: Normalized failure kind.
:type kind: str
:param reason: Human-readable failure reason.
:type reason: str
:param source: Optional source metadata, defaults to ``None``.
:type source: Optional[DomainSource], optional
:param translation_failure: Underlying expression translation failure,
defaults to ``None``.
:type translation_failure: Optional[TranslationFailure], optional
Example::
>>> failure = OperationFailure("value_error", "bad operation")
>>> failure.kind
'value_error'
"""
kind: str
reason: str
source: Optional[DomainSource] = None
translation_failure: Optional[TranslationFailure] = None
[docs]
@dataclass(frozen=True)
class OperationStep:
"""One successful assignment step in a domain-aware operation execution.
:param source: Optional source metadata for the assignment.
:type source: Optional[DomainSource]
:param before: Symbolic environment before the assignment.
:type before: Mapping[str, Union[z3.ArithRef, z3.BoolRef]]
:param after: Symbolic environment after the assignment.
:type after: Mapping[str, Union[z3.ArithRef, z3.BoolRef]]
:param path_conditions: Predicates that must hold before this assignment,
defaults to ``()``.
:type path_conditions: Tuple[z3.ExprRef, ...], optional
:param definedness_constraints: Runtime-definedness constraints introduced
by this assignment, defaults to ``()``.
:type definedness_constraints: Tuple[DomainConstraint, ...], optional
Example::
>>> import z3
>>> step = OperationStep(None, {"x": z3.Int("x")}, {"x": z3.IntVal(1)})
>>> step.after["x"]
1
"""
source: Optional[DomainSource]
before: Mapping[str, _Z3Expr]
after: Mapping[str, _Z3Expr]
path_conditions: Tuple[z3.ExprRef, ...] = ()
definedness_constraints: Tuple[DomainConstraint, ...] = ()
[docs]
@dataclass(frozen=True)
class OperationBranch:
"""One source branch in a domain-aware operation execution.
:param branch_id: Stable branch identifier within the local ``if`` block.
:type branch_id: Optional[str]
:param branch_kind: Branch kind: ``"if"``, ``"elif"``, or ``"else"``.
:type branch_kind: str
:param selector: Z3 predicate selecting this branch.
:type selector: z3.ExprRef
:param path_conditions: Predicates needed to reach the branch, defaults to
``()``.
:type path_conditions: Tuple[z3.ExprRef, ...], optional
:param status: Branch reachability status, defaults to ``"sat"``.
:type status: str, optional
:param result_env: Branch-local result environment, defaults to ``None``.
:type result_env: Optional[Mapping[str, Union[z3.ArithRef, z3.BoolRef]]], optional
:param definedness_constraints: Runtime-definedness constraints introduced
by this branch, defaults to ``()``.
:type definedness_constraints: Tuple[DomainConstraint, ...], optional
:param failure: Branch-local operation failure, defaults to ``None``.
:type failure: Optional[OperationFailure], optional
Example::
>>> import z3
>>> branch = OperationBranch("0", "if", z3.Bool("cond"))
>>> branch.branch_kind
'if'
"""
branch_id: Optional[str]
branch_kind: str
selector: z3.ExprRef
path_conditions: Tuple[z3.ExprRef, ...] = ()
status: str = "sat"
result_env: Optional[Mapping[str, _Z3Expr]] = None
definedness_constraints: Tuple[DomainConstraint, ...] = ()
failure: Optional[OperationFailure] = None
[docs]
def __post_init__(self) -> None:
"""Validate branch invariants after dataclass initialization.
:return: ``None``.
:rtype: None
:raises ValueError: If the branch kind, status, or unreachable-branch
payload is inconsistent.
Example::
>>> import z3
>>> OperationBranch("0", "if", z3.Bool("c")).status
'sat'
"""
if self.branch_kind not in ("if", "elif", "else"):
raise ValueError(f"Unsupported operation branch kind: {self.branch_kind}")
if self.status not in ("sat", "unsat", "unknown"):
raise ValueError(f"Unsupported operation branch status: {self.status}")
if self.status == "unsat":
if self.result_env is not None:
raise ValueError(
"Unreachable operation branches must not carry result_env."
)
if self.definedness_constraints:
raise ValueError(
"Unreachable operation branches must not carry definedness constraints."
)
if self.failure is not None:
raise ValueError(
"Unreachable operation branches must not carry failures."
)
[docs]
@dataclass(frozen=True)
class OperationExecution:
"""Domain-aware operation block execution result.
The result keeps the final visible environment separate from evidence about
steps, branches, and runtime-definedness side conditions. Temporary names
created inside a block can appear in internal steps, but :attr:`env` exposes
only names visible before execution started.
:param env: Final visible symbolic environment.
:type env: Mapping[str, Union[z3.ArithRef, z3.BoolRef]]
:param visible_names: Names preserved in the public final environment.
:type visible_names: Tuple[str, ...]
:param expr_constraints: Solver-only value constraints, defaults to ``()``.
:type expr_constraints: Tuple[z3.ExprRef, ...], optional
:param definedness_constraints: Runtime-definedness constraints collected
during execution, defaults to ``()``.
:type definedness_constraints: Tuple[DomainConstraint, ...], optional
:param steps: Assignment-step evidence, defaults to ``()``.
:type steps: Tuple[OperationStep, ...], optional
:param branches: Branch reachability evidence, defaults to ``()``.
:type branches: Tuple[OperationBranch, ...], optional
:param failure: Expected execution failure, defaults to ``None``.
:type failure: Optional[OperationFailure], optional
Example::
>>> import z3
>>> result = OperationExecution({"x": z3.IntVal(1)}, ("x",))
>>> result.visible_names
('x',)
"""
env: Mapping[str, _Z3Expr]
visible_names: Tuple[str, ...]
expr_constraints: Tuple[z3.ExprRef, ...] = ()
definedness_constraints: Tuple[DomainConstraint, ...] = ()
steps: Tuple[OperationStep, ...] = ()
branches: Tuple[OperationBranch, ...] = ()
failure: Optional[OperationFailure] = None
@dataclass
class _ExecutionResult:
"""Internal execution carrier used while walking operation statements.
:param env: Current symbolic environment.
:type env: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:param definedness_constraints: Runtime-definedness constraints collected
so far.
:type definedness_constraints: Tuple[DomainConstraint, ...]
:param steps: Assignment-step evidence collected so far.
:type steps: Tuple[OperationStep, ...]
:param branches: Branch evidence collected so far.
:type branches: Tuple[OperationBranch, ...]
:param failure: Expected execution failure, or ``None``.
:type failure: Optional[OperationFailure]
:param next_step: Next assignment-step index.
:type next_step: int
Example::
>>> result = _ExecutionResult({}, (), (), (), None, 0)
>>> result.failure is None
True
"""
env: _Z3Env
definedness_constraints: Tuple[DomainConstraint, ...]
steps: Tuple[OperationStep, ...]
branches: Tuple[OperationBranch, ...]
failure: Optional[OperationFailure]
next_step: int
[docs]
def parse_operations(
code: str, allowed_vars: Optional[List[str]] = None
) -> List[OperationStatement]:
"""
Parse DSL operation code string into a list of operation statements.
This function parses a DSL code string containing operation statements
(assignments and ``if`` blocks) and converts them into model-layer
operation statements. It can optionally validate that expressions only
reference variables that are already available at that point in the block.
Unknown assignment targets are treated as block-local temporary variables.
:param code: DSL operation code string to parse
:type code: str
:param allowed_vars: List of initially available variable names, or ``None`` for free mode
:type allowed_vars: Optional[List[str]]
:return: List of parsed operation statements
:rtype: List[OperationStatement]
:raises ValueError: If an expression references a variable that is not yet available
:raises pyfcstm.dsl.error.GrammarParseError: If DSL parsing fails
Example::
>>> ops = parse_operations("x = x + 1; y = 10;", allowed_vars=['x', 'y'])
>>> len(ops)
2
>>> ops[0].var_name
'x'
>>> # Free mode - no variable restrictions
>>> ops = parse_operations("a = b + c;", allowed_vars=None)
>>> # Restricted mode - raises ValueError for variables used before assignment
>>> ops = parse_operations("y = z + 1; z = 1;", allowed_vars=['x', 'y'])
Traceback (most recent call last):
...
ValueError: Variable 'z' is not in allowed variables: ['x', 'y']
"""
parsed_node = parse_with_grammar_entry(code, "operational_statement_set")
def _validate_expression_variables(expr, available_names: set) -> None:
"""Validate expression variable references against currently visible names.
:param expr: Model expression whose variable references should be
checked.
:type expr: pyfcstm.model.expr.Expr
:param available_names: Names visible before evaluating ``expr``.
:type available_names: set
:return: ``None``.
:rtype: None
:raises ValueError: If ``expr`` references a variable that is not yet
available in the current operation block.
Example::
>>> # This nested helper is exercised through parse_operations().
>>> parse_operations("tmp = x + 1;", allowed_vars=["x"])[0].var_name
'tmp'
"""
for var in expr.list_variables():
if var.name not in available_names:
raise ValueError(
f"Variable '{var.name}' is not in allowed variables: {allowed_vars}"
)
def _convert_statements(
statements: List[dsl_nodes.OperationalStatement],
available_names: Optional[set],
) -> List[OperationStatement]:
"""Convert parsed DSL operation nodes into model statements.
The conversion tracks block-local temporary variables in declaration
order. Assignment targets become visible after their expression has
been validated; ``if`` branch bodies inherit only the names visible
before entering the branch.
:param statements: Parsed DSL operation statement nodes.
:type statements: List[pyfcstm.dsl.node.OperationalStatement]
:param available_names: Names visible before the statement list, or
``None`` when reference validation is disabled.
:type available_names: Optional[set]
:return: Converted model operation statements.
:rtype: List[OperationStatement]
:raises TypeError: If a parsed statement node type is unknown.
:raises ValueError: If a referenced variable is not available.
Example::
>>> # The helper keeps branch-local temporaries from escaping.
>>> parse_operations("if [x > 0] { tmp = x; }", ["x"])
[IfBlock(branches=[IfBlockBranch(condition=BinaryOp(...), statements=[Operation(var_name='tmp', expr=Variable(name='x'))])])]
"""
converted = []
current_names = None if available_names is None else set(available_names)
for statement in statements:
if isinstance(statement, dsl_nodes.OperationAssignment):
expr = parse_expr_node_to_expr(statement.expr)
if current_names is not None:
_validate_expression_variables(expr, current_names)
current_names.add(statement.name)
converted.append(Operation(var_name=statement.name, expr=expr))
continue
if isinstance(statement, dsl_nodes.OperationIf):
base_names = None if current_names is None else set(current_names)
branches = []
for branch in statement.branches:
condition = None
if branch.condition is not None:
condition = parse_expr_node_to_expr(branch.condition)
if base_names is not None:
_validate_expression_variables(condition, base_names)
branch_names = None if base_names is None else set(base_names)
branch_statements = _convert_statements(
branch.statements, branch_names
)
branches.append(
IfBlockBranch(
condition=condition,
statements=branch_statements,
)
)
converted.append(IfBlock(branches=branches))
continue
raise TypeError(
f"Unknown operational statement node type {type(statement)!r}."
)
return converted
initial_names = None if allowed_vars is None else set(allowed_vars)
return _convert_statements(parsed_node, initial_names)
def _execute_operation_statements_symbolically(
statements: List[OperationStatement],
exprs: Dict[str, Union[z3.ArithRef, z3.BoolRef]],
) -> Dict[str, Union[z3.ArithRef, z3.BoolRef]]:
"""
Execute a sequence of operation statements symbolically.
:param statements: Statements to execute in order.
:type statements: List[OperationStatement]
:param exprs: Current symbolic environment.
:type exprs: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:return: Updated symbolic environment.
:rtype: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
Example::
>>> import z3
>>> ops = parse_operations("x = x + 1;", allowed_vars=["x"])
>>> env = _execute_operation_statements_symbolically(ops, {"x": z3.IntVal(1)})
>>> z3.simplify(env["x"])
2
"""
current_exprs = dict(exprs)
for statement in statements:
if isinstance(statement, Operation):
current_exprs[statement.var_name] = expr_to_z3(
statement.expr, current_exprs
)
elif isinstance(statement, IfBlock):
current_exprs = _execute_if_block_symbolically(statement, current_exprs)
else:
raise TypeError(f"Unknown operation statement type {type(statement)!r}.")
return current_exprs
def _execute_if_block_symbolically(
if_block: IfBlock,
exprs: Dict[str, Union[z3.ArithRef, z3.BoolRef]],
) -> Dict[str, Union[z3.ArithRef, z3.BoolRef]]:
"""
Execute an ``if`` block symbolically with branch-local merge semantics.
Each branch is symbolically executed from the same pre-``if`` environment.
Only names that were already visible before entering the ``if`` block
participate in the final merge. This keeps branch-local temporary
variables out of the merged outer environment.
:param if_block: If-block to execute.
:type if_block: IfBlock
:param exprs: Current symbolic environment.
:type exprs: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:return: Merged symbolic environment after the if-block.
:rtype: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
Example::
>>> import z3
>>> block = parse_operations("if [x > 0] { y = 1; } else { y = 2; }", ["x", "y"])[0]
>>> env = _execute_if_block_symbolically(block, {"x": z3.Int("x"), "y": z3.Int("y")})
>>> str(env["y"])
'If(0 < x, 1, 2)'
"""
base_exprs = dict(exprs)
visible_names = tuple(base_exprs.keys())
branch_results = []
for branch in if_block.branches:
branch_results.append(
(
expr_to_z3(branch.condition, base_exprs)
if branch.condition is not None
else None,
_execute_operation_statements_symbolically(
branch.statements, base_exprs
),
)
)
merged_exprs = dict(base_exprs)
for name in visible_names:
merged_value = base_exprs[name]
for branch_condition, branch_exprs in reversed(branch_results):
if branch_condition is None:
merged_value = branch_exprs[name]
else:
merged_value = z3.If(branch_condition, branch_exprs[name], merged_value)
merged_exprs[name] = merged_value
return merged_exprs
def _as_operation_list(
operations: Union[OperationStatement, List[OperationStatement]],
) -> List[OperationStatement]:
"""Normalize one operation statement or statement list to a list.
:param operations: Single statement or list of statements.
:type operations: Union[OperationStatement, List[OperationStatement]]
:return: Operation statement list.
:rtype: List[OperationStatement]
Example::
>>> op = parse_operations("x = 1;")[0]
>>> _as_operation_list(op) == [op]
True
"""
if isinstance(operations, OperationStatement):
return [operations]
return list(operations)
def _constraint_exprs(items: Sequence[DomainConstraint]) -> Tuple[z3.ExprRef, ...]:
"""Return raw Z3 predicates from domain-constraint objects.
:param items: Domain constraints.
:type items: Sequence[DomainConstraint]
:return: Raw Z3 predicates.
:rtype: Tuple[z3.ExprRef, ...]
Example::
>>> import z3
>>> _constraint_exprs((DomainConstraint(z3.Int("x") != 0),))
(x != 0,)
"""
return tuple(item.constraint for item in items)
def _true_expr() -> z3.BoolRef:
"""Return the constant true Z3 predicate.
:return: Z3 Boolean true.
:rtype: z3.BoolRef
Example::
>>> _true_expr()
True
"""
return z3.BoolVal(True)
def _and_expr(items: Sequence[z3.ExprRef]) -> z3.ExprRef:
"""Conjoin predicates while keeping empty and singleton cases compact.
:param items: Predicates to conjoin.
:type items: Sequence[z3.ExprRef]
:return: Combined predicate.
:rtype: z3.ExprRef
Example::
>>> import z3
>>> _and_expr((z3.BoolVal(True), z3.BoolVal(False)))
And(True, False)
"""
if not items:
return _true_expr()
if len(items) == 1:
return items[0]
return z3.And(*items)
def _path_guard(
domains: Sequence[DomainConstraint],
path_conditions: Sequence[z3.ExprRef],
) -> Tuple[DomainConstraint, ...]:
"""Guard domain constraints by the path that introduces them.
:param domains: Runtime-definedness constraints from the current step or
branch.
:type domains: Sequence[DomainConstraint]
:param path_conditions: Predicates needed to reach the step or branch.
:type path_conditions: Sequence[z3.ExprRef]
:return: Guarded domain constraints.
:rtype: Tuple[DomainConstraint, ...]
Example::
>>> import z3
>>> x = z3.Int("x")
>>> guarded = _path_guard((DomainConstraint(x != 0),), (x > 1,))
>>> guarded[0].constraint
Implies(x > 1, x != 0)
"""
if not domains:
return ()
if not path_conditions:
return tuple(domains)
guard = _and_expr(tuple(path_conditions))
return tuple(
DomainConstraint(
z3.Implies(guard, item.constraint),
source=item.source,
)
for item in domains
)
def _operation_failure_from_translation(
failure: TranslationFailure,
) -> OperationFailure:
"""Wrap an expression translation failure as an operation failure.
:param failure: Expression translation failure.
:type failure: TranslationFailure
:return: Operation failure preserving the original failure object.
:rtype: OperationFailure
Example::
>>> failure = TranslationFailure("value_error", "bad expression")
>>> _operation_failure_from_translation(failure).translation_failure is failure
True
"""
return OperationFailure(
kind=failure.kind,
reason=failure.reason,
source=failure.source,
translation_failure=failure,
)
def _operation_failure(
kind: str,
reason: str,
source: Optional[DomainSource],
) -> OperationFailure:
"""Create an operation failure without a nested translation failure.
:param kind: Normalized failure kind.
:type kind: str
:param reason: Human-readable failure reason.
:type reason: str
:param source: Optional source metadata.
:type source: Optional[DomainSource]
:return: Operation failure.
:rtype: OperationFailure
Example::
>>> _operation_failure("unsupported", "bad op", None).kind
'unsupported'
"""
return OperationFailure(kind=kind, reason=reason, source=source)
def _normalize_solver_status(status: str) -> str:
"""Normalize solver status strings for operation evidence.
:param status: Raw status string from a solver helper.
:type status: str
:return: ``"sat"``, ``"unsat"``, or ``"unknown"``.
:rtype: str
Example::
>>> _normalize_solver_status("timeout")
'unknown'
"""
return status if status in ("sat", "unsat") else "unknown"
def _execution_point_status(
*,
assumptions: Sequence[z3.ExprRef],
path_conditions: Sequence[z3.ExprRef],
definedness_constraints: Sequence[DomainConstraint],
prune_unreachable: bool,
timeout_ms: Optional[int],
) -> str:
"""Return whether the current execution point is reachable.
:param assumptions: Caller-known facts for the whole operation block.
:type assumptions: Sequence[z3.ExprRef]
:param path_conditions: Predicates needed to reach the current point.
:type path_conditions: Sequence[z3.ExprRef]
:param definedness_constraints: Runtime-definedness constraints collected
before the current point.
:type definedness_constraints: Sequence[DomainConstraint]
:param prune_unreachable: Whether to run the reachability query.
:type prune_unreachable: bool
:param timeout_ms: Optional solver timeout in milliseconds.
:type timeout_ms: Optional[int]
:return: Normalized reachability status.
:rtype: str
Example::
>>> _execution_point_status(
... assumptions=(),
... path_conditions=(),
... definedness_constraints=(),
... prune_unreachable=False,
... timeout_ms=None,
... )
'sat'
"""
if not prune_unreachable:
return "sat"
result = is_sat(
(
*assumptions,
*path_conditions,
*_constraint_exprs(definedness_constraints),
),
timeout_ms=timeout_ms,
)
return _normalize_solver_status(result.kind)
def _condition_expr_for_metadata(expr, env: _Z3Env) -> Optional[z3.ExprRef]:
"""Best-effort translation used only for unreachable-branch evidence.
:param expr: Branch condition expression.
:type expr: object
:param env: Symbolic environment used for translation.
:type env: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:return: Boolean condition expression, or ``None`` if it cannot be built
without affecting execution.
:rtype: Optional[z3.ExprRef]
Example::
>>> import z3
>>> cond = parse_operations("if [x > 0] { y = 1; }", ["x", "y"])[0].branches[0].condition
>>> _condition_expr_for_metadata(cond, {"x": z3.Int("x"), "y": z3.Int("y")})
0 < x
"""
try:
condition_expr = expr_to_z3(expr, env)
except NotImplementedError:
# NotImplementedError: expr_to_z3 raises this for supported expression
# nodes whose math function is intentionally unsupported by Z3.
return None
except ValueError:
# ValueError: expr_to_z3 raises this for unknown variables, unknown
# operators, and unsupported expression object types.
return None
except TypeError:
# TypeError: Python/Z3 operator overloads can reject malformed operand
# combinations before Z3 wraps the failure.
return None
except z3.Z3Exception:
# Z3Exception: Z3 rejects sort/operator-domain mismatches.
return None
if not z3.is_bool(condition_expr):
return None
return condition_expr
def _translate_operation_expr(
expr,
env: _Z3Env,
*,
assumptions: Sequence[z3.ExprRef],
path_conditions: Sequence[z3.ExprRef],
definedness_constraints: Sequence[DomainConstraint],
source: Optional[DomainSource],
prune_unreachable: bool,
timeout_ms: Optional[int],
):
"""Translate an operation expression in the current execution context.
:param expr: Operation expression to translate.
:type expr: object
:param env: Current symbolic environment.
:type env: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:param assumptions: Caller-known block facts.
:type assumptions: Sequence[z3.ExprRef]
:param path_conditions: Predicates needed to reach this expression.
:type path_conditions: Sequence[z3.ExprRef]
:param definedness_constraints: Runtime-definedness constraints collected
before this expression.
:type definedness_constraints: Sequence[DomainConstraint]
:param source: Optional source metadata.
:type source: Optional[DomainSource]
:param prune_unreachable: Whether nested conditional branches should be
pruned.
:type prune_unreachable: bool
:param timeout_ms: Optional solver timeout in milliseconds.
:type timeout_ms: Optional[int]
:return: Domain-aware expression translation.
:rtype: pyfcstm.solver.domain.ExprDomain
Example::
>>> import z3
>>> expr = parse_operations("x = 1 / y;", ["x", "y"])[0].expr
>>> result = _translate_operation_expr(
... expr,
... {"x": z3.Int("x"), "y": z3.Int("y")},
... assumptions=(),
... path_conditions=(),
... definedness_constraints=(),
... source=None,
... prune_unreachable=True,
... timeout_ms=None,
... )
>>> [str(item.constraint) for item in result.definedness_constraints]
['y != 0']
"""
return translate_expr_domain(
expr,
env,
assumptions=assumptions,
path_conditions=(
*path_conditions,
*_constraint_exprs(definedness_constraints),
),
source=source,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
)
def _branch_status(
*,
assumptions: Sequence[z3.ExprRef],
path_conditions: Sequence[z3.ExprRef],
definedness_constraints: Sequence[DomainConstraint],
selector: z3.ExprRef,
prune_unreachable: bool,
timeout_ms: Optional[int],
) -> str:
"""Return whether a candidate operation branch is reachable.
:param assumptions: Caller-known block facts.
:type assumptions: Sequence[z3.ExprRef]
:param path_conditions: Predicates needed to reach the enclosing branch
point.
:type path_conditions: Sequence[z3.ExprRef]
:param definedness_constraints: Runtime-definedness constraints collected
before this branch.
:type definedness_constraints: Sequence[DomainConstraint]
:param selector: Branch selector predicate.
:type selector: z3.ExprRef
:param prune_unreachable: Whether to run the reachability query.
:type prune_unreachable: bool
:param timeout_ms: Optional solver timeout in milliseconds.
:type timeout_ms: Optional[int]
:return: Normalized branch reachability status.
:rtype: str
Example::
>>> import z3
>>> _branch_status(
... assumptions=(),
... path_conditions=(),
... definedness_constraints=(),
... selector=z3.BoolVal(False),
... prune_unreachable=True,
... timeout_ms=None,
... )
'unsat'
"""
if not prune_unreachable:
return "sat"
result = is_sat(
(
*assumptions,
*path_conditions,
*_constraint_exprs(definedness_constraints),
selector,
),
timeout_ms=timeout_ms,
)
return _normalize_solver_status(result.kind)
def _merge_branch_env(
base_env: _Z3Env,
merge_names: Sequence[str],
branch_results: Sequence[Tuple[z3.ExprRef, Mapping[str, _Z3Expr]]],
) -> _Z3Env:
"""Merge branch environments with nested Z3 ``If`` expressions.
:param base_env: Environment before entering the ``if`` block.
:type base_env: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:param merge_names: Names that are visible outside the ``if`` block.
:type merge_names: Sequence[str]
:param branch_results: Branch selectors and their result environments.
:type branch_results: Sequence[Tuple[z3.ExprRef, Mapping[str, Union[z3.ArithRef, z3.BoolRef]]]]
:return: Merged symbolic environment.
:rtype: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
Example::
>>> import z3
>>> merged = _merge_branch_env(
... {"x": z3.Int("x")},
... ("x",),
... ((z3.Bool("take"), {"x": z3.IntVal(1)}),),
... )
>>> str(merged["x"])
'If(take, 1, x)'
"""
merged_env = dict(base_env)
for name in merge_names:
merged_value = base_env[name]
for selector, result_env in reversed(branch_results):
merged_value = z3.If(selector, result_env[name], merged_value)
merged_env[name] = merged_value
return merged_env
def _execute_operation_statements_domain(
statements: Sequence[OperationStatement],
env: _Z3Env,
*,
visible_names: Tuple[str, ...],
assumptions: Sequence[z3.ExprRef],
path_conditions: Sequence[z3.ExprRef],
definedness_constraints: Sequence[DomainConstraint],
source: Optional[DomainSource],
prune_unreachable: bool,
timeout_ms: Optional[int],
step_start: int,
) -> _ExecutionResult:
"""Execute a statement sequence with path and domain evidence.
:param statements: Operation statements to execute in order.
:type statements: Sequence[OperationStatement]
:param env: Starting symbolic environment.
:type env: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:param visible_names: Names visible at the public block boundary.
:type visible_names: Tuple[str, ...]
:param assumptions: Caller-known block facts.
:type assumptions: Sequence[z3.ExprRef]
:param path_conditions: Predicates needed to reach the statement sequence.
:type path_conditions: Sequence[z3.ExprRef]
:param definedness_constraints: Runtime-definedness constraints collected
before the sequence starts.
:type definedness_constraints: Sequence[DomainConstraint]
:param source: Optional source metadata copied into step-level metadata.
:type source: Optional[DomainSource]
:param prune_unreachable: Whether to stop translating paths proved
unreachable.
:type prune_unreachable: bool
:param timeout_ms: Optional solver timeout in milliseconds.
:type timeout_ms: Optional[int]
:param step_start: First assignment-step index for this sequence.
:type step_start: int
:return: Internal execution carrier.
:rtype: _ExecutionResult
Example::
>>> import z3
>>> ops = parse_operations("x = x + 1;", ["x"])
>>> result = _execute_operation_statements_domain(
... ops,
... {"x": z3.IntVal(1)},
... visible_names=("x",),
... assumptions=(),
... path_conditions=(),
... definedness_constraints=(),
... source=None,
... prune_unreachable=True,
... timeout_ms=None,
... step_start=0,
... )
>>> z3.simplify(result.env["x"])
2
"""
current_env = dict(env)
current_domains: Tuple[DomainConstraint, ...] = tuple(definedness_constraints)
steps: List[OperationStep] = []
branches: List[OperationBranch] = []
step_index = step_start
for statement in statements:
point_status = _execution_point_status(
assumptions=assumptions,
path_conditions=path_conditions,
definedness_constraints=current_domains,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
)
if point_status == "unsat":
break
if isinstance(statement, Operation):
before = dict(current_env)
step_source = (
DomainSource(
label=source.label,
step=step_index,
snapshot=source.snapshot,
prefix_id=source.prefix_id,
)
if source is not None
else DomainSource(step=step_index)
)
result = _translate_operation_expr(
statement.expr,
current_env,
assumptions=assumptions,
path_conditions=path_conditions,
definedness_constraints=current_domains,
source=step_source,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
)
step_domains = _path_guard(
result.definedness_constraints,
path_conditions,
)
combined_domains = (*current_domains, *step_domains)
if result.failure is not None:
return _ExecutionResult(
env=current_env,
definedness_constraints=combined_domains,
steps=tuple(steps),
branches=tuple(branches),
failure=_operation_failure_from_translation(result.failure),
next_step=step_index,
)
current_env = dict(current_env)
current_env[statement.var_name] = result.z3_expr
step = OperationStep(
source=step_source,
before=before,
after=dict(current_env),
path_conditions=tuple(path_conditions),
definedness_constraints=step_domains,
)
steps.append(step)
current_domains = combined_domains
step_index += 1
continue
if isinstance(statement, IfBlock):
result = _execute_if_block_domain(
statement,
current_env,
visible_names=visible_names,
assumptions=assumptions,
path_conditions=path_conditions,
definedness_constraints=current_domains,
source=source,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
step_start=step_index,
)
branches.extend(result.branches)
steps.extend(result.steps)
current_env = result.env
current_domains = result.definedness_constraints
step_index = result.next_step
if result.failure is not None:
return _ExecutionResult(
env=current_env,
definedness_constraints=current_domains,
steps=tuple(steps),
branches=tuple(branches),
failure=result.failure,
next_step=step_index,
)
continue
return _ExecutionResult(
env=current_env,
definedness_constraints=current_domains,
steps=tuple(steps),
branches=tuple(branches),
failure=_operation_failure(
"unsupported_statement",
f"Unknown operation statement type {type(statement).__name__}.",
source,
),
next_step=step_index,
)
return _ExecutionResult(
env=current_env,
definedness_constraints=current_domains,
steps=tuple(steps),
branches=tuple(branches),
failure=None,
next_step=step_index,
)
def _execute_if_block_domain(
if_block: IfBlock,
env: _Z3Env,
*,
visible_names: Tuple[str, ...],
assumptions: Sequence[z3.ExprRef],
path_conditions: Sequence[z3.ExprRef],
definedness_constraints: Sequence[DomainConstraint],
source: Optional[DomainSource],
prune_unreachable: bool,
timeout_ms: Optional[int],
step_start: int,
) -> _ExecutionResult:
"""Execute an ``if`` block with path-sensitive branch pruning.
:param if_block: If-block to execute.
:type if_block: IfBlock
:param env: Environment before the ``if`` block.
:type env: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:param visible_names: Names visible at the public block boundary.
:type visible_names: Tuple[str, ...]
:param assumptions: Caller-known block facts.
:type assumptions: Sequence[z3.ExprRef]
:param path_conditions: Predicates needed to reach the ``if`` block.
:type path_conditions: Sequence[z3.ExprRef]
:param definedness_constraints: Runtime-definedness constraints collected
before the ``if`` block.
:type definedness_constraints: Sequence[DomainConstraint]
:param source: Optional source metadata copied into nested step metadata.
:type source: Optional[DomainSource]
:param prune_unreachable: Whether to skip branches proved unreachable.
:type prune_unreachable: bool
:param timeout_ms: Optional solver timeout in milliseconds.
:type timeout_ms: Optional[int]
:param step_start: First assignment-step index inside the block.
:type step_start: int
:return: Internal execution carrier.
:rtype: _ExecutionResult
Example::
>>> import z3
>>> block = parse_operations("if [x > 0] { y = 1; } else { y = 2; }", ["x", "y"])[0]
>>> result = _execute_if_block_domain(
... block,
... {"x": z3.Int("x"), "y": z3.Int("y")},
... visible_names=("x", "y"),
... assumptions=(),
... path_conditions=(),
... definedness_constraints=(),
... source=None,
... prune_unreachable=True,
... timeout_ms=None,
... step_start=0,
... )
>>> [branch.branch_kind for branch in result.branches]
['if', 'else']
"""
base_env = dict(env)
merge_names = tuple(base_env.keys())
prefix_selectors: List[z3.ExprRef] = []
branch_results = []
branches: List[OperationBranch] = []
steps: List[OperationStep] = []
all_domains: List[DomainConstraint] = list(definedness_constraints)
step_index = step_start
for index, branch in enumerate(if_block.branches):
branch_kind = (
"else" if branch.condition is None else ("if" if index == 0 else "elif")
)
branch_id = str(index)
arrival_conditions = (*path_conditions, *prefix_selectors)
local_domains: List[DomainConstraint] = []
if branch.condition is None:
condition_expr = None
selector = _and_expr(tuple(prefix_selectors))
else:
if prefix_selectors:
arrival_status = _branch_status(
assumptions=assumptions,
path_conditions=path_conditions,
definedness_constraints=tuple(all_domains),
selector=_and_expr(tuple(prefix_selectors)),
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
)
if arrival_status == "unsat":
condition_expr = _condition_expr_for_metadata(
branch.condition, base_env
)
selector = (
_and_expr((*prefix_selectors, condition_expr))
if condition_expr is not None
else _and_expr(tuple(prefix_selectors))
)
branches.append(
OperationBranch(
branch_id=branch_id,
branch_kind=branch_kind,
selector=selector,
path_conditions=(*path_conditions, selector),
status="unsat",
result_env=None,
definedness_constraints=(),
failure=None,
)
)
if condition_expr is not None:
prefix_selectors.append(z3.Not(condition_expr))
continue
condition_result = _translate_operation_expr(
branch.condition,
base_env,
assumptions=assumptions,
path_conditions=arrival_conditions,
definedness_constraints=all_domains,
source=source,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
)
condition_domains = _path_guard(
condition_result.definedness_constraints,
arrival_conditions,
)
all_domains.extend(condition_domains)
local_domains.extend(condition_domains)
if condition_result.failure is not None:
return _ExecutionResult(
env=_merge_branch_env(base_env, merge_names, branch_results),
definedness_constraints=tuple(all_domains),
steps=tuple(steps),
branches=tuple(branches),
failure=_operation_failure_from_translation(
condition_result.failure
),
next_step=step_index,
)
if not z3.is_bool(condition_result.z3_expr):
return _ExecutionResult(
env=_merge_branch_env(base_env, merge_names, branch_results),
definedness_constraints=tuple(all_domains),
steps=tuple(steps),
branches=tuple(branches),
failure=_operation_failure(
"non_bool_condition",
"Operation if condition did not translate to a Boolean Z3 expression.",
source,
),
next_step=step_index,
)
condition_expr = condition_result.z3_expr
selector = (
condition_expr
if not prefix_selectors
else z3.And(*prefix_selectors, condition_expr)
)
status = _branch_status(
assumptions=assumptions,
path_conditions=path_conditions,
definedness_constraints=tuple(all_domains),
selector=selector,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
)
if status == "unsat":
branches.append(
OperationBranch(
branch_id=branch_id,
branch_kind=branch_kind,
selector=selector,
path_conditions=(*path_conditions, selector),
status=status,
result_env=None,
definedness_constraints=(),
failure=None,
)
)
else:
branch_result = _execute_operation_statements_domain(
branch.statements,
base_env,
visible_names=visible_names,
assumptions=assumptions,
path_conditions=(*path_conditions, selector),
definedness_constraints=tuple(all_domains),
source=source,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
step_start=step_index,
)
steps.extend(branch_result.steps)
nested_branches = branch_result.branches
branch_body_definedness = branch_result.definedness_constraints[
len(all_domains) :
]
branch_definedness = (*local_domains, *branch_body_definedness)
result_env = {
name: branch_result.env[name]
for name in merge_names
if name in branch_result.env
}
branch_obj = OperationBranch(
branch_id=branch_id,
branch_kind=branch_kind,
selector=selector,
path_conditions=(*path_conditions, selector),
status=status,
result_env=result_env,
definedness_constraints=branch_definedness,
failure=branch_result.failure,
)
branches.append(branch_obj)
branches.extend(nested_branches)
all_domains.extend(branch_body_definedness)
branch_results.append((selector, result_env))
step_index = branch_result.next_step
if branch_result.failure is not None:
return _ExecutionResult(
env=_merge_branch_env(base_env, merge_names, branch_results),
definedness_constraints=tuple(all_domains),
steps=tuple(steps),
branches=tuple(branches),
failure=branch_result.failure,
next_step=step_index,
)
if branch.condition is not None:
prefix_selectors.append(z3.Not(condition_expr))
merged_env = _merge_branch_env(base_env, merge_names, branch_results)
return _ExecutionResult(
env=merged_env,
definedness_constraints=tuple(all_domains),
steps=tuple(steps),
branches=tuple(branches),
failure=None,
next_step=step_index,
)
[docs]
def execute_operations_domain(
operations: Union[OperationStatement, List[OperationStatement]],
var_exprs: Dict[str, _Z3Expr],
*,
assumptions: Sequence[z3.ExprRef] = (),
path_conditions: Sequence[z3.ExprRef] = (),
source: Optional[DomainSource] = None,
prune_unreachable: bool = True,
timeout_ms: Optional[int] = None,
) -> OperationExecution:
"""Execute operation statements with runtime-definedness metadata.
This path-sensitive executor preserves legacy symbolic assignment behavior
while also returning branch evidence and constraints needed for FCSTM
runtime-definedness. The returned environment contains only variables that
were visible before execution started; block-local temporaries stay internal.
:param operations: Single operation statement or statement list.
:type operations: Union[OperationStatement, List[OperationStatement]]
:param var_exprs: Starting symbolic environment.
:type var_exprs: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:param assumptions: Caller-known facts added to reachability checks,
defaults to ``()``.
:type assumptions: Sequence[z3.ExprRef], optional
:param path_conditions: Predicates needed to reach this block, defaults to
``()``.
:type path_conditions: Sequence[z3.ExprRef], optional
:param source: Optional source metadata copied into step evidence, defaults
to ``None``.
:type source: Optional[DomainSource], optional
:param prune_unreachable: Whether to skip paths proved unreachable,
defaults to ``True``.
:type prune_unreachable: bool, optional
:param timeout_ms: Optional solver timeout in milliseconds.
:type timeout_ms: Optional[int], optional
:return: Domain-aware operation execution result.
:rtype: OperationExecution
Example::
>>> import z3
>>> ops = parse_operations("x = x + 1;", allowed_vars=["x"])
>>> result = execute_operations_domain(ops, {"x": z3.Int("x")})
>>> result.failure is None
True
>>> len(result.steps)
1
"""
operation_list = _as_operation_list(operations)
visible_names = tuple(var_exprs.keys())
result = _execute_operation_statements_domain(
operation_list,
dict(var_exprs),
visible_names=visible_names,
assumptions=tuple(assumptions),
path_conditions=tuple(path_conditions),
definedness_constraints=(),
source=source,
prune_unreachable=prune_unreachable,
timeout_ms=timeout_ms,
step_start=0,
)
env = {name: result.env[name] for name in visible_names}
return OperationExecution(
env=env,
visible_names=visible_names,
expr_constraints=(),
definedness_constraints=result.definedness_constraints,
steps=result.steps,
branches=result.branches,
failure=result.failure,
)
[docs]
def merge_operation_definedness(*items) -> Tuple[DomainConstraint, ...]:
"""Flatten operation-domain and domain-constraint inputs in order.
:param items: Domain constraints, operation execution objects, operation
steps, operation branches, or iterables of those objects.
:type items: object
:return: Flattened runtime-definedness constraints.
:rtype: Tuple[DomainConstraint, ...]
:raises TypeError: If any item is not a supported operation-domain shape.
Example::
>>> import z3
>>> item = DomainConstraint(z3.Int("x") != 0)
>>> merge_operation_definedness(item) == (item,)
True
"""
merged: List[DomainConstraint] = []
for item in items:
if isinstance(item, DomainConstraint):
merged.append(item)
elif isinstance(item, (OperationExecution, OperationStep, OperationBranch)):
merged.extend(item.definedness_constraints)
elif isinstance(item, Iterable) and not isinstance(item, (str, bytes)):
for sub_item in item:
if isinstance(
sub_item,
(
DomainConstraint,
OperationExecution,
OperationStep,
OperationBranch,
),
):
merged.extend(merge_operation_definedness(sub_item))
continue
raise TypeError(
"Unsupported operation definedness item: {type_name}".format(
type_name=type(sub_item).__name__,
)
)
else:
raise TypeError(
"Unsupported operation definedness item: {type_name}".format(
type_name=type(item).__name__,
)
)
return merge_definedness_constraints(merged)
[docs]
def execute_operations(
operations: Union[OperationStatement, List[OperationStatement]],
var_exprs: Dict[str, Union[z3.ArithRef, z3.BoolRef]],
) -> Dict[str, Union[z3.ArithRef, z3.BoolRef]]:
"""
Execute operation statements on a Z3 variable expression dictionary.
This function takes one operation statement or a statement list and
symbolically executes it against a variable-expression dictionary.
Assignments update the current symbolic environment directly, while ``if``
blocks evaluate each branch from the same pre-``if`` environment and then
merge only the names that were already visible before entering the block.
Statements are executed in order, so later statements can reference the
results of earlier ones.
This performs symbolic execution - expressions are built up symbolically
rather than being evaluated to concrete values.
:param operations: Single statement or list of statements to execute
:type operations: Union[OperationStatement, List[OperationStatement]]
:param var_exprs: Dictionary mapping variable names to current Z3 expressions
:type var_exprs: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:return: New variable expression dictionary with updated global Z3 expressions
:rtype: Dict[str, Union[z3.ArithRef, z3.BoolRef]]
:raises ValueError: If a variable referenced in an expression is not available in the current block scope
Example::
>>> import z3
>>> from pyfcstm.model.model import Operation
>>> from pyfcstm.model.expr import Variable, Integer, BinaryOp
>>>
>>> # Create Z3 symbolic variables
>>> x = z3.Int('x')
>>> y = z3.Int('y')
>>> var_exprs = {'x': x, 'y': y}
>>>
>>> # Create operations: x = x + 2; y = y + x;
>>> op1 = Operation(var_name='x', expr=BinaryOp(x=Variable('x'), op='+', y=Integer(2)))
>>> op2 = Operation(var_name='y', expr=BinaryOp(x=Variable('y'), op='+', y=Variable('x')))
>>>
>>> # Execute operations symbolically
>>> new_exprs = execute_operations([op1, op2], var_exprs)
>>> # new_exprs['x'] is the Z3 expression: x + 2
>>> # new_exprs['y'] is the Z3 expression: y + (x + 2)
>>>
>>> # Can verify with solver
>>> solver = z3.Solver()
>>> solver.add(x == 5, y == 10)
>>> solver.add(new_exprs['x'] == 7) # 5 + 2
>>> solver.add(new_exprs['y'] == 17) # 10 + 7
>>> solver.check()
sat
"""
operation_list = _as_operation_list(operations)
original_var_names = list(var_exprs.keys())
current_exprs = _execute_operation_statements_symbolically(
operation_list, var_exprs
)
return {name: current_exprs[name] for name in original_var_names}
__all__ = [
"OperationBranch",
"OperationExecution",
"OperationFailure",
"OperationSource",
"OperationStep",
"execute_operations",
"execute_operations_domain",
"merge_operation_definedness",
"parse_operations",
]