Getting Started

This guide walks you through installing rtiumaapy, running the autopilot example, and writing your first UMAA component.

Prerequisites

Requirement

Version

Python

>= 3.8

RTI Connext DDS License

Free evaluation at https://www.rti.com/free-trial

OS

Linux, macOS, or Windows

You need a valid RTI Connext DDS license file. Place it where Connext can find it (e.g. $HOME/rti_license.dat or set RTI_LICENSE_FILE). The rti.connext Python binding is installed automatically as a package dependency.

Installation

1. Clone the repository

git clone <repo-url> rticonnextdds-usecases-umaa
cd rticonnextdds-usecases-umaa/python

2. Create a virtual environment

python3 -m venv .venv
source .venv/bin/activate   # Linux/macOS
# .venv\Scripts\activate    # Windows

3. Install the SDK in editable mode

pip install -e ".[dev]"

This installs rtiumaapy plus development dependencies (pytest, pytest-asyncio).

4. Set the QoS environment variable

The SDK locates the UMAA QoS XML file via the UMAA_QOS_FILE environment variable:

export UMAA_QOS_FILE="$(pwd)/../qos/umaa_qos_lib.xml"

Tip

Add this to your shell profile or .env file to avoid setting it every session.

Verify installation

1. Run the tests

python -m pytest tests/ -v --tb=short

You should see over 1,600 tests passing.

2. Run the autopilot example

Open two terminals (each with the venv active and UMAA_QOS_FILE set).

Terminal 1 — Start the autopilot

cd examples/autopilot
./start_autopilot.sh

Terminal 2 — Send a command

cd examples/globalvector_consumer
./start_globalvector_consumer.sh

The start script has the autopilot’s destination GUID hardcoded, so no extra flags are needed. You should see the full command lifecycle in both terminals: ISSUED → COMMANDED → EXECUTING → COMPLETED.

Minimal Example — Report Publisher

The SDK includes 350 pre-wired service classes so you don’t need to specify types or topics yourself. Here’s a health report publisher using HealthReportProvider:

import asyncio
from rtiumaapy import DDSContext, set_timestamp
from rtiumaapy.services.so import HealthReportProvider
from rtiumaapy.datamodel.HealthReportType import (
    UMAA_SO_HealthReport_HealthReportType as HealthReportType,
)

async def main():
    ctx = DDSContext(domain_id=0)

    provider = HealthReportProvider(ctx)

    sample = HealthReportType(source=ctx.source_id)
    set_timestamp(sample)
    provider.write(sample)

    await ctx.run_until_shutdown()

asyncio.run(main())

Minimal Example — Report Subscriber

The consumer side is even simpler — subclass the pre-wired HealthReportConsumer and override on_report():

import asyncio
from rtiumaapy import DDSContext
from rtiumaapy.services.so import HealthReportConsumer

class MyHealthConsumer(HealthReportConsumer):
    async def on_report(self, sample):
        print(f"Health: severity={sample.severity} code={sample.code}")

async def main():
    ctx = DDSContext(domain_id=0)
    consumer = MyHealthConsumer(ctx)
    await ctx.run_until_shutdown()

asyncio.run(main())

Minimal Example — Command Provider

Command providers receive commands addressed to them and drive each command through the ICD state machine: ISSUED → COMMANDED → EXECUTING → COMPLETED. Subclass a pre-wired provider and override the on_executing() hook to implement your domain logic:

import asyncio
import logging
from rtiumaapy import DDSContext, set_timestamp
from rtiumaapy.services.so import BITControlProvider
from rtiumaapy.command_provider_session import CommandProviderSession

logging.basicConfig(level=logging.INFO)

class MyBITProvider(BITControlProvider):
    """Simple Built-In Test provider that always succeeds."""

    async def on_commanded(self, session: CommandProviderSession):
        print(f"BIT command accepted — session {session.session_id}")

    async def on_executing(self, session: CommandProviderSession):
        print("Running built-in test...")
        await asyncio.sleep(1)  # simulate work
        print("Built-in test passed.")

    async def on_complete(self, session: CommandProviderSession):
        print("BIT command completed successfully.")

async def main():
    ctx = DDSContext(domain_id=0)
    provider = MyBITProvider(ctx, source_id=ctx.source_id)
    provider.start()
    await ctx.run_until_shutdown()

asyncio.run(main())

The base class handles ack publishing, status transitions, and session management automatically — you only write the behavior.

Command Provider with Execution Status

