Skip to main content

AI Assistant

Physical AI & Humanoid Robotics

Hello! I'm your AI assistant for the AI-Native Guide to Physical AI & Humanoid Robotics. How can I help you today?

04:57 AM

Topic 2 — Nodes, Topics, Services, and Actions: Deep Dive

This topic provides comprehensive coverage of ROS 2's four core communication patterns: Nodes, Topics, Services, and Actions. You will learn when to use each pattern, how to implement them in Python (rclpy), and best practices for building robust robotic systems.


2.1 Nodes: The Building Blocks

What is a ROS 2 Node?

A node is an independent process that performs a specific task. Nodes are the fundamental unit of computation in ROS 2. Each node should have:

  • Single responsibility: One clear purpose (e.g., "publish camera images" or "plan trajectories")
  • Well-defined interfaces: Clear inputs (subscriptions) and outputs (publications)
  • Independent lifecycle: Can start, stop, and restart without affecting other nodes

Node Lifecycle

ROS 2 nodes follow a lifecycle state machine:

Uninitialized → Inactive → Active → Inactive → Finalized
↑ ↓
└─────────┘

States:

  1. Uninitialized: Node object created but not configured
  2. Inactive: Node configured but not processing (paused)
  3. Active: Node running and processing callbacks
  4. Finalized: Node cleaned up and shut down

Transitions:

  • configure(): Initialize node (load parameters, setup resources)
  • activate(): Start processing (begin publishing/subscribing)
  • deactivate(): Stop processing (pause, but keep state)
  • cleanup(): Clean up resources
  • shutdown(): Final shutdown

Creating a Minimal Node

Here's a minimal ROS 2 node in Python:

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node

class MinimalNode(Node):
def __init__(self):
super().__init__('minimal_node')
self.get_logger().info('Minimal node started')

def main(args=None):
rclpy.init(args=args)
node = MinimalNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Key components:

  • rclpy.init(): Initialize ROS 2
  • super().__init__('node_name'): Create node with name
  • rclpy.spin(node): Keep node alive, process callbacks
  • node.destroy_node(): Clean up
  • rclpy.shutdown(): Shutdown ROS 2

Node Executors

Executors control how callbacks are processed:

SingleThreadedExecutor (default):

from rclpy.executors import SingleThreadedExecutor

executor = SingleThreadedExecutor()
executor.add_node(node)
executor.spin()
  • Deterministic: Callbacks run sequentially
  • Real-time friendly: No thread contention
  • Use for: Control loops, time-critical nodes

MultiThreadedExecutor:

from rclpy.executors import MultiThreadedExecutor

executor = MultiThreadedExecutor()
executor.add_node(node)
executor.spin()
  • Higher throughput: Parallel callback execution
  • Non-deterministic: Order depends on scheduling
  • Use for: Perception, planning (can tolerate jitter)

Node Naming and Namespaces

Namespaces organize nodes hierarchically:

  • /perception/camera_driver — Camera driver in perception namespace
  • /control/motor_controller — Motor controller in control namespace
  • /planning/navigator — Navigator in planning namespace

Best practices:

  • Use namespaces to group related nodes
  • Keep node names descriptive and consistent
  • Avoid deep nesting (max 2-3 levels)

2.2 Topics: Publish-Subscribe Communication

The Publish-Subscribe Pattern

Topics enable asynchronous, one-to-many communication:

  • Publishers send messages without knowing who receives them
  • Subscribers receive messages without knowing who sends them
  • Decoupling: Publishers and subscribers are independent

When to use topics:

  • ✅ Continuous data streams (sensor data, commands)
  • ✅ One-to-many communication (one camera, many subscribers)
  • ✅ Asynchronous communication (no blocking)

When NOT to use topics:

  • ❌ Request-response patterns (use services)
  • ❌ Long-running tasks with progress (use actions)
  • ❌ Synchronous queries (use services)

Creating a Publisher Node

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')
self.publisher_ = self.create_publisher(String, 'sensor_data', 10)
self.timer = self.create_timer(1.0, self.publish_callback)
self.counter = 0

def publish_callback(self):
msg = String()
msg.data = f'Sensor reading {self.counter}'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: {msg.data}')
self.counter += 1

def main(args=None):
rclpy.init(args=args)
node = SensorPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Key components:

  • create_publisher(msg_type, topic_name, queue_size): Create publisher
  • create_timer(period, callback): Periodic callback
  • publish(msg): Send message

Creating a Subscriber Node

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')
self.subscription = self.create_subscription(
String,
'sensor_data',
self.listener_callback,
10
)

def listener_callback(self, msg):
self.get_logger().info(f'Received: {msg.data}')

def main(args=None):
rclpy.init(args=args)
node = SensorSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Key components:

  • create_subscription(msg_type, topic_name, callback, queue_size): Create subscriber
  • Callback receives message as argument

Quality of Service (QoS) for Topics

