Source code for pyfcstm.diagnostics.codes

"""
Loader for the structured diagnostic code registry.

This module loads ``codes.yaml`` (the single source of truth for diagnostic
codes emitted by :mod:`pyfcstm.model`) at import time and exposes the parsed
table as :data:`CODE_REGISTRY`. Downstream consumers — including the
``research_ideas`` LLM agent loop, IDE integrations, and the future
``jsfcstm`` visualization layer — can mirror this registry to drive their
own dispatch logic without depending on exception message text.

The loader performs structural validation on import so that schema drift
in ``codes.yaml`` fails fast. Validation failures raise
:class:`CodesSchemaError` (subclass of :class:`ValueError`), so callers can
distinguish "the diagnostics package is structurally broken" from a generic
business-level ``ValueError`` further up the stack.

The module contains:

* :class:`CodeFieldSpec` - Per-field schema describing a ``refs`` payload key.
* :class:`CodeSpec` - Full specification for one diagnostic code.
* :class:`CodesSchemaError` - Raised when ``codes.yaml`` is structurally invalid.
* :data:`CODE_REGISTRY` - Mapping ``code -> CodeSpec`` loaded at import time.
* :func:`load_codes` - Parse a YAML file path and return the registry.

.. note::
   ``_ALLOWED_REF_TYPES`` and ``_ALLOWED_SEVERITIES`` are documentation-level
   enumerations used to validate the YAML schema. They do **not** enforce
   runtime ``isinstance`` checks on emitted ``ModelDiagnostic.refs`` values
   — type-checking refs payloads at emit time is the emitter's responsibility
   (see PR-2 of issue #103). The schema's job is to give downstream tooling
   a contract to mirror, not to act as a runtime type system.

Example::

    >>> from pyfcstm.diagnostics import CODE_REGISTRY
    >>> spec = CODE_REGISTRY['E_UNDEFINED_VAR']
    >>> spec.severity
    'error'
    >>> 'var_name' in spec.refs_schema
    True
"""

import os
import sys
from dataclasses import dataclass
from types import MappingProxyType
from typing import Any, Dict, List, Mapping, Optional, Tuple

import yaml

#: Allowed values for ``severity`` in ``codes.yaml`` entries. Must stay in
#: sync with the type-token comment block at the top of ``codes.yaml``.
#:
#: ``info`` was added in Layer 2 (issue #104) to host ``I_*`` codes for
#: observations that are likely-legitimate rather than likely-defects.
_ALLOWED_SEVERITIES = ('error', 'warning', 'info')

#: Mapping from severity name to the required identifier prefix for codes
#: at that severity. Used by :func:`_validate_code` to enforce that the
#: code identifier and its severity stay in sync.
_SEVERITY_PREFIX = {
    'error': 'E_',
    'warning': 'W_',
    'info': 'I_',
}

#: Allowed values for the ``capability`` field on a code. Declared by
#: Layer 2 to gate downstream consumers that may not implement every
#: analysis flavor (e.g. jsfcstm without SMT WASM).
#:
#: * ``pure_static`` — judged from AST/model only, no expression folding,
#:   no external solver, no simulation
#: * ``const_fold`` — needs the expression constant folder
#: * ``requires_solver`` — needs an SMT backend (reserved for Layer 3)
#: * ``requires_simulation`` — needs the SimulationRuntime (reserved)
_ALLOWED_CAPABILITIES = (
    'pure_static',
    'const_fold',
    'requires_solver',
    'requires_simulation',
)

