Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions callback_isolated_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ ament_target_dependencies(component_container_callback_isolated rclcpp rclcpp_co
add_executable(component_container_single src/component_container_single.cpp)
ament_target_dependencies(component_container_single rclcpp rclcpp_components cie_thread_configurator)

add_library(callback_isolated_executor SHARED src/callback_isolated_executor.cpp)
ament_target_dependencies(callback_isolated_executor rclcpp cie_thread_configurator)
add_library(callback_isolated_executor SHARED src/callback_isolated_executor.cpp src/multi_threaded_executor_internal.cpp)
ament_target_dependencies(callback_isolated_executor rclcpp cie_thread_configurator cie_config_msgs)
target_include_directories(callback_isolated_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
#pragma once

#include "cie_config_msgs/msg/callback_group_info.hpp"
#include "rclcpp/rclcpp.hpp"

class CallbackIsolatedExecutor : public rclcpp::Executor {
RCLCPP_DISABLE_COPY(CallbackIsolatedExecutor)

std::mutex client_publisher_mutex_;
rclcpp::Publisher<cie_config_msgs::msg::CallbackGroupInfo>::SharedPtr
client_publisher_;
size_t reentrant_parallelism_{4};

// Nodes associated with this CallbackIsolatedExecutor, appended by add_node()
// and removed by remove_node()
std::set<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
Expand All @@ -26,6 +33,14 @@ class CallbackIsolatedExecutor : public rclcpp::Executor {
get_automatically_added_callback_groups_from_nodes_internal() const
RCPPUTILS_TSA_REQUIRES(mutex_);

void spin_mutually_exclusive_callback_group(
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

void spin_reentrant_callback_group(
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

public:
RCLCPP_PUBLIC
explicit CallbackIsolatedExecutor(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include "rclcpp/rclcpp.hpp"

// To implement parallelism for reentrant callback groups in
// CallbackIsolatedExecutor
class MultiThreadedExecutorInternal : public rclcpp::Executor {
RCLCPP_DISABLE_COPY(MultiThreadedExecutorInternal)

// Configuration
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;

// Thread management
std::vector<std::thread> threads_;
std::vector<pid_t> tids_; // guarded by mtx_
size_t ready_count_{0}; // guarded by mtx_ (# of threads that saved TID)
bool start_allowed_{false}; // guarded by mtx_ (run() allowed to proceed)

// Synchronization in start phase
std::mutex mtx_;
std::condition_variable cv_all_ready_;
std::condition_variable cv_start_;

std::mutex wait_mutex_; // to guard get_next_executable in run()
std::atomic_bool pre_spinning_{false};

void run();

public:
explicit MultiThreadedExecutorInternal(size_t number_of_threads)
: rclcpp::Executor(rclcpp::ExecutorOptions()),
number_of_threads_(number_of_threads) {
// hardcode for now
yield_before_execute_ = false;
next_exec_timeout_ = std::chrono::nanoseconds(-1);
}

void pre_spin();

void spin() override;

std::vector<pid_t> get_thread_ids();
};
77 changes: 55 additions & 22 deletions callback_isolated_executor/src/callback_isolated_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
#include "rclcpp/rclcpp.hpp"

#include "callback_isolated_executor/callback_isolated_executor.hpp"
#include "callback_isolated_executor/multi_threaded_executor_internal.hpp"

CallbackIsolatedExecutor::CallbackIsolatedExecutor(
const rclcpp::ExecutorOptions &options)
: rclcpp::Executor(options) {}
: rclcpp::Executor(options) {
client_publisher_ = cie_thread_configurator::create_client_publisher();
}

void CallbackIsolatedExecutor::spin() {
std::vector<std::thread> threads;
Expand Down Expand Up @@ -52,35 +55,65 @@ void CallbackIsolatedExecutor::spin() {
}
} // guard mutex_

std::mutex client_publisher_mutex;
auto client_publisher = cie_thread_configurator::create_client_publisher();

for (auto [group, node] : groups_and_nodes) {
auto executor =
std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor->add_callback_group(group, node);
auto callback_group_id =
cie_thread_configurator::create_callback_group_id(group, node);

threads.emplace_back([executor, callback_group_id, &client_publisher,
&client_publisher_mutex]() {
auto tid = static_cast<pid_t>(syscall(SYS_gettid));

{
std::lock_guard<std::mutex> lock{client_publisher_mutex};
cie_thread_configurator::publish_callback_group_info(
client_publisher, tid, callback_group_id);
}

executor->spin();
});
if (group->type() == rclcpp::CallbackGroupType::Reentrant &&
reentrant_parallelism_ >= 2) {
threads.emplace_back(
&CallbackIsolatedExecutor::spin_reentrant_callback_group, this, group,
node);
} else {
threads.emplace_back(
&CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group,
this, group, node);
}
}

for (auto &t : threads) {
t.join();
}
}

void CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group(
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) {
auto executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor->add_callback_group(group, node);
auto callback_group_id =
cie_thread_configurator::create_callback_group_id(group, node);
auto tid = static_cast<pid_t>(syscall(SYS_gettid));

{
std::lock_guard<std::mutex> lock{client_publisher_mutex_};
cie_thread_configurator::publish_callback_group_info(client_publisher_, tid,
callback_group_id);
}

executor->spin();
}

void CallbackIsolatedExecutor::spin_reentrant_callback_group(
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) {
auto executor =
std::make_shared<MultiThreadedExecutorInternal>(reentrant_parallelism_);
executor->add_callback_group(group, node);
auto callback_group_id =
cie_thread_configurator::create_callback_group_id(group, node);

executor->pre_spin();
auto tids = executor->get_thread_ids();

{
std::lock_guard<std::mutex> lock{client_publisher_mutex_};
for (auto tid : tids) {
cie_thread_configurator::publish_callback_group_info(
client_publisher_, tid, callback_group_id);
}
}

executor->spin();
}

void CallbackIsolatedExecutor::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
Expand Down
115 changes: 115 additions & 0 deletions callback_isolated_executor/src/multi_threaded_executor_internal.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include <sys/syscall.h>

#include "callback_isolated_executor/multi_threaded_executor_internal.hpp"

void MultiThreadedExecutorInternal::pre_spin() {
if (pre_spinning_.exchange(true)) {
throw std::runtime_error("pre_spin() called while already pre-spinning");
}

{
std::lock_guard<std::mutex> lock{mtx_};
assert(threads_.empty() && tids_.empty());
ready_count_ = 0;
start_allowed_ = false;
}

threads_.reserve(number_of_threads_);
for (size_t i = 0; i < number_of_threads_; i++) {
threads_.emplace_back(&MultiThreadedExecutorInternal::run, this);
}
}

void MultiThreadedExecutorInternal::run() {
auto tid = static_cast<pid_t>(syscall(SYS_gettid));
bool will_notify = false;

{
std::lock_guard<std::mutex> lock{wait_mutex_};
tids_.push_back(tid);
ready_count_++;
if (ready_count_ == number_of_threads_) {
will_notify = true;
}
}

if (will_notify) {
cv_all_ready_.notify_one();
}

{
std::unique_lock<std::mutex> lock{mtx_};
cv_start_.wait(lock, [this]() { return start_allowed_; });
}

while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;

{
std::lock_guard wait_lock{wait_mutex_};

if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}

if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
}

if (yield_before_execute_) {
std::this_thread::yield();
}

execute_any_executable(any_exec);

// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
}
}

void MultiThreadedExecutorInternal::spin() {
if (!pre_spinning_.load()) {
throw std::runtime_error("spin() called without pre_spin()");
}

if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}

RCPPUTILS_SCOPE_EXIT(pre_spinning_.store(false););
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false););

{
std::unique_lock<std::mutex> lock{mtx_};
cv_all_ready_.wait(lock,
[this]() { return ready_count_ == number_of_threads_; });
}

{
std::lock_guard<std::mutex> lock{mtx_};
start_allowed_ = true;
}

cv_start_.notify_all();

for (auto &thread : threads_) {
thread.join();
}
}

std::vector<pid_t> MultiThreadedExecutorInternal::get_thread_ids() {
if (!pre_spinning_.load()) {
throw std::runtime_error("get_thread_ids() called without pre_spin()");
}

{
std::unique_lock<std::mutex> lock{mtx_};
cv_all_ready_.wait(lock,
[this]() { return ready_count_ == number_of_threads_; });
}

std::lock_guard<std::mutex> lock{mtx_};
return tids_;
}
14 changes: 12 additions & 2 deletions cie_sample_application/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,33 @@ add_executable(sample_node_main src/sample_node_main.cpp)
target_link_libraries(sample_node_main sample_node)
ament_target_dependencies(sample_node_main rclcpp callback_isolated_executor)

# New reentrant node component and executable
add_library(reentrant_node SHARED src/reentrant_node.cpp)
ament_target_dependencies(reentrant_node rclcpp rclcpp_components std_msgs)
rclcpp_components_register_nodes(reentrant_node "ReentrantNode")

add_executable(reentrant_node_main src/reentrant_node_main.cpp)
target_link_libraries(reentrant_node_main reentrant_node)
ament_target_dependencies(reentrant_node_main rclcpp callback_isolated_executor)

install(TARGETS
sample_node_main
reentrant_node_main
DESTINATION lib/${PROJECT_NAME}
)

install(DIRECTORY include/
DESTINATION include
)

install(TARGETS sample_node
install(TARGETS sample_node reentrant_node
EXPORT export_${PROJECT_NAME}
DESTINATION lib
)

install(DIRECTORY launch/ DESTINATION share/${PROJECT_NAME}/launch)

ament_export_include_directories(include)
ament_export_libraries(sample_node)
ament_export_libraries(sample_node reentrant_node)

ament_package()
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

class ReentrantNode : public rclcpp::Node {
public:
explicit ReentrantNode(
const rclcpp::NodeOptions &options = rclcpp::NodeOptions());

private:
void timer_callback_1();
void timer_callback_2();
void subscription_callback(const std_msgs::msg::Int32::SharedPtr msg);

rclcpp::TimerBase::SharedPtr timer1_;
rclcpp::TimerBase::SharedPtr timer2_;
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;

// Single reentrant group to allow parallel callbacks
rclcpp::CallbackGroup::SharedPtr reentrant_group_;

std::atomic<int> count_{0};
};
Loading