ReportConsumer
ReportConsumer — single-topic report subscriber (Tier 1).
A ReportConsumer owns one DataReader and delivers incoming
samples via the on_report subclass-override hook (D36).
- class rtiumaapy.report_consumer.ReportConsumer(ctx: DDSContext, service_name: str | None = None, *, report_type: Type, report_topic: str)[source]
Bases:
BaseServiceSubscribes to a single report type and delivers via subclass override.
Subclass and override
on_report()to process incoming samples:class GpsConsumer(ReportConsumer): async def on_report(self, sample): print(sample)
- Parameters:
ctx – The
DDSContextowning shared DDS infrastructure.service_name – Unique name for this service instance.
report_type – An
@idl.structtype (the IDL-generated Python class).report_topic – The DDS topic name (drives QoS assignment).
- property reader: rti.connextdds.DataReader
The underlying
DataReader(for advanced use).
- property report_topic: str
The DDS topic name.
- start() None[source]
Start the asynchronous event loop that delivers samples.
Creates an
asyncio.Taskrunning_run(). The task is automatically cancelled whenclose()is called.
Usage
A ReportConsumer subscribes to a single DDS topic and delivers incoming
samples via the on_report() hook. Use a pre-wired consumer class —
subclass it and override on_report():
from rtiumaapy.services.so import HealthReportConsumer
class MyHealthConsumer(HealthReportConsumer):
async def on_report(self, sample):
print(f"Health: severity={sample.severity}")
consumer = MyHealthConsumer(ctx)
Event Loop
Call start() to begin the async take_async() event loop, or let
DDSContext.run_until_shutdown() start it automatically.
The loop:
Waits for samples via
reader.take_async()Validates each sample against IDL constraints
Calls
on_report(sample)for each valid sampleLogs exceptions from
on_report()— does not stop the loop
Using Pre-Wired Classes
Pre-wired consumers handle all the type/topic wiring. Just subclass and override on_report():
from rtiumaapy.services.sa import GlobalPoseReportConsumer
class MyPoseConsumer(GlobalPoseReportConsumer):
async def on_report(self, sample):
print(f"Lat: {sample.pose.geodeticLatitude}")
consumer = MyPoseConsumer(ctx)