CommandProvider

CommandProvider — server-side command interaction (Tier 0).

A CommandProvider receives commands addressed to it via a content-filtered reader, manages CommandProviderSession instances for each active command, and owns all ack/status/exec_status writers. Subclasses override async hooks to implement domain-specific behavior.

class rtiumaapy.command_provider.CommandProvider(ctx: DDSContext, service_name: str | None = None, *, command_type: Type, ack_type: Type, status_type: Type, command_topic: str, ack_topic: str, status_topic: str, exec_status_type: Type = None, exec_status_topic: str = None, source_id)[source]

Bases: BaseService

Subclass and override hooks to implement command behavior.

Base class owns ALL status publishing — subclasses never publish status directly.

Parameters:
  • ctx – The DDSContext owning shared DDS infrastructure.

  • service_name – Unique name for this service instance.

  • command_type – IDL-generated command struct type.

  • ack_type – IDL-generated ack report struct type.

  • status_type – IDL-generated command status struct type.

  • exec_status_type – Optional execution status struct type.

  • command_topic – DDS topic name for commands.

  • ack_topic – DDS topic name for ack.

  • status_topic – DDS topic name for status.

  • exec_status_topic – Optional DDS topic name for exec status.

  • source_id – This provider’s IdentifierType identity.

async validate_command(command) Tuple[bool, str][source]

Validate a command after ISSUED status and ack are published.

Default implementation checks all fields against UMAA IDL constraints (range, enum, nested struct). Override for additional domain-specific logic.

Return (True, "") to accept, (False, reason_string) to reject.

async on_updated(session: CommandProviderSession, previous_command, updated_command) None[source]

Called when a command update arrives for an active session.

async on_commanded(session: CommandProviderSession) None[source]

Called after COMMANDED status published, before EXECUTING.

async on_executing(session: CommandProviderSession) None[source]

Called after EXECUTING status published. Do the actual work here.

Raise CommandHookError for domain-specific failures.

async on_complete(session: CommandProviderSession) None[source]

Called after COMPLETED status published.

async on_failed(session: CommandProviderSession, exception: Exception) None[source]

Called after FAILED status published.

async on_terminal(session: CommandProviderSession) None[source]

Always called (finally block), regardless of outcome.

start() None[source]

Start the command processing loop.

async close() None[source]

Fail active sessions, cancel _run. Entity cleanup is deferred to DDSContext.

State Machine

The CommandProvider drives the ICD command state machine automatically. Subclasses only implement hooks — they never publish status directly.

┌──────────┐    ┌───────────┐    ┌───────────┐    ┌───────────┐
│  ISSUED  │───►│ COMMANDED │───►│ EXECUTING │───►│ COMPLETED │
└──────────┘    └───────────┘    └───────────┘    └───────────┘
     │               │               │
     ▼               ▼               ▼
┌──────────┐    ┌──────────┐    ┌──────────┐
│  FAILED  │    │  FAILED  │    │  FAILED  │
└──────────┘    └──────────┘    └──────────┘
     │               │               │
     ▼               ▼               ▼
┌───────────────────────────────────────────┐
│              CANCELED (dispose)            │
└───────────────────────────────────────────┘

Hooks

Override these async methods to implement domain behavior:

Hook

When Called

Required

validate_command(command)

After ISSUED, returns (bool, reason)

No (default: field validation)

on_commanded(session)

After COMMANDED published

No

on_executing(session)

After EXECUTING published — do work here

Yes

on_complete(session)

After COMPLETED published

No

on_failed(session, exception)

After FAILED published

No

on_terminal(session)

Always called (finally block)

No

on_updated(session, prev, new)

Command update during EXECUTING

No

Example

from rtiumaapy.services.mo import GlobalVectorControlProvider
from rtiumaapy.command_provider_session import CommandProviderSession

class MyVectorProvider(GlobalVectorControlProvider):
    def __init__(self, ctx, *, source_id):
        super().__init__(ctx, source_id=source_id)

    async def on_executing(self, session: CommandProviderSession):
        cmd = session.command
        print(f"Heading: {cmd.direction}, Speed: {cmd.speed}")
        # Return normally → COMPLETED
        # Raise CommandHookError → FAILED

    async def on_complete(self, session):
        print("Command completed successfully")

Signaling Failures

Raise CommandHookError from any hook to fail the command:

from rtiumaapy import CommandHookError
from rtiumaapy.command_provider_session import CommandReasonEnum

async def on_executing(self, session):
    raise CommandHookError(
        reason_enum=CommandReasonEnum.RESOURCE_FAILED,
        message="Hardware not responding",
    )

The framework validates the reason against the ICD’s per-state allowed reasons (D51) and transitions to FAILED.