This is a practical guide to adding persistent memory to ROS2 robots. We'll implement a memory node that records failures, tracks waypoints, and learns from operational experience.
Prerequisites
- ROS2 Humble or later
- Python 3.8+
- shodh-memory:
pip install shodh-memory
Step 1: Create the Memory Node
We'll create a ROS2 node that wraps the memory system and exposes it through services:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from shodh_memory import MemorySystem, Position
from robot_interfaces.srv import RecordMemory, RecallMemory
from robot_interfaces.msg import RobotState
class MemoryNode(Node):
def __init__(self):
super().__init__('robot_memory')
# Initialize memory system with persistent storage
memory_path = self.declare_parameter('memory_path', '/robot/memory').value
self.memory = MemorySystem(memory_path)
self.get_logger().info(f'Memory initialized at {memory_path}')
# Services for recording and recalling
self.record_srv = self.create_service(
RecordMemory, 'record_memory', self.record_callback)
self.recall_srv = self.create_service(
RecallMemory, 'recall_memory', self.recall_callback)
# Subscribe to robot state for automatic failure detection
self.state_sub = self.create_subscription(
RobotState, '/robot/state', self.state_callback, 10)
self.last_state = None
def record_callback(self, request, response):
memory_id = self.memory.record(
content=request.content,
type=request.memory_type or "Observation"
)
response.success = True
response.memory_id = memory_id
return response
def recall_callback(self, request, response):
results = self.memory.recall(request.query, limit=request.limit or 5)
response.memories = [
{"id": r["id"], "content": r["content"], "importance": r.get("importance", 0)}
for r in results
]
return response
def state_callback(self, msg):
if self.last_state is None:
self.last_state = msg
return
# Detect state transitions that indicate failures
if self.last_state.operational and not msg.operational:
self.memory.record_failure(
description=f"Robot went non-operational: {msg.error_code}",
severity="high" if msg.emergency_stop else "medium",
root_cause=msg.error_description
)
self.get_logger().warn(f'Recorded failure: {msg.error_code}')
self.last_state = msg
def main():
rclpy.init()
node = MemoryNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Step 2: Define Service Interfaces
Create the service definitions in your robot_interfaces package:
string content
string memory_type
---
bool success
string memory_idstring query
int32 limit
---
string[] memoriesStep 3: Waypoint Memory
Record visited waypoints with success/failure status:
from shodh_memory import MemorySystem, Position
class WaypointTracker:
def __init__(self, memory: MemorySystem):
self.memory = memory
def record_waypoint(self, name: str, pos: tuple, status: str, notes: str = ""):
position = Position(x=pos[0], y=pos[1], z=pos[2] if len(pos) > 2 else 0.0)
self.memory.record_waypoint(
name=name,
position=position,
status=status, # "reached", "failed", "skipped"
notes=notes
)
def get_problem_waypoints(self) -> list:
"""Find waypoints that frequently fail"""
results = self.memory.recall("waypoint failed", limit=20)
# Count failures per waypoint name
failure_counts = {}
for r in results:
# Extract waypoint name from memory content
if "waypoint" in r["content"].lower():
name = self.extract_waypoint_name(r["content"])
failure_counts[name] = failure_counts.get(name, 0) + 1
# Return waypoints with 2+ failures
return [name for name, count in failure_counts.items() if count >= 2]
def extract_waypoint_name(self, content: str) -> str:
# Simple extraction - customize for your naming scheme
import re
match = re.search(r'waypoint[\s:]+([\w-]+)', content, re.I)
return match.group(1) if match else "unknown"Step 4: Learning from Experience
The Hebbian learning mechanism means frequently co-accessed memories strengthen their connections. When you recall "navigation failure" and a specific sensor reading appears in multiple results, that association gets stronger.
def diagnose_failure(memory: MemorySystem, symptoms: str) -> list:
"""
Given current symptoms, find relevant past failures and their resolutions.
Hebbian learning ensures frequently successful patterns rank higher.
"""
# Search for similar past issues
similar_failures = memory.recall(
f"failure {symptoms}",
limit=10
)
# For each failure, find associated resolutions
resolutions = []
for failure in similar_failures:
# Search for decisions made after this failure
followup = memory.recall(
f"decision after {failure['id'][:8]}",
limit=3
)
if followup:
resolutions.append({
"failure": failure["content"],
"resolution": followup[0]["content"],
"confidence": failure.get("importance", 0) * followup[0].get("importance", 0)
})
return sorted(resolutions, key=lambda x: x["confidence"], reverse=True)Step 5: Launch Configuration
Add the memory node to your robot's launch file:
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='robot_memory',
executable='memory_node',
name='robot_memory',
parameters=[{
'memory_path': '/robot/persistent_memory'
}],
output='screen'
),
# ... other nodes
])Memory Maintenance
The system runs background consolidation automatically. For long-running robots, old memories compress into semantic facts:
- Individual "navigation failed at waypoint X" events consolidate into "waypoint X has reliability issues"
- Repeated sensor readings consolidate into "LiDAR unreliable in dusty conditions"
- Activation decay prunes memories that haven't been accessed in weeks
The result: a robot that gets more reliable over time because it remembers what went wrong and what fixed it.