QoS profiles control message delivery:

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

# Reliable delivery (for commands)
qos_reliable = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)

# Best-effort delivery (for high-frequency sensors)
qos_best_effort = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5
)

publisher = self.create_publisher(String, 'topic', qos_reliable)

QoS matching: Publishers and subscribers must have compatible QoS. If incompatible, they won't connect.

Common patterns:

  • Sensor data: Best-effort, volatile, keep last (depth=5)
  • Commands: Reliable, volatile, keep last (depth=10)
  • State: Reliable, transient local, keep last (depth=1)

2.3 Services: Request-Response Communication

When to Use Services

Services provide synchronous, request-response communication:

  • One-to-one: One client calls one server
  • Blocking: Client waits for response
  • Discrete: Used for queries, not continuous data

When to use services:

  • ✅ Discrete queries (get robot pose, plan trajectory)
  • ✅ Configuration changes (set parameters)
  • ✅ Synchronous operations (need immediate response)

When NOT to use services:

  • ❌ Continuous data streams (use topics)
  • ❌ Long-running tasks (use actions)
  • ❌ One-to-many communication (use topics)

Service Definition (.srv file)

Create srv/PlanTrajectory.srv:

# Request
geometry_msgs/Point start
geometry_msgs/Point goal
float64 max_velocity
---
# Response
nav_msgs/Path path
bool success
string message

Service Server Implementation

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from nav_msgs.msg import Path
from your_package.srv import PlanTrajectory

class PlannerService(Node):
def __init__(self):
super().__init__('planner_service')
self.srv = self.create_service(
PlanTrajectory,
'plan_trajectory',
self.plan_callback
)
self.get_logger().info('Planner service ready')

def plan_callback(self, request, response):
self.get_logger().info(f'Planning from {request.start} to {request.goal}')

# Simulate planning
# In real implementation, call your planner here
response.path = Path() # Populate with actual path
response.success = True
response.message = 'Path planned successfully'

return response

def main(args=None):
rclpy.init(args=args)
node = PlannerService()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Service Client Implementation

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from your_package.srv import PlanTrajectory

class PlannerClient(Node):
def __init__(self):
super().__init__('planner_client')
self.client = self.create_client(PlanTrajectory, 'plan_trajectory')

# Wait for service to be available
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')

def call_planner(self, start, goal, max_velocity):
request = PlanTrajectory.Request()
request.start = start
request.goal = goal
request.max_velocity = max_velocity

future = self.client.call_async(request)
rclpy.spin_until_future_complete(self, future)

if future.result() is not None:
response = future.result()
if response.success:
self.get_logger().info(f'Path planned: {response.message}')
return response.path
else:
self.get_logger().error(f'Planning failed: {response.message}')
else:
self.get_logger().error('Service call failed')
return None

def main(args=None):
rclpy.init(args=args)
node = PlannerClient()

start = Point(x=0.0, y=0.0, z=0.0)
goal = Point(x=5.0, y=3.0, z=0.0)
path = node.call_planner(start, goal, max_velocity=1.0)

node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Key points:

  • wait_for_service(): Wait for server to be available
  • call_async(): Non-blocking call (returns future)
  • spin_until_future_complete(): Wait for response
  • Always check future.result() for errors

2.4 Actions: Goal-Oriented Communication

When to Use Actions

Actions are for asynchronous, long-running tasks with feedback:

  • Goal: Client sends goal (e.g., "navigate to position X")
  • Feedback: Server reports progress (e.g., "50% complete")
  • Result: Server returns final outcome

When to use actions:

  • ✅ Long-running tasks (navigation, manipulation)
  • ✅ Tasks with progress updates
  • ✅ Tasks that can be cancelled

When NOT to use actions:

  • ❌ Quick queries (use services)
  • ❌ Continuous streams (use topics)

Action Definition (.action file)

Create action/Navigate.action:

# Goal
geometry_msgs/Point target
float64 max_velocity
---
# Result
bool success
string message
float64 distance_traveled
---
# Feedback
float64 progress_percent
geometry_msgs/Point current_position

Action Server Implementation

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from geometry_msgs.msg import Point
from your_package.action import Navigate

class NavigationActionServer(Node):
def __init__(self):
super().__init__('navigation_action_server')
self._action_server = ActionServer(
self,
Navigate,
'navigate_to_goal',
self.execute_callback
)
self.get_logger().info('Navigation action server ready')

def execute_callback(self, goal_handle):
self.get_logger().info(f'Executing navigation to {goal_handle.request.target}')

# Simulate navigation
feedback_msg = Navigate.Feedback()
result = Navigate.Result()

# Simulate progress
for i in range(10):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result.success = False
result.message = 'Navigation cancelled'
return result

feedback_msg.progress_percent = (i + 1) * 10.0
feedback_msg.current_position = Point(x=float(i), y=0.0, z=0.0)
goal_handle.publish_feedback(feedback_msg)

