diff --git a/src/agnocast_components/src/agnocast_component_container.cpp b/src/agnocast_components/src/agnocast_component_container.cpp index 09f464d9b..11a3ff06f 100644 --- a/src/agnocast_components/src/agnocast_component_container.cpp +++ b/src/agnocast_components/src/agnocast_component_container.cpp @@ -29,8 +29,8 @@ int main(int argc, char * argv[]) executor->spin(); rclcpp::shutdown(); - } catch (rclcpp_components::ComponentManagerException & ex) { - std::cerr << "Exception caught in main: " << ex.what() << std::endl; + } catch (const std::exception & e) { + std::cerr << "Exception caught in main: " << e.what() << std::endl; close(agnocast::agnocast_fd); return EXIT_FAILURE; } catch (...) { diff --git a/src/agnocast_components/src/agnocast_component_container_mt.cpp b/src/agnocast_components/src/agnocast_component_container_mt.cpp index e61e83e92..8ce7b1d82 100644 --- a/src/agnocast_components/src/agnocast_component_container_mt.cpp +++ b/src/agnocast_components/src/agnocast_component_container_mt.cpp @@ -44,8 +44,8 @@ int main(int argc, char * argv[]) executor->spin(); rclcpp::shutdown(); - } catch (rclcpp_components::ComponentManagerException & ex) { - std::cerr << "Exception caught in main: " << ex.what() << std::endl; + } catch (const std::exception & e) { + std::cerr << "Exception caught in main: " << e.what() << std::endl; close(agnocast::agnocast_fd); return EXIT_FAILURE; } catch (...) { diff --git a/src/agnocastlib/include/agnocast/agnocast_callback_info.hpp b/src/agnocastlib/include/agnocast/agnocast_callback_info.hpp index 13b66fbf7..879e16e70 100644 --- a/src/agnocastlib/include/agnocast/agnocast_callback_info.hpp +++ b/src/agnocastlib/include/agnocast/agnocast_callback_info.hpp @@ -69,10 +69,8 @@ TypeErasedCallback get_erased_callback(Func && callback) auto && typed_arg = static_cast &&>(arg); callback(std::move(typed_arg).get()); } else { - RCLCPP_ERROR( - logger, "Agnocast internal implementation error: bad allocation when callback is called"); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error( + "Agnocast internal implementation error: bad allocation when callback is called"); } }; } diff --git a/src/agnocastlib/include/agnocast/agnocast_callback_isolated_executor.hpp b/src/agnocastlib/include/agnocast/agnocast_callback_isolated_executor.hpp index 3ecc2c2e3..0f139db96 100644 --- a/src/agnocastlib/include/agnocast/agnocast_callback_isolated_executor.hpp +++ b/src/agnocastlib/include/agnocast/agnocast_callback_isolated_executor.hpp @@ -27,6 +27,8 @@ class CallbackIsolatedAgnocastExecutor : public rclcpp::Executor std::owner_less> weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::atomic worker_thread_failed_{false}; + // Mutex to protect weak_child_executors_ and child_threads_ mutable std::mutex child_resources_mutex_; diff --git a/src/agnocastlib/include/agnocast/agnocast_multi_threaded_executor.hpp b/src/agnocastlib/include/agnocast/agnocast_multi_threaded_executor.hpp index 90b0dae75..89115b937 100644 --- a/src/agnocastlib/include/agnocast/agnocast_multi_threaded_executor.hpp +++ b/src/agnocastlib/include/agnocast/agnocast_multi_threaded_executor.hpp @@ -3,6 +3,7 @@ #include "agnocast/agnocast_executor.hpp" #include "rclcpp/rclcpp.hpp" +#include #include namespace agnocast @@ -19,6 +20,8 @@ class MultiThreadedAgnocastExecutor : public agnocast::AgnocastExecutor std::chrono::nanoseconds ros2_next_exec_timeout_; const int agnocast_next_exec_timeout_ms_; + std::atomic worker_thread_failed_{false}; + void ros2_spin(); void agnocast_spin(); bool validate_callback_group(const rclcpp::CallbackGroup::SharedPtr & group) const override; diff --git a/src/agnocastlib/include/agnocast/agnocast_publisher.hpp b/src/agnocastlib/include/agnocast/agnocast_publisher.hpp index b08a561f3..0d868ef8a 100644 --- a/src/agnocastlib/include/agnocast/agnocast_publisher.hpp +++ b/src/agnocastlib/include/agnocast/agnocast_publisher.hpp @@ -175,9 +175,7 @@ class BasicPublisher void publish(ipc_shared_ptr && message) { if (!message || topic_name_ != message.get_topic_name()) { - RCLCPP_ERROR(logger, "Invalid message to publish."); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::invalid_argument("Invalid message to publish."); } // Capture raw pointer BEFORE invalidation (get() returns nullptr after invalidation). diff --git a/src/agnocastlib/include/agnocast/agnocast_subscription.hpp b/src/agnocastlib/include/agnocast/agnocast_subscription.hpp index 641462f4f..c3ba930f4 100644 --- a/src/agnocastlib/include/agnocast/agnocast_subscription.hpp +++ b/src/agnocastlib/include/agnocast/agnocast_subscription.hpp @@ -60,9 +60,8 @@ rclcpp::CallbackGroup::SharedPtr get_valid_callback_group( if (callback_group) { if (!node->get_node_base_interface()->callback_group_in_node(callback_group)) { - RCLCPP_ERROR(logger, "Cannot create agnocast subscription, callback group not in node."); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::invalid_argument( + "Cannot create agnocast subscription, callback group not in node."); } } else { callback_group = node->get_node_base_interface()->get_default_callback_group(); @@ -296,9 +295,7 @@ class BasicTakeSubscription : public SubscriptionBase std::lock_guard lock(mmap_mtx); if (ioctl(agnocast_fd, AGNOCAST_TAKE_MSG_CMD, &take_args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_TAKE_MSG_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error(std::string("AGNOCAST_TAKE_MSG_CMD failed: ") + strerror(errno)); } for (uint32_t i = 0; i < take_args.ret_pub_shm_num; i++) { diff --git a/src/agnocastlib/include/agnocast/node/agnocast_only_callback_isolated_executor.hpp b/src/agnocastlib/include/agnocast/node/agnocast_only_callback_isolated_executor.hpp index 3c755ee32..b10186de8 100644 --- a/src/agnocastlib/include/agnocast/node/agnocast_only_callback_isolated_executor.hpp +++ b/src/agnocastlib/include/agnocast/node/agnocast_only_callback_isolated_executor.hpp @@ -32,6 +32,8 @@ class AgnocastOnlyCallbackIsolatedExecutor : public AgnocastOnlyExecutor std::mutex callback_group_created_cv_mutex_; bool callback_group_created_{false}; + std::atomic worker_thread_failed_{false}; + // Mutex to protect weak_child_executors_ and child_threads_ mutable std::mutex child_resources_mutex_; diff --git a/src/agnocastlib/include/agnocast/node/agnocast_only_multi_threaded_executor.hpp b/src/agnocastlib/include/agnocast/node/agnocast_only_multi_threaded_executor.hpp index 93901a194..2e386ab96 100644 --- a/src/agnocastlib/include/agnocast/node/agnocast_only_multi_threaded_executor.hpp +++ b/src/agnocastlib/include/agnocast/node/agnocast_only_multi_threaded_executor.hpp @@ -14,6 +14,8 @@ class AgnocastOnlyMultiThreadedExecutor : public AgnocastOnlyExecutor bool yield_before_execute_; const int next_exec_timeout_ms_; + std::atomic worker_thread_failed_{false}; + void agnocast_spin(); public: diff --git a/src/agnocastlib/src/agnocast_callback_info.cpp b/src/agnocastlib/src/agnocast_callback_info.cpp index 244889f7b..92b9ceba5 100644 --- a/src/agnocastlib/src/agnocast_callback_info.cpp +++ b/src/agnocastlib/src/agnocast_callback_info.cpp @@ -43,9 +43,7 @@ void receive_and_execute_message( std::lock_guard lock(mmap_mtx); if (ioctl(agnocast_fd, AGNOCAST_RECEIVE_MSG_CMD, &receive_args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_RECEIVE_MSG_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error(std::string("AGNOCAST_RECEIVE_MSG_CMD failed: ") + strerror(errno)); } // Map the shared memory region with read permissions whenever a new publisher is discovered. diff --git a/src/agnocastlib/src/agnocast_callback_isolated_executor.cpp b/src/agnocastlib/src/agnocast_callback_isolated_executor.cpp index 86125be41..f04098f76 100644 --- a/src/agnocastlib/src/agnocast_callback_isolated_executor.cpp +++ b/src/agnocastlib/src/agnocast_callback_isolated_executor.cpp @@ -89,7 +89,7 @@ void CallbackIsolatedAgnocastExecutor::spin() weak_child_executors_.push_back(executor); - child_threads_.emplace_back([executor, callback_group_id = std::move(callback_group_id), + child_threads_.emplace_back([this, executor, callback_group_id = std::move(callback_group_id), &client_publisher, &client_publisher_mutex]() { auto tid = static_cast(syscall(SYS_gettid)); @@ -98,7 +98,13 @@ void CallbackIsolatedAgnocastExecutor::spin() agnocast::publish_callback_group_info(client_publisher, tid, callback_group_id); } - executor->spin(); + try { + executor->spin(); + } catch (const std::exception & e) { + RCLCPP_ERROR(logger, "Exception in agnocast child executor thread: %s", e.what()); + worker_thread_failed_.store(true); + spinning.store(false); + } }); }; @@ -173,6 +179,10 @@ void CallbackIsolatedAgnocastExecutor::spin() } child_threads_.clear(); weak_child_executors_.clear(); + + if (worker_thread_failed_.load()) { + throw std::runtime_error("Exception occurred in agnocast child executor thread"); + } } void CallbackIsolatedAgnocastExecutor::add_callback_group( diff --git a/src/agnocastlib/src/agnocast_client.cpp b/src/agnocastlib/src/agnocast_client.cpp index 62d89250a..f6c0ae5d8 100644 --- a/src/agnocastlib/src/agnocast_client.cpp +++ b/src/agnocastlib/src/agnocast_client.cpp @@ -21,9 +21,8 @@ uint32_t get_agnocast_sub_count(const std::string & topic_name) topic_info_args.topic_info_ret_buffer_addr = reinterpret_cast(topic_info_buffer.data()); topic_info_args.topic_info_ret_buffer_size = MAX_TOPIC_INFO_RET_NUM; if (ioctl(agnocast_fd, AGNOCAST_GET_TOPIC_SUBSCRIBER_INFO_CMD, &topic_info_args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_GET_TOPIC_SUBSCRIBER_INFO_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error( + std::string("AGNOCAST_GET_TOPIC_SUBSCRIBER_INFO_CMD failed: ") + strerror(errno)); } return topic_info_args.ret_topic_info_ret_num; diff --git a/src/agnocastlib/src/agnocast_component_container.cpp b/src/agnocastlib/src/agnocast_component_container.cpp index 12d65b92c..e76428591 100644 --- a/src/agnocastlib/src/agnocast_component_container.cpp +++ b/src/agnocastlib/src/agnocast_component_container.cpp @@ -34,8 +34,8 @@ int main(int argc, char * argv[]) executor->spin(); rclcpp::shutdown(); - } catch (rclcpp_components::ComponentManagerException & ex) { - std::cerr << "Exception caught in main: " << ex.what() << std::endl; + } catch (const std::exception & e) { + std::cerr << "Exception caught in main: " << e.what() << std::endl; close(agnocast::agnocast_fd); return EXIT_FAILURE; } catch (...) { diff --git a/src/agnocastlib/src/agnocast_component_container_mt.cpp b/src/agnocastlib/src/agnocast_component_container_mt.cpp index b4173f022..8f2de381f 100644 --- a/src/agnocastlib/src/agnocast_component_container_mt.cpp +++ b/src/agnocastlib/src/agnocast_component_container_mt.cpp @@ -49,8 +49,8 @@ int main(int argc, char * argv[]) executor->spin(); rclcpp::shutdown(); - } catch (rclcpp_components::ComponentManagerException & ex) { - std::cerr << "Exception caught in main: " << ex.what() << std::endl; + } catch (const std::exception & e) { + std::cerr << "Exception caught in main: " << e.what() << std::endl; close(agnocast::agnocast_fd); return EXIT_FAILURE; } catch (...) { diff --git a/src/agnocastlib/src/agnocast_multi_threaded_executor.cpp b/src/agnocastlib/src/agnocast_multi_threaded_executor.cpp index 9f7c53b61..7aa6bfd68 100644 --- a/src/agnocastlib/src/agnocast_multi_threaded_executor.cpp +++ b/src/agnocastlib/src/agnocast_multi_threaded_executor.cpp @@ -111,6 +111,10 @@ void MultiThreadedAgnocastExecutor::spin() for (auto & thread : threads) { thread.join(); } + + if (worker_thread_failed_.load()) { + throw std::runtime_error("Exception occurred in agnocast worker thread"); + } } void MultiThreadedAgnocastExecutor::ros2_spin() @@ -155,30 +159,36 @@ void MultiThreadedAgnocastExecutor::ros2_spin() void MultiThreadedAgnocastExecutor::agnocast_spin() { - while (rclcpp::ok(this->context_) && spinning.load()) { - if (need_epoll_updates.load()) { - prepare_epoll(); - } + try { + while (rclcpp::ok(this->context_) && spinning.load()) { + if (need_epoll_updates.load()) { + prepare_epoll(); + } - agnocast::AgnocastExecutable agnocast_executable; + agnocast::AgnocastExecutable agnocast_executable; - if (!rclcpp::ok(this->context_) || !spinning.load()) { - return; - } - - // Unlike a single-threaded executor, in a multi-threaded executor, each thread is dedicated to - // handling either ROS 2 callbacks or Agnocast callbacks exclusively. - // Given this separation, get_next_agnocast_executable() can block indefinitely without a - // timeout. However, since we need to periodically check for epoll updates, we should implement - // a long timeout period instead of an infinite block. - if (get_next_agnocast_executable( - agnocast_executable, agnocast_next_exec_timeout_ms_ /* timed-blocking*/)) { - if (yield_before_execute_) { - std::this_thread::yield(); + if (!rclcpp::ok(this->context_) || !spinning.load()) { + return; } - execute_agnocast_executable(agnocast_executable); + // Unlike a single-threaded executor, in a multi-threaded executor, each thread is dedicated + // to handling either ROS 2 callbacks or Agnocast callbacks exclusively. Given this + // separation, get_next_agnocast_executable() can block indefinitely without a timeout. + // However, since we need to periodically check for epoll updates, we should implement a long + // timeout period instead of an infinite block. + if (get_next_agnocast_executable( + agnocast_executable, agnocast_next_exec_timeout_ms_ /* timed-blocking*/)) { + if (yield_before_execute_) { + std::this_thread::yield(); + } + + execute_agnocast_executable(agnocast_executable); + } } + } catch (const std::exception & e) { + RCLCPP_ERROR(logger, "Exception in agnocast worker thread: %s", e.what()); + worker_thread_failed_.store(true); + spinning.store(false); } } diff --git a/src/agnocastlib/src/agnocast_publisher.cpp b/src/agnocastlib/src/agnocast_publisher.cpp index 00f9538a5..6ee058945 100644 --- a/src/agnocastlib/src/agnocast_publisher.cpp +++ b/src/agnocastlib/src/agnocast_publisher.cpp @@ -32,11 +32,8 @@ void increment_borrowed_publisher_num() void decrement_borrowed_publisher_num() { if (borrowed_publisher_num == 0) { - RCLCPP_ERROR( - logger, + throw std::logic_error( "The number of publish() called exceeds the number of borrow_loaned_message() called."); - close(agnocast_fd); - exit(EXIT_FAILURE); } borrowed_publisher_num--; } @@ -54,9 +51,7 @@ topic_local_id_t initialize_publisher( pub_args.qos_is_transient_local = qos.durability() == rclcpp::DurabilityPolicy::TransientLocal; pub_args.is_bridge = is_bridge; if (ioctl(agnocast_fd, AGNOCAST_ADD_PUBLISHER_CMD, &pub_args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_ADD_PUBLISHER_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error(std::string("AGNOCAST_ADD_PUBLISHER_CMD failed: ") + strerror(errno)); } return pub_args.ret_id; @@ -80,9 +75,7 @@ union ioctl_publish_msg_args publish_core( publish_msg_args.subscriber_ids_buffer_size = MAX_SUBSCRIBER_NUM; if (ioctl(agnocast_fd, AGNOCAST_PUBLISH_MSG_CMD, &publish_msg_args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_PUBLISH_MSG_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error(std::string("AGNOCAST_PUBLISH_MSG_CMD failed: ") + strerror(errno)); } TRACEPOINT(agnocast_publish, publisher_handle, publish_msg_args.ret_entry_id); @@ -152,9 +145,8 @@ uint32_t get_subscription_count_core(const std::string & topic_name) union ioctl_get_subscriber_num_args args = {}; args.topic_name = {topic_name.c_str(), topic_name.size()}; if (ioctl(agnocast_fd, AGNOCAST_GET_SUBSCRIBER_NUM_CMD, &args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error( + std::string("AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: ") + strerror(errno)); } uint32_t inter_count = args.ret_other_process_subscriber_num; @@ -177,9 +169,8 @@ uint32_t get_intra_subscription_count_core(const std::string & topic_name) union ioctl_get_subscriber_num_args get_subscriber_count_args = {}; get_subscriber_count_args.topic_name = {topic_name.c_str(), topic_name.size()}; if (ioctl(agnocast_fd, AGNOCAST_GET_SUBSCRIBER_NUM_CMD, &get_subscriber_count_args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error( + std::string("AGNOCAST_GET_SUBSCRIBER_NUM_CMD failed: ") + strerror(errno)); } return get_subscriber_count_args.ret_same_process_subscriber_num; diff --git a/src/agnocastlib/src/agnocast_subscription.cpp b/src/agnocastlib/src/agnocast_subscription.cpp index 28548221f..7961a565c 100644 --- a/src/agnocastlib/src/agnocast_subscription.cpp +++ b/src/agnocastlib/src/agnocast_subscription.cpp @@ -32,9 +32,7 @@ union ioctl_add_subscriber_args SubscriptionBase::initialize( add_subscriber_args.ignore_local_publications = ignore_local_publications; add_subscriber_args.is_bridge = is_bridge; if (ioctl(agnocast_fd, AGNOCAST_ADD_SUBSCRIBER_CMD, &add_subscriber_args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_ADD_SUBSCRIBER_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error(std::string("AGNOCAST_ADD_SUBSCRIBER_CMD failed: ") + strerror(errno)); } return add_subscriber_args; @@ -45,9 +43,8 @@ uint32_t get_publisher_count_core(const std::string & topic_name) union ioctl_get_publisher_num_args args = {}; args.topic_name = {topic_name.c_str(), topic_name.size()}; if (ioctl(agnocast_fd, AGNOCAST_GET_PUBLISHER_NUM_CMD, &args) < 0) { - RCLCPP_ERROR(logger, "AGNOCAST_GET_PUBLISHER_NUM_CMD failed: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error( + std::string("AGNOCAST_GET_PUBLISHER_NUM_CMD failed: ") + strerror(errno)); } uint32_t count = args.ret_publisher_num; @@ -79,12 +76,9 @@ mqd_t open_mq_for_subscription( const int mq_mode = 0666; mqd_t mq = mq_open(mq_name.c_str(), O_CREAT | O_RDONLY | O_NONBLOCK, mq_mode, &attr); if (mq == -1) { - RCLCPP_ERROR_STREAM( - logger, "mq_open failed for topic '" << topic_name << "' (subscriber_id=" << subscriber_id - << ", mq_name='" << mq_name - << "'): " << strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + throw std::runtime_error( + "mq_open failed for topic '" + topic_name + "' (subscriber_id=" + + std::to_string(subscriber_id) + ", mq_name='" + mq_name + "'): " + strerror(errno)); } mq_subscription = std::make_pair(mq, mq_name); diff --git a/src/agnocastlib/src/node/agnocast_only_callback_isolated_executor.cpp b/src/agnocastlib/src/node/agnocast_only_callback_isolated_executor.cpp index 1952cb577..c207161bd 100644 --- a/src/agnocastlib/src/node/agnocast_only_callback_isolated_executor.cpp +++ b/src/agnocastlib/src/node/agnocast_only_callback_isolated_executor.cpp @@ -92,7 +92,7 @@ void AgnocastOnlyCallbackIsolatedExecutor::spin() weak_child_executors_.push_back(agnocast_executor); - child_threads_.emplace_back([executor = std::move(agnocast_executor), + child_threads_.emplace_back([this, executor = std::move(agnocast_executor), callback_group_id = std::move(callback_group_id), &client_publisher, &client_publisher_mutex]() { auto tid = static_cast(syscall(SYS_gettid)); @@ -102,7 +102,13 @@ void AgnocastOnlyCallbackIsolatedExecutor::spin() agnocast::publish_callback_group_info(client_publisher, tid, callback_group_id); } - executor->spin(); + try { + executor->spin(); + } catch (const std::exception & e) { + RCLCPP_ERROR(logger, "Exception in agnocast child executor thread: %s", e.what()); + worker_thread_failed_.store(true); + spinning_.store(false); + } }); }; @@ -182,6 +188,10 @@ void AgnocastOnlyCallbackIsolatedExecutor::spin() } child_threads_.clear(); weak_child_executors_.clear(); + + if (worker_thread_failed_.load()) { + throw std::runtime_error("Exception occurred in agnocast child executor thread"); + } } void AgnocastOnlyCallbackIsolatedExecutor::cancel() diff --git a/src/agnocastlib/src/node/agnocast_only_multi_threaded_executor.cpp b/src/agnocastlib/src/node/agnocast_only_multi_threaded_executor.cpp index ace74a504..f145f2399 100644 --- a/src/agnocastlib/src/node/agnocast_only_multi_threaded_executor.cpp +++ b/src/agnocastlib/src/node/agnocast_only_multi_threaded_executor.cpp @@ -42,43 +42,53 @@ void AgnocastOnlyMultiThreadedExecutor::spin() for (auto & thread : threads) { thread.join(); } + + if (worker_thread_failed_.load()) { + throw std::runtime_error("Exception occurred in agnocast worker thread"); + } } void AgnocastOnlyMultiThreadedExecutor::agnocast_spin() { - while (spinning_.load()) { - if (need_epoll_updates.load()) { - add_callback_groups_from_nodes_associated_to_executor(); - agnocast::prepare_epoll_impl( - epoll_fd_, my_pid_, ready_agnocast_executables_mutex_, ready_agnocast_executables_, - [this](const rclcpp::CallbackGroup::SharedPtr & group) { - return is_callback_group_associated(group); - }); - } + try { + while (spinning_.load()) { + if (need_epoll_updates.load()) { + add_callback_groups_from_nodes_associated_to_executor(); + agnocast::prepare_epoll_impl( + epoll_fd_, my_pid_, ready_agnocast_executables_mutex_, ready_agnocast_executables_, + [this](const rclcpp::CallbackGroup::SharedPtr & group) { + return is_callback_group_associated(group); + }); + } - agnocast::AgnocastExecutable agnocast_executable; + agnocast::AgnocastExecutable agnocast_executable; - if (!spinning_.load()) { - return; - } - - // As each thread is dedicated to handling Agnocast callbacks, get_next_agnocast_executable() - // can block indefinitely without a timeout. However, since we need to periodically check for - // epoll updates, we should implement a long timeout period instead of an infinite block. - bool shutdown_detected = false; - if (get_next_agnocast_executable( - agnocast_executable, next_exec_timeout_ms_ /* timed-blocking*/, shutdown_detected)) { - if (yield_before_execute_) { - std::this_thread::yield(); + if (!spinning_.load()) { + return; } - execute_agnocast_executable(agnocast_executable); - } + // As each thread is dedicated to handling Agnocast callbacks, get_next_agnocast_executable() + // can block indefinitely without a timeout. However, since we need to periodically check for + // epoll updates, we should implement a long timeout period instead of an infinite block. + bool shutdown_detected = false; + if (get_next_agnocast_executable( + agnocast_executable, next_exec_timeout_ms_ /* timed-blocking*/, shutdown_detected)) { + if (yield_before_execute_) { + std::this_thread::yield(); + } + + execute_agnocast_executable(agnocast_executable); + } - if (shutdown_detected) { - spinning_.store(false); - return; + if (shutdown_detected) { + spinning_.store(false); + return; + } } + } catch (const std::exception & e) { + RCLCPP_ERROR(logger, "Exception in agnocast worker thread: %s", e.what()); + worker_thread_failed_.store(true); + spinning_.store(false); } } diff --git a/src/agnocastlib/test/unit/test_agnocast_subscription.cpp b/src/agnocastlib/test/unit/test_agnocast_subscription.cpp index 9c787fe8b..14eec5ad1 100644 --- a/src/agnocastlib/test/unit/test_agnocast_subscription.cpp +++ b/src/agnocastlib/test/unit/test_agnocast_subscription.cpp @@ -34,10 +34,7 @@ TEST_F(GetValidCallbackGroupTest, get_valid_callback_group_not_in_node) options.callback_group = other_node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); - EXPECT_EXIT( - agnocast::get_valid_callback_group(node.get(), options), - ::testing::ExitedWithCode(EXIT_FAILURE), - "Cannot create agnocast subscription, callback group not in node."); + EXPECT_THROW(agnocast::get_valid_callback_group(node.get(), options), std::invalid_argument); } TEST_F(GetValidCallbackGroupTest, get_valid_callback_group_nullptr) diff --git a/src/agnocastlib/test/unit/test_mocked_agnocast.cpp b/src/agnocastlib/test/unit/test_mocked_agnocast.cpp index 175040e2a..45b2ba03e 100644 --- a/src/agnocastlib/test/unit/test_mocked_agnocast.cpp +++ b/src/agnocastlib/test/unit/test_mocked_agnocast.cpp @@ -107,9 +107,7 @@ TEST_F(AgnocastPublisherTest, test_publish_null_message) agnocast::ipc_shared_ptr message; // Act & Assert - EXPECT_EXIT( - dummy_publisher->publish(std::move(message)), ::testing::ExitedWithCode(EXIT_FAILURE), - "Invalid message to publish."); + EXPECT_THROW(dummy_publisher->publish(std::move(message)), std::invalid_argument); } TEST_F(AgnocastPublisherTest, test_publish_already_published_message) @@ -119,9 +117,7 @@ TEST_F(AgnocastPublisherTest, test_publish_already_published_message) dummy_publisher->publish(std::move(message)); // Act & Assert - EXPECT_EXIT( - dummy_publisher->publish(std::move(message)), ::testing::ExitedWithCode(EXIT_FAILURE), - "Invalid message to publish."); + EXPECT_THROW(dummy_publisher->publish(std::move(message)), std::invalid_argument); } TEST_F(AgnocastPublisherTest, test_publish_different_message) @@ -135,9 +131,7 @@ TEST_F(AgnocastPublisherTest, test_publish_different_message) agnocast::ipc_shared_ptr message = dummy_publisher->borrow_loaned_message(); // Act & Assert - EXPECT_EXIT( - dummy_publisher->publish(std::move(diff_message)), ::testing::ExitedWithCode(EXIT_FAILURE), - "Invalid message to publish."); + EXPECT_THROW(dummy_publisher->publish(std::move(diff_message)), std::invalid_argument); } TEST_F(AgnocastPublisherTest, test_publish_loan_num_and_pub_num_mismatch) @@ -542,9 +536,7 @@ TEST_F(AgnocastCallbackInfoTest, get_erased_callback_invalid_type) // Act & Assert agnocast::TypeErasedCallback erased_callback = agnocast::get_erased_callback(float_callback); - EXPECT_EXIT( - erased_callback(std::move(int_arg)), ::testing::ExitedWithCode(EXIT_FAILURE), - "Agnocast internal implementation error: bad allocation when callback is called"); + EXPECT_THROW(erased_callback(std::move(int_arg)), std::runtime_error); } TEST_F(AgnocastCallbackInfoTest, get_erased_callback_const_ptr)