#: Allowed values for the ``emit_tier`` field on a code. PR-A-fix I-b
#: lets the schema explicitly declare which emit pipeline produces a
#: code so downstream dispatchers can register handlers correctly:
#:
#: * ``static_pipeline`` — fires during the regular static analysis
#:   pass (``parse_dsl_node_to_state_machine`` /
#:   ``collectDocumentDiagnostics``). This is the default for legacy
#:   codes that omit the field.
#: * ``lookup_api`` — fires only when a runtime resolver method
#:   (e.g. ``State.resolve_event``) is invoked explicitly; never seen
#:   by the static pipeline or the parity tests.
#: * ``partial_static_pipeline`` — implemented in the static pipeline
#:   on only one end (typically jsfcstm); the other end intentionally
#:   does not emit. Downstream LLM consumers should not block waiting
#:   for the missing end to surface this code.
_ALLOWED_EMIT_TIERS = (
    'static_pipeline',
    'lookup_api',
    'partial_static_pipeline',
)

#: Required keys for the ``for_llm`` payload when present on a code.
#: ``summary`` is a one-line description aimed at downstream LLM consumers;
#: ``recommended_actions`` is a list of dicts describing concrete fixes;
#: ``do_not`` is a list of strings describing anti-patterns to avoid.
_FOR_LLM_REQUIRED_KEYS = ('summary', 'recommended_actions', 'do_not')

#: Allowed values for the ``refs.<field>.type`` token in ``codes.yaml``. This
#: tuple is the **single source of truth** for the type-token vocabulary; the
#: comment block at the top of ``codes.yaml`` is documentation that must
#: mirror this tuple (test/diagnostics/test_codes_yaml.py asserts both lists
#: stay aligned).
#:
#: NOTE: these tokens are documentation-only. PR-1 does not enforce that
#: ``refs[field]`` actually carries the declared Python type at emit time;
#: the YAML schema serves as the contract surface for downstream tools and
#: as the input for human-readable spec rendering.
_ALLOWED_REF_TYPES = (
    'str',
    'str_or_null',
    'int',
    'int_or_null',
    'bool',
    'Span',
    'list[str]',
)


