DDSContext

DDSContext — singleton DDS infrastructure owner for UMAA applications.

Manages the DomainParticipant, Publisher, Subscriber, QoS provider, topic cache, and service registry. Services receive a DDSContext at construction and use its factory methods to create DDS entities.

class rtiumaapy.dds_context.DDSContext(domain_id: int = 0, qos_file: str | None = None, source_guid: str | None = None)[source]

Bases: object

Singleton encapsulating all DDS infrastructure for a UMAA application.

Only one DDSContext may exist at a time. Call shutdown() before creating a new one.

__init__(domain_id: int = 0, qos_file: str | None = None, source_guid: str | None = None) None[source]

Create the DDS infrastructure.

Parameters:
  • domain_id – DDS domain ID.

  • qos_file – Path to QoS XML. If None, reads from the UMAA_QOS_FILE environment variable.

  • source_guid – Hex GUID (32 chars or UUID with dashes) for this component’s source identity. If None, a random GUID is generated automatically.

Raises:

RuntimeError – If a DDSContext already exists.

classmethod current() DDSContext[source]

Return the active DDSContext.

Raises:

RuntimeError – If no context has been created yet.

property participant: rti.connextdds.DomainParticipant
property publisher: rti.connextdds.Publisher
property subscriber: rti.connextdds.Subscriber
property qos_provider: rti.connextdds.QosProvider
property domain_id: int
property source_id

The component’s IdentifierType source identity.

property source_guid: str

The component’s source GUID as a 32-char hex string.

get_topic(data_type: Type, topic_name: str) rti.connextdds.Topic[source]

Retrieve or create a typed DDS Topic.

Uses dds.Topic.find() to look up an existing Topic registered with the DomainParticipant before creating a new one (D17).

Parameters:
  • data_type – An @idl.struct type (the IDL-generated Python class).

  • topic_name – The DDS topic name string.

Returns:

The existing or newly created dds.Topic.

register_service(key: str, service: Any) None[source]

Register a service. Does not start it.

Called automatically by BaseService.__init__.

Raises:

ValueError – If key is already registered.

unregister_service(key: str) Any | None[source]

Remove and return a service, or None if not found.

Cancels the service’s _run() task if one is active.

get_service(key: str) Any | None[source]

Look up a registered service by key.

registered_service_keys() List[str][source]

Return a snapshot of all registered service keys.

create_writer(data_type: Type, topic_name: str) rti.connextdds.DataWriter[source]

Create a typed DataWriter with QoS resolved from topic_filter rules.

Parameters:
  • data_type – An @idl.struct type.

  • topic_name – The DDS topic name — drives automatic QoS assignment.

create_reader(data_type: Type, topic_name: str) rti.connextdds.DataReader[source]

Create a typed DataReader with QoS resolved from topic_filter rules.

Parameters:
  • data_type – An @idl.struct type.

  • topic_name – The DDS topic name — drives automatic QoS assignment.

create_filtered_reader(data_type: Type, topic_name: str, filter_expression: str, filter_parameters: List[str] | None = None, filter_name: str | None = None) Tuple[rti.connextdds.DataReader, rti.connextdds.ContentFilteredTopic][source]

Create a content-filtered DataReader.

Returns:

(DataReader, ContentFilteredTopic) — the caller can update the filter dynamically via cft.set_filter().

async run_until_shutdown() None[source]

Start all registered services and block until SIGINT/SIGTERM.

Each service with a _run coroutine method gets its own asyncio.Task. When a termination signal arrives the context calls shutdown() to tear everything down.

async shutdown() None[source]

Tear down all managed resources in order:

  1. Stop the rti.asyncio dispatcher — we are done listening.

  2. Cancel every service’s _run() task.

  3. Await close() on every registered service (reverse order) — services publish final messages (writes only).

  4. Close all DDS contained entities.

  5. Close the DomainParticipant.

  6. Clear the singleton reference.

Usage

DDSContext is the first object you create and the last one shut down. Everything flows through it:

import asyncio
from rtiumaapy import DDSContext

async def main():
    ctx = DDSContext(domain_id=0)
    # ... create services, components ...
    await ctx.run_until_shutdown()

asyncio.run(main())

Shutdown Order

shutdown() tears down resources in a specific order to avoid segfaults and DDS warnings:

  1. Stop rti.asyncio dispatcher (no more data received)

  2. Cancel all _run() tasks

  3. Call close() on every service (reverse registration order)

  4. close_contained_entities() on the DomainParticipant

  5. Close the DomainParticipant

  6. Clear the singleton reference

Services can still write during close() because the dispatcher shutdown only affects the receive path.