Source code for pyfcstm.highlight.pygments_lexer

"""
Pygments lexer implementation for FCSTM DSL syntax highlighting.

This module defines :class:`FcstmLexer`, a Pygments lexer tailored for the
FCSTM (Finite State Machine) DSL. The lexer mirrors the FCSTM surface syntax
defined by ``Grammar.g4`` and provides highlighting support for Sphinx
documentation as well as other Pygments-based tools.

The module exposes the following public component:

* :class:`FcstmLexer` - Regex-based lexer for FCSTM DSL tokens and comments

.. note::
   The lexer is designed for use with Pygments and Sphinx's ``code-block``
   directive. It does not parse or validate DSL input. In particular,
   :meth:`FcstmLexer.analyse_text` must remain a pure string/token heuristic
   and must not call the FCSTM parser/model loader, so malformed but still
   recognizably FCSTM snippets can continue to be detected.

Example::

    >>> from pygments import highlight
    >>> from pygments.formatters import HtmlFormatter
    >>> from pygments.lexers import get_lexer_by_name
    >>> code = 'state Root { import "./worker.fcstm" as Worker; }'
    >>> lexer = get_lexer_by_name("fcstm")
    >>> html = highlight(code, lexer, HtmlFormatter())

Usage in Sphinx documentation::

    .. code-block:: fcstm

        state Root {
            import "./worker.fcstm" as Worker {
                def counter -> shared_counter;
                event /Start -> Start named "Mapped Start";
            }
        }
"""

import re
from typing import List, Tuple

from pygments.lexer import RegexLexer, words, include
from pygments.token import (
    Comment, Operator, Keyword, Name, String, Number,
    Punctuation, Whitespace
)

__all__ = ['FcstmLexer']