[docs] class CodesSchemaError(ValueError): """ Raised when ``codes.yaml`` is structurally invalid. Subclasses :class:`ValueError` so generic ``except ValueError`` handlers still catch it, but downstream tooling that wants to distinguish "diagnostics package broken" from a domain-level ``ValueError`` can use a tighter handler. """
[docs] @dataclass(frozen=True) class CodeFieldSpec: """ Schema for a single field inside :attr:`CodeSpec.refs_schema`. :param name: Field name as it will appear in :attr:`ModelDiagnostic.refs`. :type name: str :param type: Field type token. Must be one of the allowed type tokens documented at the top of ``codes.yaml``. :type type: str :param required: Whether this field must be present when the diagnostic is emitted. :type required: bool :param description: Human-readable explanation of the field. :type description: str :param enum: Optional tuple of allowed string values for the field. When present, downstream emit-test infrastructure (and any future runtime validator) checks that ``refs[field]`` is a member of the tuple. ``None`` means the field has no enumeration constraint. :type enum: Optional[Tuple[str, ...]] """ name: str type: str required: bool description: str enum: Optional[Tuple[str, ...]] = None
[docs] @dataclass(frozen=True) class ForLlmSpec: """ Structured guidance attached to a diagnostic code for downstream LLM consumers. Layer 2 (issue #104) requires this for every emitted code — ``E_*``, ``W_*``, and ``I_*`` — so that LLM agent loops can read structured fix recommendations instead of regex-ing the human-readable ``message``. PR-A originally grandfathered the 14 Layer 1 ``E_*`` codes; PR-A-fix I-a backfilled them so this field is now expected on every catalogued code. :param summary: One-line description aimed at LLM consumers. :type summary: str :param recommended_actions: Ordered list of concrete fix suggestions. Each entry is a free-form dict; downstream tooling is expected to treat the list as a hint rather than a closed schema. :type recommended_actions: Tuple[Mapping[str, Any], ...] :param do_not: List of anti-pattern strings the LLM should avoid. :type do_not: Tuple[str, ...] """ summary: str recommended_actions: Tuple[Mapping[str, Any], ...] do_not: Tuple[str, ...]
[docs] @dataclass(frozen=True) class CodeSpec: """ Full specification for a single diagnostic code. :param code: Stable code identifier (e.g. ``'E_UNDEFINED_VAR'``). :type code: str :param severity: ``'error'``, ``'warning'``, or ``'info'``. :type severity: str :param description: Human-readable description of when the code fires. :type description: str :param refs_schema: Mapping ``field_name -> CodeFieldSpec`` describing the structured payload for diagnostics with this code. The mapping itself is a :class:`types.MappingProxyType` so downstream callers cannot mutate the registry by accident. :type refs_schema: Mapping[str, CodeFieldSpec] :param example_dsl: Minimal DSL snippet that triggers the code, defaults to ``None``. :type example_dsl: str, optional :param capability: Which analysis tier this code belongs to. Layer 2 declares this required when present; unset means ``'pure_static'`` for grandfathered Layer 1 codes. :type capability: str, optional :param for_llm: Structured guidance for downstream LLM consumers. Backfilled across all ``E_*`` codes by PR-A-fix I-a; new codes are expected to ship with one. Still typed as ``Optional`` so the loader can tolerate forward-compatibility cases. :type for_llm: ForLlmSpec, optional :param emit_tier: Which emit pipeline actually fires this code. ``'static_pipeline'`` (default) means the code fires during ``parse_dsl_node_to_state_machine`` / the equivalent jsfcstm ``collectDocumentDiagnostics`` static analysis pass. ``'lookup_api'`` means the code only fires through explicit runtime resolver APIs (e.g. ``State.resolve_event``) and is never produced by the static pipeline. ``'partial_static_pipeline'`` marks codes whose static-pipeline emit is implemented on one end only (typically jsfcstm) — downstream LLM consumers should not block waiting for the missing end. PR-A-fix I-b makes the field explicit so dispatchers can register handlers based on the actual emit channel. :type emit_tier: str, optional """ code: str severity: str description: str refs_schema: Mapping[str, CodeFieldSpec] example_dsl: Optional[str] = None capability: str = 'pure_static' for_llm: Optional[ForLlmSpec] = None emit_tier: str = 'static_pipeline'
[docs] def required_fields(self) -> List[str]: """ Return the names of fields that must be present in ``refs``. :return: List of required field names in declaration order. :rtype: List[str] """ return [name for name, spec in self.refs_schema.items() if spec.required]
def _ctx(path: str, *bits: str) -> str: return f"codes.yaml at {path!r}: " + " ".join(bits) def _validate_field(path: str, code: str, field_name: str, raw: Any) -> CodeFieldSpec: if not isinstance(raw, dict): raise CodesSchemaError(_ctx( path, f"code {code!r} field {field_name!r} must be a mapping, got", type(raw).__name__, )) if 'type' not in raw: raise CodesSchemaError(_ctx( path, f"code {code!r} field {field_name!r} is missing required key 'type'.", )) field_type = raw['type'] if field_type not in _ALLOWED_REF_TYPES: raise CodesSchemaError(_ctx( path, f"code {code!r} field {field_name!r} has unsupported type {field_type!r}.", f"Allowed: {_ALLOWED_REF_TYPES}.", )) raw_required = raw.get('required', False) if not isinstance(raw_required, bool): # Catch the YAML footgun where `required: "false"` is loaded as a # truthy string instead of a bool. `bool("false")` is True, which # would silently invert required/optional semantics. raise CodesSchemaError(_ctx( path, f"code {code!r} field {field_name!r} 'required' must be a YAML bool,", f"got {type(raw_required).__name__}: {raw_required!r}", )) description = str(raw.get('description', '')) raw_enum = raw.get('enum') field_enum: Optional[Tuple[str, ...]] = None if raw_enum is not None: if not isinstance(raw_enum, list): raise CodesSchemaError(_ctx( path, f"code {code!r} field {field_name!r} 'enum' must be a list " f"when present,", f"got {type(raw_enum).__name__}", )) if not raw_enum: raise CodesSchemaError(_ctx( path, f"code {code!r} field {field_name!r} 'enum' must be non-empty " f"when present.", )) for value in raw_enum: if not isinstance(value, str): raise CodesSchemaError(_ctx( path, f"code {code!r} field {field_name!r} 'enum' members must " f"be strings, got {type(value).__name__}: {value!r}", )) field_enum = tuple(raw_enum) return CodeFieldSpec( name=field_name, type=field_type, required=raw_required, description=description, enum=field_enum, ) def _validate_code(path: str, code: str, raw: Any) -> CodeSpec: if not isinstance(raw, dict): raise CodesSchemaError(_ctx( path, f"code {code!r} must be a mapping, got {type(raw).__name__}.", )) severity = raw.get('severity') if severity not in _ALLOWED_SEVERITIES: raise CodesSchemaError(_ctx( path, f"code {code!r} has invalid severity {severity!r}.", f"Allowed: {_ALLOWED_SEVERITIES}.", )) if 'description' not in raw: raise CodesSchemaError(_ctx( path, f"code {code!r} is missing required key 'description'.", )) description = str(raw.get('description', '')).strip() if not description: raise CodesSchemaError(_ctx( path, f"code {code!r} must have a non-empty 'description'.", )) raw_refs = raw.get('refs') or {} if not isinstance(raw_refs, dict): raise CodesSchemaError(_ctx( path, f"code {code!r} 'refs' must be a mapping when present,", f"got {type(raw_refs).__name__}.", )) refs_schema: Dict[str, CodeFieldSpec] = {} for field_name, field_raw in raw_refs.items(): refs_schema[field_name] = _validate_field(path, code, field_name, field_raw) example_dsl = raw.get('example_dsl') if example_dsl is not None and not isinstance(example_dsl, str): raise CodesSchemaError(_ctx( path, f"code {code!r} 'example_dsl' must be a string when present,", f"got {type(example_dsl).__name__}.", )) # Codes follow a 1-letter severity prefix convention: E_* errors, # W_* warnings, I_* infos. Enforce so that severity and code stay in sync. expected_prefix = _SEVERITY_PREFIX[severity] if not code.startswith(expected_prefix): raise CodesSchemaError(_ctx( path, f"code {code!r} has severity {severity!r} but does not start", f"with the expected prefix {expected_prefix!r}.", )) capability = raw.get('capability', 'pure_static') if capability not in _ALLOWED_CAPABILITIES: raise CodesSchemaError(_ctx( path, f"code {code!r} has invalid capability {capability!r}.", f"Allowed: {_ALLOWED_CAPABILITIES}.", )) emit_tier = raw.get('emit_tier', 'static_pipeline') if emit_tier not in _ALLOWED_EMIT_TIERS: raise CodesSchemaError(_ctx( path, f"code {code!r} has invalid emit_tier {emit_tier!r}.", f"Allowed: {_ALLOWED_EMIT_TIERS}.", )) for_llm = _validate_for_llm(path, code, raw.get('for_llm')) return CodeSpec( code=code, severity=severity, description=description, refs_schema=MappingProxyType(refs_schema), example_dsl=example_dsl, capability=capability, for_llm=for_llm, emit_tier=emit_tier, ) def _validate_for_llm(path: str, code: str, raw: Any) -> Optional[ForLlmSpec]: if raw is None: return None if not isinstance(raw, dict): raise CodesSchemaError(_ctx( path, f"code {code!r} 'for_llm' must be a mapping when present,", f"got {type(raw).__name__}.", )) missing = [k for k in _FOR_LLM_REQUIRED_KEYS if k not in raw] if missing: raise CodesSchemaError(_ctx( path, f"code {code!r} 'for_llm' is missing required keys {missing}.", f"Required: {_FOR_LLM_REQUIRED_KEYS}.", )) summary = raw['summary'] if not isinstance(summary, str) or not summary.strip(): raise CodesSchemaError(_ctx( path, f"code {code!r} 'for_llm.summary' must be a non-empty string.", )) actions_raw = raw['recommended_actions'] if not isinstance(actions_raw, list): raise CodesSchemaError(_ctx( path, f"code {code!r} 'for_llm.recommended_actions' must be a list,", f"got {type(actions_raw).__name__}.", )) for i, action in enumerate(actions_raw): if not isinstance(action, dict): raise CodesSchemaError(_ctx( path, f"code {code!r} 'for_llm.recommended_actions[{i}]' must be a", f"mapping, got {type(action).__name__}.", )) do_not_raw = raw['do_not'] if not isinstance(do_not_raw, list): raise CodesSchemaError(_ctx( path, f"code {code!r} 'for_llm.do_not' must be a list,", f"got {type(do_not_raw).__name__}.", )) for i, item in enumerate(do_not_raw): if not isinstance(item, str): raise CodesSchemaError(_ctx( path, f"code {code!r} 'for_llm.do_not[{i}]' must be a string,", f"got {type(item).__name__}.", )) return ForLlmSpec( summary=summary.strip(), recommended_actions=tuple(MappingProxyType(dict(a)) for a in actions_raw), do_not=tuple(do_not_raw), )
[docs] def load_codes(path: str) -> Dict[str, CodeSpec]: """ Load and validate a ``codes.yaml`` file from disk. :param path: Filesystem path to the YAML file. :type path: str :return: Mapping ``code -> CodeSpec`` parsed from the file. :rtype: Dict[str, CodeSpec] :raises FileNotFoundError: If ``path`` does not exist. :raises CodesSchemaError: If the YAML structure does not match the expected schema, or if a code uses an unknown severity / type token. Subclasses :class:`ValueError` for backwards compatibility with generic ``except ValueError`` handlers. Example:: >>> import os >>> from pyfcstm.diagnostics.codes import load_codes >>> path = os.path.join(os.path.dirname(__file__), 'codes.yaml') """ with open(path, 'r', encoding='utf-8') as f: raw = yaml.safe_load(f) if raw is None: raise CodesSchemaError(_ctx(path, "file is empty.")) if not isinstance(raw, dict): raise CodesSchemaError(_ctx( path, f"must contain a top-level mapping, got {type(raw).__name__}.", )) registry: Dict[str, CodeSpec] = {} for code, entry in raw.items(): if not isinstance(code, str): raise CodesSchemaError(_ctx( path, f"top-level key {code!r} must be a string." )) registry[code] = _validate_code(path, code, entry) if not registry: raise CodesSchemaError(_ctx(path, "contains no code definitions.")) return registry
def _resolve_codes_yaml_path() -> str: """ Resolve the on-disk path of ``codes.yaml`` in both source and PyInstaller one-file bundle layouts. In a normal install / editable install, :data:`__file__` lives next to ``codes.yaml``. In a PyInstaller one-file bundle, data files are extracted to ``sys._MEIPASS`` at startup; the package directory under ``__file__`` may not contain ``codes.yaml`` directly. :return: Absolute path to a readable ``codes.yaml``. :rtype: str """ here = os.path.dirname(__file__) candidate = os.path.join(here, 'codes.yaml') if os.path.isfile(candidate): return candidate meipass = getattr(sys, '_MEIPASS', None) if meipass is not None: bundled = os.path.join(meipass, 'pyfcstm', 'diagnostics', 'codes.yaml') if os.path.isfile(bundled): return bundled # Last-ditch: return the original candidate so the resulting # FileNotFoundError points at the expected location rather than a # synthetic path. return candidate _CODES_YAML_PATH = _resolve_codes_yaml_path() #: Mapping ``code -> CodeSpec`` loaded from ``codes.yaml`` at import time. #: Wrapped in :class:`types.MappingProxyType` so downstream callers cannot #: mutate the registry by accident. CODE_REGISTRY: Mapping[str, CodeSpec] = MappingProxyType(load_codes(_CODES_YAML_PATH))