diff --git a/callback_isolated_executor/CMakeLists.txt b/callback_isolated_executor/CMakeLists.txt index 295cfeb..bf31569 100644 --- a/callback_isolated_executor/CMakeLists.txt +++ b/callback_isolated_executor/CMakeLists.txt @@ -30,8 +30,8 @@ ament_target_dependencies(component_container_callback_isolated rclcpp rclcpp_co add_executable(component_container_single src/component_container_single.cpp) ament_target_dependencies(component_container_single rclcpp rclcpp_components cie_thread_configurator) -add_library(callback_isolated_executor SHARED src/callback_isolated_executor.cpp) -ament_target_dependencies(callback_isolated_executor rclcpp cie_thread_configurator) +add_library(callback_isolated_executor SHARED src/callback_isolated_executor.cpp src/multi_threaded_executor_internal.cpp) +ament_target_dependencies(callback_isolated_executor rclcpp cie_thread_configurator cie_config_msgs) target_include_directories(callback_isolated_executor PUBLIC $ $ diff --git a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp index c243669..226b601 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp @@ -1,9 +1,16 @@ #pragma once + +#include "cie_config_msgs/msg/callback_group_info.hpp" #include "rclcpp/rclcpp.hpp" class CallbackIsolatedExecutor : public rclcpp::Executor { RCLCPP_DISABLE_COPY(CallbackIsolatedExecutor) + std::mutex client_publisher_mutex_; + rclcpp::Publisher::SharedPtr + client_publisher_; + size_t reentrant_parallelism_{4}; + // Nodes associated with this CallbackIsolatedExecutor, appended by add_node() // and removed by remove_node() std::set threads_; + std::vector tids_; // guarded by mtx_ + size_t ready_count_{0}; // guarded by mtx_ (# of threads that saved TID) + bool start_allowed_{false}; // guarded by mtx_ (run() allowed to proceed) + + // Synchronization in start phase + std::mutex mtx_; + std::condition_variable cv_all_ready_; + std::condition_variable cv_start_; + + std::mutex wait_mutex_; // to guard get_next_executable in run() + std::atomic_bool pre_spinning_{false}; + + void run(); + +public: + explicit MultiThreadedExecutorInternal(size_t number_of_threads) + : rclcpp::Executor(rclcpp::ExecutorOptions()), + number_of_threads_(number_of_threads) { + // hardcode for now + yield_before_execute_ = false; + next_exec_timeout_ = std::chrono::nanoseconds(-1); + } + + void pre_spin(); + + void spin() override; + + std::vector get_thread_ids(); +}; diff --git a/callback_isolated_executor/src/callback_isolated_executor.cpp b/callback_isolated_executor/src/callback_isolated_executor.cpp index c238c15..0cd31c5 100644 --- a/callback_isolated_executor/src/callback_isolated_executor.cpp +++ b/callback_isolated_executor/src/callback_isolated_executor.cpp @@ -8,10 +8,13 @@ #include "rclcpp/rclcpp.hpp" #include "callback_isolated_executor/callback_isolated_executor.hpp" +#include "callback_isolated_executor/multi_threaded_executor_internal.hpp" CallbackIsolatedExecutor::CallbackIsolatedExecutor( const rclcpp::ExecutorOptions &options) - : rclcpp::Executor(options) {} + : rclcpp::Executor(options) { + client_publisher_ = cie_thread_configurator::create_client_publisher(); +} void CallbackIsolatedExecutor::spin() { std::vector threads; @@ -52,28 +55,17 @@ void CallbackIsolatedExecutor::spin() { } } // guard mutex_ - std::mutex client_publisher_mutex; - auto client_publisher = cie_thread_configurator::create_client_publisher(); - for (auto [group, node] : groups_and_nodes) { - auto executor = - std::make_shared(); - executor->add_callback_group(group, node); - auto callback_group_id = - cie_thread_configurator::create_callback_group_id(group, node); - - threads.emplace_back([executor, callback_group_id, &client_publisher, - &client_publisher_mutex]() { - auto tid = static_cast(syscall(SYS_gettid)); - - { - std::lock_guard lock{client_publisher_mutex}; - cie_thread_configurator::publish_callback_group_info( - client_publisher, tid, callback_group_id); - } - - executor->spin(); - }); + if (group->type() == rclcpp::CallbackGroupType::Reentrant && + reentrant_parallelism_ >= 2) { + threads.emplace_back( + &CallbackIsolatedExecutor::spin_reentrant_callback_group, this, group, + node); + } else { + threads.emplace_back( + &CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group, + this, group, node); + } } for (auto &t : threads) { @@ -81,6 +73,47 @@ void CallbackIsolatedExecutor::spin() { } } +void CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group( + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { + auto executor = std::make_shared(); + executor->add_callback_group(group, node); + auto callback_group_id = + cie_thread_configurator::create_callback_group_id(group, node); + auto tid = static_cast(syscall(SYS_gettid)); + + { + std::lock_guard lock{client_publisher_mutex_}; + cie_thread_configurator::publish_callback_group_info(client_publisher_, tid, + callback_group_id); + } + + executor->spin(); +} + +void CallbackIsolatedExecutor::spin_reentrant_callback_group( + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { + auto executor = + std::make_shared(reentrant_parallelism_); + executor->add_callback_group(group, node); + auto callback_group_id = + cie_thread_configurator::create_callback_group_id(group, node); + + executor->pre_spin(); + auto tids = executor->get_thread_ids(); + + { + std::lock_guard lock{client_publisher_mutex_}; + for (auto tid : tids) { + cie_thread_configurator::publish_callback_group_info( + client_publisher_, tid, callback_group_id); + } + } + + executor->spin(); +} + void CallbackIsolatedExecutor::add_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, diff --git a/callback_isolated_executor/src/multi_threaded_executor_internal.cpp b/callback_isolated_executor/src/multi_threaded_executor_internal.cpp new file mode 100644 index 0000000..e79a932 --- /dev/null +++ b/callback_isolated_executor/src/multi_threaded_executor_internal.cpp @@ -0,0 +1,115 @@ +#include + +#include "callback_isolated_executor/multi_threaded_executor_internal.hpp" + +void MultiThreadedExecutorInternal::pre_spin() { + if (pre_spinning_.exchange(true)) { + throw std::runtime_error("pre_spin() called while already pre-spinning"); + } + + { + std::lock_guard lock{mtx_}; + assert(threads_.empty() && tids_.empty()); + ready_count_ = 0; + start_allowed_ = false; + } + + threads_.reserve(number_of_threads_); + for (size_t i = 0; i < number_of_threads_; i++) { + threads_.emplace_back(&MultiThreadedExecutorInternal::run, this); + } +} + +void MultiThreadedExecutorInternal::run() { + auto tid = static_cast(syscall(SYS_gettid)); + bool will_notify = false; + + { + std::lock_guard lock{wait_mutex_}; + tids_.push_back(tid); + ready_count_++; + if (ready_count_ == number_of_threads_) { + will_notify = true; + } + } + + if (will_notify) { + cv_all_ready_.notify_one(); + } + + { + std::unique_lock lock{mtx_}; + cv_start_.wait(lock, [this]() { return start_allowed_; }); + } + + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_exec; + + { + std::lock_guard wait_lock{wait_mutex_}; + + if (!rclcpp::ok(this->context_) || !spinning.load()) { + return; + } + + if (!get_next_executable(any_exec, next_exec_timeout_)) { + continue; + } + } + + if (yield_before_execute_) { + std::this_thread::yield(); + } + + execute_any_executable(any_exec); + + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); + } +} + +void MultiThreadedExecutorInternal::spin() { + if (!pre_spinning_.load()) { + throw std::runtime_error("spin() called without pre_spin()"); + } + + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + + RCPPUTILS_SCOPE_EXIT(pre_spinning_.store(false);); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);); + + { + std::unique_lock lock{mtx_}; + cv_all_ready_.wait(lock, + [this]() { return ready_count_ == number_of_threads_; }); + } + + { + std::lock_guard lock{mtx_}; + start_allowed_ = true; + } + + cv_start_.notify_all(); + + for (auto &thread : threads_) { + thread.join(); + } +} + +std::vector MultiThreadedExecutorInternal::get_thread_ids() { + if (!pre_spinning_.load()) { + throw std::runtime_error("get_thread_ids() called without pre_spin()"); + } + + { + std::unique_lock lock{mtx_}; + cv_all_ready_.wait(lock, + [this]() { return ready_count_ == number_of_threads_; }); + } + + std::lock_guard lock{mtx_}; + return tids_; +} diff --git a/cie_sample_application/CMakeLists.txt b/cie_sample_application/CMakeLists.txt index 771b0b3..8a1a39b 100644 --- a/cie_sample_application/CMakeLists.txt +++ b/cie_sample_application/CMakeLists.txt @@ -33,8 +33,18 @@ add_executable(sample_node_main src/sample_node_main.cpp) target_link_libraries(sample_node_main sample_node) ament_target_dependencies(sample_node_main rclcpp callback_isolated_executor) +# New reentrant node component and executable +add_library(reentrant_node SHARED src/reentrant_node.cpp) +ament_target_dependencies(reentrant_node rclcpp rclcpp_components std_msgs) +rclcpp_components_register_nodes(reentrant_node "ReentrantNode") + +add_executable(reentrant_node_main src/reentrant_node_main.cpp) +target_link_libraries(reentrant_node_main reentrant_node) +ament_target_dependencies(reentrant_node_main rclcpp callback_isolated_executor) + install(TARGETS sample_node_main + reentrant_node_main DESTINATION lib/${PROJECT_NAME} ) @@ -42,7 +52,7 @@ install(DIRECTORY include/ DESTINATION include ) -install(TARGETS sample_node +install(TARGETS sample_node reentrant_node EXPORT export_${PROJECT_NAME} DESTINATION lib ) @@ -50,6 +60,6 @@ install(TARGETS sample_node install(DIRECTORY launch/ DESTINATION share/${PROJECT_NAME}/launch) ament_export_include_directories(include) -ament_export_libraries(sample_node) +ament_export_libraries(sample_node reentrant_node) ament_package() diff --git a/cie_sample_application/include/cie_sample_application/reentrant_node.hpp b/cie_sample_application/include/cie_sample_application/reentrant_node.hpp new file mode 100644 index 0000000..4d35c57 --- /dev/null +++ b/cie_sample_application/include/cie_sample_application/reentrant_node.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/int32.hpp" + +class ReentrantNode : public rclcpp::Node { +public: + explicit ReentrantNode( + const rclcpp::NodeOptions &options = rclcpp::NodeOptions()); + +private: + void timer_callback_1(); + void timer_callback_2(); + void subscription_callback(const std_msgs::msg::Int32::SharedPtr msg); + + rclcpp::TimerBase::SharedPtr timer1_; + rclcpp::TimerBase::SharedPtr timer2_; + rclcpp::Publisher::SharedPtr pub_; + rclcpp::Subscription::SharedPtr sub_; + + // Single reentrant group to allow parallel callbacks + rclcpp::CallbackGroup::SharedPtr reentrant_group_; + + std::atomic count_{0}; +}; diff --git a/cie_sample_application/src/reentrant_node.cpp b/cie_sample_application/src/reentrant_node.cpp new file mode 100644 index 0000000..3425e10 --- /dev/null +++ b/cie_sample_application/src/reentrant_node.cpp @@ -0,0 +1,35 @@ +#include +#include +#include + +#include "cie_sample_application/reentrant_node.hpp" +#include "rclcpp_components/register_node_macro.hpp" + +using namespace std::chrono_literals; + +ReentrantNode::ReentrantNode(const rclcpp::NodeOptions &options) + : Node("reentrant_node", options) { + // Create a single reentrant callback group + reentrant_group_ = + create_callback_group(rclcpp::CallbackGroupType::Reentrant); + + timer1_ = this->create_wall_timer( + 100ms, std::bind(&ReentrantNode::timer_callback_1, this), + reentrant_group_); + timer2_ = this->create_wall_timer( + 1000ms, std::bind(&ReentrantNode::timer_callback_2, this), + reentrant_group_); +} + +void ReentrantNode::timer_callback_1() { + long tid = syscall(SYS_gettid); + RCLCPP_INFO(this->get_logger(), "Timer1 (T=100ms) (tid=%ld)", tid); +} + +void ReentrantNode::timer_callback_2() { + std::this_thread::sleep_for(900ms); + long tid = syscall(SYS_gettid); + RCLCPP_INFO(this->get_logger(), "Timer2 (T=1000ms) (tid=%ld)", tid); +} + +RCLCPP_COMPONENTS_REGISTER_NODE(ReentrantNode) diff --git a/cie_sample_application/src/reentrant_node_main.cpp b/cie_sample_application/src/reentrant_node_main.cpp new file mode 100644 index 0000000..13863b3 --- /dev/null +++ b/cie_sample_application/src/reentrant_node_main.cpp @@ -0,0 +1,16 @@ +#include "callback_isolated_executor/callback_isolated_executor.hpp" +#include "cie_sample_application/reentrant_node.hpp" + +int main(int argc, char *argv[]) { + rclcpp::init(argc, argv); + + auto node = std::make_shared(); + auto executor = std::make_shared(); + // executor->set_reentrant_parallelism(4); + + executor->add_node(node); + executor->spin(); + + rclcpp::shutdown(); + return 0; +}