Source code for pyfcstm.entry.simulate.completer

"""
Auto-completion support for the simulation REPL.

This module provides comprehensive command and argument completion using
prompt_toolkit's completion framework. It supports context-aware completion
for all commands including their arguments and values.
"""

from typing import Iterable

from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.document import Document


[docs] class SimulationCompleter(Completer): """ Completer for simulation REPL commands and arguments. This class provides comprehensive context-aware completion for: - Command names (with partial matching) - Event names for cycle command (both full-path and short names) - Count values for cycle and history commands - Setting keys and values for setting command - Log levels for log_level setting :param runtime: The simulation runtime instance :type runtime: SimulationRuntime :ivar COMMANDS: List of available commands :vartype COMMANDS: list :ivar LOG_LEVELS: List of available log levels :vartype LOG_LEVELS: list """ COMMANDS = [ 'cycle', 'init', 'clear', 'current', 'events', 'history', 'setting', 'export', 'help', 'quit', 'exit' ] LOG_LEVELS = ['debug', 'info', 'warning', 'error', 'off']
[docs] def __init__(self, runtime): """ Initialize the completer. :param runtime: The simulation runtime instance :type runtime: SimulationRuntime """ self.runtime = runtime
[docs] def get_completions(self, document: Document, complete_event) -> Iterable[Completion]: """ Generate completions for the current input. :param document: The current document :type document: Document :param complete_event: The completion event :return: Iterator of completion suggestions :rtype: Iterable[Completion] """ text = document.text_before_cursor words = text.split() # Command completion - when no space or typing command if not words or (len(words) == 1 and ' ' not in text): prefix = text.strip() for cmd in self.COMMANDS: if cmd.startswith(prefix): yield Completion( cmd, start_position=-len(prefix), display_meta=self._get_command_help(cmd) ) return # Get command and check for argument completion command = words[0] # cycle command - complete with count or events if command == 'cycle': # If we have only "cycle " or typing first argument if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')): # Suggest count numbers and events prefix = words[1] if len(words) == 2 else '' # Suggest common count values for count in ['1', '5', '10', '20', '50', '100']: if count.startswith(prefix): yield Completion( count, start_position=-len(prefix), display_meta='cycle count' ) # Also suggest events events = self._get_current_events() for event in events: if event.startswith(prefix): yield Completion( event, start_position=-len(prefix), display_meta='event name' ) else: # After count or additional events prefix = words[-1] if not text.endswith(' ') else '' events = self._get_current_events() for event in events: if event.startswith(prefix): yield Completion( event, start_position=-len(prefix), display_meta='event name' ) # init command - complete with state paths and variable assignments elif command == 'init': # First argument - state path if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')): prefix = words[1] if len(words) == 2 else '' # Get all available state paths state_paths = self._get_all_state_paths() for state_path in state_paths: if state_path.startswith(prefix): # Determine if it's a leaf or composite state state = self._find_state_by_path(state_path) if state: meta = 'leaf state' if state.is_leaf_state else 'composite state' else: meta = 'state' yield Completion( state_path, start_position=-len(prefix), display_meta=meta ) # Subsequent arguments - variable assignments else: # Get the last word being typed prefix = words[-1] if not text.endswith(' ') else '' # Check if we're in the middle of typing a variable assignment if '=' in prefix: # User is typing the value part, suggest value formats var_name, value_prefix = prefix.split('=', 1) var_name = var_name.strip() # Get variable type var_type = self._get_variable_type(var_name) if var_type: # Suggest example values based on type if var_type == 'int': examples = ['0', '1', '10', '100', '0xFF', '0b1010'] else: # float examples = ['0.0', '1.0', '10.5', '1e-3'] for example in examples: if example.startswith(value_prefix): yield Completion( f"{var_name}={example}", start_position=-len(prefix), display_meta=f'{var_type} value' ) else: # User is typing variable name, suggest all variables var_names = self._get_all_variable_names() # Filter out already assigned variables assigned_vars = set() for word in words[2:]: # Skip 'init' and state_path if '=' in word: var_name = word.split('=')[0].strip() assigned_vars.add(var_name) for var_name in var_names: if var_name not in assigned_vars and var_name.startswith(prefix): var_type = self._get_variable_type(var_name) # Suggest with '=' appended for convenience yield Completion( f"{var_name}=", start_position=-len(prefix), display_meta=f'{var_type} variable' ) # history command - complete with count or 'all' elif command == 'history': if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')): prefix = words[1] if len(words) == 2 else '' # Suggest 'all' keyword if 'all'.startswith(prefix): yield Completion( 'all', start_position=-len(prefix), display_meta='show all history' ) # Suggest common count values for count in ['5', '10', '20', '50', '100']: if count.startswith(prefix): yield Completion( count, start_position=-len(prefix), display_meta='history count' ) # setting command - complete with keys and values elif command == 'setting': setting_keys = ['table_max_rows', 'history_size', 'color', 'log_level'] # First argument - setting key if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')): prefix = words[1] if len(words) == 2 else '' for key in setting_keys: if key.startswith(prefix): yield Completion( key, start_position=-len(prefix), display_meta=self._get_setting_help(key) ) # Second argument - setting value elif len(words) >= 2: setting_key = words[1] prefix = words[2] if len(words) == 3 and not text.endswith(' ') else '' # Suggest values based on setting key if setting_key == 'log_level': for level in self.LOG_LEVELS: if level.startswith(prefix): yield Completion( level, start_position=-len(prefix), display_meta='log level' ) elif setting_key == 'color': for value in ['on', 'off', 'true', 'false']: if value.startswith(prefix): yield Completion( value, start_position=-len(prefix), display_meta='color setting' ) elif setting_key in ['table_max_rows', 'history_size']: # Suggest common numeric values for value in ['10', '20', '50', '100', '200', '500', '1000']: if value.startswith(prefix): yield Completion( value, start_position=-len(prefix), display_meta='numeric value' ) # export command - complete with filesystem paths elif command == 'export': if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')): prefix = words[1] if len(words) == 2 else '' import os from pathlib import Path # Supported export formats SUPPORTED_EXTENSIONS = {'.csv', '.json', '.yaml', '.jsonl'} # Parse the prefix to get directory and filename parts if prefix: # Expand user home directory expanded_prefix = os.path.expanduser(prefix) prefix_path = Path(expanded_prefix) # Determine the directory to search and the filename prefix if prefix.endswith(os.sep) or prefix.endswith('/') or prefix.endswith('\\'): # User typed a trailing slash, complete files in that directory search_dir = prefix_path file_prefix = '' has_dirname = True else: # Split into directory and filename parts parent = prefix_path.parent # Check if parent is different from current path (has dirname) has_dirname = str(parent) != '.' search_dir = parent if has_dirname else Path('.') file_prefix = prefix_path.name else: # No prefix, search current directory search_dir = Path('.') file_prefix = '' has_dirname = False # Collect completions with priority priority_completions = [] # Directories and supported format files other_completions = [] # Other files # Get completions from filesystem try: if search_dir.exists() and search_dir.is_dir(): for item in sorted(search_dir.iterdir()): item_name = item.name # Check if item matches the prefix if item_name.startswith(file_prefix): # Build the completion text if prefix and has_dirname: # Calculate the relative path from prefix if prefix.endswith(os.sep) or prefix.endswith('/') or prefix.endswith('\\'): completion_text = item_name else: # Replace the filename part completion_text = str(Path(prefix).parent / item_name) # Normalize path separators for current OS completion_text = completion_text.replace('\\', os.sep).replace('/', os.sep) else: completion_text = item_name # Determine priority and metadata if item.is_dir(): completion_text += os.sep meta = 'directory' is_priority = True else: # Show file extension as metadata ext = item.suffix.lower() is_priority = ext in SUPPORTED_EXTENSIONS ext_upper = item.suffix.upper() meta = f'{ext_upper[1:]} file' if ext_upper else 'file' completion = Completion( completion_text, start_position=-len(prefix), display_meta=meta ) if is_priority: priority_completions.append(completion) else: other_completions.append(completion) except (OSError, PermissionError): # Ignore filesystem errors pass # If no dirname in prefix, also suggest common filenames if not has_dirname and not any(sep in prefix for sep in ['/', '\\']): for ext in ['.csv', '.json', '.yaml', '.jsonl']: filename = f'history{ext}' if filename.startswith(prefix): # Add to priority completions if not already present completion = Completion( filename, start_position=-len(prefix), display_meta=f'{ext[1:].upper()} format' ) # Check if not duplicate if not any(c.text == filename for c in priority_completions): priority_completions.insert(0, completion) # Yield priority completions first, then others for completion in priority_completions: yield completion for completion in other_completions: yield completion
def _get_current_events(self) -> list: """ Get available events in the current state. Returns both full-path and short event names. :return: List of event names :rtype: list """ if not self.runtime.current_state: return [] current_state = self.runtime.current_state current_state_name = current_state.name events = set() # Check parent's transitions for transitions from current state if current_state.parent: parent = current_state.parent for transition in parent.transitions: # Check if this transition is from the current state if transition.from_state == current_state_name and transition.event: # Add full path event_path = '.'.join(transition.event.state_path) + '.' + transition.event.name events.add(event_path) # Add short name events.add(transition.event.name) return sorted(events) def _get_command_help(self, cmd: str) -> str: """ Get help text for a command. :param cmd: The command name :type cmd: str :return: Help text :rtype: str """ help_map = { 'cycle': 'Execute cycle(s) with optional events', 'init': 'Hot start from specific state', 'clear': 'Reset to initial state', 'current': 'Show current state and variables', 'events': 'List available events', 'history': 'Show execution history', 'setting': 'View or change settings', 'export': 'Export history to file', 'help': 'Show help', 'quit': 'Exit simulator', 'exit': 'Exit simulator', } return help_map.get(cmd, '') def _get_setting_help(self, key: str) -> str: """ Get help text for a setting key. :param key: The setting key :type key: str :return: Help text :rtype: str """ help_map = { 'table_max_rows': 'max rows in tables (default: 20)', 'history_size': 'max history entries (default: 100)', 'color': 'enable/disable colors (on/off)', 'log_level': 'logging level (debug/info/warning/error/off)', } return help_map.get(key, '') def _get_all_state_paths(self) -> list: """ Get all state paths in the state machine. Returns a list of dot-separated state paths from root to each state. :return: List of state paths :rtype: list """ paths = [] def collect_paths(state, current_path): """Recursively collect all state paths.""" paths.append(current_path) # Recursively collect substates for substate in state.substates.values(): subpath = f"{current_path}.{substate.name}" collect_paths(substate, subpath) # Start from root state if hasattr(self.runtime, 'state_machine') and self.runtime.state_machine: root = self.runtime.state_machine.root_state collect_paths(root, root.name) return sorted(paths) def _find_state_by_path(self, state_path: str): """ Find a state by its dot-separated path. :param state_path: Dot-separated state path (e.g., "Root.System.Active") :type state_path: str :return: State object or None :rtype: State or None """ if not hasattr(self.runtime, 'state_machine') or not self.runtime.state_machine: return None parts = state_path.split('.') current = self.runtime.state_machine.root_state # Verify root name matches if parts[0] != current.name: return None # Traverse the path for part in parts[1:]: if part in current.substates: current = current.substates[part] else: return None return current def _get_all_variable_names(self) -> list: """ Get all variable names defined in the state machine. :return: List of variable names :rtype: list """ if hasattr(self.runtime, 'vars'): return sorted(self.runtime.vars.keys()) return [] def _get_variable_type(self, var_name: str) -> str: """ Get the type of a variable. :param var_name: Variable name :type var_name: str :return: Variable type ('int' or 'float') or empty string if not found :rtype: str """ if not hasattr(self.runtime, 'state_machine') or not self.runtime.state_machine: return '' defines = self.runtime.state_machine.defines if var_name in defines: return defines[var_name].type return ''