Skip to content

Fix deadlock in take_event and set_on_new_event_callback#890

Open
thomasmoore-torc wants to merge 2 commits into
ros2:rollingfrom
thomasmoore-torc:take_event_deadlock
Open

Fix deadlock in take_event and set_on_new_event_callback#890
thomasmoore-torc wants to merge 2 commits into
ros2:rollingfrom
thomasmoore-torc:take_event_deadlock

Conversation

@thomasmoore-torc

Copy link
Copy Markdown

Description

Using the EventsExecutor together with a subscription QoS-status event callback (e.g. a message_lost_callback) can produce an AB-BA deadlock between Fast DDS and rmw_fastrtps:

  • A Fast DDS receive thread holds the DataReader's internal mutex while invoking the status listener (on_sample_lost), which calls into rmw_fastrtps and blocks taking the subscription's event mutex (on_new_event_m_).
  • Concurrently, the EventsExecutor thread holds that event mutex inside take_event() and calls back into the DataReader (get_sample_lost_status), blocking on the reader mutex.

Each thread holds the mutex the other needs, in the opposite order, so they deadlock — wedging the reader mutex and stalling the receive path.

The same lock-ordering applies to every QoS-status event (liveliness, deadline, requested/offered-incompatible-qos, matched, sample/message-lost), in both take_event() and set_on_new_event_callback(), on the subscription and publication event paths. This PR breaks the chain uniformly: on_new_event_m_ is no longer held across calls into the DataReader/DataWriter status getters — each status is copied into a local with the lock released, then the lock is re-acquired and the cached value adopted only if a listener update did not arrive in the meantime. get_inconsistent_topic_status() is intentionally left under the lock, since it operates on the Topic and does not take the reader/writer mutex.

Is this user-facing behavior change?

No public API or contract change. The only observable difference is that the AB-BA deadlock described above no longer occurs; event-status delivery semantics are preserved (the lock is released only around the DDS status query, and a concurrent listener update is never discarded).

Did you use Generative AI?

Yes. The root-cause analysis and the code changes were developed with Claude Code (Claude Opus 4.8); the modifications to take_event() and set_on_new_event_callback() in custom_subscriber_info.cpp and custom_publisher_info.cpp were authored with its assistance and reviewed and validated by myself and my coleagues.

Additional Information

This was validated by exercising the affected path — an EventsExecutor with a message-lost event callback under sustained sample loss — which reliably reproduced the deadlock before this change and showed no recurrence afterward.

Note for reviewers: the diff is broader than the message-lost path that was observed to deadlock. The same fix is applied to all status getters and to both the subscription and publication event classes, since they share the identical lock-ordering pattern.

@thomasmoore-torc

Copy link
Copy Markdown
Author

@Mergifyio backport kilted jazzy humble

@mergify

mergify Bot commented Jun 15, 2026

Copy link
Copy Markdown

backport kilted jazzy humble

☑️ Command disallowed due to command restrictions in the Mergify configuration.

Details
  • sender-permission >= write

Signed-off-by: Thomas Moore <thomas.moore@torc.ai>
…ent deadlock

Signed-off-by: Thomas Moore <thomas.moore@torc.ai>
@MiguelCompany

Copy link
Copy Markdown
Collaborator

I wonder why this has not been caught by the thread sanitizer job.

Oh! I see. It is only run for packages rcpputils rcutils.

Running for test_rmw_implementation without the changes to see if the deadlock is already reported:

Build Status

@MiguelCompany

Copy link
Copy Markdown
Collaborator

Result for the events test does not show lock order inversion:

      Start 16: test_event__rmw_fastrtps_cpp