# Simulate work
self.get_clock().sleep_for(rclpy.duration.Duration(seconds=1))

goal_handle.succeed()
result.success = True
result.message = 'Navigation completed'
result.distance_traveled = 10.0
return result

def main(args=None):
rclpy.init(args=args)
node = NavigationActionServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Action Client Implementation

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from geometry_msgs.msg import Point
from your_package.action import Navigate

class NavigationActionClient(Node):
def __init__(self):
super().__init__('navigation_action_client')
self._action_client = ActionClient(self, Navigate, 'navigate_to_goal')

def send_goal(self, target, max_velocity):
self.get_logger().info('Waiting for action server...')
self._action_client.wait_for_server()

goal_msg = Navigate.Goal()
goal_msg.target = target
goal_msg.max_velocity = max_velocity

self.get_logger().info(f'Sending goal: {target}')
send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)

rclpy.spin_until_future_complete(self, send_goal_future)
goal_handle = send_goal_future.result()

if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return

self.get_logger().info('Goal accepted, executing...')
get_result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, get_result_future)

result = get_result_future.result().result
if result.success:
self.get_logger().info(f'Navigation succeeded: {result.message}')
else:
self.get_logger().error(f'Navigation failed: {result.message}')

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Progress: {feedback.progress_percent}%')

def main(args=None):
rclpy.init(args=args)
node = NavigationActionClient()

target = Point(x=5.0, y=3.0, z=0.0)
node.send_goal(target, max_velocity=1.0)

node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

2.5 System Example: 3-Node Pipeline

Here's a complete example integrating all patterns:

Node 1: SensorSimulator (Publisher)

  • Publishes fake sensor data (position, velocity) on /sensor_data

Node 2: Planner (Service Server)

  • Subscribes to /sensor_data
  • Provides /plan_trajectory service
  • Computes trajectory from current state to goal

Node 3: MotorController (Service Client + Publisher)

  • Calls /plan_trajectory service
  • Publishes motor commands on /motor_commands

Data flow:

SensorSimulator → /sensor_data → Planner

/plan_trajectory (service)

MotorController ← /motor_commands ← (publishes commands)

This pattern demonstrates:

  • Topics for continuous sensor streams
  • Services for discrete planning queries
  • Separation of concerns: Each node has single responsibility

2.6 Timing & QoS Best Practices

Timing Considerations

  • Control loops: Run at high frequency (50-500 Hz)
  • Perception: Can be slower (10-30 Hz)
  • Planning: Often asynchronous (on-demand)

Use timers for periodic tasks:

# 10 Hz control loop
self.timer = self.create_timer(0.1, self.control_callback)

# 30 Hz perception loop
self.timer = self.create_timer(1.0/30.0, self.perception_callback)

Avoid blocking in callbacks:

# BAD: Blocking operation in callback
def callback(self, msg):
time.sleep(1.0) # Blocks entire node!

# GOOD: Use async or timers
def callback(self, msg):
# Quick processing only
self.process_data(msg)

QoS Matching

Critical rule: Publishers and subscribers must have compatible QoS:

  • ReliableReliable: ✅ Compatible
  • Best-effortBest-effort: ✅ Compatible
  • ReliableBest-effort: ❌ Incompatible (won't connect)

Always specify QoS explicitly:

# Explicit QoS prevents mismatches
qos = QoSProfile(reliability=ReliabilityPolicy.RELIABLE, depth=10)
publisher = self.create_publisher(String, 'topic', qos)

2.7 Debugging Tools

Command-Line Tools

# List all nodes
ros2 node list

# List all topics
ros2 topic list

# Echo topic messages
ros2 topic echo /sensor_data

# Check topic frequency
ros2 topic hz /sensor_data

# Check topic bandwidth
ros2 topic bw /sensor_data

# List services
ros2 service list

# Call service
ros2 service call /plan_trajectory your_package/srv/PlanTrajectory "{start: {x: 0.0, y: 0.0}, goal: {x: 5.0, y: 3.0}}"

# List actions
ros2 action list

# Send action goal
ros2 action send_goal /navigate_to_goal your_package/action/Navigate "{target: {x: 5.0, y: 3.0}}"

Visualization Tools

  • rqt_graph: Visualize computation graph
  • rqt_topic: Inspect topic messages
  • rqt_service: Call services interactively

Summary

Choose the right pattern:

  • Topics: Continuous streams, one-to-many, asynchronous
  • Services: Discrete queries, one-to-one, synchronous
  • Actions: Long-running tasks, progress feedback, cancellable

Best practices:

  • ✅ Single responsibility per node
  • ✅ Explicit QoS profiles
  • ✅ Avoid blocking in callbacks
  • ✅ Use appropriate executors (single-threaded for control)
  • ✅ Handle errors and timeouts
  • ✅ Use namespaces for organization

The next topic covers Launch Files, URDF, and RViz for orchestrating and visualizing complete systems.