Source code for pyfcstm.entry.inspect

"""Command-line model inspection for FCSTM DSL files.

This module registers the ``inspect`` CLI command. The command reads an FCSTM
DSL file, builds a :class:`pyfcstm.model.StateMachine`, runs
:func:`pyfcstm.diagnostics.inspect_model`, and emits the inspection payload as
stable JSON text.

Module map:

.. list-table::
   :header-rows: 1

   * - Entry
     - Purpose
   * - :func:`build_inspect_json`
     - Build JSON text for one input file and inspect policy.
   * - :func:`_add_inspect_subcommand`
     - Register the ``inspect`` subcommand on a Click group.

Examples::

    >>> import os
    >>> from tempfile import TemporaryDirectory
    >>> with TemporaryDirectory() as td:
    ...     path = os.path.join(td, "demo.fcstm")
    ...     with open(path, "w", encoding="utf-8") as f:
    ...         _ = f.write("state Root;")
    ...     text = build_inspect_json(path)
    >>> '"root_state_path": "Root"' in text
    True
"""

from __future__ import annotations

import json
import pathlib
from typing import Optional

import click

from .base import CONTEXT_SETTINGS, ClickErrorException
from ..diagnostics import inspect_model
from ..dsl import GrammarParseError
from ..model import load_state_machine_from_file
from ..utils import ModelValidationError
from ..verify import InspectAccessForbiddenError
from ..verify.taxonomy import (
    CALL_COUNT_SCALING_ORDER,
    COMPLEXITY_TIER_ORDER,
)


_INSPECT_COMPLEXITY_CHOICES = tuple(COMPLEXITY_TIER_ORDER) + ("bmc_search",)
_INSPECT_CALL_COUNT_CHOICES = CALL_COUNT_SCALING_ORDER + (
    "k_unrollings",
    "k_unrollings_times_branching",
)


def _validate_inspect_policy(
    max_complexity_tier: str,
    max_call_count_scaling: str,
) -> None:
    """Validate inspect policy knobs before the CLI reads an input model.

    The Click layer accepts the full taxonomy labels so forbidden labels can
    produce the same domain-specific policy error as the inspect adapter.  The
    CLI must still reject those labels before parsing a user file, even when
    ``--enable-verify`` is not passed.

    :param max_complexity_tier: Maximum verify complexity tier requested by
        the caller.
    :type max_complexity_tier: str
    :param max_call_count_scaling: Maximum verify call-count scaling requested
        by the caller.
    :type max_call_count_scaling: str
    :return: ``None``.
    :rtype: None
    :raises pyfcstm.verify.inspect_adapter.InspectAccessForbiddenError: If
        either inspect policy knob is outside the automatic inspect budget.

    Examples::

        >>> _validate_inspect_policy("structural", "linear_in_transitions")
        >>> _validate_inspect_policy("bmc_search", "linear_in_transitions")
        Traceback (most recent call last):
        ...
        pyfcstm.verify.inspect_adapter.InspectAccessForbiddenError: bmc_search algorithms are not allowed in automatic inspect runs
        >>> _validate_inspect_policy("structural", "k_unrollings")
        Traceback (most recent call last):
        ...
        pyfcstm.verify.inspect_adapter.InspectAccessForbiddenError: call-count scaling 'k_unrollings' is not allowed in automatic inspect runs
    """
    if max_complexity_tier == "bmc_search":
        raise InspectAccessForbiddenError(
            "bmc_search algorithms are not allowed in automatic inspect runs"
        )
    if max_complexity_tier not in COMPLEXITY_TIER_ORDER:
        raise InspectAccessForbiddenError(
            "unknown inspect complexity tier: {maximum!r}".format(
                maximum=max_complexity_tier
            )
        )
    if max_call_count_scaling not in _INSPECT_CALL_COUNT_CHOICES:
        raise InspectAccessForbiddenError(
            "unknown inspect call-count scaling: {maximum!r}".format(
                maximum=max_call_count_scaling
            )
        )
    if max_call_count_scaling not in CALL_COUNT_SCALING_ORDER:
        raise InspectAccessForbiddenError(
            "call-count scaling {maximum!r} is not allowed in automatic inspect runs".format(
                maximum=max_call_count_scaling
            )
        )


