Source code for rtiumaapy.command_provider_session

"""CommandProviderSession — one command's lifecycle through the ICD state machine.

Each ``CommandProviderSession`` is created by ``CommandProvider`` when a new
command arrives. It drives the state machine via an inline while-loop with
if/elif dispatch. ``_transition_to()`` validates every state change.

External actors (cancel, fail, update) communicate via methods that set
terminal state or pending-update flag; the run loop checks at natural
checkpoints.
"""

from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING, Optional, Set, Dict

import rti.connextdds as dds

from rtiumaapy.errors import CommandHookError

if TYPE_CHECKING:
    from rtiumaapy.command_provider import CommandProvider

_logger = logging.getLogger(__name__)

# ── Import IDL enums via the generated module system ──────────────────────
# The actual enum types are loaded by the generated datamodel files.
# We import them from the flat-name aliases for brevity.
from rtiumaapy.datamodel.MaritimeEnumerationSets import (
    UMAA_Common_MaritimeEnumeration_CommandStatusEnumModule_CommandStatusEnumType
    as CommandStatusEnum,
    UMAA_Common_MaritimeEnumeration_CommandStatusReasonEnumModule_CommandStatusReasonEnumType
    as CommandReasonEnum,
)

# ── Transition table ──────────────────────────────────────────────────────

_VALID_TRANSITIONS: Dict[Optional[int], Set[int]] = {
    None: {
        CommandStatusEnum.ISSUED,
        CommandStatusEnum.FAILED,
        CommandStatusEnum.CANCELED,
    },
    CommandStatusEnum.ISSUED: {
        CommandStatusEnum.COMMANDED,
        CommandStatusEnum.FAILED,
        CommandStatusEnum.CANCELED,
    },
    CommandStatusEnum.COMMANDED: {
        CommandStatusEnum.EXECUTING,
        CommandStatusEnum.FAILED,
        CommandStatusEnum.CANCELED,
    },
    CommandStatusEnum.EXECUTING: {
        CommandStatusEnum.COMPLETED,
        CommandStatusEnum.FAILED,
        CommandStatusEnum.CANCELED,
        CommandStatusEnum.ISSUED,  # update re-entry
    },
}

_TERMINAL_STATES: Set[int] = {
    CommandStatusEnum.COMPLETED,
    CommandStatusEnum.FAILED,
    CommandStatusEnum.CANCELED,
}

# Per-state failure reason validation (D51)
_VALID_FAIL_REASONS: Dict[Optional[int], Set[int]] = {
    None: {
        CommandReasonEnum.SERVICE_FAILED,
    },
    CommandStatusEnum.ISSUED: {
        CommandReasonEnum.SERVICE_FAILED,
        CommandReasonEnum.INTERRUPTED,
        CommandReasonEnum.TIMEOUT,
        CommandReasonEnum.RESOURCE_FAILED,
        CommandReasonEnum.VALIDATION_FAILED,
    },
    CommandStatusEnum.COMMANDED: {
        CommandReasonEnum.SERVICE_FAILED,
        CommandReasonEnum.INTERRUPTED,
        CommandReasonEnum.TIMEOUT,
        CommandReasonEnum.RESOURCE_REJECTED,
    },
    CommandStatusEnum.EXECUTING: {
        CommandReasonEnum.SERVICE_FAILED,
        CommandReasonEnum.INTERRUPTED,
        CommandReasonEnum.TIMEOUT,
        CommandReasonEnum.RESOURCE_FAILED,
        CommandReasonEnum.OBJECTIVE_FAILED,
    },
}


