Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ int main(int argc, char * argv[])
executor->spin();

rclcpp::shutdown();
} catch (rclcpp_components::ComponentManagerException & ex) {
std::cerr << "Exception caught in main: " << ex.what() << std::endl;
} catch (const std::exception & e) {
std::cerr << "Exception caught in main: " << e.what() << std::endl;
close(agnocast::agnocast_fd);
return EXIT_FAILURE;
} catch (...) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ int main(int argc, char * argv[])
executor->spin();

rclcpp::shutdown();
} catch (rclcpp_components::ComponentManagerException & ex) {
std::cerr << "Exception caught in main: " << ex.what() << std::endl;
} catch (const std::exception & e) {
std::cerr << "Exception caught in main: " << e.what() << std::endl;
close(agnocast::agnocast_fd);
return EXIT_FAILURE;
} catch (...) {
Expand Down
6 changes: 2 additions & 4 deletions src/agnocastlib/include/agnocast/agnocast_callback_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,8 @@ TypeErasedCallback get_erased_callback(Func && callback)
auto && typed_arg = static_cast<TypedMessagePtr<T> &&>(arg);
callback(std::move(typed_arg).get());
} else {
RCLCPP_ERROR(
logger, "Agnocast internal implementation error: bad allocation when callback is called");
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(
"Agnocast internal implementation error: bad allocation when callback is called");
Comment thread
Koichi98 marked this conversation as resolved.
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class CallbackIsolatedAgnocastExecutor : public rclcpp::Executor
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

std::atomic<bool> worker_thread_failed_{false};

// Mutex to protect weak_child_executors_ and child_threads_
mutable std::mutex child_resources_mutex_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "agnocast/agnocast_executor.hpp"
#include "rclcpp/rclcpp.hpp"

#include <atomic>
#include <chrono>

namespace agnocast
Expand All @@ -19,6 +20,8 @@ class MultiThreadedAgnocastExecutor : public agnocast::AgnocastExecutor
std::chrono::nanoseconds ros2_next_exec_timeout_;
const int agnocast_next_exec_timeout_ms_;

std::atomic<bool> worker_thread_failed_{false};

void ros2_spin();
void agnocast_spin();
bool validate_callback_group(const rclcpp::CallbackGroup::SharedPtr & group) const override;
Expand Down
4 changes: 1 addition & 3 deletions src/agnocastlib/include/agnocast/agnocast_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ class BasicPublisher
void publish(ipc_shared_ptr<MessageT> && message)
{
if (!message || topic_name_ != message.get_topic_name()) {
RCLCPP_ERROR(logger, "Invalid message to publish.");
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::invalid_argument("Invalid message to publish.");
Comment thread
Koichi98 marked this conversation as resolved.
}

// Capture raw pointer BEFORE invalidation (get() returns nullptr after invalidation).
Expand Down
9 changes: 3 additions & 6 deletions src/agnocastlib/include/agnocast/agnocast_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ rclcpp::CallbackGroup::SharedPtr get_valid_callback_group(

if (callback_group) {
if (!node->get_node_base_interface()->callback_group_in_node(callback_group)) {
RCLCPP_ERROR(logger, "Cannot create agnocast subscription, callback group not in node.");
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::invalid_argument(
"Cannot create agnocast subscription, callback group not in node.");
Comment thread
Koichi98 marked this conversation as resolved.
}
} else {
callback_group = node->get_node_base_interface()->get_default_callback_group();
Expand Down Expand Up @@ -296,9 +295,7 @@ class BasicTakeSubscription : public SubscriptionBase
std::lock_guard<std::mutex> lock(mmap_mtx);

if (ioctl(agnocast_fd, AGNOCAST_TAKE_MSG_CMD, &take_args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_TAKE_MSG_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(std::string("AGNOCAST_TAKE_MSG_CMD failed: ") + strerror(errno));
}

for (uint32_t i = 0; i < take_args.ret_pub_shm_num; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class AgnocastOnlyCallbackIsolatedExecutor : public AgnocastOnlyExecutor
std::mutex callback_group_created_cv_mutex_;
bool callback_group_created_{false};

std::atomic<bool> worker_thread_failed_{false};

// Mutex to protect weak_child_executors_ and child_threads_
mutable std::mutex child_resources_mutex_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class AgnocastOnlyMultiThreadedExecutor : public AgnocastOnlyExecutor
bool yield_before_execute_;
const int next_exec_timeout_ms_;

std::atomic<bool> worker_thread_failed_{false};

void agnocast_spin();

public:
Expand Down
4 changes: 1 addition & 3 deletions src/agnocastlib/src/agnocast_callback_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ void receive_and_execute_message(
std::lock_guard<std::mutex> lock(mmap_mtx);

if (ioctl(agnocast_fd, AGNOCAST_RECEIVE_MSG_CMD, &receive_args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_RECEIVE_MSG_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(std::string("AGNOCAST_RECEIVE_MSG_CMD failed: ") + strerror(errno));
Comment thread
Koichi98 marked this conversation as resolved.
}

// Map the shared memory region with read permissions whenever a new publisher is discovered.
Expand Down
14 changes: 12 additions & 2 deletions src/agnocastlib/src/agnocast_callback_isolated_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void CallbackIsolatedAgnocastExecutor::spin()

weak_child_executors_.push_back(executor);

child_threads_.emplace_back([executor, callback_group_id = std::move(callback_group_id),
child_threads_.emplace_back([this, executor, callback_group_id = std::move(callback_group_id),
&client_publisher, &client_publisher_mutex]() {
auto tid = static_cast<pid_t>(syscall(SYS_gettid));

Expand All @@ -98,7 +98,13 @@ void CallbackIsolatedAgnocastExecutor::spin()
agnocast::publish_callback_group_info(client_publisher, tid, callback_group_id);
}

executor->spin();
try {
executor->spin();
} catch (const std::exception & e) {
RCLCPP_ERROR(logger, "Exception in agnocast child executor thread: %s", e.what());
worker_thread_failed_.store(true);
spinning.store(false);
}
});
};

Expand Down Expand Up @@ -173,6 +179,10 @@ void CallbackIsolatedAgnocastExecutor::spin()
}
child_threads_.clear();
weak_child_executors_.clear();

if (worker_thread_failed_.load()) {
throw std::runtime_error("Exception occurred in agnocast child executor thread");
}
}

void CallbackIsolatedAgnocastExecutor::add_callback_group(
Expand Down
5 changes: 2 additions & 3 deletions src/agnocastlib/src/agnocast_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ uint32_t get_agnocast_sub_count(const std::string & topic_name)
topic_info_args.topic_info_ret_buffer_addr = reinterpret_cast<uint64_t>(topic_info_buffer.data());
topic_info_args.topic_info_ret_buffer_size = MAX_TOPIC_INFO_RET_NUM;
if (ioctl(agnocast_fd, AGNOCAST_GET_TOPIC_SUBSCRIBER_INFO_CMD, &topic_info_args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_GET_TOPIC_SUBSCRIBER_INFO_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(
std::string("AGNOCAST_GET_TOPIC_SUBSCRIBER_INFO_CMD failed: ") + strerror(errno));
}

return topic_info_args.ret_topic_info_ret_num;
Expand Down
4 changes: 2 additions & 2 deletions src/agnocastlib/src/agnocast_component_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ int main(int argc, char * argv[])
executor->spin();

rclcpp::shutdown();
} catch (rclcpp_components::ComponentManagerException & ex) {
std::cerr << "Exception caught in main: " << ex.what() << std::endl;
} catch (const std::exception & e) {
std::cerr << "Exception caught in main: " << e.what() << std::endl;
close(agnocast::agnocast_fd);
return EXIT_FAILURE;
} catch (...) {
Expand Down
4 changes: 2 additions & 2 deletions src/agnocastlib/src/agnocast_component_container_mt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ int main(int argc, char * argv[])
executor->spin();

rclcpp::shutdown();
} catch (rclcpp_components::ComponentManagerException & ex) {
std::cerr << "Exception caught in main: " << ex.what() << std::endl;
} catch (const std::exception & e) {
std::cerr << "Exception caught in main: " << e.what() << std::endl;
close(agnocast::agnocast_fd);
return EXIT_FAILURE;
} catch (...) {
Expand Down
48 changes: 29 additions & 19 deletions src/agnocastlib/src/agnocast_multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ void MultiThreadedAgnocastExecutor::spin()
for (auto & thread : threads) {
thread.join();
}

if (worker_thread_failed_.load()) {
throw std::runtime_error("Exception occurred in agnocast worker thread");
}
}

void MultiThreadedAgnocastExecutor::ros2_spin()
Expand Down Expand Up @@ -155,30 +159,36 @@ void MultiThreadedAgnocastExecutor::ros2_spin()

void MultiThreadedAgnocastExecutor::agnocast_spin()
{
while (rclcpp::ok(this->context_) && spinning.load()) {
if (need_epoll_updates.load()) {
prepare_epoll();
}
try {
while (rclcpp::ok(this->context_) && spinning.load()) {
if (need_epoll_updates.load()) {
prepare_epoll();
}

agnocast::AgnocastExecutable agnocast_executable;
agnocast::AgnocastExecutable agnocast_executable;

if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}

// Unlike a single-threaded executor, in a multi-threaded executor, each thread is dedicated to
// handling either ROS 2 callbacks or Agnocast callbacks exclusively.
// Given this separation, get_next_agnocast_executable() can block indefinitely without a
// timeout. However, since we need to periodically check for epoll updates, we should implement
// a long timeout period instead of an infinite block.
if (get_next_agnocast_executable(
agnocast_executable, agnocast_next_exec_timeout_ms_ /* timed-blocking*/)) {
if (yield_before_execute_) {
std::this_thread::yield();
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}

execute_agnocast_executable(agnocast_executable);
// Unlike a single-threaded executor, in a multi-threaded executor, each thread is dedicated
// to handling either ROS 2 callbacks or Agnocast callbacks exclusively. Given this
// separation, get_next_agnocast_executable() can block indefinitely without a timeout.
// However, since we need to periodically check for epoll updates, we should implement a long
// timeout period instead of an infinite block.
if (get_next_agnocast_executable(
agnocast_executable, agnocast_next_exec_timeout_ms_ /* timed-blocking*/)) {
if (yield_before_execute_) {
std::this_thread::yield();
}

execute_agnocast_executable(agnocast_executable);
}
}
} catch (const std::exception & e) {
RCLCPP_ERROR(logger, "Exception in agnocast worker thread: %s", e.what());
worker_thread_failed_.store(true);
spinning.store(false);
}
}

Expand Down
23 changes: 7 additions & 16 deletions src/agnocastlib/src/agnocast_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ void increment_borrowed_publisher_num()
void decrement_borrowed_publisher_num()
{
if (borrowed_publisher_num == 0) {
RCLCPP_ERROR(
logger,
throw std::logic_error(
"The number of publish() called exceeds the number of borrow_loaned_message() called.");
close(agnocast_fd);
exit(EXIT_FAILURE);
}
borrowed_publisher_num--;
}
Expand All @@ -54,9 +51,7 @@ topic_local_id_t initialize_publisher(
pub_args.qos_is_transient_local = qos.durability() == rclcpp::DurabilityPolicy::TransientLocal;
pub_args.is_bridge = is_bridge;
if (ioctl(agnocast_fd, AGNOCAST_ADD_PUBLISHER_CMD, &pub_args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_ADD_PUBLISHER_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(std::string("AGNOCAST_ADD_PUBLISHER_CMD failed: ") + strerror(errno));
}

return pub_args.ret_id;
Expand All @@ -80,9 +75,7 @@ union ioctl_publish_msg_args publish_core(
publish_msg_args.subscriber_ids_buffer_size = MAX_SUBSCRIBER_NUM;

if (ioctl(agnocast_fd, AGNOCAST_PUBLISH_MSG_CMD, &publish_msg_args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_PUBLISH_MSG_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(std::string("AGNOCAST_PUBLISH_MSG_CMD failed: ") + strerror(errno));
}

TRACEPOINT(agnocast_publish, publisher_handle, publish_msg_args.ret_entry_id);
Expand Down Expand Up @@ -152,9 +145,8 @@ uint32_t get_subscription_count_core(const std::string & topic_name)
union ioctl_get_subscriber_num_args args = {};
args.topic_name = {topic_name.c_str(), topic_name.size()};
if (ioctl(agnocast_fd, AGNOCAST_GET_SUBSCRIBER_NUM_CMD, &args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(
std::string("AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: ") + strerror(errno));
}

uint32_t inter_count = args.ret_other_process_subscriber_num;
Expand All @@ -177,9 +169,8 @@ uint32_t get_intra_subscription_count_core(const std::string & topic_name)
union ioctl_get_subscriber_num_args get_subscriber_count_args = {};
get_subscriber_count_args.topic_name = {topic_name.c_str(), topic_name.size()};
if (ioctl(agnocast_fd, AGNOCAST_GET_SUBSCRIBER_NUM_CMD, &get_subscriber_count_args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(
std::string("AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: ") + strerror(errno));
}

return get_subscriber_count_args.ret_same_process_subscriber_num;
Expand Down
18 changes: 6 additions & 12 deletions src/agnocastlib/src/agnocast_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ union ioctl_add_subscriber_args SubscriptionBase::initialize(
add_subscriber_args.ignore_local_publications = ignore_local_publications;
add_subscriber_args.is_bridge = is_bridge;
if (ioctl(agnocast_fd, AGNOCAST_ADD_SUBSCRIBER_CMD, &add_subscriber_args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_ADD_SUBSCRIBER_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(std::string("AGNOCAST_ADD_SUBSCRIBER_CMD failed: ") + strerror(errno));
}

return add_subscriber_args;
Expand All @@ -45,9 +43,8 @@ uint32_t get_publisher_count_core(const std::string & topic_name)
union ioctl_get_publisher_num_args args = {};
args.topic_name = {topic_name.c_str(), topic_name.size()};
if (ioctl(agnocast_fd, AGNOCAST_GET_PUBLISHER_NUM_CMD, &args) < 0) {
RCLCPP_ERROR(logger, "AGNOCAST_GET_PUBLISHER_NUM_CMD failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(
std::string("AGNOCAST_GET_PUBLISHER_NUM_CMD failed: ") + strerror(errno));
}

uint32_t count = args.ret_publisher_num;
Expand Down Expand Up @@ -79,12 +76,9 @@ mqd_t open_mq_for_subscription(
const int mq_mode = 0666;
mqd_t mq = mq_open(mq_name.c_str(), O_CREAT | O_RDONLY | O_NONBLOCK, mq_mode, &attr);
if (mq == -1) {
RCLCPP_ERROR_STREAM(
logger, "mq_open failed for topic '" << topic_name << "' (subscriber_id=" << subscriber_id
<< ", mq_name='" << mq_name
<< "'): " << strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
throw std::runtime_error(
"mq_open failed for topic '" + topic_name + "' (subscriber_id=" +
std::to_string(subscriber_id) + ", mq_name='" + mq_name + "'): " + strerror(errno));
}
mq_subscription = std::make_pair(mq, mq_name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void AgnocastOnlyCallbackIsolatedExecutor::spin()

weak_child_executors_.push_back(agnocast_executor);

child_threads_.emplace_back([executor = std::move(agnocast_executor),
child_threads_.emplace_back([this, executor = std::move(agnocast_executor),
callback_group_id = std::move(callback_group_id),
&client_publisher, &client_publisher_mutex]() {
auto tid = static_cast<pid_t>(syscall(SYS_gettid));
Expand All @@ -102,7 +102,13 @@ void AgnocastOnlyCallbackIsolatedExecutor::spin()
agnocast::publish_callback_group_info(client_publisher, tid, callback_group_id);
}

executor->spin();
try {
executor->spin();
} catch (const std::exception & e) {
RCLCPP_ERROR(logger, "Exception in agnocast child executor thread: %s", e.what());
worker_thread_failed_.store(true);
spinning_.store(false);
}
});
};

Expand Down Expand Up @@ -182,6 +188,10 @@ void AgnocastOnlyCallbackIsolatedExecutor::spin()
}
child_threads_.clear();
weak_child_executors_.clear();

if (worker_thread_failed_.load()) {
throw std::runtime_error("Exception occurred in agnocast child executor thread");
}
}

void AgnocastOnlyCallbackIsolatedExecutor::cancel()
Expand Down
Loading
Loading