"""
Command processor for the simulation REPL.
This module provides command parsing and execution for the interactive
state machine simulator.
"""
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Tuple, Dict, Any
from ...simulate import (
SimulationRuntimeDfsError,
SimulationRuntimeEventError,
SimulationRuntimeExpressionError,
)
from .display import StateDisplay
from .logging import configure_simulate_cli_logger
[docs]
class LogLevel(Enum):
"""
Log level enumeration for controlling output verbosity.
:cvar DEBUG: Show all messages including debug information
:cvar INFO: Show informational messages and above
:cvar WARNING: Show warnings and errors only
:cvar ERROR: Show errors only
:cvar OFF: Disable all logging
"""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
OFF = "off"
[docs]
class Settings:
"""
Runtime settings for the simulator.
:ivar table_max_rows: Maximum rows to display in tables before truncating
:vartype table_max_rows: int
:ivar history_size: Maximum number of history entries to keep
:vartype history_size: int
:ivar color: Whether to use ANSI colors
:vartype color: bool
:ivar log_level: Current log level
:vartype log_level: LogLevel
"""
[docs]
def __init__(self):
"""Initialize settings with default values."""
self.table_max_rows: int = 20
self.history_size: int = 100
self.color: bool = True
self.log_level: LogLevel = LogLevel.WARNING
[docs]
def get(self, key: str) -> Any:
"""
Get a setting value.
:param key: Setting key
:type key: str
:return: Setting value
:rtype: Any
:raises KeyError: If key doesn't exist
"""
if not hasattr(self, key):
raise KeyError(f"Unknown setting: {key}")
return getattr(self, key)
[docs]
def set(self, key: str, value: Any) -> None:
"""
Set a setting value.
:param key: Setting key
:type key: str
:param value: Setting value
:type value: Any
:raises KeyError: If key doesn't exist
:raises ValueError: If value is invalid
"""
if not hasattr(self, key):
raise KeyError(f"Unknown setting: {key}")
# Validate and convert value based on type
current_value = getattr(self, key)
if isinstance(current_value, bool):
if isinstance(value, str):
if value.lower() in ('on', 'true', '1', 'yes'):
value = True
elif value.lower() in ('off', 'false', '0', 'no'):
value = False
else:
raise ValueError(f"Invalid boolean value: {value}")
elif isinstance(current_value, int):
try:
value = int(value)
if value < 0:
raise ValueError(f"Value must be non-negative: {value}")
except (ValueError, TypeError):
raise ValueError(f"Invalid integer value: {value}")
elif isinstance(current_value, LogLevel):
if isinstance(value, str):
try:
value = LogLevel(value.lower())
except ValueError:
valid_levels = [level.value for level in LogLevel]
raise ValueError(f"Invalid log level. Available: {', '.join(valid_levels)}")
setattr(self, key, value)
[docs]
def list_all(self) -> Dict[str, Any]:
"""
Get all settings as a dictionary.
:return: Dictionary of all settings
:rtype: Dict[str, Any]
"""
return {
'table_max_rows': self.table_max_rows,
'history_size': self.history_size,
'color': self.color,
'log_level': self.log_level.value if isinstance(self.log_level, LogLevel) else self.log_level,
}
[docs]
@dataclass
class CommandResult:
"""
Result of a command execution.
:param output: The output text to display to the user
:type output: str
:param should_exit: Whether the REPL should exit after this command
:type should_exit: bool
"""
output: str
should_exit: bool = False
[docs]
class CommandProcessor:
"""
Processor for handling interactive commands.
This class routes commands to their handlers and manages command execution
state including log level.
:param runtime: The simulation runtime instance
:type runtime: SimulationRuntime
:ivar display: State display formatter
:vartype display: StateDisplay
:ivar log_level: Current log level
:vartype log_level: LogLevel
"""
[docs]
def __init__(self, runtime, state_machine=None, use_color: bool = True):
"""
Initialize the command processor.
:param runtime: The simulation runtime instance
:type runtime: SimulationRuntime
:param state_machine: The state machine model (required for init command)
:type state_machine: StateMachine, optional
:param use_color: Whether to use ANSI colors, defaults to True
:type use_color: bool, optional
"""
self.runtime = runtime
self.state_machine = state_machine if state_machine is not None else runtime.state_machine
self.settings = Settings()
self.settings.color = use_color
self.display = StateDisplay(use_color=use_color, logger=runtime.logger)
configure_simulate_cli_logger(self.runtime.logger, use_color=use_color)
# Sync log level with runtime logger
self._sync_log_level()
def _sync_log_level(self):
"""
Synchronize log level setting with runtime logger.
"""
import logging
level_map = {
LogLevel.DEBUG: logging.DEBUG,
LogLevel.INFO: logging.INFO,
LogLevel.WARNING: logging.WARNING,
LogLevel.ERROR: logging.ERROR,
LogLevel.OFF: logging.CRITICAL + 10, # Higher than CRITICAL to disable all
}
self.runtime.logger.setLevel(level_map[self.settings.log_level])
[docs]
def process(self, user_input: str) -> CommandResult:
"""
Process a user command.
:param user_input: The raw user input string
:type user_input: str
:return: Command execution result
:rtype: CommandResult
"""
parts = user_input.strip().split()
if not parts:
return CommandResult("")
command = parts[0]
args = parts[1:]
if command == 'cycle':
return self._handle_cycle(args)
elif command == 'init':
return self._handle_init(args)
elif command == 'clear':
return self._handle_clear()
elif command == 'current':
return self._handle_current()
elif command == 'events':
return self._handle_events()
elif command == 'history':
return self._handle_history(args)
elif command == 'setting':
return self._handle_setting(args)
elif command == 'export':
return self._handle_export(args)
elif command == 'help':
return self._handle_help()
elif command in ['quit', 'exit']:
return CommandResult("Goodbye!", should_exit=True)
else:
return CommandResult(f"Unknown command: {command}. Type 'help' for available commands.")
def _handle_cycle(self, events: List[str]) -> CommandResult:
"""
Handle cycle command with optional count parameter.
Supports two formats:
- cycle [events...] - Execute 1 cycle with optional events
- cycle [count] [events...] - Execute count cycles with optional events
For count > 1, displays results in table format.
:param events: List containing optional count and event names
:type events: List[str]
:return: Command result with current state or table
:rtype: CommandResult
"""
try:
# Parse arguments: first arg might be count
count = 1
event_list = events
if events:
# Check if first argument looks like a number
first_arg = events[0]
if first_arg.lstrip('-').isdigit():
# First argument is a number
try:
count = int(first_arg)
if count <= 0:
return CommandResult("Error: cycle count must be a positive integer")
event_list = events[1:]
except ValueError:
return CommandResult(f"Error: invalid cycle count '{first_arg}'")
# Single cycle - use simple format
if count == 1:
if self.settings.log_level == LogLevel.DEBUG:
self.display.log(f"Executing cycle with events: {event_list if event_list else 'none'}", "debug")
self.runtime.cycle(event_list if event_list else None)
return CommandResult(self.display.format_current_state(self.runtime))
# Multiple cycles - use table format
return self._handle_multiple_cycles(count, event_list)
except SimulationRuntimeDfsError:
# SimulationRuntimeDfsError: runtime.cycle validation exceeded its
# DFS safety limits for the user's state machine.
return CommandResult(
"Cycle execution failed: State machine contains an unbounded execution chain.\n"
"This usually means there are too many automatic transitions without a stable state.\n"
"Please review your state machine definition for missing stoppable states."
)
except (SimulationRuntimeEventError, SimulationRuntimeExpressionError) as e:
# SimulationRuntimeEventError: runtime.cycle rejected a
# user-supplied event path after trying supported resolution modes.
# SimulationRuntimeExpressionError: user DSL guard/action
# expression evaluation failed, for example division by zero,
# math domain error, or numeric overflow.
return CommandResult(f"Cycle execution failed: {e}")
def _handle_init(self, args: List[str]) -> CommandResult:
"""
Handle init command to hot start from a specific state.
Syntax: init <state_path> [var1=value1 var2=value2 ...]
This command creates a new runtime instance starting from the specified
state without executing enter actions. All variables must be provided.
:param args: Command arguments [state_path, var_assignments...]
:type args: List[str]
:return: Command result with new current state
:rtype: CommandResult
"""
if not args:
return CommandResult(
"Usage: init <state_path> [var1=value1 ...]\n"
"Example: init System.Active counter=10 flag=1\n"
"Note: All variables must be provided when using init."
)
state_path = args[0]
var_assignments = args[1:]
# Parse variable assignments
initial_vars = {}
for assignment in var_assignments:
if '=' not in assignment:
return CommandResult(
f"Error: invalid variable assignment '{assignment}'. "
f"Expected format: var=value"
)
var_name, var_value_str = assignment.split('=', 1)
var_name = var_name.strip()
var_value_str = var_value_str.strip()
# Parse value
try:
var_value = self._parse_value(var_value_str)
except ValueError as e:
return CommandResult(f"Error: {e}")
initial_vars[var_name] = var_value
# Validate that all variables are provided
if initial_vars:
missing_vars = set(self.runtime.vars.keys()) - set(initial_vars.keys())
if missing_vars:
return CommandResult(
f"Error: All variables must be provided. Missing: {sorted(missing_vars)}\n"
f"Available variables: {sorted(self.runtime.vars.keys())}"
)
# Create new runtime with hot start
try:
# Import here to avoid circular dependency
from ...simulate import SimulationRuntime
new_runtime = SimulationRuntime(
self.state_machine,
initial_state=state_path,
initial_vars=initial_vars if initial_vars else None,
abstract_error_mode=self.runtime._abstract_error_mode,
history_size=self.runtime.history_size
)
# Replace runtime
self.runtime = new_runtime
# Reconfigure display with new runtime logger
self.display = StateDisplay(use_color=self.settings.color, logger=new_runtime.logger)
configure_simulate_cli_logger(new_runtime.logger, use_color=self.settings.color)
self._sync_log_level()
return CommandResult(
f"Initialized from state: {state_path}\n" +
self.display.format_current_state(self.runtime)
)
except ValueError as e:
# ValueError: SimulationRuntime hot-start validation rejected the
# user-provided state path or initial variable values.
return CommandResult(f"Initialization failed: {e}")
def _parse_value(self, value_str: str) -> float:
"""
Parse a numeric value from string.
Supports:
- Integers: 10, -5
- Hexadecimal: 0xFF, 0x10
- Binary: 0b1010, 0b11
- Floats: 3.14, -2.5
- Scientific notation: 1e-3, 2.5e2
:param value_str: String representation of the value
:type value_str: str
:return: Parsed numeric value (int or float)
:rtype: Union[int, float]
:raises ValueError: If the value cannot be parsed
"""
value_str = value_str.strip()
# Hexadecimal
if value_str.startswith(('0x', '0X')):
try:
return int(value_str, 16)
except ValueError:
raise ValueError(f"Invalid hexadecimal value: {value_str}")
# Binary
if value_str.startswith(('0b', '0B')):
try:
return int(value_str, 2)
except ValueError:
raise ValueError(f"Invalid binary value: {value_str}")
# Try integer first
try:
return int(value_str)
except ValueError:
pass
# Try float
try:
return float(value_str)
except ValueError:
raise ValueError(f"Invalid numeric value: {value_str}")
def _handle_multiple_cycles(self, count: int, event_list: List[str]) -> CommandResult:
"""
Handle multiple cycle execution with table output.
:param count: Number of cycles to execute
:type count: int
:param event_list: List of events to trigger each cycle
:type event_list: List[str]
:return: Command result with table
:rtype: CommandResult
"""
# Get starting cycle count
start_cycle = self.runtime.cycle_count
# Collect data for all cycles
table_data = []
for i in range(count):
if self.settings.log_level == LogLevel.DEBUG:
self.display.log(f"Executing cycle {i+1}/{count} with events: {event_list if event_list else 'none'}", "debug")
self.runtime.cycle(event_list if event_list else None)
# Collect state and variables
try:
state_path = '.'.join(self.runtime.current_state.path)
except (AttributeError, IndexError):
state_path = "(terminated)"
# Use actual cycle count from runtime
cycle_num = start_cycle + i + 1
row = [cycle_num, state_path]
# Add variable values
for var_name in sorted(self.runtime.vars.keys()):
row.append(self.runtime.vars[var_name])
table_data.append(row)
# Prepare table headers
headers = ['Cycle', 'State']
var_names = sorted(self.runtime.vars.keys())
headers.extend(var_names)
# Filter rows if count >= 20
if count >= 20:
display_data = table_data[:10] + table_data[-10:]
# Add separator row
separator_row = ['...'] * len(headers)
display_data = table_data[:10] + [separator_row] + table_data[-10:]
else:
display_data = table_data
# Generate table using our custom formatter
table_str = self.display.format_table(headers, display_data, var_names)
return CommandResult(table_str)
def _handle_clear(self) -> CommandResult:
"""
Handle /clear command.
:return: Command result with reset state
:rtype: CommandResult
"""
# Recreate the runtime to reset state
from ...simulate import SimulationRuntime
self.runtime = SimulationRuntime(self.runtime.state_machine)
if self.settings.log_level in [LogLevel.DEBUG, LogLevel.INFO]:
self.display.log("State machine reset to initial state", "info")
return CommandResult(self.display.format_current_state(self.runtime))
def _handle_current(self) -> CommandResult:
"""
Handle /current command.
:return: Command result with current state
:rtype: CommandResult
"""
return CommandResult(self.display.format_current_state(self.runtime))
def _handle_events(self) -> CommandResult:
"""
Handle /events command.
:return: Command result with available events
:rtype: CommandResult
"""
events = self._get_current_events()
return CommandResult(self.display.format_events(events))
def _handle_help(self) -> CommandResult:
"""
Handle help command.
:return: Command result with help text
:rtype: CommandResult
"""
help_text = """Available commands:
cycle [count] [events...] - Execute cycle(s) with optional events
count: number of cycles (default: 1)
Examples: cycle, cycle 5, cycle 3 Start
init <state> [vars...] - Hot start from specific state with variables
All variables must be provided
Examples: init System.Active counter=10 flag=1
Supports: hex (0xFF), binary (0b1010), float (3.14)
clear - Reset to initial state
current - Show current state and all variables
events - List available events in current state
history [n|all] - Show execution history (default: 10 recent entries)
setting [key] [value] - View or change settings (including log_level)
export <filename> - Export history to file (.csv, .json, .yaml, .jsonl)
help - Show this help message
quit, exit - Exit simulator
Keyboard shortcuts (interactive mode):
Tab - Auto-complete commands and events
Ctrl+R - Search command history
Ctrl+C - Cancel current input
Ctrl+D - Exit simulator
Up/Down arrows - Navigate command history"""
return CommandResult(help_text)
def _handle_history(self, args: List[str]) -> CommandResult:
"""
Handle /history command.
:param args: Command arguments (count or 'all')
:type args: List[str]
:return: Command result with history table
:rtype: CommandResult
"""
if not self.runtime.history:
return CommandResult("No execution history available.")
# Determine how many entries to show
if args and args[0].lower() == 'all':
count = len(self.runtime.history)
elif args:
try:
count = int(args[0])
if count <= 0:
return CommandResult("Error: count must be a positive integer")
except ValueError:
return CommandResult(f"Error: invalid count '{args[0]}'")
else:
count = 10 # Default
# Get the most recent entries
entries = self.runtime.history[-count:]
# Prepare table data
headers = ['Cycle', 'State']
if entries:
# Get all variable names from the first entry
var_names = sorted(entries[0]['vars'].keys())
headers.extend(var_names)
# Build table rows
table_data = []
for entry in entries:
row = [entry['cycle'], entry['state']]
for var_name in var_names:
row.append(entry['vars'].get(var_name, ''))
table_data.append(row)
# Generate table
table_str = self.display.format_table(headers, table_data, var_names)
return CommandResult(table_str)
else:
return CommandResult("No history entries to display.")
def _handle_setting(self, args: List[str]) -> CommandResult:
"""
Handle /setting command.
:param args: Command arguments (key and optional value)
:type args: List[str]
:return: Command result
:rtype: CommandResult
"""
if not args:
# Show all settings
settings = self.settings.list_all()
lines = ["Current settings:"]
for key, value in sorted(settings.items()):
lines.append(f" {key} = {value}")
return CommandResult('\n'.join(lines))
key = args[0]
if len(args) == 1:
# Show specific setting
try:
value = self.settings.get(key)
if isinstance(value, LogLevel):
value = value.value
return CommandResult(f"{key} = {value}")
except KeyError as e:
return CommandResult(f"Error: {e}")
# Set setting
value = args[1]
try:
self.settings.set(key, value)
# Apply setting changes
if key == 'color':
self.display.use_color = self.settings.color
elif key == 'log_level':
# Sync log level with runtime logger
self._sync_log_level()
elif key == 'history_size':
# Update runtime history size
self.runtime.history_size = self.settings.history_size if self.settings.history_size > 0 else None
# Trim existing history to new size
if self.runtime.history_size is not None and len(self.runtime.history) > self.runtime.history_size:
self.runtime.history = self.runtime.history[-self.runtime.history_size:]
return CommandResult(f"Setting updated: {key} = {value}")
except (KeyError, ValueError) as e:
return CommandResult(f"Error: {e}")
def _get_current_events(self) -> List[Tuple[str, Optional[str]]]:
"""
Get available events in the current state.
:return: List of (full_path, short_name) tuples
:rtype: List[Tuple[str, Optional[str]]]
"""
if self.runtime.is_ended:
return []
if not self.runtime.current_state:
return []
current_state = self.runtime.current_state
current_state_name = current_state.name
events = []
seen_events = set()
# Check parent's transitions for transitions from current state
if current_state.parent:
parent = current_state.parent
for transition in parent.transitions:
# Check if this transition is from the current state
if transition.from_state == current_state_name and transition.event:
event_path = '.'.join(transition.event.state_path) + '.' + transition.event.name
if event_path not in seen_events:
seen_events.add(event_path)
# Add full path and short name
short_name = transition.event.name
if short_name != event_path:
events.append((event_path, short_name))
else:
events.append((event_path, None))
return events
def _handle_export(self, args: List[str]) -> CommandResult:
"""
Handle export command to export history to file.
Supports multiple formats based on file extension:
- .csv: CSV format with columns: cycle, state, var1, var2, ...
- .json: JSON array of history entries
- .yaml: YAML array of history entries
- .jsonl: JSON Lines format (one JSON object per line)
:param args: Command arguments [filename]
:type args: List[str]
:return: Command result
:rtype: CommandResult
"""
if not args:
return CommandResult("Usage: export <filename>\nSupported formats: .csv, .json, .yaml, .jsonl")
filename = args[0]
# Check if history is empty
if not self.runtime.history:
return CommandResult("No history to export. Run some cycles first.")
# Determine format from extension
import os
_, ext = os.path.splitext(filename)
ext = ext.lower()
if ext not in ['.csv', '.json', '.yaml', '.jsonl']:
return CommandResult(f"Unsupported file format: {ext}\nSupported formats: .csv, .json, .yaml, .jsonl")
if ext == '.csv':
import csv
try:
self._export_csv(filename)
except (OSError, csv.Error) as e:
# OSError: open/write failed due to path, permission, or disk
# state; csv.Error: csv.writer rejected the output stream/data.
return CommandResult(f"Export failed: {e}")
elif ext == '.json':
try:
self._export_json(filename)
except (OSError, ValueError) as e:
# OSError: open/write failed due to path, permission, or disk
# state; ValueError: json serialization/write rejected payload.
return CommandResult(f"Export failed: {e}")
elif ext == '.yaml':
import yaml
try:
self._export_yaml(filename)
except (OSError, ValueError, yaml.YAMLError) as e:
# OSError: open/write failed due to path, permission, or disk
# state; ValueError/YAMLError: PyYAML rejected the payload or
# output stream.
return CommandResult(f"Export failed: {e}")
elif ext == '.jsonl':
try:
self._export_jsonl(filename)
except (OSError, ValueError) as e:
# OSError: open/write failed due to path, permission, or disk
# state; ValueError: per-entry json serialization/write failed.
return CommandResult(f"Export failed: {e}")
return CommandResult(f"History exported to {filename} ({len(self.runtime.history)} entries)")
def _export_csv(self, filename: str) -> None:
"""
Export history to CSV format.
Includes cycle, state, events (semicolon-separated), and all variables.
:param filename: Output filename
:type filename: str
"""
import csv
# Get all variable names
var_names = sorted(self.runtime.vars.keys())
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Write header
header = ['cycle', 'state', 'events'] + var_names
writer.writerow(header)
# Write data
for entry in self.runtime.history:
cycle_num = entry['cycle']
state = entry['state']
events = entry.get('events', [])
vars_dict = entry['vars']
# Join events with semicolon to avoid confusion with CSV commas
events_str = ';'.join(events) if events else ''
row = [cycle_num, state, events_str]
for var_name in var_names:
row.append(vars_dict.get(var_name, ''))
writer.writerow(row)
def _export_json(self, filename: str) -> None:
"""
Export history to JSON format.
:param filename: Output filename
:type filename: str
"""
import json
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.runtime.history, f, indent=2, ensure_ascii=False)
def _export_yaml(self, filename: str) -> None:
"""
Export history to YAML format.
:param filename: Output filename
:type filename: str
"""
import yaml
with open(filename, 'w', encoding='utf-8') as f:
yaml.dump(self.runtime.history, f, default_flow_style=False, allow_unicode=True)
def _export_jsonl(self, filename: str) -> None:
"""
Export history to JSON Lines format.
:param filename: Output filename
:type filename: str
"""
import json
with open(filename, 'w', encoding='utf-8') as f:
for entry in self.runtime.history:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')