[docs] class CommandProviderSession: """Manages one command's lifecycle through the ICD state machine. Created by :class:`CommandProvider` for each incoming command. ``run()`` walks through states with if/elif dispatch. ``_transition_to()`` validates every transition. ``cancel()``/``fail()`` set the terminal state and publish before cancelling the task. ``set_new_command()`` stores the pending update. """ def __init__(self, provider: CommandProvider, command) -> None: self._provider = provider self._command = command self._session_id: str = str(command.sessionID) self._current_state: Optional[int] = None self._current_reason: Optional[int] = None self._task: Optional[asyncio.Task] = None self._new_command = None # pending update # ── Properties ──────────────────────────────────────────────────────── @property def command(self): """The current command sample.""" return self._command @property def session_id(self) -> str: """The session ID string.""" return self._session_id @property def current_state(self) -> Optional[int]: """The current state machine state.""" return self._current_state # ── External methods (called from provider/outside the task) ──────────
[docs] async def cancel(self) -> None: """Consumer disposed command → CANCELED. Sets terminal state, publishes, then cancels the task. """ if self._task is not None and not self._task.done() \ and self._current_state not in _TERMINAL_STATES: self._transition_to(CommandStatusEnum.CANCELED, reason=CommandReasonEnum.CANCELED) self._publish_status() self._task.cancel()
[docs] async def fail(self) -> None: """Provider shutdown → FAILED(SERVICE_FAILED). Sets terminal state, publishes, then cancels the task. """ if self._task is not None and not self._task.done() \ and self._current_state not in _TERMINAL_STATES: self._transition_to(CommandStatusEnum.FAILED, reason=CommandReasonEnum.SERVICE_FAILED) self._publish_status() self._task.cancel()
[docs] def set_new_command(self, new_command) -> None: """Store a pending command update. Does NOT cancel the task. The run loop checks ``_new_command`` at the natural checkpoint after ``on_executing()`` returns. """ self._new_command = new_command
# ── State transition ────────────────────────────────────────────────── def _transition_to(self, state: int, reason: Optional[int] = None) -> None: """Validate and advance the state machine. If the transition is illegal, forces FAILED and raises ``RuntimeError``. When transitioning to FAILED, validates the reason against ``_VALID_FAIL_REASONS`` (D51). Invalid reasons are coerced to ``SERVICE_FAILED`` with a warning. """ allowed = _VALID_TRANSITIONS.get(self._current_state, set()) if state not in allowed: previous = self._current_state _logger.error( "Session %s: illegal transition %s%s, forcing FAILED", self._session_id, previous, state) self._current_state = CommandStatusEnum.FAILED self._current_reason = CommandReasonEnum.SERVICE_FAILED raise RuntimeError( f"Session {self._session_id}: illegal transition " f"{previous}{state}") if state == CommandStatusEnum.FAILED and reason is not None: valid = _VALID_FAIL_REASONS.get(self._current_state, set()) if reason not in valid: _logger.warning( "Session %s: reason %s not valid from state %s; " "falling back to SERVICE_FAILED", self._session_id, reason, self._current_state) reason = CommandReasonEnum.SERVICE_FAILED self._current_state = state self._current_reason = reason # ── DDS publish ─────────────────────────────────────────────────────── def _publish_status(self) -> None: """Publish ``_current_state`` and ``_current_reason`` to DDS.""" try: from rtiumaapy.timestamp import UmaaTimestamp status = self._provider._status_type() status.source = self._provider._source_id status.sessionID = self._command.sessionID UmaaTimestamp.set_timestamp(status) status.commandStatus = self._current_state if self._current_reason is not None: status.commandStatusReason = self._current_reason self._provider._status_writer.write(status) except Exception: _logger.warning( "Session %s: failed to publish %s status", self._session_id, self._current_state, exc_info=True) async def _apply_update(self) -> None: """Swap to the new command and re-enter from ISSUED(UPDATED).""" previous = self._command self._command = self._new_command self._new_command = None self._transition_to(CommandStatusEnum.ISSUED, reason=CommandReasonEnum.UPDATED) self._publish_status() await self._provider.on_updated(self, previous, self._command) # ── Instance disposal ───────────────────────────────────────────────── def _dispose_provider_instances(self) -> None: """Dispose provider-side DDS instances (ack + status + exec_status).""" session_guid = self._command.sessionID # Ack try: ack_key = self._provider._ack_type() ack_key.source = self._provider._source_id ack_key.sessionID = session_guid ih = self._provider._ack_writer.lookup_instance(ack_key) if ih != dds.InstanceHandle.nil(): self._provider._ack_writer.dispose_instance(ih) except Exception: _logger.debug("Session %s: ack dispose failed", self._session_id) # Status try: status_key = self._provider._status_type() status_key.source = self._provider._source_id status_key.sessionID = session_guid ih = self._provider._status_writer.lookup_instance(status_key) if ih != dds.InstanceHandle.nil(): self._provider._status_writer.dispose_instance(ih) except Exception: _logger.debug("Session %s: status dispose failed", self._session_id) # Exec status (optional) if self._provider._exec_status_writer is not None: try: exec_key = self._provider._exec_status_type() exec_key.source = self._provider._source_id exec_key.sessionID = session_guid ih = self._provider._exec_status_writer.lookup_instance( exec_key) if ih != dds.InstanceHandle.nil(): self._provider._exec_status_writer.dispose_instance(ih) except Exception: _logger.debug("Session %s: exec_status dispose failed", self._session_id) # ── Main loop ─────────────────────────────────────────────────────────
[docs] async def run(self) -> None: """Drive the ICD state machine via inline while-loop dispatch.""" try: while self._current_state not in _TERMINAL_STATES: if self._current_state == CommandStatusEnum.ISSUED: # Validation (D42 — after ISSUED + Ack) accepted, reason_msg = \ await self._provider.validate_command(self._command) if not accepted: raise CommandHookError( CommandReasonEnum.VALIDATION_FAILED, reason_msg) self._transition_to(CommandStatusEnum.COMMANDED, reason=CommandReasonEnum.SUCCEEDED) self._publish_status() await self._provider.on_commanded(self) elif self._current_state == CommandStatusEnum.COMMANDED: self._transition_to(CommandStatusEnum.EXECUTING, reason=CommandReasonEnum.SUCCEEDED) self._publish_status() elif self._current_state == CommandStatusEnum.EXECUTING: await self._provider.on_executing(self) # Check for pending update at natural checkpoint if self._new_command is not None: await self._apply_update() else: self._transition_to(CommandStatusEnum.COMPLETED, reason=CommandReasonEnum.SUCCEEDED) self._publish_status() await self._provider.on_complete(self) except asyncio.CancelledError: # Terminal state already set and published by cancel()/fail(). _logger.debug( "Session %s: task cancelled in state %s", self._session_id, self._current_state) except CommandHookError as e: if self._current_state not in _TERMINAL_STATES: self._transition_to(CommandStatusEnum.FAILED, reason=e.reason_enum) self._publish_status() await self._provider.on_failed(self, e) except Exception as e: if self._current_state not in _TERMINAL_STATES: self._transition_to(CommandStatusEnum.FAILED, reason=CommandReasonEnum.SERVICE_FAILED) self._publish_status() await self._provider.on_failed(self, e) finally: try: await self._provider.on_terminal(self) except Exception: _logger.exception( "Session %s: provider on_terminal hook error", self._session_id, ) self._dispose_provider_instances() self._provider._active_sessions.pop(self._session_id, None)