CommandConsumer
CommandConsumer — client-side command interaction (Tier 0).
A CommandConsumer sends commands, receives ack/status/exec_status via
content-filtered readers, and delivers updates via subclass-override hooks.
One active session at a time.
- class rtiumaapy.command_consumer.CommandConsumer(ctx: DDSContext, service_name: str | None = None, *, command_type: Type, ack_type: Type, status_type: Type, command_topic: str, ack_topic: str, status_topic: str, exec_status_type: Type = None, exec_status_topic: str = None, source_id=None, destination_id=None)[source]
Bases:
BaseServiceSubclass and override hooks to react to command lifecycle events.
- Parameters:
ctx – The
DDSContextowning shared DDS infrastructure.service_name – Unique name for this service instance.
command_type – IDL-generated command struct type.
ack_type – IDL-generated ack report struct type.
status_type – IDL-generated command status struct type.
exec_status_type – Optional execution status struct type.
command_topic – DDS topic name for commands.
ack_topic – DDS topic name for ack.
status_topic – DDS topic name for status.
exec_status_topic – Optional DDS topic name for exec status.
source_id – This consumer’s
IdentifierTypeidentity.destination_id – Target provider’s
IdentifierTypeidentity.
- async on_status(session_id: bytes, status) None[source]
Called for every status update from the provider.
Do not call
cancel()from this hook — terminal statuses are detected automatically and trigger_end_session()after this hook returns.
- async on_ack(session_id: bytes, ack) None[source]
Called when the provider echoes the command acknowledgment.
- async on_exec_status(session_id: bytes, exec_status) None[source]
Called when the provider publishes execution progress.
- async on_terminal(session_id: bytes, status) None[source]
Called after the session closes.
- Parameters:
session_id – The session that ended.
status – Terminal status sample, or None for cancel/crash/shutdown.
- property has_matched_provider: bool
True if at least one provider is subscribed to the command topic.
- async wait_for_discovery(timeout: float = 30.0) bool[source]
Block until a provider subscribes to the command topic.
Returns True if a provider was discovered, False on timeout.
- async send(command, session_id: bytes = None) bytes[source]
Send a command or update. Returns the session_id handle.
If
session_idis None, starts a new session. If provided, sends an update to the existing session (D39).All header fields are auto-stamped (D47).
- Raises:
RuntimeError – If starting a new session while one is active.
RuntimeError – If session_id doesn’t match the active session.
Sending Commands
from rtiumaapy.services.mo import GlobalVectorControlConsumer
from rtiumaapy.datamodel.GlobalVectorCommandType import (
UMAA_MO_GlobalVectorControl_GlobalVectorCommandType as CmdType,
)
from rtiumaapy import set_timestamp
consumer = GlobalVectorControlConsumer(ctx)
# Wait for a provider to appear
await consumer.wait_for_discovery(timeout=10.0)
# Build and send a command
cmd = CmdType()
cmd.speed = 5.0
cmd.direction = 1.57 # radians
session_id = await consumer.send(cmd)
Hooks
Override these to react to lifecycle events:
Hook |
When Called |
|---|---|
|
Provider acknowledged the command |
|
Status update (ISSUED, COMMANDED, EXECUTING, COMPLETED, FAILED) |
|
Execution progress (if the service defines an exec status topic) |
|
Session ended (COMPLETED, FAILED, or CANCELED) |
Example
from rtiumaapy.services.mo import GlobalVectorControlConsumer
class MyVectorConsumer(GlobalVectorControlConsumer):
async def on_status(self, session_id, status):
print(f"Status: {status.status}")
async def on_exec_status(self, session_id, exec_status):
print(f"Direction achieved: {exec_status.directionAchieved}")
async def on_terminal(self, session_id, status):
print(f"Command finished: {status.status if status else 'canceled'}")
Canceling
await consumer.cancel()
Disposes the command instance, the provider detects it and transitions to CANCELED.
Updating
Send a command update to the same session (the provider must be in EXECUTING state):
updated_cmd = CmdType()
updated_cmd.speed = 10.0
await consumer.send(updated_cmd, session_id=session_id)