Topic 2 — Nodes, Topics, Services, and Actions: Deep Dive
This topic provides comprehensive coverage of ROS 2's four core communication patterns: Nodes, Topics, Services, and Actions. You will learn when to use each pattern, how to implement them in Python (rclpy), and best practices for building robust robotic systems.
2.1 Nodes: The Building Blocks
What is a ROS 2 Node?
A node is an independent process that performs a specific task. Nodes are the fundamental unit of computation in ROS 2. Each node should have:
- Single responsibility: One clear purpose (e.g., "publish camera images" or "plan trajectories")
- Well-defined interfaces: Clear inputs (subscriptions) and outputs (publications)
- Independent lifecycle: Can start, stop, and restart without affecting other nodes
Node Lifecycle
ROS 2 nodes follow a lifecycle state machine:
Uninitialized → Inactive → Active → Inactive → Finalized
↑ ↓
└─────────┘
States:
- Uninitialized: Node object created but not configured
- Inactive: Node configured but not processing (paused)
- Active: Node running and processing callbacks
- Finalized: Node cleaned up and shut down
Transitions:
configure(): Initialize node (load parameters, setup resources)activate(): Start processing (begin publishing/subscribing)deactivate(): Stop processing (pause, but keep state)cleanup(): Clean up resourcesshutdown(): Final shutdown
Creating a Minimal Node
Here's a minimal ROS 2 node in Python:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
class MinimalNode(Node):
def __init__(self):
super().__init__('minimal_node')
self.get_logger().info('Minimal node started')
def main(args=None):
rclpy.init(args=args)
node = MinimalNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Key components:
rclpy.init(): Initialize ROS 2super().__init__('node_name'): Create node with namerclpy.spin(node): Keep node alive, process callbacksnode.destroy_node(): Clean uprclpy.shutdown(): Shutdown ROS 2
Node Executors
Executors control how callbacks are processed:
SingleThreadedExecutor (default):
from rclpy.executors import SingleThreadedExecutor
executor = SingleThreadedExecutor()
executor.add_node(node)
executor.spin()
- Deterministic: Callbacks run sequentially
- Real-time friendly: No thread contention
- Use for: Control loops, time-critical nodes
MultiThreadedExecutor:
from rclpy.executors import MultiThreadedExecutor
executor = MultiThreadedExecutor()
executor.add_node(node)
executor.spin()
- Higher throughput: Parallel callback execution
- Non-deterministic: Order depends on scheduling
- Use for: Perception, planning (can tolerate jitter)
Node Naming and Namespaces
Namespaces organize nodes hierarchically:
/perception/camera_driver— Camera driver in perception namespace/control/motor_controller— Motor controller in control namespace/planning/navigator— Navigator in planning namespace
Best practices:
- Use namespaces to group related nodes
- Keep node names descriptive and consistent
- Avoid deep nesting (max 2-3 levels)
2.2 Topics: Publish-Subscribe Communication
The Publish-Subscribe Pattern
Topics enable asynchronous, one-to-many communication:
- Publishers send messages without knowing who receives them
- Subscribers receive messages without knowing who sends them
- Decoupling: Publishers and subscribers are independent
When to use topics:
- ✅ Continuous data streams (sensor data, commands)
- ✅ One-to-many communication (one camera, many subscribers)
- ✅ Asynchronous communication (no blocking)
When NOT to use topics:
- ❌ Request-response patterns (use services)
- ❌ Long-running tasks with progress (use actions)
- ❌ Synchronous queries (use services)
Creating a Publisher Node
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')
self.publisher_ = self.create_publisher(String, 'sensor_data', 10)
self.timer = self.create_timer(1.0, self.publish_callback)
self.counter = 0
def publish_callback(self):
msg = String()
msg.data = f'Sensor reading {self.counter}'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: {msg.data}')
self.counter += 1
def main(args=None):
rclpy.init(args=args)
node = SensorPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Key components:
create_publisher(msg_type, topic_name, queue_size): Create publishercreate_timer(period, callback): Periodic callbackpublish(msg): Send message
Creating a Subscriber Node
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')
self.subscription = self.create_subscription(
String,
'sensor_data',
self.listener_callback,
10
)
def listener_callback(self, msg):
self.get_logger().info(f'Received: {msg.data}')
def main(args=None):
rclpy.init(args=args)
node = SensorSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Key components:
create_subscription(msg_type, topic_name, callback, queue_size): Create subscriber- Callback receives message as argument
Quality of Service (QoS) for Topics
QoS profiles control message delivery:
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
# Reliable delivery (for commands)
qos_reliable = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# Best-effort delivery (for high-frequency sensors)
qos_best_effort = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5
)
publisher = self.create_publisher(String, 'topic', qos_reliable)
QoS matching: Publishers and subscribers must have compatible QoS. If incompatible, they won't connect.
Common patterns:
- Sensor data: Best-effort, volatile, keep last (depth=5)
- Commands: Reliable, volatile, keep last (depth=10)
- State: Reliable, transient local, keep last (depth=1)
2.3 Services: Request-Response Communication
When to Use Services
Services provide synchronous, request-response communication:
- One-to-one: One client calls one server
- Blocking: Client waits for response
- Discrete: Used for queries, not continuous data
When to use services:
- ✅ Discrete queries (get robot pose, plan trajectory)
- ✅ Configuration changes (set parameters)
- ✅ Synchronous operations (need immediate response)
When NOT to use services:
- ❌ Continuous data streams (use topics)
- ❌ Long-running tasks (use actions)
- ❌ One-to-many communication (use topics)
Service Definition (.srv file)
Create srv/PlanTrajectory.srv:
# Request
geometry_msgs/Point start
geometry_msgs/Point goal
float64 max_velocity
---
# Response
nav_msgs/Path path
bool success
string message
Service Server Implementation
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from nav_msgs.msg import Path
from your_package.srv import PlanTrajectory
class PlannerService(Node):
def __init__(self):
super().__init__('planner_service')
self.srv = self.create_service(
PlanTrajectory,
'plan_trajectory',
self.plan_callback
)
self.get_logger().info('Planner service ready')
def plan_callback(self, request, response):
self.get_logger().info(f'Planning from {request.start} to {request.goal}')
# Simulate planning
# In real implementation, call your planner here
response.path = Path() # Populate with actual path
response.success = True
response.message = 'Path planned successfully'
return response
def main(args=None):
rclpy.init(args=args)
node = PlannerService()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Service Client Implementation
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from your_package.srv import PlanTrajectory
class PlannerClient(Node):
def __init__(self):
super().__init__('planner_client')
self.client = self.create_client(PlanTrajectory, 'plan_trajectory')
# Wait for service to be available
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
def call_planner(self, start, goal, max_velocity):
request = PlanTrajectory.Request()
request.start = start
request.goal = goal
request.max_velocity = max_velocity
future = self.client.call_async(request)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response = future.result()
if response.success:
self.get_logger().info(f'Path planned: {response.message}')
return response.path
else:
self.get_logger().error(f'Planning failed: {response.message}')
else:
self.get_logger().error('Service call failed')
return None
def main(args=None):
rclpy.init(args=args)
node = PlannerClient()
start = Point(x=0.0, y=0.0, z=0.0)
goal = Point(x=5.0, y=3.0, z=0.0)
path = node.call_planner(start, goal, max_velocity=1.0)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Key points:
wait_for_service(): Wait for server to be availablecall_async(): Non-blocking call (returns future)spin_until_future_complete(): Wait for response- Always check
future.result()for errors
2.4 Actions: Goal-Oriented Communication
When to Use Actions
Actions are for asynchronous, long-running tasks with feedback:
- Goal: Client sends goal (e.g., "navigate to position X")
- Feedback: Server reports progress (e.g., "50% complete")
- Result: Server returns final outcome
When to use actions:
- ✅ Long-running tasks (navigation, manipulation)
- ✅ Tasks with progress updates
- ✅ Tasks that can be cancelled
When NOT to use actions:
- ❌ Quick queries (use services)
- ❌ Continuous streams (use topics)
Action Definition (.action file)
Create action/Navigate.action:
# Goal
geometry_msgs/Point target
float64 max_velocity
---
# Result
bool success
string message
float64 distance_traveled
---
# Feedback
float64 progress_percent
geometry_msgs/Point current_position
Action Server Implementation
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from geometry_msgs.msg import Point
from your_package.action import Navigate
class NavigationActionServer(Node):
def __init__(self):
super().__init__('navigation_action_server')
self._action_server = ActionServer(
self,
Navigate,
'navigate_to_goal',
self.execute_callback
)
self.get_logger().info('Navigation action server ready')
def execute_callback(self, goal_handle):
self.get_logger().info(f'Executing navigation to {goal_handle.request.target}')
# Simulate navigation
feedback_msg = Navigate.Feedback()
result = Navigate.Result()
# Simulate progress
for i in range(10):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result.success = False
result.message = 'Navigation cancelled'
return result
feedback_msg.progress_percent = (i + 1) * 10.0
feedback_msg.current_position = Point(x=float(i), y=0.0, z=0.0)
goal_handle.publish_feedback(feedback_msg)
# Simulate work
self.get_clock().sleep_for(rclpy.duration.Duration(seconds=1))
goal_handle.succeed()
result.success = True
result.message = 'Navigation completed'
result.distance_traveled = 10.0
return result
def main(args=None):
rclpy.init(args=args)
node = NavigationActionServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Action Client Implementation
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from geometry_msgs.msg import Point
from your_package.action import Navigate
class NavigationActionClient(Node):
def __init__(self):
super().__init__('navigation_action_client')
self._action_client = ActionClient(self, Navigate, 'navigate_to_goal')
def send_goal(self, target, max_velocity):
self.get_logger().info('Waiting for action server...')
self._action_client.wait_for_server()
goal_msg = Navigate.Goal()
goal_msg.target = target
goal_msg.max_velocity = max_velocity
self.get_logger().info(f'Sending goal: {target}')
send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
rclpy.spin_until_future_complete(self, send_goal_future)
goal_handle = send_goal_future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted, executing...')
get_result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, get_result_future)
result = get_result_future.result().result
if result.success:
self.get_logger().info(f'Navigation succeeded: {result.message}')
else:
self.get_logger().error(f'Navigation failed: {result.message}')
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Progress: {feedback.progress_percent}%')
def main(args=None):
rclpy.init(args=args)
node = NavigationActionClient()
target = Point(x=5.0, y=3.0, z=0.0)
node.send_goal(target, max_velocity=1.0)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
2.5 System Example: 3-Node Pipeline
Here's a complete example integrating all patterns:
Node 1: SensorSimulator (Publisher)
- Publishes fake sensor data (position, velocity) on
/sensor_data
Node 2: Planner (Service Server)
- Subscribes to
/sensor_data - Provides
/plan_trajectoryservice - Computes trajectory from current state to goal
Node 3: MotorController (Service Client + Publisher)
- Calls
/plan_trajectoryservice - Publishes motor commands on
/motor_commands
Data flow:
SensorSimulator → /sensor_data → Planner
↓
/plan_trajectory (service)
↓
MotorController ← /motor_commands ← (publishes commands)
This pattern demonstrates:
- Topics for continuous sensor streams
- Services for discrete planning queries
- Separation of concerns: Each node has single responsibility
2.6 Timing & QoS Best Practices
Timing Considerations
- Control loops: Run at high frequency (50-500 Hz)
- Perception: Can be slower (10-30 Hz)
- Planning: Often asynchronous (on-demand)
Use timers for periodic tasks:
# 10 Hz control loop
self.timer = self.create_timer(0.1, self.control_callback)
# 30 Hz perception loop
self.timer = self.create_timer(1.0/30.0, self.perception_callback)
Avoid blocking in callbacks:
# BAD: Blocking operation in callback
def callback(self, msg):
time.sleep(1.0) # Blocks entire node!
# GOOD: Use async or timers
def callback(self, msg):
# Quick processing only
self.process_data(msg)
QoS Matching
Critical rule: Publishers and subscribers must have compatible QoS:
- Reliable ↔ Reliable: ✅ Compatible
- Best-effort ↔ Best-effort: ✅ Compatible
- Reliable ↔ Best-effort: ❌ Incompatible (won't connect)
Always specify QoS explicitly:
# Explicit QoS prevents mismatches
qos = QoSProfile(reliability=ReliabilityPolicy.RELIABLE, depth=10)
publisher = self.create_publisher(String, 'topic', qos)
2.7 Debugging Tools
Command-Line Tools
# List all nodes
ros2 node list
# List all topics
ros2 topic list
# Echo topic messages
ros2 topic echo /sensor_data
# Check topic frequency
ros2 topic hz /sensor_data
# Check topic bandwidth
ros2 topic bw /sensor_data
# List services
ros2 service list
# Call service
ros2 service call /plan_trajectory your_package/srv/PlanTrajectory "{start: {x: 0.0, y: 0.0}, goal: {x: 5.0, y: 3.0}}"
# List actions
ros2 action list
# Send action goal
ros2 action send_goal /navigate_to_goal your_package/action/Navigate "{target: {x: 5.0, y: 3.0}}"
Visualization Tools
- rqt_graph: Visualize computation graph
- rqt_topic: Inspect topic messages
- rqt_service: Call services interactively
Summary
Choose the right pattern:
- Topics: Continuous streams, one-to-many, asynchronous
- Services: Discrete queries, one-to-one, synchronous
- Actions: Long-running tasks, progress feedback, cancellable
Best practices:
- ✅ Single responsibility per node
- ✅ Explicit QoS profiles
- ✅ Avoid blocking in callbacks
- ✅ Use appropriate executors (single-threaded for control)
- ✅ Handle errors and timeouts
- ✅ Use namespaces for organization
The next topic covers Launch Files, URDF, and RViz for orchestrating and visualizing complete systems.