[docs] def build_inspect_json( input_code_file: str, *, enable_verify: bool = False, max_complexity_tier: str = "structural", max_call_count_scaling: str = "linear_in_transitions", smt_timeout_ms: Optional[int] = None, ) -> str: """Build stable JSON text for an inspected FCSTM model. The helper centralizes the CLI workflow: read and parse the DSL file, build the state-machine model, run :func:`pyfcstm.diagnostics.inspect_model`, and serialize the JSON-friendly dict returned by :meth:`ModelInspect.to_json`. :param input_code_file: Path to the input FCSTM DSL file. :type input_code_file: str :param enable_verify: Whether to run inspect-eligible verify algorithms. :type enable_verify: bool, optional :param max_complexity_tier: Maximum verify complexity tier accepted by inspect. Forbidden tiers are rejected even when ``enable_verify`` is false. :type max_complexity_tier: str, optional :param max_call_count_scaling: Maximum verify call-count scaling accepted by inspect. Forbidden scaling labels are rejected even when ``enable_verify`` is false. :type max_call_count_scaling: str, optional :param smt_timeout_ms: Optional solver timeout in milliseconds for SMT-local verify algorithms. ``None`` leaves the solver timeout unset. ``0`` is forwarded unchanged to the solver layer and follows Z3 semantics, where no finite timeout is configured. :type smt_timeout_ms: Optional[int], optional :return: Pretty-printed JSON inspection report text ending with a newline. :rtype: str :raises pyfcstm.entry.base.ClickErrorException: If the input file is missing, cannot be read, cannot be decoded, fails parsing or model validation, or requests a forbidden inspect verify policy. Examples:: >>> import json >>> import os >>> from tempfile import TemporaryDirectory >>> with TemporaryDirectory() as td: ... path = os.path.join(td, "demo.fcstm") ... with open(path, "w", encoding="utf-8") as f: ... _ = f.write("state Root;") ... payload = json.loads(build_inspect_json(path)) >>> payload["root_state_path"] 'Root' """ try: _validate_inspect_policy( max_complexity_tier, max_call_count_scaling, ) machine = load_state_machine_from_file(input_code_file) report = inspect_model( machine, enable_verify=enable_verify, max_complexity_tier=max_complexity_tier, max_call_count_scaling=max_call_count_scaling, smt_timeout_ms=smt_timeout_ms, ) except FileNotFoundError: # load_state_machine_from_file reads the user-provided path; a missing # input file should be reported as a controlled CLI error. raise ClickErrorException(f"Input DSL file not found: {input_code_file}") except UnicodeDecodeError as err: # load_state_machine_from_file uses auto_decode, which raises # UnicodeDecodeError when none of the supported encodings can decode # user-provided input bytes. raise ClickErrorException( f"Failed to decode input DSL file {input_code_file}: {err}" ) except OSError as err: # pathlib.Path.read_bytes inside load_state_machine_from_file raises # OSError subclasses for filesystem failures such as permission # errors, missing parent components, or directory paths. raise ClickErrorException( f"Failed to read input DSL file {input_code_file}: {err}" ) except GrammarParseError as err: # parse_state_machine_dsl raises GrammarParseError for syntax and # lexical failures in user-provided FCSTM text. raise ClickErrorException( f"Failed to parse input DSL file {input_code_file}: {err}" ) except ModelValidationError as err: # parse_dsl_node_to_state_machine raises ModelValidationError for # model-level contract violations after a syntactically valid parse. raise ClickErrorException( f"Invalid state machine model in {input_code_file}: {err}" ) except InspectAccessForbiddenError as err: # _validate_inspect_policy and inspect_model reject forbidden automatic # inspect verify policies such as BMC search. raise ClickErrorException(str(err)) return ( json.dumps( report.to_json(), ensure_ascii=False, indent=2, sort_keys=True, ) + "\n" )
def _add_inspect_subcommand(cli: click.Group) -> click.Group: """Add the ``inspect`` subcommand to a Click CLI group. The registered command emits :func:`pyfcstm.diagnostics.inspect_model` output as JSON text. Verify integration remains disabled by default and is enabled only by passing ``--enable-verify``. :param cli: The Click group to which the subcommand should be added. :type cli: click.Group :return: The same CLI group instance with ``inspect`` registered. :rtype: click.Group Examples:: >>> import click >>> app = click.Group() >>> app = _add_inspect_subcommand(app) >>> "inspect" in app.commands True """ @cli.command( "inspect", help="Inspect a state machine DSL file and emit structured JSON.", context_settings=CONTEXT_SETTINGS, ) @click.option( "-i", "--input-code", "input_code_file", type=str, required=True, help="Input code file of state machine DSL.", ) @click.option( "-o", "--output", "output_file", type=str, default=None, help="Output JSON file, output to stdout when not assigned.", ) @click.option( "--enable-verify", is_flag=True, help="Run inspect-eligible pyfcstm.verify algorithms.", ) @click.option( "--max-complexity-tier", type=click.Choice(_INSPECT_COMPLEXITY_CHOICES, case_sensitive=True), default="structural", show_default=True, help=( "Maximum verify complexity tier accepted by inspect; bmc_search is " "parsed only to report a policy error." ), ) @click.option( "--max-call-count-scaling", type=click.Choice(_INSPECT_CALL_COUNT_CHOICES, case_sensitive=True), default="linear_in_transitions", show_default=True, help=( "Maximum verify call-count scaling accepted by inspect; " "k_unrollings values are parsed only to report a policy error." ), ) @click.option( "--smt-timeout-ms", type=click.IntRange(min=0), default=None, help=( "Optional SMT solver timeout in milliseconds; " "0 keeps Z3 without a finite timeout." ), ) def inspect_command( input_code_file: str, output_file: Optional[str], enable_verify: bool, max_complexity_tier: str, max_call_count_scaling: str, smt_timeout_ms: Optional[int], ) -> None: """Inspect a state machine DSL file and emit JSON. :param input_code_file: Path to the file containing state machine DSL code. :type input_code_file: str :param output_file: Path to the output JSON file. If ``None``, output is written to stdout. :type output_file: Optional[str] :param enable_verify: Whether to run inspect-eligible verify algorithms. :type enable_verify: bool :param max_complexity_tier: Maximum verify complexity tier accepted by the inspect adapter. :type max_complexity_tier: str :param max_call_count_scaling: Maximum verify call-count scaling accepted by the inspect adapter. :type max_call_count_scaling: str :param smt_timeout_ms: Optional SMT solver timeout in milliseconds. ``None`` leaves the timeout unset, while ``0`` is forwarded unchanged to the solver layer and keeps Z3 without a finite timeout. :type smt_timeout_ms: Optional[int] :return: ``None``. JSON is written to a file or stdout. :rtype: None Examples:: $ pyfcstm inspect -i machine.fcstm $ pyfcstm inspect -i machine.fcstm --enable-verify --max-complexity-tier smt_linear """ inspect_json = build_inspect_json( input_code_file, enable_verify=enable_verify, max_complexity_tier=max_complexity_tier, max_call_count_scaling=max_call_count_scaling, smt_timeout_ms=smt_timeout_ms, ) if output_file is None: click.echo(inspect_json, nl=False) else: try: pathlib.Path(output_file).write_text(inspect_json, encoding="utf-8") except OSError as err: # pathlib.Path.write_text raises OSError subclasses for # filesystem failures such as missing parent directories or # permission errors. raise ClickErrorException( f"Failed to write inspect JSON file {output_file}: {err}" ) return cli