View on GitHub

Rx4DDS

Reactive Extensions Adapters for RTI Connext DDS

Download this project as a .zip file Download this project as a tar.gz file

Rx4DDS

What's Rx4DDS

Rx4DDS is a research library that integrates Reactive Extensions (Rx) with RTI Connext DDS in C++11, C#, and JavaScript. Rx and DDS are quite complementary because Rx is based on the Observable-Observer pattern, which is analogous to the publish-subscribe pattern of DDS. Furthermore, the core tenet of Rx composition of operations over values that change over time complements DDS instances, which are data objects that change over time. DDS ensures propagation of changes to the interested remote participants. Consequently, combining Rx with DDS enables a coherent end-to-end distributed asynchronous dataflow architecture for both data distribution (which is performed by DDS) and processing (which is done by Rx). Rx and DDS together support location transparency of dataflow-style programs seamlessly. The resulting applications dramatically simplify concurrency to the extent that it can be simply configured.

CppCon 2015 session on Rx4DDS: Reactive Stream Processing in Industrial IoT using DDS and RxCpp [Slides]. The talk explains the demonstration videos and the code samples below.

Status

The Rx4DDS adapters are research prototypes. RxJS adapter works with RTI Connext DDS Node.js Connector. The C# codebase also includes an implementation of the DEBS'13 Grand Challenge using DDS and Rx.NET.

Further Reading

Data Transformation using Map

Demonstrates a stateful transformation pipeline. Transforms "Square" topic to "Circle" and "Triangle" topics. Blue circle orbits around the square, and blue triangle orbits around the blue circle.

  rx4dds::TopicSubscription<ShapeType> 
      topic_sub(participant, "Square", waitset, worker);
  rx::observable<LoanedSample<ShapeType>> source =
      topic_sub.create_observable();
  rx::observable<ShapeType> square_track = 
    source >> rx4dds::complete_on_dispose()
           >> rx4dds::error_on_no_alive_writers()
           >> rxo::filter([](LoanedSample<ShapeType> s) {
                 return s.info().valid();
              }) // skip invalid samples
           >> rxo::map([](LoanedSample<ShapeType> valid) {
                 return valid.data();
              }); // map samples to data

  int circle_degree = 0;
  square_track
    .map([circle_degree](ShapeType & square) mutable 
    {
      circle_degree = (circle_degree + 3) % 360;
      return shape_location(square, circle_degree);
    })
    .tap([circle_writer](ShapeType & circle) mutable {
            circle_writer.write(circle);
    }); // tap replaced as publish_over_dds later

  int tri_degree = 0;
  circle_track
    .map([tri_degree](ShapeType & circle) mutable 
    {
      tri_degree = (tri_degree + 9) % 360;
      return shape_location(circle, tri_degree);
    })
    >> rx4dds::publish_over_dds(triangle_writer);

  triangle_track.subscribe();

Keyed Data Transformation using GroupBy and Map

Demonstrates multiple parallel stateful pipelines. Same as before but the transformation pipeline is replicated for each key (i.e., shape color). The resulting topics ("Circle" and "Triangle") mimic the lifecycle of the original instance in "Square" topic. DDS dispose maps to on_completed and NOT_ALIVE_NO_WRITERS instance-state maps to on_error.

rx4dds::TopicSubscription<ShapeType> 
    topic_sub(participant, "Square", waitset, worker);
auto grouped_stream =
  topic_sub.create_observable() 
      >> rx4dds::group_by_instance ([](ShapeType & shape) { 
            return shape.color(); 
         });
grouped_stream
  .flat_map([circle_writer, triangle_writer]
            (GroupedShapeObservable go) {
   rx::observable<ShapeType> inner_transformed =
        go >> rx4dds::to_unkeyed()
           >> rx4dds::complete_on_dispose()
           >> rx4dds::error_on_no_alive_writers()
           >> rx4dds::skip_invalid_samples()
           >> rx4dds::map_samples_to_data()
           >> rx4dds::map_to_circle_track() // as shown before
           >> rx4dds::publish_over_dds(
                 circle_writer, ShapeType(go.key())
           >> rx4dds::map_to_triangle_track() // as shown before
           >> rx4dds::publish_over_dds(
                 triangle_writer, ShapeType(go.key());
     return inner_transformed;
  }).subscribe();

Average of Multiple Topic Instances

Demonstrates dynamic correlation (average) of "Square" topic instances. Incorporates lifecycle (appearance and disappearance) of "Square" topic instances. As before, DDS dispose maps to on_completed and NOT_ALIVE_NO_WRITERS instance-state maps to on_error.

rx4dds::TopicSubscription<ShapeType> 
    topic_sub(participant, "Square", waitset, worker);
auto grouped_stream =
  topic_sub.create_observable() 
      >> rx4dds::group_by_instance ([](ShapeType & shape) { 
            return shape.color(); 
         });
grouped_stream
  .map([](GroupedShapeObservable go) {
    return go >> rx4dds::to_unkeyed()
              >> rx4dds::complete_on_dispose()
              >> rx4dds::error_on_no_alive_writers()
              >> rx4dds::skip_invalid_samples()
              >> rx4dds::map_samples_to_data()
              >> rxo::publish()
              >> rxo::ref_count()
              >> rxo::as_dynamic();
 })
 >> rx4dds::coalesce_alive()
 >> rxo::map([](const vector<rxcpp::observable<ShapeType>> & srcs) {
      return rx4dds::combine_latest(srcs);
    })
 >> rxo::switch_on_next()
 >> rxo::map([](const std::vector<ShapeType> & shapes) {
         return calculate_average(shapes);
    })
 >> rx4dds::publish_over_dds(triangle_writer, ShapeType(“ORANGE”));