CommandProvider
CommandProvider — server-side command interaction (Tier 0).
A CommandProvider receives commands addressed to it via a content-filtered
reader, manages CommandProviderSession instances for each active command,
and owns all ack/status/exec_status writers. Subclasses override async hooks
to implement domain-specific behavior.
- class rtiumaapy.command_provider.CommandProvider(ctx: DDSContext, service_name: str | None = None, *, command_type: Type, ack_type: Type, status_type: Type, command_topic: str, ack_topic: str, status_topic: str, exec_status_type: Type = None, exec_status_topic: str = None, source_id)[source]
Bases:
BaseServiceSubclass and override hooks to implement command behavior.
Base class owns ALL status publishing — subclasses never publish status directly.
- Parameters:
ctx – The
DDSContextowning shared DDS infrastructure.service_name – Unique name for this service instance.
command_type – IDL-generated command struct type.
ack_type – IDL-generated ack report struct type.
status_type – IDL-generated command status struct type.
exec_status_type – Optional execution status struct type.
command_topic – DDS topic name for commands.
ack_topic – DDS topic name for ack.
status_topic – DDS topic name for status.
exec_status_topic – Optional DDS topic name for exec status.
source_id – This provider’s
IdentifierTypeidentity.
- async validate_command(command) Tuple[bool, str][source]
Validate a command after ISSUED status and ack are published.
Default implementation checks all fields against UMAA IDL constraints (range, enum, nested struct). Override for additional domain-specific logic.
Return
(True, "")to accept,(False, reason_string)to reject.
- async on_updated(session: CommandProviderSession, previous_command, updated_command) None[source]
Called when a command update arrives for an active session.
- async on_commanded(session: CommandProviderSession) None[source]
Called after COMMANDED status published, before EXECUTING.
- async on_executing(session: CommandProviderSession) None[source]
Called after EXECUTING status published. Do the actual work here.
Raise
CommandHookErrorfor domain-specific failures.
- async on_complete(session: CommandProviderSession) None[source]
Called after COMPLETED status published.
- async on_failed(session: CommandProviderSession, exception: Exception) None[source]
Called after FAILED status published.
- async on_terminal(session: CommandProviderSession) None[source]
Always called (finally block), regardless of outcome.
State Machine
The CommandProvider drives the ICD command state machine automatically.
Subclasses only implement hooks — they never publish status directly.
┌──────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ ISSUED │───►│ COMMANDED │───►│ EXECUTING │───►│ COMPLETED │
└──────────┘ └───────────┘ └───────────┘ └───────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ FAILED │ │ FAILED │ │ FAILED │
└──────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
┌───────────────────────────────────────────┐
│ CANCELED (dispose) │
└───────────────────────────────────────────┘
Hooks
Override these async methods to implement domain behavior:
Hook |
When Called |
Required |
|---|---|---|
|
After ISSUED, returns |
No (default: field validation) |
|
After COMMANDED published |
No |
|
After EXECUTING published — do work here |
Yes |
|
After COMPLETED published |
No |
|
After FAILED published |
No |
|
Always called (finally block) |
No |
|
Command update during EXECUTING |
No |
Example
from rtiumaapy.services.mo import GlobalVectorControlProvider
from rtiumaapy.command_provider_session import CommandProviderSession
class MyVectorProvider(GlobalVectorControlProvider):
def __init__(self, ctx, *, source_id):
super().__init__(ctx, source_id=source_id)
async def on_executing(self, session: CommandProviderSession):
cmd = session.command
print(f"Heading: {cmd.direction}, Speed: {cmd.speed}")
# Return normally → COMPLETED
# Raise CommandHookError → FAILED
async def on_complete(self, session):
print("Command completed successfully")
Signaling Failures
Raise CommandHookError from any hook to fail the command:
from rtiumaapy import CommandHookError
from rtiumaapy.command_provider_session import CommandReasonEnum
async def on_executing(self, session):
raise CommandHookError(
reason_enum=CommandReasonEnum.RESOURCE_FAILED,
message="Hardware not responding",
)
The framework validates the reason against the ICD’s per-state allowed reasons (D51) and transitions to FAILED.