16: Test command: /usr/bin/python3 "-u" "/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/install/ament_cmake_ros/share/ament_cmake_ros/cmake/run_test_isolated.py" "/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/test_results/test_rmw_implementation/test_event__rmw_fastrtps_cpp.gtest.xml" "--package-name" "test_rmw_implementation" "--output-file" "/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/ament_cmake_gtest/test_event__rmw_fastrtps_cpp.txt" "--env" "RMW_IMPLEMENTATION=rmw_fastrtps_cpp" "--command" "/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/test_event" "--gtest_output=xml:/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/test_results/test_rmw_implementation/test_event__rmw_fastrtps_cpp.gtest.xml"
16: Working Directory: /home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation
16: Test timeout computed to be: 60
16: -- run_test.py: extra environment variables:
16:  - RMW_IMPLEMENTATION=rmw_fastrtps_cpp
16: -- run_test.py: invoking following command in '/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation':
16:  - /home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/install/rmw_test_fixture_implementation/lib/rmw_test_fixture_implementation/run_rmw_isolated /home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/test_event --gtest_output=xml:/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/test_results/test_rmw_implementation/test_event__rmw_fastrtps_cpp.gtest.xml
16: Running main() from ./googletest/src/gtest_main.cc
16: [==========] Running 4 tests from 1 test suite.
16: [----------] Global test environment set-up.
16: [----------] 4 tests from TestEvent
16: [ RUN      ] TestEvent.basic_publisher_matched_event
16: [       OK ] TestEvent.basic_publisher_matched_event (489 ms)
16: [ RUN      ] TestEvent.basic_subscription_matched_event
16: [       OK ] TestEvent.basic_subscription_matched_event (433 ms)
16: [ RUN      ] TestEvent.one_pub_multi_sub_connect_disconnect
16: [       OK ] TestEvent.one_pub_multi_sub_connect_disconnect (132 ms)
16: [ RUN      ] TestEvent.one_sub_multi_pub_matched_unmatched_event
16: [       OK ] TestEvent.one_sub_multi_pub_matched_unmatched_event (133 ms)
16: [----------] 4 tests from TestEvent (1189 ms total)
16: 
16: [----------] Global test environment tear-down
16: [==========] 4 tests from 1 test suite ran. (1189 ms total)
16: [  PASSED  ] 4 tests.
16: -- run_test.py: return code 0
16: -- run_test.py: inject classname prefix into gtest result file '/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/test_results/test_rmw_implementation/test_event__rmw_fastrtps_cpp.gtest.xml'
16: -- run_test.py: verify result file '/home/jenkins-agent/workspace/nightly_linux_thread_sanitizer/ws/build/test_rmw_implementation/test_results/test_rmw_implementation/test_event__rmw_fastrtps_cpp.gtest.xml'
16/38 Test #16: test_event__rmw_fastrtps_cpp ..........................   Passed    2.52 sec

@thomasmoore-torc

Copy link
Copy Markdown
Author

@MiguelCompany - that test is insufficient to trigger the AB-BA deadlock. I've created a test in a branch of rmw_implementation that demonstrates the issue when run against the current rolling branch of rmw_fastrtps. Here's the relevant output of running the test with TSAN enabled:

WARNING: ThreadSanitizer: lock-order-inversion (potential deadlock) (pid=1759)
  Cycle in lock order graph: M0 (0x725c00002868) => M1 (0x725800003d88) => M0

  Mutex M1 acquired here while holding mutex M0 in main thread:
    #0 pthread_mutex_lock ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 2a13a7710e361d06f7babbea53065ca2be93f738)
    #1 eprosima::fastdds::dds::DataReaderImpl::get_sample_lost_status(eprosima::fastdds::dds::BaseStatus&) <null> (libfastdds.so.3.6+0x2f5e8f) (BuildId: 4876362f9e32f84c8de4cabe21943a7e824e50df)
    #2 rmw_fastrtps_shared_cpp::__rmw_event_set_callback(rmw_event_s*, void (*)(void const*, unsigned long), void const*) /root/ws/src/rmw_fastrtps/rmw_fastrtps_shared_cpp/src/rmw_event.cpp:160 (librmw_fastrtps_shared_cpp.so+0x7ea51) (BuildId: bf3e92c2fa8c596c57cb845fbc19a000efc9cd77)
    #3 rmw_event_set_callback /root/ws/src/rmw_fastrtps/rmw_fastrtps_cpp/src/rmw_event.cpp:59 (librmw_fastrtps_cpp.so+0x5c605) (BuildId: 9b3ff1ce4ed3e15ab82d0d4484b7111af698f51f)
    #4 TestEventMessageLostDeadlock_take_event_does_not_deadlock_with_on_sample_lost_Test::TestBody() /root/ws/src/rmw_implementation/test_rmw_implementation/test/test_event_message_lost_deadlock.cpp:135 (test_event_message_lost_deadlock+0x11c76) (BuildId: 585a1db1b0a675f95e137685aac2f5668e8228b2)
    #5 void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) <null> (test_event_message_lost_deadlock+0x50dae) (BuildId: 585a1db1b0a675f95e137685aac2f5668e8228b2)

  Mutex M0 previously acquired by the same thread here:
    #0 pthread_mutex_lock ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 2a13a7710e361d06f7babbea53065ca2be93f738)
    #1 __gthread_mutex_lock /usr/include/x86_64-linux-gnu/c++/13/bits/gthr-default.h:749 (librmw_fastrtps_shared_cpp.so+0x4db11) (BuildId: bf3e92c2fa8c596c57cb845fbc19a000efc9cd77)
    #2 std::mutex::lock() /usr/include/c++/13/bits/std_mutex.h:113 (librmw_fastrtps_shared_cpp.so+0x4db11)
    #3 std::unique_lock<std::mutex>::lock() /usr/include/c++/13/bits/unique_lock.h:141 (librmw_fastrtps_shared_cpp.so+0x4db11)
    #4 std::unique_lock<std::mutex>::unique_lock(std::mutex&) /usr/include/c++/13/bits/unique_lock.h:71 (librmw_fastrtps_shared_cpp.so+0x4db11)
    #5 rcpputils::unique_lock<std::mutex>::unique_lock(std::mutex&) /opt/ros/rolling/include/rcpputils/rcpputils/unique_lock.hpp:35 (librmw_fastrtps_shared_cpp.so+0x4db11)
    #6 RMWSubscriptionEvent::set_on_new_event_callback(rmw_event_type_e, void const*, void (*)(void const*, unsigned long)) /root/ws/src/rmw_fastrtps/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp:242 (librmw_fastrtps_shared_cpp.so+0x4db11)
    #7 rmw_fastrtps_shared_cpp::__rmw_event_set_callback(rmw_event_s*, void (*)(void const*, unsigned long), void const*) /root/ws/src/rmw_fastrtps/rmw_fastrtps_shared_cpp/src/rmw_event.cpp:160 (librmw_fastrtps_shared_cpp.so+0x7ea51) (BuildId: bf3e92c2fa8c596c57cb845fbc19a000efc9cd77)
    #8 rmw_event_set_callback /root/ws/src/rmw_fastrtps/rmw_fastrtps_cpp/src/rmw_event.cpp:59 (librmw_fastrtps_cpp.so+0x5c605) (BuildId: 9b3ff1ce4ed3e15ab82d0d4484b7111af698f51f)
    #9 TestEventMessageLostDeadlock_take_event_does_not_deadlock_with_on_sample_lost_Test::TestBody() /root/ws/src/rmw_implementation/test_rmw_implementation/test/test_event_message_lost_deadlock.cpp:135 (test_event_message_lost_deadlock+0x11c76) (BuildId: 585a1db1b0a675f95e137685aac2f5668e8228b2)
    #10 void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) <null> (test_event_message_lost_deadlock+0x50dae) (BuildId: 585a1db1b0a675f95e137685aac2f5668e8228b2)

  Mutex M0 acquired here while holding mutex M1 in thread T4:
    #0 pthread_mutex_lock ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 2a13a7710e361d06f7babbea53065ca2be93f738)
    #1 __gthread_mutex_lock /usr/include/x86_64-linux-gnu/c++/13/bits/gthr-default.h:749 (librmw_fastrtps_shared_cpp.so+0x4c95f) (BuildId: bf3e92c2fa8c596c57cb845fbc19a000efc9cd77)
    #2 std::mutex::lock() /usr/include/c++/13/bits/std_mutex.h:113 (librmw_fastrtps_shared_cpp.so+0x4c95f)
    #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) /usr/include/c++/13/bits/std_mutex.h:249 (librmw_fastrtps_shared_cpp.so+0x4c95f)
    #4 RMWSubscriptionEvent::update_sample_lost(unsigned int, unsigned int) /root/ws/src/rmw_fastrtps/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp:449 (librmw_fastrtps_shared_cpp.so+0x4c95f)
    #5 CustomDataReaderListener::on_sample_lost(eprosima::fastdds::dds::DataReader*, eprosima::fastdds::dds::BaseStatus const&) /root/ws/src/rmw_fastrtps/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp:97 (librmw_fastrtps_shared_cpp.so+0x4ca40) (BuildId: bf3e92c2fa8c596c57cb845fbc19a000efc9cd77)
    #6 eprosima::fastdds::dds::DataReaderImpl::InnerDataReaderListener::on_sample_lost(eprosima::fastdds::rtps::RTPSReader*, int) <null> (libfastdds.so.3.6+0x2f84de) (BuildId: 4876362f9e32f84c8de4cabe21943a7e824e50df)

  Mutex M1 previously acquired by the same thread here:
    #0 pthread_mutex_lock ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1341 (libtsan.so.2+0x59a13) (BuildId: 2a13a7710e361d06f7babbea53065ca2be93f738)
    #1 eprosima::fastdds::rtps::StatelessReader::process_data_msg(eprosima::fastdds::rtps::CacheChange_t*) <null> (libfastdds.so.3.6+0x5867d6) (BuildId: 4876362f9e32f84c8de4cabe21943a7e824e50df)

  Thread T4 'dds.shm.7411' (tid=1769, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1022 (libtsan.so.2+0x5ac1a) (BuildId: 2a13a7710e361d06f7babbea53065ca2be93f738)
    #1 eprosima::thread::start_thread_impl(int, void* (*)(void*), void*) <null> (libfastdds.so.3.6+0x60633a) (BuildId: 4876362f9e32f84c8de4cabe21943a7e824e50df)
    #2 init_context_impl /root/ws/src/rmw_fastrtps/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp:82 (librmw_fastrtps_cpp.so+0x38090) (BuildId: 9b3ff1ce4ed3e15ab82d0d4484b7111af698f51f)
    #3 rmw_fastrtps_cpp::increment_context_impl_ref_count(rmw_context_s*) /root/ws/src/rmw_fastrtps/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp:244 (librmw_fastrtps_cpp.so+0x39597) (BuildId: 9b3ff1ce4ed3e15ab82d0d4484b7111af698f51f)
    #4 rmw_create_node /root/ws/src/rmw_fastrtps/rmw_fastrtps_cpp/src/rmw_node.cpp:60 (librmw_fastrtps_cpp.so+0x5e70f) (BuildId: 9b3ff1ce4ed3e15ab82d0d4484b7111af698f51f)
    #5 TestEventMessageLostDeadlock::SetUp() /root/ws/src/rmw_implementation/test_rmw_implementation/test/test_event_message_lost_deadlock.cpp:84 (test_event_message_lost_deadlock+0x1713a) (BuildId: 585a1db1b0a675f95e137685aac2f5668e8228b2)
    #6 void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) <null> (test_event_message_lost_deadlock+0x50dae) (BuildId: 585a1db1b0a675f95e137685aac2f5668e8228b2)

@MiguelCompany

Copy link
Copy Markdown
Collaborator

@thomasmoore-torc Great! Please open a PR on rmw_implementation with the new test

@thomasmoore-torc

Copy link
Copy Markdown
Author

@MiguelCompany - I've created the PR at ros2/rmw_implementation#278

@fujitatomoya fujitatomoya left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm with reproducible test in rmw test.

@fujitatomoya

Copy link
Copy Markdown
Collaborator

Pulls: #890, ros2/rmw_implementation#278
Gist: https://gist.githubusercontent.com/fujitatomoya/e30a6f121dec854fa58db56e23dd4ff1/raw/e44ebf55fe7729cb041494bb0a3b70aba1d9acab/ros2.repos
BUILD args: --packages-above-and-dependencies rmw_fastrtps_shared_cpp test_rmw_implementation
TEST args: --packages-above rmw_fastrtps_shared_cpp test_rmw_implementation
ROS Distro: rolling
Job: ci_launcher
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/19606

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Linux-rhel Build Status
  • Windows Build Status

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants