diff --git a/test_rmw_implementation/CMakeLists.txt b/test_rmw_implementation/CMakeLists.txt index 7fdf5a84..f67bb5d6 100644 --- a/test_rmw_implementation/CMakeLists.txt +++ b/test_rmw_implementation/CMakeLists.txt @@ -196,6 +196,15 @@ if(BUILD_TESTING) test_msgs::test_msgs ) + ament_add_gtest_executable(test_event_message_lost_deadlock + test/test_event_message_lost_deadlock.cpp + ) + target_link_libraries(test_event_message_lost_deadlock + rmw::rmw + rmw_implementation::rmw_implementation + test_msgs::test_msgs + ) + function(test_api) message(STATUS "Creating API tests for '${rmw_implementation}'") set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation}) @@ -298,6 +307,23 @@ if(BUILD_TESTING) ENV ${rmw_implementation_env_var} ) + + # Reproduces an AB-BA lock-order inversion between rmw_take_event(MESSAGE_LOST) + # and on_sample_lost. This is specific to rmw_fastrtps: it relies on disabling + # intra-process delivery (via the Fast DDS profile below, a no-op for other + # implementations) to force real SAMPLE_LOST. Other implementations do not + # reliably produce SAMPLE_LOST for this pattern, so registering it for them only + # yields false failures; restrict registration to the fastrtps variants. (The test + # itself also skips cleanly if no loss is generated.) + if(rmw_implementation MATCHES "fastrtps") + ament_add_ros_isolated_gtest_test(test_event_message_lost_deadlock + TEST_NAME test_event_message_lost_deadlock${target_suffix} + TIMEOUT 60 + ENV + ${rmw_implementation_env_var} + FASTRTPS_DEFAULT_PROFILES_FILE=${CMAKE_CURRENT_SOURCE_DIR}/test/no_intraprocess_profile.xml + ) + endif() endfunction() call_for_each_rmw_implementation(test_api) diff --git a/test_rmw_implementation/test/no_intraprocess_profile.xml b/test_rmw_implementation/test/no_intraprocess_profile.xml new file mode 100644 index 00000000..92e05d7b --- /dev/null +++ b/test_rmw_implementation/test/no_intraprocess_profile.xml @@ -0,0 +1,12 @@ + + + + + OFF + + diff --git a/test_rmw_implementation/test/test_event_message_lost_deadlock.cpp b/test_rmw_implementation/test/test_event_message_lost_deadlock.cpp new file mode 100644 index 00000000..e48ad901 --- /dev/null +++ b/test_rmw_implementation/test/test_event_message_lost_deadlock.cpp @@ -0,0 +1,223 @@ +// Copyright 2026 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Regression test for an AB-BA lock-order inversion in the subscription +// QoS-event path. +// +// Two mutexes are involved, per subscription: +// R = the DataReader's internal mutex +// E = the rmw subscription event mutex +// +// * Executor side (the take loop below): +// rmw_take_event(MESSAGE_LOST) locks E, then queries the reader's +// sample-lost status, which locks R. (E -> R) +// +// * DDS receive side (driven by real sample loss): +// the reader holds R while delivering on_sample_lost, which calls back +// into rmw and locks E. (R -> E) +// +// Run concurrently with actual SAMPLE_LOST traffic, these orderings deadlock on +// an affected implementation. +// +// To exercise it, all three must hold: +// 1. The MESSAGE_LOST event callback is armed via rmw_event_set_callback() +// (this is what an events-driven executor does; it makes the implementation +// deliver on_sample_lost under the reader mutex -- the R->E edge). +// 2. Real SAMPLE_LOST occurs. The CMake registration sets a profile that +// disables intra-process delivery for rmw_fastrtps; intra-process delivery +// hands samples over inline and never loses them. If no loss is generated +// the test reports "scenario not exercised" rather than a misleading pass. +// 3. The take loop and the publisher flood run concurrently (done below). +// +// On an affected implementation the take loop wedges and the watchdog fails the +// test; on a fixed implementation the take loop keeps progressing and it passes. +// +// For deterministic detection, build the rmw implementation with ThreadSanitizer; +// TSAN reports the inversion the first time both orderings are observed. + +#include + +#include +#include +#include +#include +#include + +#include "rcutils/allocator.h" +#include "rcutils/strdup.h" + +#include "rmw/error_handling.h" +#include "rmw/event.h" +#include "rmw/rmw.h" + +#include "test_msgs/msg/basic_types.h" + +namespace +{ +constexpr std::chrono::seconds kMaxRunTime{20}; // overall cap +constexpr std::chrono::seconds kStallWindow{3}; // no take progress => deadlock +constexpr std::chrono::milliseconds kDiscovery{500}; // endpoint matching +} // namespace + +class TestEventMessageLostDeadlock : public ::testing::Test +{ +protected: + void SetUp() override + { + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + init_options = rmw_get_zero_initialized_init_options(); + ASSERT_EQ(RMW_RET_OK, rmw_init_options_init(&init_options, allocator)); + init_options.enclave = rcutils_strdup("/", allocator); + ASSERT_STREQ("/", init_options.enclave); + ASSERT_EQ(RMW_RET_OK, rmw_init(&init_options, &context)); + node = rmw_create_node(&context, "test_event_message_lost_deadlock", "/"); + ASSERT_NE(nullptr, node); + } + + void TearDown() override + { + if (node) { + EXPECT_EQ(RMW_RET_OK, rmw_destroy_node(node)); + } + EXPECT_EQ(RMW_RET_OK, rmw_shutdown(&context)); + EXPECT_EQ(RMW_RET_OK, rmw_context_fini(&context)); + EXPECT_EQ(RMW_RET_OK, rmw_init_options_fini(&init_options)); + } + + rmw_init_options_t init_options{rmw_get_zero_initialized_init_options()}; + rmw_context_t context{rmw_get_zero_initialized_context()}; + rmw_node_t * node{nullptr}; + rmw_publisher_options_t pub_options = rmw_get_default_publisher_options(); + rmw_subscription_options_t sub_options = rmw_get_default_subscription_options(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, BasicTypes); + const char * topic_name = "/message_lost_deadlock"; +}; + +TEST_F(TestEventMessageLostDeadlock, take_event_does_not_deadlock_with_on_sample_lost) +{ + // Shallow, best-effort QoS so a non-draining reader under a publisher flood drops + // samples and generates SAMPLE_LOST. + rmw_qos_profile_t qos = rmw_qos_profile_default; + qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + qos.depth = 1; + qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + + rmw_publisher_t * pub = rmw_create_publisher(node, ts, topic_name, &qos, &pub_options); + ASSERT_NE(nullptr, pub) << rmw_get_error_string().str; + rmw_subscription_t * sub = rmw_create_subscription(node, ts, topic_name, &qos, &sub_options); + ASSERT_NE(nullptr, sub) << rmw_get_error_string().str; + + std::this_thread::sleep_for(kDiscovery); // let endpoints match + + // Arm the MESSAGE_LOST event callback so the implementation delivers on_sample_lost + // under the reader mutex -- the R->E edge of the inversion. + rmw_event_t event{rmw_get_zero_initialized_event()}; + ASSERT_EQ(RMW_RET_OK, rmw_subscription_event_init(&event, sub, RMW_EVENT_MESSAGE_LOST)) + << rmw_get_error_string().str; + + std::atomic events_seen{0}; + rmw_event_callback_t cb = [](const void * user_data, size_t /*number_of_events*/) { + auto * counter = static_cast *>(const_cast(user_data)); + counter->fetch_add(1, std::memory_order_relaxed); + }; + ASSERT_EQ(RMW_RET_OK, rmw_event_set_callback(&event, cb, &events_seen)) + << rmw_get_error_string().str; + + std::atomic stop{false}; + std::atomic take_iters{0}; + + // Publisher flood: messages are never taken on the reader, so the depth-1 history + // overruns and SAMPLE_LOST fires on the receive thread (which holds the reader mutex + // when it invokes on_sample_lost). + std::thread flood([&]() { + test_msgs__msg__BasicTypes msg; + test_msgs__msg__BasicTypes__init(&msg); + while (!stop.load(std::memory_order_relaxed)) { + (void)rmw_publish(pub, &msg, nullptr); + } + test_msgs__msg__BasicTypes__fini(&msg); + }); + + // take_event loop: take_event locks the event mutex, then queries the reader's + // sample-lost status (reader mutex) -- the E->R edge. + std::thread taker([&]() { + rmw_message_lost_status_t status; + bool taken = false; + while (!stop.load(std::memory_order_relaxed)) { + (void)rmw_take_event(&event, &status, &taken); + take_iters.fetch_add(1, std::memory_order_relaxed); + } + }); + + // Watchdog: the deadlock wedges the taker (stuck acquiring the reader mutex), so its + // iteration counter freezes while loss continues. The best-effort publisher keeps + // running regardless, so we watch the taker specifically. + const auto start = std::chrono::steady_clock::now(); + auto last_progress = start; + uint64_t last_take = 0; + bool deadlocked = false; + + while (std::chrono::steady_clock::now() - start < kMaxRunTime) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + const uint64_t now_take = take_iters.load(std::memory_order_relaxed); + if (now_take != last_take) { + last_take = now_take; + last_progress = std::chrono::steady_clock::now(); + continue; + } + // No take progress. Only call it a deadlock once loss has actually been generated, + // i.e. the R->E edge is genuinely being exercised. + if (events_seen.load(std::memory_order_relaxed) > 0 && + std::chrono::steady_clock::now() - last_progress > kStallWindow) + { + deadlocked = true; + break; + } + } + + stop.store(true, std::memory_order_relaxed); + + if (deadlocked) { + // The workers are wedged on the inverted mutexes and cannot be joined or recovered + // (TearDown would also wedge destroying DDS entities). Detach and terminate non-zero + // so the test is recorded as a failure instead of hanging. + flood.detach(); + taker.detach(); + std::cerr + << "\nDEADLOCK DETECTED: rmw_take_event(MESSAGE_LOST) (event mutex -> reader mutex) " + "deadlocked against on_sample_lost (reader mutex -> event mutex).\n" + << " take_event iterations before stall: " << take_iters.load() << "\n" + << " sample-lost events observed: " << events_seen.load() << "\n"; + std::quick_exit(1); + } + + flood.join(); + taker.join(); + + EXPECT_EQ(RMW_RET_OK, rmw_event_fini(&event)); + EXPECT_EQ(RMW_RET_OK, rmw_destroy_subscription(node, sub)); + EXPECT_EQ(RMW_RET_OK, rmw_destroy_publisher(node, pub)); + + // If no loss was ever generated, the R->E edge never ran, so the scenario was not + // exercised and "no deadlock" proves nothing. This happens when intra-process delivery + // was not disabled, or on an implementation that does not produce SAMPLE_LOST for a + // depth-1 best-effort reader that never takes. Skip cleanly rather than reporting a + // misleading pass or a failure unrelated to any deadlock. + if (events_seen.load() == 0u) { + GTEST_SKIP() + << "No SAMPLE_LOST was generated, so the lock-order inversion was not exercised. " + "Ensure intra-process delivery is disabled (see the profile in the CMake env)."; + } +}