[docs] class FcstmLexer(RegexLexer): """ Lexer for FCSTM (Finite State Machine) DSL. This lexer provides syntax highlighting for hierarchical state machine definitions in the FCSTM DSL. It recognizes keywords, operators, numbers, strings, comments (including nested multiline comments), and identifiers. The implementation uses stateful regular expressions via :class:`pygments.lexer.RegexLexer`. The lexer supports: * Variable definitions and types (``def``, ``int``, ``float``) * State and import definitions (``state``, ``pseudo``, ``import``, ``as``, ``named``) * Transitions and lifecycle actions (``enter``, ``during``, ``exit``) * Aspect-oriented actions (``before``, ``after``, ``>>``) * Guards and effects (``if``, ``effect``) * Import mapping blocks (``def`` mapping, ``event`` mapping, ``$n`` / ``${n}`` templates) * Logical and arithmetic expressions * Events and scoped references (``::``) Example:: >>> from pygments.lexers import get_lexer_by_name >>> lexer = get_lexer_by_name("fcstm") >>> list(lexer.get_tokens('state Root { import "./worker.fcstm" as Worker; }'))[:5] [(Token.Keyword.Declaration, 'state'), ...] .. note:: The lexer includes a heuristic :meth:`analyse_text` method used by Pygments to guess if input text is likely FCSTM code. """ name = 'FCSTM' aliases = ['fcstm', 'fcsm'] filenames = ['*.fcstm'] mimetypes = ['text/x-fcstm'] _ANALYSIS_MASK_PATTERNS = ( re.compile(r'(?s)R"(?P<delim>[^ ()\\\t\r\n]{0,16})\((.*?)\)(?P=delim)"'), re.compile(r'(?is)(?<!\w)(?:[rubf]{0,3})"""(.*?)"""'), re.compile(r"(?is)(?<!\w)(?:[rubf]{0,3})'''(.*?)'''"), re.compile(r'(?s)\br(?P<hashes>#{0,16})"(?!")(.*?)"(?P=hashes)'), re.compile(r'(?ms)<<[-~]?(?P<quote>[\'"]?)(?P<label>[A-Za-z_]\w*)(?P=quote)\n.*?^\s*(?P=label)\s*$'), re.compile(r'(?s)%(?:q|Q)(?P<delim>[^A-Za-z0-9\s])(.*?)(?P=delim)'), re.compile(r'(?s)`(?:\\.|[^`])*`'), re.compile(r'(?s)/\*.*?\*/'), re.compile(r'"(?:\\.|[^"\\\n])*"'), re.compile(r"'(?:\\.|[^'\\\n])*'"), ) _ANALYSIS_NEGATIVE_PATTERNS = ( (re.compile(r'(?m)^\s*@startuml\b|^\s*@enduml\b|^\s*allowmixing\s*$'), 0.45), ( re.compile( r'(?m)^\s*(?:participant|actor|boundary|control|entity|database|annotation|object)\s+' r'(?:\"[^\"]+\"|[A-Za-z_][\w.]*)(?:\s+as\s+[A-Za-z_]\w*)?(?:\s*\{|$)' ), 0.25, ), (re.compile(r'(?m)^\s*(?:abstract\s+class|class)\s+[A-Za-z_][\w.]*\b(?!\s*(?:=|->))'), 0.20), ( re.compile( r'(?m)^\s*package\s+[A-Za-z_]\w*\b|^\s*func\s+\w+\s*\(|^\s*var\s+\w+\b|^\s*type\s+\w+\b' ), 0.25, ), ( re.compile( r'(?m)^\s*fn\s+\w+\s*\(|^\s*impl(?:\s*<[^>]+>)?\s+\w|^\s*trait\s+\w|' r'^\s*pub(?:\s*\([^)]*\))?\s+(?:fn|struct|enum|mod|trait|type|use|const|static|impl)\b' ), 0.30, ), ( re.compile( r'(?m)^\s*(?:public|private|protected)\s+(?:class|interface|enum|record|static|final|abstract|' r'synchronized|void|[A-Za-z_]\w*(?:<[^>]+>)?)\b|^\s*package\s+[A-Za-z_][\w.]*\s*;|(?<!/)\bjava\.util\.function\b' ), 0.25, ), ( re.compile( r'(?m)^\s*export\s+(?:default\b|const\b|let\b|var\b|function\b|class\b|interface\b|type\b|namespace\b|\{)|' r'^\s*interface\s+\w|^\s*namespace\s+\w|^\s*type\s+\w+\s*=|(?<!/)\bglobalThis\s*(?:\.|=)|' r'(?<!/)\bString\.raw\b|\bRecord\s*<|=>' ), 0.25, ), ( re.compile( r'(?m)^\s*def[^\S\n]+\w+\s*\(|^\s*from\s+\w+\s+import\b|^\s*import\s+(?!as\b)\w+\b' ), 0.25, ), ( re.compile( r'(?m)^\s*#include\b|\bstd::\w|\btemplate\s*<|^\s*using\s+namespace\b|^\s*using\s+[A-Za-z_]\w*\s*=|' r'^\s*typedef\b(?!\s*=)|^\s*struct\s+[A-Za-z_]\w*|\bnullptr\b(?!\s*=)' ), 0.30, ), (re.compile(r'(?m)\bmacro_rules!'), 0.35), (re.compile(r'(?m)\b[A-Za-z_]\w*!\s*[\(\[{]'), 0.35), ( re.compile( r'(?m)^(?=.*\bpseudo\b)(?=.*\bnamed\b)(?=.*\babstract\b)(?=.*\bref\b)(?=.*\beffect\b)' r'(?!.*\bstate\b)(?!.*->).*(?:=|,|:).*$' ), 0.25, ), ( re.compile( r'(?m)^\s*pseudo\s*=.*\bnamed\s*=.*\babstract\s*=.*\bref\s*=.*\beffect\s*=.*$' ), 0.30, ), (re.compile(r'(?m)^\s*module\s+[A-Za-z_]\w*\b|^\s*BEGIN\s*\{|^\s*end\s*$|->\s*do\b(?!\s*[;:])'), 0.20), ( re.compile( r'(?m)^\s*const\s+\w+\s*=|^\s*let\s+\w+\s*=|^\s*var\s+\w+\s*=|^\s*function\s+\w+\s*\(|' r'^\s*try\b(?:\s*\{|$)|^\s*finally\b(?!.*->)(?:\s*:|\s*\{|$)' ), 0.20, ), (re.compile(r'(?m)^\s*(?:if|for|while|try|except|finally|class)\b(?!.*->).*:\s*$'), 0.15), ) _ANALYSIS_TOKEN_PATTERN = re.compile( r'\[\s*\*\s*\]|->|::|>>|<=|>=|==|!=|&&|\|\||\*\*|' r'[A-Za-z_][A-Za-z0-9_]*|[0-9]+(?:\.[0-9]+)?|[{}()\[\];,./:+\-*!=?<>]' ) _ANALYSIS_IDENTIFIER_PATTERN = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$') _ANALYSIS_RESERVED_WORDS = frozenset(( 'abstract', 'after', 'as', 'before', 'def', 'during', 'effect', 'enter', 'else', 'event', 'exit', 'float', 'if', 'import', 'int', 'named', 'pseudo', 'ref', 'state', )) _ANALYSIS_LIFECYCLE_KEYWORDS = frozenset(('enter', 'during', 'exit')) tokens = { 'root': [ include('whitespace'), include('comments'), (r'\bimport\b', Keyword.Declaration, 'import-header'), # Keywords - state machine structure (words(( 'state', 'pseudo', 'named', 'def', 'event', 'as', ), suffix=r'\b'), Keyword.Declaration), # Keywords - lifecycle actions (words(( 'enter', 'during', 'exit', 'before', 'after', ), suffix=r'\b'), Keyword.Reserved), # Keywords - modifiers (words(( 'abstract', 'ref', 'effect', ), suffix=r'\b'), Keyword.Namespace), # Keywords - types (words(( 'int', 'float', ), suffix=r'\b'), Keyword.Type), # Keywords - control flow (words(( 'if', 'else', ), suffix=r'\b'), Keyword.Reserved), # Keywords - logical operators (word form) (words(( 'and', 'or', 'not', ), suffix=r'\b'), Operator.Word), # Boolean literals (words(( 'True', 'true', 'TRUE', 'False', 'false', 'FALSE', ), suffix=r'\b'), Keyword.Constant), # Math constants (words(( 'pi', 'E', 'tau', ), suffix=r'\b'), Name.Constant), # Built-in functions (from UFUNC_NAME) (words(( 'sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'sinh', 'cosh', 'tanh', 'asinh', 'acosh', 'atanh', 'sqrt', 'cbrt', 'exp', 'log', 'log10', 'log2', 'log1p', 'abs', 'ceil', 'floor', 'round', 'trunc', 'sign', ), suffix=r'\b'), Name.Builtin), # Aspect operator (r'>>', Operator.Word), # Transition arrow (r'->', Operator), # Pseudo-state markers (r'\[\*\]', Keyword.Pseudo), # Event scope operators (r'::', Operator), (r':', Punctuation), # Absolute path marker (in chain_id) (r'/', Operator), # Numbers - must come before operators to avoid conflicts # Hexadecimal (r'0x[0-9a-fA-F]+', Number.Hex), # Float with exponent (r'[0-9]+\.[0-9]*([eE][+-]?[0-9]+)?', Number.Float), (r'\.[0-9]+([eE][+-]?[0-9]+)?', Number.Float), (r'[0-9]+[eE][+-]?[0-9]+', Number.Float), # Integer (r'[0-9]+', Number.Integer), # Operators - multi-character operators must come before single-character ones # Power operator (must come before *) (r'\*\*', Operator), # Bit shift operators (must come before < and >) (r'<<', Operator), # Comparison operators (must come before single < and >) (r'<=|>=|==|!=', Operator), # Logical operators (must come before single !) (r'&&|\|\|', Operator), # Forced transition operator (after != and && to avoid conflicts) (r'!', Operator.Word), # Single-character operators (r'[+\-*/%&|^~<>]', Operator), # Operators - assignment and ternary (r'=|\?', Operator), # Punctuation (r'[{}()\[\];,.]', Punctuation), # Strings (r'"([^"\\]|\\[btnfr"\'\\]|\\[0-7]{1,3}|\\u[0-9a-fA-F]{4}|\\x[0-9a-fA-F]{2})*"', String.Double), (r"'([^'\\]|\\[btnfr\"'\\]|\\[0-7]{1,3}|\\u[0-9a-fA-F]{4}|\\x[0-9a-fA-F]{2})*'", String.Single), # Identifiers (must come after keywords) (r'[a-zA-Z_][a-zA-Z0-9_]*', Name), ], 'import-header': [ include('whitespace'), include('comments'), (r'\b(?:as)\b', Keyword.Declaration), (r'\bnamed\b', Keyword.Declaration), (r'"([^"\\]|\\[btnfr"\'\\]|\\[0-7]{1,3}|\\u[0-9a-fA-F]{4}|\\x[0-9a-fA-F]{2})*"', String.Double), (r"'([^'\\]|\\[btnfr\"'\\]|\\[0-7]{1,3}|\\u[0-9a-fA-F]{4}|\\x[0-9a-fA-F]{2})*'", String.Single), (r'\{', Punctuation, ('#pop', 'import-block')), (r';', Punctuation, '#pop'), (r'[a-zA-Z_][a-zA-Z0-9_]*', Name), ], 'import-block': [ include('whitespace'), include('comments'), (r'\bdef\b', Keyword.Declaration, 'import-def-selector'), (r'\bevent\b', Keyword.Declaration), (r'\bnamed\b', Keyword.Declaration), (r'->', Operator), (r'\$\{[0-9]+\}|\$[0-9]+', Name.Variable), (r'[a-zA-Z_][a-zA-Z0-9_]*\*(?:[a-zA-Z0-9_*]*)', Name.Variable), (r'\*', Operator), (r'/', Operator), (r'\.', Punctuation), (r';', Punctuation), (r'\{', Punctuation), (r'\}', Punctuation, '#pop'), (r'"([^"\\]|\\[btnfr"\'\\]|\\[0-7]{1,3}|\\u[0-9a-fA-F]{4}|\\x[0-9a-fA-F]{2})*"', String.Double), (r"'([^'\\]|\\[btnfr\"'\\]|\\[0-7]{1,3}|\\u[0-9a-fA-F]{4}|\\x[0-9a-fA-F]{2})*'", String.Single), (r'[a-zA-Z_][a-zA-Z0-9_]*', Name), ], 'import-def-selector': [ include('whitespace'), include('comments'), (r'->', Operator, 'import-def-target'), (r'\{', Punctuation), (r'\}', Punctuation), (r',', Punctuation), (r'[a-zA-Z_][a-zA-Z0-9_]*\*(?:[a-zA-Z0-9_*]*)', Name.Variable), (r'\*[a-zA-Z0-9_][a-zA-Z0-9_*]*', Name.Variable), (r'[a-zA-Z_][a-zA-Z0-9_]*', Name), (r'\*', Operator), ], 'import-def-target': [ include('whitespace'), include('comments'), (r';', Punctuation, ('#pop', '#pop')), (r'\$\{[0-9]+\}|\$[0-9]+', Name.Variable), (r'[a-zA-Z_][a-zA-Z0-9_]*(?:(?:\$\{[0-9]+\}|\$[0-9]+|\*)(?:[a-zA-Z0-9_]*))*', Name.Variable), (r'\*', Operator), (r'[a-zA-Z_][a-zA-Z0-9_]*', Name), ], 'whitespace': [ (r'\s+', Whitespace), ], 'comments': [ # Multiline comments (not skipped - used for documentation) (r'/\*', Comment.Multiline, 'comment-multiline'), # Single-line comments (r'//[^\r\n]*', Comment.Single), # Python-style comments (r'#[^\r\n]*', Comment.Single), ], 'comment-multiline': [ (r'[^*/]+', Comment.Multiline), (r'/\*', Comment.Multiline, '#push'), # Nested comments (r'\*/', Comment.Multiline, '#pop'), (r'[*/]', Comment.Multiline), ], } @staticmethod def _mask_analysis_text(fragment: str) -> str: """Replace non-newline characters with spaces to preserve line layout.""" return re.sub(r'[^\n]', ' ', fragment) @classmethod def _mask_disabled_preprocessor_blocks(cls, text: str) -> str: """Hide ``#if 0 ... #endif`` regions which are not active code.""" lines = text.splitlines(keepends=True) masked = [] disabled_depth = 0 for line in lines: stripped = line.lstrip() if disabled_depth: masked.append(cls._mask_analysis_text(line)) if re.match(r'#if(?:n?def)?\b', stripped): disabled_depth += 1 elif re.match(r'#endif\b', stripped): disabled_depth -= 1 continue if re.match(r'#if\s+0\b', stripped): disabled_depth = 1 masked.append(cls._mask_analysis_text(line)) else: masked.append(line) return ''.join(masked) @classmethod def _mask_plantuml_blocks(cls, text: str) -> str: """Hide PlantUML note and legend payload blocks.""" lines = text.splitlines(keepends=True) masked = [] block_kind = None for line in lines: stripped = line.strip().lower() if block_kind is not None: masked.append(cls._mask_analysis_text(line)) if (block_kind == 'note' and stripped == 'end note') or ( block_kind == 'legend' and stripped == 'endlegend' ): block_kind = None continue if re.match(r'(?i)^note\b', stripped): block_kind = 'note' masked.append(cls._mask_analysis_text(line)) continue if re.match(r'(?i)^legend\b', stripped): block_kind = 'legend' masked.append(cls._mask_analysis_text(line)) continue masked.append(line) return ''.join(masked) @classmethod def _strip_non_semantic_regions(cls, text: str) -> str: """ Remove comment/string-like bait so scoring only sees live code structure. """ text = text.replace('\r\n', '\n').replace('\r', '\n') text = cls._mask_disabled_preprocessor_blocks(text) text = cls._mask_plantuml_blocks(text) text = re.sub( r'(?ms)^[ \t]*=begin\b.*?^[ \t]*=end\b[^\n]*(?:\n|$)', lambda match: cls._mask_analysis_text(match.group(0)), text, ) for pattern in cls._ANALYSIS_MASK_PATTERNS: text = pattern.sub(lambda match: cls._mask_analysis_text(match.group(0)), text) text = re.sub(r'(?m)//[^\n]*', lambda match: cls._mask_analysis_text(match.group(0)), text) text = re.sub(r'(?m)#[^\n]*', lambda match: cls._mask_analysis_text(match.group(0)), text) text = re.sub(r"(?m)^[ \t]*'[^\n]*", lambda match: cls._mask_analysis_text(match.group(0)), text) return text @classmethod def _analysis_tokenize(cls, text: str) -> List[str]: """Tokenize live code into a lightweight FCSTM-oriented token stream.""" return cls._ANALYSIS_TOKEN_PATTERN.findall(text) @classmethod def _analysis_is_identifier(cls, token: str) -> bool: """Return whether token is a non-keyword identifier-like symbol.""" return bool(cls._ANALYSIS_IDENTIFIER_PATTERN.match(token)) and token not in cls._ANALYSIS_RESERVED_WORDS @classmethod def _analysis_collect_state_spans(cls, tokens: List[str]) -> List[Tuple[int, int, bool, bool]]: """ Collect spans for ``state`` declarations/blocks in the token stream. The scan is intentionally tolerant to missing alias string literals, because string payload is masked out during bait removal. """ spans = [] index = 0 while index < len(tokens): start = index if tokens[index] == 'pseudo': if index + 1 >= len(tokens) or tokens[index + 1] != 'state': index += 1 continue state_index = index + 1 else: state_index = index if ( tokens[state_index] != 'state' or state_index + 1 >= len(tokens) or not cls._analysis_is_identifier(tokens[state_index + 1]) ): index += 1 continue tail_index = state_index + 2 if tail_index < len(tokens) and tokens[tail_index] == 'named': tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] not in {';', '{', '}'}: tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] in {';', '{'}: spans.append((start, tail_index, tokens[tail_index] == '{', tokens[tail_index] == ';')) index = tail_index + 1 else: index += 1 return spans @classmethod def _analysis_collect_event_spans(cls, tokens: List[str]) -> List[Tuple[int, int]]: """Collect spans for ``event`` declarations.""" spans = [] for index, token in enumerate(tokens[:-2]): if token != 'event' or not cls._analysis_is_identifier(tokens[index + 1]): continue tail_index = index + 2 if tail_index < len(tokens) and tokens[tail_index] == 'named': tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] != ';': tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == ';': spans.append((index, tail_index)) return spans @classmethod def _analysis_collect_import_spans(cls, tokens: List[str]) -> List[Tuple[int, int, bool, bool]]: """Collect spans for ``import`` declarations and mapping blocks.""" spans = [] for index, token in enumerate(tokens): if token != 'import' or index + 2 >= len(tokens): continue if tokens[index + 1] == 'as': alias_index = index + 2 elif index + 3 < len(tokens) and tokens[index + 2] == 'as': alias_index = index + 3 else: continue if alias_index >= len(tokens) or not cls._analysis_is_identifier(tokens[alias_index]): continue tail_index = alias_index + 1 has_named = False if tail_index < len(tokens) and tokens[tail_index] == 'named': has_named = True tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] not in {';', '{', '}'}: tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] in {';', '{'}: spans.append((index, tail_index, tokens[tail_index] == '{', has_named)) return spans @classmethod def _analysis_collect_import_def_mapping_spans(cls, tokens: List[str]) -> List[Tuple[int, int]]: """Collect shallow spans for ``def ... -> ...;`` mappings inside import blocks.""" spans = [] for index, token in enumerate(tokens): if token != 'def': continue arrow_index = index + 1 while arrow_index < len(tokens) and tokens[arrow_index] != ';' and arrow_index - index <= 16: if tokens[arrow_index] == '->': break arrow_index += 1 if arrow_index >= len(tokens) or tokens[arrow_index] != '->': continue tail_index = arrow_index + 1 while tail_index < len(tokens) and tokens[tail_index] != ';' and tail_index - index <= 24: tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == ';': spans.append((index, tail_index)) return spans @classmethod def _analysis_collect_import_event_mapping_spans(cls, tokens: List[str]) -> List[Tuple[int, int, bool]]: """Collect shallow spans for ``event ... -> ...;`` mappings inside import blocks.""" spans = [] for index, token in enumerate(tokens): if token != 'event': continue arrow_index = index + 1 while arrow_index < len(tokens) and tokens[arrow_index] != ';' and arrow_index - index <= 20: if tokens[arrow_index] == '->': break arrow_index += 1 if arrow_index >= len(tokens) or tokens[arrow_index] != '->': continue tail_index = arrow_index + 1 has_named = False while tail_index < len(tokens) and tokens[tail_index] != ';' and tail_index - index <= 28: if tokens[tail_index] == 'named': has_named = True tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == ';': spans.append((index, tail_index, has_named)) return spans @classmethod def _analysis_collect_def_spans(cls, tokens: List[str]) -> List[Tuple[int, int]]: """Collect spans for ``def int/float`` declarations.""" spans = [] for index, token in enumerate(tokens[:-2]): if ( token == 'def' and index + 2 < len(tokens) and tokens[index + 1] in {'int', 'float'} and cls._analysis_is_identifier(tokens[index + 2]) ): tail_index = index + 3 while tail_index < len(tokens) and tokens[tail_index] != ';' and tail_index - index <= 24: tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == ';': spans.append((index, tail_index)) return spans @classmethod def _analysis_collect_lifecycle_spans( cls, tokens: List[str], ) -> List[Tuple[int, int, bool, bool]]: """Collect spans for ``enter``/``during``/``exit`` handler-like constructs.""" spans = [] for index, token in enumerate(tokens): if token not in cls._ANALYSIS_LIFECYCLE_KEYWORDS: continue if index > 0 and tokens[index - 1] == '>>': continue tail_index = index + 1 if tail_index < len(tokens) and tokens[tail_index] in {'before', 'after'}: tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == 'abstract': tail_index += 1 if tail_index < len(tokens) and cls._analysis_is_identifier(tokens[tail_index]): tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == 'ref': tail_index += 1 while ( tail_index < len(tokens) and tokens[tail_index] not in {';', '{', '}'} and tail_index - index <= 16 ): tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] in {';', '{', '}'} and tail_index - index <= 16: spans.append((index, tail_index, tokens[tail_index] == '{', tail_index == index + 1)) return spans @classmethod def _analysis_collect_aspect_spans(cls, tokens: List[str]) -> List[Tuple[int, int, bool]]: """Collect spans for ``>>`` aspect handlers.""" spans = [] for index, token in enumerate(tokens[:-1]): if token != '>>' or tokens[index + 1] not in cls._ANALYSIS_LIFECYCLE_KEYWORDS: continue tail_index = index + 2 if tail_index < len(tokens) and tokens[tail_index] in {'before', 'after'}: tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == 'abstract': tail_index += 1 if tail_index < len(tokens) and cls._analysis_is_identifier(tokens[tail_index]): tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] == 'ref': tail_index += 1 while ( tail_index < len(tokens) and tokens[tail_index] not in {';', '{', '}'} and tail_index - index <= 18 ): tail_index += 1 if tail_index < len(tokens) and tokens[tail_index] in {';', '{', '}'} and tail_index - index <= 18: spans.append((index, tail_index, tokens[tail_index] == '{')) return spans @classmethod def _analysis_collect_transition_spans(cls, tokens: List[str]) -> List[Tuple[int, int, bool]]: """ Collect spans for FCSTM-like transitions. This is intentionally shallow: it only looks for plausible source/target shapes and a nearby statement terminator or effect block end. """ spans = [] for index, token in enumerate(tokens): if token != '->' or index == 0 or index + 1 >= len(tokens): continue if tokens[index - 1] == '*' and index >= 2 and tokens[index - 2] == '!': start = index - 2 forced = True elif cls._analysis_is_identifier(tokens[index - 1]) and index >= 2 and tokens[index - 2] == '!': start = index - 2 forced = True elif tokens[index - 1] == '[*]': start = index - 1 forced = False elif cls._analysis_is_identifier(tokens[index - 1]): start = index - 1 forced = False else: continue if tokens[index + 1] not in {'[*]'} and not cls._analysis_is_identifier(tokens[index + 1]): continue rich = forced or tokens[index - 1] == '[*]' or tokens[index + 1] == '[*]' saw_effect_block = False tail_index = index + 2 while tail_index < len(tokens) and tail_index - start <= 32: if tokens[tail_index] in {'::', ':', 'effect'}: rich = True if tokens[tail_index] == 'effect' and tail_index + 1 < len(tokens) and tokens[tail_index + 1] == '{': saw_effect_block = True if tokens[tail_index] == ';': spans.append((start, tail_index, rich)) break if tokens[tail_index] == '}' and saw_effect_block: spans.append((start, tail_index, rich)) break tail_index += 1 return spans @classmethod def _analysis_has_leading_construct(cls, tokens: List[str]) -> bool: """Check whether the file starts like a top-level FCSTM declaration.""" if len(tokens) >= 3 and tokens[0] == 'state' and cls._analysis_is_identifier(tokens[1]): return tokens[2] in {';', '{'} or ( len(tokens) >= 4 and tokens[2] == 'named' and tokens[3] in {';', '{'} ) if len(tokens) >= 4 and tokens[0] == 'pseudo' and tokens[1] == 'state' and cls._analysis_is_identifier(tokens[2]): return tokens[3] in {';', '{'} or ( len(tokens) >= 5 and tokens[3] == 'named' and tokens[4] in {';', '{'} ) if len(tokens) >= 4 and tokens[0] == 'def' and tokens[1] in {'int', 'float'} and cls._analysis_is_identifier(tokens[2]): return ';' in tokens[3:] if len(tokens) >= 3 and tokens[0] == 'event' and cls._analysis_is_identifier(tokens[1]): return tokens[2] == ';' or (len(tokens) >= 4 and tokens[2] == 'named' and tokens[3] == ';') if len(tokens) >= 4 and tokens[0] == 'import' and tokens[2] == 'as' and cls._analysis_is_identifier(tokens[3]): return ';' in tokens[4:] or '{' in tokens[4:] return False @staticmethod def _analysis_span_density(token_count: int, spans: List[Tuple[int, int, object]]) -> float: """Compute coverage ratio of recognised FCSTM spans over the token stream.""" covered = set() for span in spans: start, end = span[:2] covered.update(range(start, end + 1)) return len(covered) / max(token_count, 1) def analyse_text(text: str) -> float: """ Analyze text to determine if it is likely FCSTM code. This method is used by Pygments to heuristically determine whether the input should be lexed by :class:`FcstmLexer`. It scans for key tokens and constructs a confidence score in the range ``0.0`` to ``1.0``. The heuristic balances recall (detecting FCSTM files) with precision (avoiding false positives from other languages like C++, Rust, Java). It deliberately uses only string and token-stream operations. This keeps detection tolerant of incomplete or slightly broken FCSTM input without depending on a successful DSL parse/load round-trip. :param text: Text content to analyze :type text: str :return: Confidence score indicating likelihood of FCSTM syntax :rtype: float Example:: >>> # FCSTM code - should score high >>> fcstm_code = ''' ... def int counter = 0; ... state MyState { ... enter { counter = 0; } ... [*] -> Active; ... } ... ''' >>> FcstmLexer.analyse_text(fcstm_code) 1.0 >>> # C++ code - should score low >>> cpp_code = ''' ... class MyClass { ... void enter() { counter = 0; } ... std::vector<int> data; ... }; ... ''' >>> FcstmLexer.analyse_text(cpp_code) 0.0 >>> # Python code - should score low >>> python_code = ''' ... def enter(): ... counter = 0 ... state = "active" ... ''' >>> FcstmLexer.analyse_text(python_code) 0.0 >>> # Java code - should score low >>> java_code = ''' ... public class State { ... private int counter = 0; ... public void enter() { counter = 0; } ... } ... ''' >>> FcstmLexer.analyse_text(java_code) 0.0 >>> # Rust code - should score low >>> rust_code = ''' ... struct State { ... counter: i32, ... } ... impl State { ... fn enter(&mut self) { self.counter = 0; } ... } ... ''' >>> FcstmLexer.analyse_text(rust_code) 0.0 """ analysis_text = FcstmLexer._strip_non_semantic_regions(text) tokens = FcstmLexer._analysis_tokenize(analysis_text) if not tokens: return 0.0 state_spans = FcstmLexer._analysis_collect_state_spans(tokens) event_spans = FcstmLexer._analysis_collect_event_spans(tokens) import_spans = FcstmLexer._analysis_collect_import_spans(tokens) def_spans = FcstmLexer._analysis_collect_def_spans(tokens) import_def_mapping_spans = FcstmLexer._analysis_collect_import_def_mapping_spans(tokens) import_event_mapping_spans = FcstmLexer._analysis_collect_import_event_mapping_spans(tokens) lifecycle_spans = FcstmLexer._analysis_collect_lifecycle_spans(tokens) aspect_spans = FcstmLexer._analysis_collect_aspect_spans(tokens) transition_spans = FcstmLexer._analysis_collect_transition_spans(tokens) state_blocks = sum(1 for _, _, is_block, _ in state_spans if is_block) state_decls = sum(1 for _, _, _, is_decl in state_spans if is_decl) state_named = sum( 1 for start, _, _, _ in state_spans if 'named' in tokens[start:min(len(tokens), start + 5)] ) event_named = sum( 1 for start, _ in event_spans if 'named' in tokens[start:min(len(tokens), start + 5)] ) import_blocks = sum(1 for _, _, is_block, _ in import_spans if is_block) import_named = sum(1 for _, _, _, has_named in import_spans if has_named) import_event_named = sum(1 for _, _, has_named in import_event_mapping_spans if has_named) lifecycle_blocks = sum(1 for _, _, is_block, _ in lifecycle_spans if is_block) lifecycle_bare = sum(1 for _, _, _, is_bare in lifecycle_spans if is_bare) lifecycle_abstract = sum( 1 for start, _, _, _ in lifecycle_spans if 'abstract' in tokens[start:min(len(tokens), start + 6)] ) lifecycle_ref = sum( 1 for start, _, _, _ in lifecycle_spans if 'ref' in tokens[start:min(len(tokens), start + 12)] ) lifecycle_before_after = sum( 1 for start, _, _, _ in lifecycle_spans if any(token in {'before', 'after'} for token in tokens[start:min(len(tokens), start + 5)]) ) rich_transitions = sum(1 for _, _, is_rich in transition_spans if is_rich) plain_transitions = len(transition_spans) - rich_transitions score = 0.0 # Structural FCSTM signals from the token stream. score += min(state_blocks * 0.26 + state_decls * 0.18, 0.46) score += min(len(event_spans) * 0.10, 0.14) score += min(len(import_spans) * 0.14 + import_blocks * 0.04, 0.18) score += min(len(def_spans) * 0.16, 0.20) score += min(len(import_def_mapping_spans) * 0.10 + len(import_event_mapping_spans) * 0.12, 0.28) score += min( lifecycle_blocks * 0.16 + lifecycle_bare * 0.08 + max(len(lifecycle_spans) - lifecycle_blocks - lifecycle_bare, 0) * 0.12, 0.24, ) score += min(len(aspect_spans) * 0.18, 0.18) score += min(rich_transitions * 0.24 + plain_transitions * 0.14, 0.32) score += min( ( state_named + event_named + import_named + import_event_named + lifecycle_abstract + lifecycle_ref + lifecycle_before_after ) * 0.04, 0.12, ) has_leading_construct = FcstmLexer._analysis_has_leading_construct(tokens) if has_leading_construct: score += 0.45 span_density = FcstmLexer._analysis_span_density( len(tokens), state_spans + event_spans + import_spans + def_spans + import_def_mapping_spans + import_event_mapping_spans + lifecycle_spans + aspect_spans + transition_spans, ) for pattern, penalty in FcstmLexer._ANALYSIS_NEGATIVE_PATTERNS: if pattern.search(analysis_text): score -= penalty if has_leading_construct and score > 0.0: if score >= 0.45: score += 0.32 * span_density elif span_density >= 0.75: score += 0.18 * span_density # Ensure score stays in valid range return max(0.0, min(score, 1.0))