diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp
index 2365b67697..dd5b1ebe63 100644
--- a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp
+++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp
@@ -274,9 +274,12 @@ class EventsExecutor : public rclcpp::Executor
   rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;
 
   std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
-  std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
   std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
 
+  /// Mutex to protect the current_entities_collection_
+  std::recursive_mutex collection_mutex_;
+  std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
+
   /// Flag used to reduce the number of unnecessary waitable events
   std::atomic<bool> notify_waitable_event_pushed_ {false};
 
diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp
index 33e9cb67bd..64b07c0814 100644
--- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp
+++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp
@@ -273,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
   switch (event.type) {
     case ExecutorEventType::CLIENT_EVENT:
       {
-        auto client = this->retrieve_entity(
-          static_cast<const rcl_client_t *>(event.entity_key),
-          current_entities_collection_->clients);
-
+        rclcpp::ClientBase::SharedPtr client;
+        {
+          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
+          client = this->retrieve_entity(
+            static_cast<const rcl_client_t *>(event.entity_key),
+            current_entities_collection_->clients);
+        }
         if (client) {
           for (size_t i = 0; i < event.num_events; i++) {
             execute_client(client);
@@ -287,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
       }
     case ExecutorEventType::SUBSCRIPTION_EVENT:
       {
-        auto subscription = this->retrieve_entity(
-          static_cast<const rcl_subscription_t *>(event.entity_key),
-          current_entities_collection_->subscriptions);
+        rclcpp::SubscriptionBase::SharedPtr subscription;
+        {
+          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
+          subscription = this->retrieve_entity(
+            static_cast<const rcl_subscription_t *>(event.entity_key),
+            current_entities_collection_->subscriptions);
+        }
         if (subscription) {
           for (size_t i = 0; i < event.num_events; i++) {
             execute_subscription(subscription);
@@ -299,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
       }
     case ExecutorEventType::SERVICE_EVENT:
       {
-        auto service = this->retrieve_entity(
-          static_cast<const rcl_service_t *>(event.entity_key),
-          current_entities_collection_->services);
-
+        rclcpp::ServiceBase::SharedPtr service;
+        {
+          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
+          service = this->retrieve_entity(
+            static_cast<const rcl_service_t *>(event.entity_key),
+            current_entities_collection_->services);
+        }
         if (service) {
           for (size_t i = 0; i < event.num_events; i++) {
             execute_service(service);
@@ -319,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
       }
     case ExecutorEventType::WAITABLE_EVENT:
       {
-        auto waitable = this->retrieve_entity(
-          static_cast<const rclcpp::Waitable *>(event.entity_key),
-          current_entities_collection_->waitables);
+        rclcpp::Waitable::SharedPtr waitable;
+        {
+          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
+          waitable = this->retrieve_entity(
+            static_cast<const rclcpp::Waitable *>(event.entity_key),
+            current_entities_collection_->waitables);
+        }
         if (waitable) {
           for (size_t i = 0; i < event.num_events; i++) {
             auto data = waitable->take_data_by_entity_id(event.waitable_data);
@@ -386,6 +400,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
 void
 EventsExecutor::refresh_current_collection_from_callback_groups()
 {
+  // Build the new collection
   this->entities_collector_->update_collections();
   auto callback_groups = this->entities_collector_->get_all_callback_groups();
   rclcpp::executors::ExecutorEntitiesCollection new_collection;
@@ -400,6 +415,9 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
   // To do it, we need to add the notify waitable as an entry in both the new and
   // current collections such that it's neither added or removed.
   this->add_notify_waitable_to_collection(new_collection.waitables);
+
+  // Acquire lock before modifying the current collection
+  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
   this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
 
   this->refresh_current_collection(new_collection);
@@ -409,6 +427,9 @@ void
 EventsExecutor::refresh_current_collection(
   const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
 {
+  // Acquire lock before modifying the current collection
+  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
+
   current_entities_collection_->timers.update(
     new_collection.timers,
     [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp
index 232baaace3..e9aad61685 100644
--- a/rclcpp/test/rclcpp/executors/test_executors.cpp
+++ b/rclcpp/test/rclcpp/executors/test_executors.cpp
@@ -796,6 +796,60 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
   }
 }
 
+// This test verifies the thread-safety of adding and removing a node
+// while the executor is spinning and events are ready.
+// This test does not contain expectations, but rather it verifies that
+// we can run a "stressful routine" without crashing.
+TYPED_TEST(TestExecutors, stressAddRemoveNode)
+{
+  using ExecutorType = TypeParam;
+  // rmw_connextdds doesn't support events-executor
+  if (
+    std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
+    std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
+  {
+    GTEST_SKIP();
+  }
+
+  ExecutorType executor;
+
+  // A timer that is "always" ready (the timer callback doesn't do anything)
+  auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});
+
+  // This thread spins the executor until it's cancelled
+  std::thread spinner_thread([&]() {
+      executor.spin();
+    });
+
+  // This thread publishes data in a busy loop (the node has a subscription)
+  std::thread publisher_thread1([&]() {
+      for (size_t i = 0; i < 100000; i++) {
+        this->publisher->publish(test_msgs::msg::Empty());
+      }
+    });
+  std::thread publisher_thread2([&]() {
+      for (size_t i = 0; i < 100000; i++) {
+        this->publisher->publish(test_msgs::msg::Empty());
+      }
+    });
+
+  // This thread adds/remove the node that contains the entities in a busy loop
+  std::thread add_remove_thread([&]() {
+      for (size_t i = 0; i < 100000; i++) {
+        executor.add_node(this->node);
+        executor.remove_node(this->node);
+      }
+    });
+
+  // Wait for the threads that do real work to finish
+  publisher_thread1.join();
+  publisher_thread2.join();
+  add_remove_thread.join();
+
+  executor.cancel();
+  spinner_thread.join();
+}
+
 // Check spin_until_future_complete with node base pointer (instantiates its own executor)
 TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
 {