What's Rx4DDS
Rx4DDS is a research library that integrates Reactive Extensions (Rx) with RTI Connext DDS in C++11, C#, and JavaScript. Rx and DDS are quite complementary because Rx is based on the Observable-Observer pattern, which is analogous to the publish-subscribe pattern of DDS. Furthermore, the core tenet of Rx composition of operations over values that change over time complements DDS instances, which are data objects that change over time. DDS ensures propagation of changes to the interested remote participants. Consequently, combining Rx with DDS enables a coherent end-to-end distributed asynchronous dataflow architecture for both data distribution (which is performed by DDS) and processing (which is done by Rx). Rx and DDS together support location transparency of dataflow-style programs seamlessly. The resulting applications dramatically simplify concurrency to the extent that it can be simply configured.
CppCon 2015 session on Rx4DDS: Reactive Stream Processing in Industrial IoT using DDS and RxCpp [Slides]. The talk explains the demonstration videos and the code samples below.
Status
The Rx4DDS adapters are research prototypes. RxJS adapter works with RTI Connext DDS Node.js Connector. The C# codebase also includes an implementation of the DEBS'13 Grand Challenge using DDS and Rx.NET.
Further Reading
- DEBS'15 Research paper Functional Reactive Stream Processing for Data-centric Publish/Subscribe
- DEBS'15 Research paper presentation
- A slightly older research paper Scalable Reactive Stream Processing Using DDS and Rx
- Silicon Valley Code Camp presentation
- Silicon Valley Code Camp video
- First Rx4DDS.NET blogpost
- OMG DDS portal
- RTI Connext DDS
- RTI YouTube Channel
Data Transformation using Map
Demonstrates a stateful transformation pipeline. Transforms "Square" topic to "Circle" and "Triangle" topics. Blue circle orbits around the square, and blue triangle orbits around the blue circle.
rx4dds::TopicSubscription<ShapeType>
topic_sub(participant, "Square", waitset, worker);
rx::observable<LoanedSample<ShapeType>> source =
topic_sub.create_observable();
rx::observable<ShapeType> square_track =
source >> rx4dds::complete_on_dispose()
>> rx4dds::error_on_no_alive_writers()
>> rxo::filter([](LoanedSample<ShapeType> s) {
return s.info().valid();
}) // skip invalid samples
>> rxo::map([](LoanedSample<ShapeType> valid) {
return valid.data();
}); // map samples to data
int circle_degree = 0;
square_track
.map([circle_degree](ShapeType & square) mutable
{
circle_degree = (circle_degree + 3) % 360;
return shape_location(square, circle_degree);
})
.tap([circle_writer](ShapeType & circle) mutable {
circle_writer.write(circle);
}); // tap replaced as publish_over_dds later
int tri_degree = 0;
circle_track
.map([tri_degree](ShapeType & circle) mutable
{
tri_degree = (tri_degree + 9) % 360;
return shape_location(circle, tri_degree);
})
>> rx4dds::publish_over_dds(triangle_writer);
triangle_track.subscribe();
Keyed Data Transformation using GroupBy and Map
Demonstrates multiple parallel stateful pipelines. Same as before but the transformation pipeline is replicated for each key (i.e., shape color). The resulting topics ("Circle" and "Triangle") mimic the lifecycle of the original instance in "Square" topic. DDS dispose maps to on_completed and NOT_ALIVE_NO_WRITERS instance-state maps to on_error.
rx4dds::TopicSubscription<ShapeType>
topic_sub(participant, "Square", waitset, worker);
auto grouped_stream =
topic_sub.create_observable()
>> rx4dds::group_by_instance ([](ShapeType & shape) {
return shape.color();
});
grouped_stream
.flat_map([circle_writer, triangle_writer]
(GroupedShapeObservable go) {
rx::observable<ShapeType> inner_transformed =
go >> rx4dds::to_unkeyed()
>> rx4dds::complete_on_dispose()
>> rx4dds::error_on_no_alive_writers()
>> rx4dds::skip_invalid_samples()
>> rx4dds::map_samples_to_data()
>> rx4dds::map_to_circle_track() // as shown before
>> rx4dds::publish_over_dds(
circle_writer, ShapeType(go.key())
>> rx4dds::map_to_triangle_track() // as shown before
>> rx4dds::publish_over_dds(
triangle_writer, ShapeType(go.key());
return inner_transformed;
}).subscribe();
Average of Multiple Topic Instances
Demonstrates dynamic correlation (average) of "Square" topic instances. Incorporates lifecycle (appearance and disappearance) of "Square" topic instances. As before, DDS dispose maps to on_completed and NOT_ALIVE_NO_WRITERS instance-state maps to on_error.
rx4dds::TopicSubscription<ShapeType>
topic_sub(participant, "Square", waitset, worker);
auto grouped_stream =
topic_sub.create_observable()
>> rx4dds::group_by_instance ([](ShapeType & shape) {
return shape.color();
});
grouped_stream
.map([](GroupedShapeObservable go) {
return go >> rx4dds::to_unkeyed()
>> rx4dds::complete_on_dispose()
>> rx4dds::error_on_no_alive_writers()
>> rx4dds::skip_invalid_samples()
>> rx4dds::map_samples_to_data()
>> rxo::publish()
>> rxo::ref_count()
>> rxo::as_dynamic();
})
>> rx4dds::coalesce_alive()
>> rxo::map([](const vector<rxcpp::observable<ShapeType>> & srcs) {
return rx4dds::combine_latest(srcs);
})
>> rxo::switch_on_next()
>> rxo::map([](const std::vector<ShapeType> & shapes) {
return calculate_average(shapes);
})
>> rx4dds::publish_over_dds(triangle_writer, ShapeType(“ORANGE”));