CommandConsumer

CommandConsumer — client-side command interaction (Tier 0).

A CommandConsumer sends commands, receives ack/status/exec_status via content-filtered readers, and delivers updates via subclass-override hooks. One active session at a time.

class rtiumaapy.command_consumer.CommandConsumer(ctx: DDSContext, service_name: str | None = None, *, command_type: Type, ack_type: Type, status_type: Type, command_topic: str, ack_topic: str, status_topic: str, exec_status_type: Type = None, exec_status_topic: str = None, source_id=None, destination_id=None)[source]

Bases: BaseService

Subclass and override hooks to react to command lifecycle events.

Parameters:
  • ctx – The DDSContext owning shared DDS infrastructure.

  • service_name – Unique name for this service instance.

  • command_type – IDL-generated command struct type.

  • ack_type – IDL-generated ack report struct type.

  • status_type – IDL-generated command status struct type.

  • exec_status_type – Optional execution status struct type.

  • command_topic – DDS topic name for commands.

  • ack_topic – DDS topic name for ack.

  • status_topic – DDS topic name for status.

  • exec_status_topic – Optional DDS topic name for exec status.

  • source_id – This consumer’s IdentifierType identity.

  • destination_id – Target provider’s IdentifierType identity.

async on_status(session_id: bytes, status) None[source]

Called for every status update from the provider.

Do not call cancel() from this hook — terminal statuses are detected automatically and trigger _end_session() after this hook returns.

async on_ack(session_id: bytes, ack) None[source]

Called when the provider echoes the command acknowledgment.

async on_exec_status(session_id: bytes, exec_status) None[source]

Called when the provider publishes execution progress.

async on_terminal(session_id: bytes, status) None[source]

Called after the session closes.

Parameters:
  • session_id – The session that ended.

  • status – Terminal status sample, or None for cancel/crash/shutdown.

property has_matched_provider: bool

True if at least one provider is subscribed to the command topic.

async wait_for_discovery(timeout: float = 30.0) bool[source]

Block until a provider subscribes to the command topic.

Returns True if a provider was discovered, False on timeout.

async send(command, session_id: bytes = None) bytes[source]

Send a command or update. Returns the session_id handle.

If session_id is None, starts a new session. If provided, sends an update to the existing session (D39).

All header fields are auto-stamped (D47).

Raises:
  • RuntimeError – If starting a new session while one is active.

  • RuntimeError – If session_id doesn’t match the active session.

async cancel() None[source]

Cancel the active session — dispose command and clean up (D50).

start() None[source]

Start the reader dispatch loops.

async close() None[source]

End active session, cancel _run. Entity cleanup is deferred to DDSContext.

Sending Commands

from rtiumaapy.services.mo import GlobalVectorControlConsumer
from rtiumaapy.datamodel.GlobalVectorCommandType import (
    UMAA_MO_GlobalVectorControl_GlobalVectorCommandType as CmdType,
)
from rtiumaapy import set_timestamp

consumer = GlobalVectorControlConsumer(ctx)

# Wait for a provider to appear
await consumer.wait_for_discovery(timeout=10.0)

# Build and send a command
cmd = CmdType()
cmd.speed = 5.0
cmd.direction = 1.57  # radians
session_id = await consumer.send(cmd)

Hooks

Override these to react to lifecycle events:

Hook

When Called

on_ack(session_id, ack)

Provider acknowledged the command

on_status(session_id, status)

Status update (ISSUED, COMMANDED, EXECUTING, COMPLETED, FAILED)

on_exec_status(session_id, exec_status)

Execution progress (if the service defines an exec status topic)

on_terminal(session_id, status)

Session ended (COMPLETED, FAILED, or CANCELED)

Example

from rtiumaapy.services.mo import GlobalVectorControlConsumer

class MyVectorConsumer(GlobalVectorControlConsumer):
    async def on_status(self, session_id, status):
        print(f"Status: {status.status}")

    async def on_exec_status(self, session_id, exec_status):
        print(f"Direction achieved: {exec_status.directionAchieved}")

    async def on_terminal(self, session_id, status):
        print(f"Command finished: {status.status if status else 'canceled'}")

Canceling

await consumer.cancel()

Disposes the command instance, the provider detects it and transitions to CANCELED.

Updating

Send a command update to the same session (the provider must be in EXECUTING state):

updated_cmd = CmdType()
updated_cmd.speed = 10.0
await consumer.send(updated_cmd, session_id=session_id)