Skip to content

Execution and callback groups

mhidalgo-bdai edited this page May 7, 2025 · 1 revision

synchros2 uses and extends the same abstractions and patterns that rclpy offers to manage execution. Namely, executors for task and callback dispatch and callback groups for concurrency control. However, executors and callback groups provided by rclpy do not generally afford blocking calls. synchros2 adds implementations of its own to better support these use cases.

Executors

rclpy.executors.SingleThreadedExecutor

The simplest executor. Tasks and callbacks are dispatched and run by the same thread that is spinning it.

import rclpy

from rclpy.executors import SingleThreadedExecutor

def main():
    rclpy.init()
    node = rclpy.create_node("my_node")
    executor = SingleThreadedExecutor()
    executor.add_node(node)
    try:
        executor.spin()  # blocking!
    except KeyboardInterrupt:
        pass
    finally:
        executor.shutdown()
        executor.remove_node(node)
        node.destroy_node()
        rclpy.try_shutdown()

if __name__ == "__main__":
    main()

Concurrency is therefore impossible. A blocking call within a callback will block the entire executor, and if it relies on other callbacks to unblock it will invariably deadlock it.

import rclpy
from std_srvs.srv import Trigger

from rclpy.executors import SingleThreadedExecutor

def poll(client):
    print("About to deadlock...")
    client.call(Trigger.Request())  # deadlock!

def trigger_callback(request, response):
    response.success = True
    return response

def main():
    rclpy.init()
    node = rclpy.create_node("my_node")
    executor = SingleThreadedExecutor()
    node.create_service(Trigger, "trigger", trigger_callback)
    client = node.create_client(Trigger, "trigger")
    node.create_timer(1.0, lambda: poll(client))
    executor.add_node(node)
    try:
        executor.spin()  # blocking!
    except KeyboardInterrupt:
        pass
    finally:
        executor.shutdown()
        executor.remove_node(node)
        node.destroy_node()
        rclpy.try_shutdown()

if __name__ == "__main__":
    main()

On the flip side, lack of context switch makes the single threaded executor a good choice when time keeping is important (e.g. a node with a timer that needs to fire at a high rate).

import time
import rclpy

from rclpy.executors import SingleThreadedExecutor

def main():
    rclpy.init()
    node = rclpy.create_node("my_node")
    executor = SingleThreadedExecutor()
    node.create_timer(0.1, lambda: print(time.time()))
    executor.add_node(node)
    try:
        executor.spin()  # blocking!
    except KeyboardInterrupt:
        pass
    finally:
        executor.shutdown()
        executor.remove_node(node)
        node.destroy_node()
        rclpy.try_shutdown()

if __name__ == "__main__":
    main()

rclpy.executors.MultiThreadedExecutor

Simple threaded executor for concurrent execution. Tasks and callbacks are dispatched by the thread that is spinning it but run on a fixed size thread pool.

import time
import rclpy

from rclpy.executors import MultiThreadedExecutor

def timer_callback():
    print(time.time())
    time.sleep(0.5)

def main():
    rclpy.init()
    node = rclpy.create_node("my_node")
    node.create_timer(1.0, timer_callback)
    node.create_timer(1.0, timer_callback)
    executor = MultiThreadedExecutor(2)
    executor.add_node(node)
    try:
        executor.spin()  # blocking!
    except KeyboardInterrupt:
        pass
    finally:
        executor.shutdown()
        executor.remove_node(node)
        node.destroy_node()
        rclpy.try_shutdown()

if __name__ == "__main__":
    main()

Blocking calls are possible but its thread pool size must be chosen wisely. If there are fewer threads than standing blocking calls at any point in time, dispatch delays and deadlocks are back on the table.

synchros2.executors.AutoScalingMultiThreadedExecutor

Akin to the rclpy.executors.MultiThreadedExecutor but managing thread pools that scale up and down on demand. Tasks and callbacks are dispatched by the thread that is spinning it but run on a thread pool that adds workers to match the demand.

import time
import rclpy

from synchros2.executors import AutoScalingMultiThreadedExecutor

def callback(i):
    print(f"[{time.time()}] Ahoy! (#{i})")

def main():
    rclpy.init()
    executor = AutoScalingMultiThreadedExecutor()
    for i in range(20):
        executor.create_task(callback, i)
    try:
        executor.spin()  # blocking!
    except KeyboardInterrupt:
        pass
    finally:
        executor.shutdown()
        rclpy.try_shutdown()

if __name__ == "__main__":
    main()

If the number of workers hits a configurable upper bound (max_threads), work is queued. If a worker is idle for too long (max_thread_idle_time), and the number of workers has not yet hit a configurable lower bound (0), it will be stopped. A concurrency quota can be configured for callback groups to avoid any given callback group from starving the rest (max_threads_per_callback_group).

By default, this executor manages a single thread pool, but more may be added. Callback groups can then be assigned to specific thread pools to further control callback dispatch and execution.

import time
import threading

import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup

from synchros2.executors import AutoScalingMultiThreadedExecutor

def callback():
    current_thread = threading.current_thread()
    print(f"Ahoy! from {current_thread.name}")

def timer_callback(executor):
    current_thread = threading.current_thread()
    print(f"Dispatch from {current_thread.name} at time {time.time()}")
    for i in range(10):
        executor.create_task(callback)

def main():
    rclpy.init()
    node = rclpy.create_node("my_node")
    executor = AutoScalingMultiThreadedExecutor()
    thread_pool = executor.add_static_thread_pool(1)
    custom_callback_group = MutuallyExclusiveCallbackGroup()
    executor.bind(custom_callback_group, thread_pool)
    node.create_timer(0.1, lambda: timer_callback(executor), custom_callback_group)
    executor.add_node(node)
    try:
        executor.spin()  # blocking!
    except KeyboardInterrupt:
        pass
    finally:
        executor.shutdown()
        executor.remove_node(node)
        node.destroy_node()
        rclpy.try_shutdown()

if __name__ == "__main__":
    main()

Callback groups

Note

Since callback groups are concurrency rules, there is little value to them for single threaded execution.

rclpy.callback_groups.MutuallyExclusiveCallbackGroup

Tasks and callbacks may only run one at a time when in this callback group.

This is the default callback group for rclpy.node.Node instances.

rclpy.callback_groups.ReentrantCallbackGroup

Tasks and callbacks may run concurrently when in this callback group, including multiple instances of the same callback e.g. when a topic subscription message arrives before it is done processing the previous one.

synchros2.callback_groups.NonReentrantCallbackGroup

Tasks and callbacks may run concurrently when in this callback group, but there may only be one instance of a given callback running at any given time e.g. topic subscription messages will always be processed sequentially.

This is the default callback group for synchros2.node.Node instances.

Utilities

To simplify executor management, synchros2 offers a few additional utilities.

If you need to spin in foreground (i.e. block on the executor), you may:

from rclpy.executors import SingleThreadedExecutor
from synchros2.executors import background

with foreground(SingleThreadedExecutor()) as executor:
    executor.spin()

If you need to spin in the background (i.e. keep the executor running while blocking on something else), you may:

from rclpy.executors import SingleThreadedExecutor
from synchros2.executors import background

with background(SingleThreadedExecutor()) as executor:
    # do work, no need to spin
    # executor.spin() would raise 

Either way, synchros2 context managers will take care of the executor lifecycle for you.

Reference

See rclpy documentation on execution and callback groups.

Clone this wiki locally