Some command services include an execution-status topic for progress feedback. The GlobalVectorControlProvider (MO domain) is a good example — it reports whether heading and speed targets have been achieved:

import asyncio
import logging
from rtiumaapy import DDSContext, set_timestamp
from rtiumaapy.services.mo import GlobalVectorControlProvider
from rtiumaapy.command_provider_session import CommandProviderSession
from rtiumaapy.datamodel.GlobalVectorExecutionStatusReportType import (
    UMAA_MO_GlobalVectorControl_GlobalVectorExecutionStatusReportType
    as GlobalVectorExecStatus,
)

logging.basicConfig(level=logging.INFO)

class MyGlobalVectorProvider(GlobalVectorControlProvider):
    """Heading/speed controller that publishes execution status."""

    async def on_commanded(self, session: CommandProviderSession):
        cmd = session.command
        print(f"GlobalVector COMMANDED — direction={getattr(cmd, 'direction', '?')}")

    async def on_executing(self, session: CommandProviderSession):
        cmd = session.command

        # Publish execution status showing targets achieved
        exec_status = GlobalVectorExecStatus()
        exec_status.source = self._source_id
        exec_status.sessionID = cmd.sessionID
        exec_status.directionAchieved = True
        exec_status.speedAchieved = True
        exec_status.elevationAchieved = True
        set_timestamp(exec_status)

        if self._exec_status_writer is not None:
            self._exec_status_writer.write(exec_status)

        print("GlobalVector targets achieved.")

    async def on_complete(self, session: CommandProviderSession):
        print("GlobalVector COMPLETED")

async def main():
    ctx = DDSContext(domain_id=0)
    provider = MyGlobalVectorProvider(ctx, source_id=ctx.source_id)
    provider.start()
    await ctx.run_until_shutdown()

asyncio.run(main())

Minimal Example — Command Consumer

The consumer side sends a command to a provider and reacts to lifecycle events (ack, status transitions, execution status, terminal) via hooks. Here’s a BITControlConsumer that sends a BIT command and waits for completion:

import asyncio
import logging
from rtiumaapy import DDSContext
from rtiumaapy.guid_util import GUIDUtil
from rtiumaapy.services.so import BITControlConsumer
from rtiumaapy.datamodel.BITCommandType import (
    UMAA_SO_BITControl_BITCommandType as BITCommandType,
)

logging.basicConfig(level=logging.INFO)

class MyBITConsumer(BITControlConsumer):
    """BIT consumer that logs every lifecycle event."""

    def __init__(self, ctx, *, source_id, destination_id):
        super().__init__(ctx, source_id=source_id, destination_id=destination_id)
        self.done = asyncio.Event()

    async def on_ack(self, session_id, ack):
        print(f"ACK received — session={GUIDUtil.to_hex(session_id)}")

    async def on_status(self, session_id, status):
        print(f"STATUS — {status.commandStatus}")

    async def on_terminal(self, session_id, status):
        print(f"TERMINAL — {status.commandStatus if status else 'cancelled'}")
        self.done.set()

async def main():
    ctx = DDSContext(domain_id=0)

    # destination_id must match the provider's source_id
    destination_id = GUIDUtil.make_source_id("<provider-guid-hex>")

    consumer = MyBITConsumer(
        ctx, source_id=ctx.source_id, destination_id=destination_id,
    )
    consumer.start()

    # Wait for the provider to appear on the network
    await consumer.wait_for_discovery(timeout=30.0)

    # Build and send the command
    cmd = BITCommandType()
    session_id = await consumer.send(cmd)
    print(f"Command sent — session={GUIDUtil.to_hex(session_id)}")

    # Wait for the command lifecycle to finish
    await asyncio.wait_for(consumer.done.wait(), timeout=30.0)

asyncio.run(main())

Key points:

  • source_id identifies this consumer; destination_id is the target provider’s identity (its content filter routes on this).

  • consumer.send(cmd) auto-stamps header fields and returns the session ID.

  • wait_for_discovery() blocks until a matching provider is found on the network.

  • The consumer reader loops dispatch on_ack, on_status, on_exec_status, and on_terminal as events arrive.

Tip

Run the provider example from the previous section in one terminal and this consumer in another to see the full ISSUED → COMMANDED → EXECUTING → COMPLETED lifecycle.

For a more complete consumer with argument parsing, execution-status tracking, and graceful shutdown, see examples/globalvector_consumer/run_globalvector_consumer.py.

What’s Next