ReportConsumer

ReportConsumer — single-topic report subscriber (Tier 1).

A ReportConsumer owns one DataReader and delivers incoming samples via the on_report subclass-override hook (D36).

class rtiumaapy.report_consumer.ReportConsumer(ctx: DDSContext, service_name: str | None = None, *, report_type: Type, report_topic: str)[source]

Bases: BaseService

Subscribes to a single report type and delivers via subclass override.

Subclass and override on_report() to process incoming samples:

class GpsConsumer(ReportConsumer):
    async def on_report(self, sample):
        print(sample)
Parameters:
  • ctx – The DDSContext owning shared DDS infrastructure.

  • service_name – Unique name for this service instance.

  • report_type – An @idl.struct type (the IDL-generated Python class).

  • report_topic – The DDS topic name (drives QoS assignment).

property reader: rti.connextdds.DataReader

The underlying DataReader (for advanced use).

property report_topic: str

The DDS topic name.

start() None[source]

Start the asynchronous event loop that delivers samples.

Creates an asyncio.Task running _run(). The task is automatically cancelled when close() is called.

async on_report(sample) None[source]

Called for each valid incoming sample.

Override in a subclass to process data. The default implementation is a no-op.

async close() None[source]

Cancel the event loop task.

Entity cleanup is deferred to DDSContext.shutdown() which stops the rti.asyncio dispatcher before calling close_contained_entities().

This method is idempotent — calling it more than once is safe.

Usage

A ReportConsumer subscribes to a single DDS topic and delivers incoming samples via the on_report() hook. Use a pre-wired consumer class — subclass it and override on_report():

from rtiumaapy.services.so import HealthReportConsumer

class MyHealthConsumer(HealthReportConsumer):
    async def on_report(self, sample):
        print(f"Health: severity={sample.severity}")

consumer = MyHealthConsumer(ctx)

Event Loop

Call start() to begin the async take_async() event loop, or let DDSContext.run_until_shutdown() start it automatically.

The loop:

  1. Waits for samples via reader.take_async()

  2. Validates each sample against IDL constraints

  3. Calls on_report(sample) for each valid sample

  4. Logs exceptions from on_report() — does not stop the loop

Using Pre-Wired Classes

Pre-wired consumers handle all the type/topic wiring. Just subclass and override on_report():

from rtiumaapy.services.sa import GlobalPoseReportConsumer

class MyPoseConsumer(GlobalPoseReportConsumer):
    async def on_report(self, sample):
        print(f"Lat: {sample.pose.geodeticLatitude}")

consumer = MyPoseConsumer(ctx)