DDSContext
DDSContext — singleton DDS infrastructure owner for UMAA applications.
Manages the DomainParticipant, Publisher, Subscriber, QoS provider, topic
cache, and service registry. Services receive a DDSContext at
construction and use its factory methods to create DDS entities.
- class rtiumaapy.dds_context.DDSContext(domain_id: int = 0, qos_file: str | None = None, source_guid: str | None = None)[source]
Bases:
objectSingleton encapsulating all DDS infrastructure for a UMAA application.
Only one
DDSContextmay exist at a time. Callshutdown()before creating a new one.- __init__(domain_id: int = 0, qos_file: str | None = None, source_guid: str | None = None) None[source]
Create the DDS infrastructure.
- Parameters:
domain_id – DDS domain ID.
qos_file – Path to QoS XML. If None, reads from the
UMAA_QOS_FILEenvironment variable.source_guid – Hex GUID (32 chars or UUID with dashes) for this component’s source identity. If None, a random GUID is generated automatically.
- Raises:
RuntimeError – If a
DDSContextalready exists.
- classmethod current() DDSContext[source]
Return the active
DDSContext.- Raises:
RuntimeError – If no context has been created yet.
- property participant: rti.connextdds.DomainParticipant
- property publisher: rti.connextdds.Publisher
- property subscriber: rti.connextdds.Subscriber
- property qos_provider: rti.connextdds.QosProvider
- property domain_id: int
- property source_id
The component’s
IdentifierTypesource identity.
- property source_guid: str
The component’s source GUID as a 32-char hex string.
- get_topic(data_type: Type, topic_name: str) rti.connextdds.Topic[source]
Retrieve or create a typed DDS Topic.
Uses
dds.Topic.find()to look up an existing Topic registered with the DomainParticipant before creating a new one (D17).- Parameters:
data_type – An
@idl.structtype (the IDL-generated Python class).topic_name – The DDS topic name string.
- Returns:
The existing or newly created
dds.Topic.
- register_service(key: str, service: Any) None[source]
Register a service. Does not start it.
Called automatically by
BaseService.__init__.- Raises:
ValueError – If key is already registered.
- unregister_service(key: str) Any | None[source]
Remove and return a service, or None if not found.
Cancels the service’s
_run()task if one is active.
- create_writer(data_type: Type, topic_name: str) rti.connextdds.DataWriter[source]
Create a typed
DataWriterwith QoS resolved from topic_filter rules.- Parameters:
data_type – An
@idl.structtype.topic_name – The DDS topic name — drives automatic QoS assignment.
- create_reader(data_type: Type, topic_name: str) rti.connextdds.DataReader[source]
Create a typed
DataReaderwith QoS resolved from topic_filter rules.- Parameters:
data_type – An
@idl.structtype.topic_name – The DDS topic name — drives automatic QoS assignment.
- create_filtered_reader(data_type: Type, topic_name: str, filter_expression: str, filter_parameters: List[str] | None = None, filter_name: str | None = None) Tuple[rti.connextdds.DataReader, rti.connextdds.ContentFilteredTopic][source]
Create a content-filtered
DataReader.- Returns:
(DataReader, ContentFilteredTopic)— the caller can update the filter dynamically viacft.set_filter().
- async run_until_shutdown() None[source]
Start all registered services and block until SIGINT/SIGTERM.
Each service with a
_runcoroutine method gets its ownasyncio.Task. When a termination signal arrives the context callsshutdown()to tear everything down.
- async shutdown() None[source]
Tear down all managed resources in order:
Stop the
rti.asynciodispatcher — we are done listening.Cancel every service’s
_run()task.Await
close()on every registered service (reverse order) — services publish final messages (writes only).Close all DDS contained entities.
Close the DomainParticipant.
Clear the singleton reference.
Usage
DDSContext is the first object you create and the last one shut down.
Everything flows through it:
import asyncio
from rtiumaapy import DDSContext
async def main():
ctx = DDSContext(domain_id=0)
# ... create services, components ...
await ctx.run_until_shutdown()
asyncio.run(main())
Shutdown Order
shutdown() tears down resources in a specific order to avoid segfaults
and DDS warnings:
Stop
rti.asynciodispatcher (no more data received)Cancel all
_run()tasksCall
close()on every service (reverse registration order)close_contained_entities()on the DomainParticipantClose the DomainParticipant
Clear the singleton reference
Services can still write during close() because the dispatcher
shutdown only affects the receive path.