Source code for rtiumaapy.report_consumer

"""ReportConsumer — single-topic report subscriber (Tier 1).

A ``ReportConsumer`` owns one ``DataReader`` and delivers incoming
samples via the ``on_report`` subclass-override hook (D36).
"""

import asyncio
import logging
from typing import Optional, Type

import rti.connextdds as dds
import rti.asyncio  # noqa: F401 — enables take_async on DataReader

from rtiumaapy.base_service import BaseService
from rtiumaapy.dds_context import DDSContext
from rtiumaapy.validation import validate_message

_logger = logging.getLogger(__name__)


[docs] class ReportConsumer(BaseService): """Subscribes to a single report type and delivers via subclass override. Subclass and override ``on_report()`` to process incoming samples:: class GpsConsumer(ReportConsumer): async def on_report(self, sample): print(sample) Args: ctx: The :class:`DDSContext` owning shared DDS infrastructure. service_name: Unique name for this service instance. report_type: An ``@idl.struct`` type (the IDL-generated Python class). report_topic: The DDS topic name (drives QoS assignment). """ def __init__( self, ctx: DDSContext, service_name: Optional[str] = None, *, report_type: Type, report_topic: str, ) -> None: super().__init__(ctx, service_name) self._report_type = report_type self._report_topic = report_topic self._reader = ctx.create_reader(report_type, report_topic) self._task: Optional[asyncio.Task] = None # ── Properties ──────────────────────────────────────────────────────── @property def reader(self) -> dds.DataReader: """The underlying ``DataReader`` (for advanced use).""" return self._reader @property def report_topic(self) -> str: """The DDS topic name.""" return self._report_topic # ── Event-driven processing ───────────────────────────────────────────
[docs] def start(self) -> None: """Start the asynchronous event loop that delivers samples. Creates an ``asyncio.Task`` running :meth:`_run`. The task is automatically cancelled when :meth:`close` is called. """ if self._task is None or self._task.done(): self._task = asyncio.ensure_future(self._run())
[docs] async def on_report(self, sample) -> None: """Called for each valid incoming sample. Override in a subclass to process data. The default implementation is a no-op. """
async def _run(self) -> None: """Async event loop — delivers valid samples to :meth:`on_report`. ``take_async()`` yields individual ``Sample`` objects (with ``.data`` and ``.info`` attributes), one at a time. """ async for sample in self._reader.take_async(): if sample.info.valid: try: valid, errors = validate_message(sample.data) if not valid: _logger.warning( "Received invalid report on %s: %s", self._report_topic, "; ".join(errors), ) await self.on_report(sample.data) except Exception: _logger.exception( "on_report hook failed for %s", self._report_topic, ) # ── Lifecycle ─────────────────────────────────────────────────────────
[docs] async def close(self) -> None: """Cancel the event loop task. Entity cleanup is deferred to ``DDSContext.shutdown()`` which stops the ``rti.asyncio`` dispatcher before calling ``close_contained_entities()``. This method is idempotent — calling it more than once is safe. """ # Cancel the _run() task so take_async() exits cleanly if self._task is not None and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None