Back to Blog
Tutorial

Building Autonomous Robots with Persistent Memory: A ROS2 Implementation Guide

December 4, 202512 min readBy Shodh Team · Engineering
ROS2autonomous-systemsHebbian-learningPythonrobotics

This is a practical guide to adding persistent memory to ROS2 robots. We'll implement a memory node that records failures, tracks waypoints, and learns from operational experience.

Prerequisites

  • ROS2 Humble or later
  • Python 3.8+
  • shodh-memory: pip install shodh-memory

Step 1: Create the Memory Node

We'll create a ROS2 node that wraps the memory system and exposes it through services:

memory_node.pypython
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from shodh_memory import MemorySystem, Position

from robot_interfaces.srv import RecordMemory, RecallMemory
from robot_interfaces.msg import RobotState

class MemoryNode(Node):
    def __init__(self):
        super().__init__('robot_memory')

        # Initialize memory system with persistent storage
        memory_path = self.declare_parameter('memory_path', '/robot/memory').value
        self.memory = MemorySystem(memory_path)
        self.get_logger().info(f'Memory initialized at {memory_path}')

        # Services for recording and recalling
        self.record_srv = self.create_service(
            RecordMemory, 'record_memory', self.record_callback)
        self.recall_srv = self.create_service(
            RecallMemory, 'recall_memory', self.recall_callback)

        # Subscribe to robot state for automatic failure detection
        self.state_sub = self.create_subscription(
            RobotState, '/robot/state', self.state_callback, 10)

        self.last_state = None

    def record_callback(self, request, response):
        memory_id = self.memory.record(
            content=request.content,
            type=request.memory_type or "Observation"
        )
        response.success = True
        response.memory_id = memory_id
        return response

    def recall_callback(self, request, response):
        results = self.memory.recall(request.query, limit=request.limit or 5)
        response.memories = [
            {"id": r["id"], "content": r["content"], "importance": r.get("importance", 0)}
            for r in results
        ]
        return response

    def state_callback(self, msg):
        if self.last_state is None:
            self.last_state = msg
            return

        # Detect state transitions that indicate failures
        if self.last_state.operational and not msg.operational:
            self.memory.record_failure(
                description=f"Robot went non-operational: {msg.error_code}",
                severity="high" if msg.emergency_stop else "medium",
                root_cause=msg.error_description
            )
            self.get_logger().warn(f'Recorded failure: {msg.error_code}')

        self.last_state = msg

def main():
    rclpy.init()
    node = MemoryNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 2: Define Service Interfaces

Create the service definitions in your robot_interfaces package:

srv/RecordMemory.srvtext
string content
string memory_type
---
bool success
string memory_id
srv/RecallMemory.srvtext
string query
int32 limit
---
string[] memories

Step 3: Waypoint Memory

Record visited waypoints with success/failure status:

waypoint_tracker.pypython
from shodh_memory import MemorySystem, Position

class WaypointTracker:
    def __init__(self, memory: MemorySystem):
        self.memory = memory

    def record_waypoint(self, name: str, pos: tuple, status: str, notes: str = ""):
        position = Position(x=pos[0], y=pos[1], z=pos[2] if len(pos) > 2 else 0.0)

        self.memory.record_waypoint(
            name=name,
            position=position,
            status=status,  # "reached", "failed", "skipped"
            notes=notes
        )

    def get_problem_waypoints(self) -> list:
        """Find waypoints that frequently fail"""
        results = self.memory.recall("waypoint failed", limit=20)

        # Count failures per waypoint name
        failure_counts = {}
        for r in results:
            # Extract waypoint name from memory content
            if "waypoint" in r["content"].lower():
                name = self.extract_waypoint_name(r["content"])
                failure_counts[name] = failure_counts.get(name, 0) + 1

        # Return waypoints with 2+ failures
        return [name for name, count in failure_counts.items() if count >= 2]

    def extract_waypoint_name(self, content: str) -> str:
        # Simple extraction - customize for your naming scheme
        import re
        match = re.search(r'waypoint[\s:]+([\w-]+)', content, re.I)
        return match.group(1) if match else "unknown"

Step 4: Learning from Experience

The Hebbian learning mechanism means frequently co-accessed memories strengthen their connections. When you recall "navigation failure" and a specific sensor reading appears in multiple results, that association gets stronger.

diagnostic_helper.pypython
def diagnose_failure(memory: MemorySystem, symptoms: str) -> list:
    """
    Given current symptoms, find relevant past failures and their resolutions.
    Hebbian learning ensures frequently successful patterns rank higher.
    """
    # Search for similar past issues
    similar_failures = memory.recall(
        f"failure {symptoms}",
        limit=10
    )

    # For each failure, find associated resolutions
    resolutions = []
    for failure in similar_failures:
        # Search for decisions made after this failure
        followup = memory.recall(
            f"decision after {failure['id'][:8]}",
            limit=3
        )
        if followup:
            resolutions.append({
                "failure": failure["content"],
                "resolution": followup[0]["content"],
                "confidence": failure.get("importance", 0) * followup[0].get("importance", 0)
            })

    return sorted(resolutions, key=lambda x: x["confidence"], reverse=True)

Step 5: Launch Configuration

Add the memory node to your robot's launch file:

robot_launch.pypython
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='robot_memory',
            executable='memory_node',
            name='robot_memory',
            parameters=[{
                'memory_path': '/robot/persistent_memory'
            }],
            output='screen'
        ),
        # ... other nodes
    ])

Memory Maintenance

The system runs background consolidation automatically. For long-running robots, old memories compress into semantic facts:

  • Individual "navigation failed at waypoint X" events consolidate into "waypoint X has reliability issues"
  • Repeated sensor readings consolidate into "LiDAR unreliable in dusty conditions"
  • Activation decay prunes memories that haven't been accessed in weeks

The result: a robot that gets more reliable over time because it remembers what went wrong and what fixed it.

Blog | Shodh | Shodh RAG