"""Redundancy and overlap design-health diagnostics."""
from collections import Counter
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
from ...utils.validate import ModelDiagnostic
if TYPE_CHECKING: # pragma: no cover
from ..inspect import EventInfo, StateInfo, TransitionInfo
[docs]
def collect_redundancy_warnings(
transitions: Iterable['TransitionInfo'],
events: Iterable['EventInfo'],
states: Iterable['StateInfo'] = (),
) -> List[ModelDiagnostic]:
transitions = list(transitions)
states = list(states)
diagnostics: List[ModelDiagnostic] = []
diagnostics.extend(_redundant_transition_warnings(transitions))
diagnostics.extend(_self_transition_nop_warnings(transitions, states))
diagnostics.extend(_effect_self_assign_warnings(transitions))
diagnostics.extend(_forced_overrides_normal_warnings(transitions))
diagnostics.extend(_shadowed_event_warnings(events))
return diagnostics
def _transition_trigger_key(t: 'TransitionInfo') -> Tuple[str, str, object, object]:
return t.from_path, t.to_path, t.event, t.guard
def _transition_behavior_key(t: 'TransitionInfo') -> Tuple[str, str, object, object, object]:
return t.from_path, t.to_path, t.event, t.guard, t.effect
def _redundant_transition_warnings(
transitions: Iterable['TransitionInfo'],
) -> List[ModelDiagnostic]:
groups: Dict[Tuple[str, str, object, object, object], List['TransitionInfo']] = {}
for t in transitions:
if t.from_path == '[*]':
continue
groups.setdefault(_transition_behavior_key(t), []).append(t)
diagnostics: List[ModelDiagnostic] = []
for key, items in groups.items():
if len(items) < 2:
continue
from_path, to_path, _, _, _ = key
diagnostics.append(ModelDiagnostic(
code='W_REDUNDANT_TRANSITION',
span=items[0].span,
severity='warning',
message=(
f'Transition {from_path!r} -> {to_path!r} is duplicated '
'with the same event, guard, and effect.'
),
refs={
'from_path': from_path,
'to_path': to_path,
'duplicate_spans': [item.span for item in items],
'transition_index': items[0].transition_index,
},
))
return diagnostics
def _self_transition_nop_warnings(
transitions: Iterable['TransitionInfo'],
states: Iterable['StateInfo'],
) -> List[ModelDiagnostic]:
states_by_path = {state.path: state for state in states}
diagnostics: List[ModelDiagnostic] = []
for t in transitions:
if t.from_path != t.to_path:
continue
if t.event is not None or t.guard is not None or t.effect is not None:
continue
if not _is_lifecycle_free_leaf(t.from_path, states_by_path):
continue
diagnostics.append(ModelDiagnostic(
code='W_SELF_TRANSITION_NOP',
span=t.span,
severity='warning',
message=(
f'Self transition on {t.from_path!r} has no trigger, '
'guard, effect, or re-entry lifecycle behavior.'
),
refs={
'state_path': t.from_path,
'from_path': t.from_path,
'to_path': t.to_path,
'transition_span': t.span,
'transition_index': t.transition_index,
},
))
return diagnostics
def _is_lifecycle_free_leaf(
state_path: str,
states_by_path: Dict[str, 'StateInfo'],
) -> bool:
state = states_by_path.get(state_path)
if state is None or not state.is_leaf or state.is_pseudo:
return False
if (
state.entry_actions
or state.during_actions
or state.exit_actions
or state.aspect_before
or state.aspect_after
):
return False
parts = state_path.split('.')
for index in range(1, len(parts)):
ancestor = states_by_path.get('.'.join(parts[:index]))
if ancestor is not None and (ancestor.aspect_before or ancestor.aspect_after):
return False
return True
def _effect_self_assign_warnings(transitions: Iterable['TransitionInfo']) -> List[ModelDiagnostic]:
counts = Counter(
(t.from_path, var_name)
for t in transitions
for var_name in getattr(t, 'effect_self_assigns', ())
)
diagnostics: List[ModelDiagnostic] = []
for t in transitions:
effect_self_assigns = getattr(t, 'effect_self_assigns', ())
effect_self_assign_spans = getattr(t, 'effect_self_assign_spans', ())
for index, var_name in enumerate(effect_self_assigns):
refs = {
'state_path': t.from_path,
'transition_span': t.span,
'var_name': var_name,
'transition_index': t.transition_index,
}
if t.from_path != '[*]' and counts[(t.from_path, var_name)] == 1:
refs['effect_self_assign_anchor'] = var_name
diagnostics.append(ModelDiagnostic(
code='W_EFFECT_SELF_ASSIGN',
span=(
effect_self_assign_spans[index]
if index < len(effect_self_assign_spans)
and effect_self_assign_spans[index] is not None
else t.span
),
severity='warning',
message=f'Transition effect assigns {var_name!r} to itself.',
refs=refs,
))
return diagnostics
def _forced_overrides_normal_warnings(transitions: Iterable['TransitionInfo']) -> List[ModelDiagnostic]:
normal_by_key = {
_transition_trigger_key(t): t
for t in transitions
if not t.is_forced
}
diagnostics: List[ModelDiagnostic] = []
for t in transitions:
normal_transition = normal_by_key.get(_transition_trigger_key(t))
if not t.is_forced or normal_transition is None:
continue
diagnostics.append(ModelDiagnostic(
code='W_FORCED_OVERRIDES_NORMAL',
span=t.span,
severity='warning',
message=(
f'Forced transition {t.from_path!r} -> {t.to_path!r} '
'duplicates a normal transition.'
),
refs={
'from_path': t.from_path,
'to_path': t.to_path,
'forced_declaration_span': t.span,
'normal_transition_span': normal_transition.span,
},
))
return diagnostics
def _shadowed_event_warnings(events: Iterable['EventInfo']) -> List[ModelDiagnostic]:
by_leaf_name: Dict[str, List['EventInfo']] = {}
for event in events:
leaf = event.qualified_name.rsplit('.', 1)[-1]
by_leaf_name.setdefault(leaf, []).append(event)
diagnostics: List[ModelDiagnostic] = []
for event_name, items in by_leaf_name.items():
chain_like = [
item for item in items
if item.scope in {'chain', 'absolute'}
]
local_like = [item for item in items if item.scope == 'local']
if not chain_like or not local_like:
continue
for local_event in local_like:
shadowing_event = _find_shadowing_event(local_event, chain_like)
if shadowing_event is None:
continue
diagnostics.append(ModelDiagnostic(
code='W_SHADOWED_EVENT',
span=local_event.span,
severity='warning',
message=(
f'Local event {local_event.qualified_name!r} shadows '
f'a chain event named {event_name!r}.'
),
refs={
'event_name': event_name,
'local_path': local_event.qualified_name,
'chain_path': shadowing_event.qualified_name,
},
))
return diagnostics
def _find_shadowing_event(
local_event: 'EventInfo',
chain_like: Iterable['EventInfo'],
) -> Optional['EventInfo']:
local_owner = _event_owner_path(local_event.qualified_name)
candidates = [
item for item in chain_like
if _is_same_or_ancestor_scope(
local_owner,
_event_owner_path(item.qualified_name),
)
]
if not candidates:
return None
return max(
candidates,
key=lambda item: len(_event_owner_path(item.qualified_name)),
)
def _event_owner_path(qualified_name: str) -> str:
if '.' not in qualified_name:
return ''
return qualified_name.rsplit('.', 1)[0]
def _is_same_or_ancestor_scope(local_owner: str, broader_owner: str) -> bool:
if broader_owner == '':
return True
return local_owner == broader_owner or local_owner.startswith(f'{broader_owner}.')