diff --git a/agents.py b/agents.py new file mode 100644 index 0000000..253f48a --- /dev/null +++ b/agents.py @@ -0,0 +1,480 @@ +""" +Agent classes for reinforcement learning and memory-augmented agents. + +This module defines several agent classes for use in environments such as mazes or gridworlds: + +- SimpleAgent: A basic Q-learning agent with epsilon-greedy action selection. +- MemoryAgent: An agent that augments Q-learning with episodic memory, using a MemorySpace for storing and retrieving experiences to inform decisions. +- RandomAgent: An agent that selects actions randomly, for baseline comparison. + +Agents can be used with or without memory, and support demonstration paths for scripted exploration. The MemoryAgent leverages a memory system for enhanced learning and recall of past experiences. +""" + +import numpy as np + +from memory import ( + MemoryConfig, + MemorySpace, + RedisIMConfig, + RedisSTMConfig, + SQLiteLTMConfig, +) +from memory.utils.util import convert_numpy_to_python + + +# Base agent class without hooks +class SimpleAgent: + def __init__( + self, + agent_id: str, + action_space: int = 4, + learning_rate: float = 0.1, + discount_factor: float = 0.9, + **kwargs, + ) -> None: + """ + Initialize a SimpleAgent for reinforcement learning. + + Args: + agent_id (str): Unique identifier for the agent. + action_space (int): Number of possible actions. + learning_rate (float): Q-learning learning rate. + discount_factor (float): Q-learning discount factor. + **kwargs: Additional arguments (unused). + """ + self.agent_id = agent_id + self.action_space = action_space + self.learning_rate = learning_rate + self.discount_factor = discount_factor + self.q_table = {} # State-action values + self.current_observation = None + self.demo_path = None # For scripted demo actions + self.demo_step = 0 + self.step_number = 0 + + def _get_state_key(self, observation: dict) -> str: + """ + Generate a unique key for a given observation/state. + + Args: + observation (dict): The environment observation. + + Returns: + str: A string key representing the state. + """ + return ( + f"{observation['position']}|{observation['target']}|{observation['steps']}" + ) + + def select_action(self, observation: dict, epsilon: float = 0.1) -> int: + """ + Select an action using an epsilon-greedy policy or a demonstration path. + + Args: + observation (dict): The current environment observation. + epsilon (float): Probability of choosing a random action (exploration). + + Returns: + int: The selected action index. + """ + self.current_observation = observation + state_key = self._get_state_key(observation) + + # Initialize state if not seen before + if state_key not in self.q_table: + self.q_table[state_key] = np.zeros(self.action_space) + + # If we have a demo path, follow it first to ensure we explore the correct path + if self.demo_path is not None and self.demo_step < len(self.demo_path): + action = self.demo_path[self.demo_step] + self.demo_step += 1 + return action + + # Epsilon-greedy policy + if np.random.random() < epsilon: + return np.random.randint(self.action_space) + else: + return np.argmax(self.q_table[state_key]) + + def update_q_value( + self, + observation: dict, + action: int, + reward: float, + next_observation: dict, + done: bool, + ) -> None: + """ + Update the Q-value for a state-action pair using the Q-learning rule. + + Args: + observation (dict): The current state observation. + action (int): The action taken. + reward (float): The reward received. + next_observation (dict): The next state observation. + done (bool): Whether the episode has ended. + """ + state_key = self._get_state_key(observation) + next_state_key = self._get_state_key(next_observation) + + # Initialize next state if not seen before + if next_state_key not in self.q_table: + self.q_table[next_state_key] = np.zeros(self.action_space) + + # Q-learning update + current_q = self.q_table[state_key][action] + + if done: + max_next_q = 0 + else: + max_next_q = np.max(self.q_table[next_state_key]) + + new_q = current_q + self.learning_rate * ( + reward + self.discount_factor * max_next_q - current_q + ) + self.q_table[state_key][action] = new_q + + def act(self, observation: dict, epsilon: float = 0.1) -> int: + """ + Choose and return an action for the given observation. + + Args: + observation (dict): The current environment observation. + epsilon (float): Probability of choosing a random action (exploration). + + Returns: + int: The selected action index. + """ + self.step_number += 1 + # Convert NumPy types to Python types + self.current_observation = convert_numpy_to_python(observation) + action = self.select_action(self.current_observation, epsilon) + return int(action) # Return as integer instead of ActionResult + + def set_demo_path(self, path: list[int]) -> None: + """ + Set a predetermined path of actions for demonstration or scripted exploration. + + Args: + path (list[int]): List of action indices to follow. + """ + self.demo_path = path + self.demo_step = 0 + + +# Memory-enhanced agent using MemorySpace directly +class MemoryAgent(SimpleAgent): + def __init__( + self, + agent_id: str, + action_space: int = 4, + learning_rate: float = 0.1, + discount_factor: float = 0.9, + **kwargs, + ) -> None: + """ + Initialize a MemoryAgent that augments Q-learning with episodic memory. + + Args: + agent_id (str): Unique identifier for the agent. + action_space (int): Number of possible actions. + learning_rate (float): Q-learning learning rate. + discount_factor (float): Q-learning discount factor. + **kwargs: Additional arguments (unused). + """ + super().__init__( + agent_id=agent_id, + action_space=action_space, + learning_rate=learning_rate, + discount_factor=discount_factor, + **kwargs, + ) + + memory_config = MemoryConfig( + stm_config=RedisSTMConfig( + ttl=120, # Increase TTL to keep more memories active + memory_limit=500, # Increase memory limit + use_mock=True, # Use mock Redis for easy setup + ), + im_config=RedisIMConfig( + ttl=240, # Longer TTL for IM + memory_limit=1000, # Larger memory limit + compression_level=0, # No compression for IM + use_mock=True, # Use mock Redis for easy setup + ), + ltm_config=SQLiteLTMConfig( + compression_level=0, # No compression for LTM + batch_size=20, # Larger batch size + db_path="memory_demo.db", # Use a real file for SQLite + ), + cleanup_interval=1000, # Reduce cleanup frequency + enable_memory_hooks=False, # Disable memory hooks since we're using direct API calls + use_embedding_engine=True, # Enable embedding engine for similarity search + text_model_name="all-MiniLM-L6-v2", # Use a default text embedding model + ) + # Store the memory system and get the memory space for this agent + self.memory_space = MemorySpace(agent_id, memory_config) + + # Keep track of visited states to avoid redundant storage + self.visited_states = set() + # Add memory cache for direct position lookups + self.position_memory_cache = {} # Mapping from positions to memories + + def select_action(self, observation: dict, epsilon: float = 0.1) -> int: + """ + Select an action using memory-augmented Q-learning and experience recall. + + Args: + observation (dict): The current environment observation. + epsilon (float): Probability of choosing a random action (exploration). + + Returns: + int: The selected action index. + """ + self.current_observation = observation + state_key = self._get_state_key(observation) + position_key = str(observation["position"]) # Use position as direct lookup key + + # Initialize state if not seen before + if state_key not in self.q_table: + self.q_table[state_key] = np.zeros(self.action_space) + + # If we have a demo path, follow it first to ensure we explore the correct path + if self.demo_path is not None and self.demo_step < len(self.demo_path): + action = self.demo_path[self.demo_step] + self.demo_step += 1 + return action + + # Try to retrieve similar experiences from memory + try: + # Store current state if not already visited + if state_key not in self.visited_states: + # Enhanced state representation + enhanced_state = { + "position": observation["position"], + "target": observation["target"], + "steps": observation["steps"], + "nearby_obstacles": observation["nearby_obstacles"], + "manhattan_distance": abs( + observation["position"][0] - observation["target"][0] + ) + + abs(observation["position"][1] - observation["target"][1]), + "state_key": state_key, + "position_key": position_key, # Add position key for direct lookup + } + self.memory_space.store_state( + state_data=convert_numpy_to_python(enhanced_state), + step_number=self.step_number, + priority=0.7, # Medium priority for state + ) + self.visited_states.add(state_key) + + # Create a query with the enhanced state features + query_state = { + "position": observation["position"], + "target": observation["target"], + "steps": observation["steps"], + "manhattan_distance": abs( + observation["position"][0] - observation["target"][0] + ) + + abs(observation["position"][1] - observation["target"][1]), + } + + # Use search strategy directly + similar_states = self.memory_space.retrieve_similar_states( + query_state=query_state, + k=10, # Increase from 5 to 10 to find more candidates + memory_type="state", + ) + + # Direct position-based lookup as fallback + if len(similar_states) == 0: + # Try direct lookup from our position memory cache + if position_key in self.position_memory_cache: + direct_memories = self.position_memory_cache[position_key] + similar_states = direct_memories + + for s in similar_states: + # Update our position memory cache with this memory for future direct lookups + mem_position = None + if "position" in s.get("content", {}): + mem_position = str(s["content"]["position"]) + elif "next_state" in s.get("content", {}): + mem_position = str(s["content"]["next_state"]) + + if mem_position: + if mem_position not in self.position_memory_cache: + self.position_memory_cache[mem_position] = [] + if s not in self.position_memory_cache[mem_position]: + self.position_memory_cache[mem_position].append(s) + + # Strong bias toward using memory (higher than epsilon) + if similar_states and np.random.random() > 0.2: + # Use any experience with significant reward + actions_from_memory = [] + for s in similar_states: + # Consider any action with a reward, not just positive ones + if "action" in s.get("content", {}): + # Weight action by reward to prefer better outcomes + # Add the action multiple times based on reward magnitude + reward = s["content"].get("reward", -1) + # Consider any reward better than average + # Add actions with better rewards more times + weight = 1 + if reward > -2: # Better than the typical step penalty + weight = 3 + if reward > 0: # Positive rewards get even more weight + weight = 5 + + for _ in range(weight): + actions_from_memory.append(s["content"]["action"]) + + if actions_from_memory: + # Most common action from similar states, weighted by reward + chosen_action = max( + set(actions_from_memory), key=actions_from_memory.count + ) + return chosen_action + except Exception as e: + # Fallback to regular selection on any error + pass + + # Epsilon-greedy policy as fallback + if np.random.random() < epsilon: + action = np.random.randint(self.action_space) + return action + else: + action = np.argmax(self.q_table[state_key]) + return action + + def act(self, observation: dict, epsilon: float = 0.1) -> int: + """ + Choose and return an action for the given observation, storing the action in memory. + + Args: + observation (dict): The current environment observation. + epsilon (float): Probability of choosing a random action (exploration). + + Returns: + int: The selected action index. + """ + self.step_number += 1 + # Convert NumPy types to Python types + self.current_observation = convert_numpy_to_python(observation) + action = self.select_action(self.current_observation, epsilon) + + # Store the action using memory space + try: + # Include more context in the action data + position_key = str(observation["position"]) + action_data = { + "action": int(action), + "position": self.current_observation["position"], + "state_key": self._get_state_key(self.current_observation), + "steps": self.current_observation["steps"], + "position_key": position_key, + } + self.memory_space.store_action( + action_data=action_data, + step_number=self.step_number, + priority=0.6, # Medium priority + ) + + # Add to position cache + if position_key not in self.position_memory_cache: + self.position_memory_cache[position_key] = [] + + # Create a memory-like structure for our cache + memory_entry = {"content": action_data, "step_number": self.step_number} + + self.position_memory_cache[position_key].append(memory_entry) + + except Exception as e: + pass + + # Return action as integer + return int(action) + + def update_q_value( + self, + observation: dict, + action: int, + reward: float, + next_observation: dict, + done: bool, + ) -> None: + """ + Update the Q-value and store the reward and outcome in memory. + + Args: + observation (dict): The current state observation. + action (int): The action taken. + reward (float): The reward received. + next_observation (dict): The next state observation. + done (bool): Whether the episode has ended. + """ + # First, call the parent method to update Q-values + super().update_q_value(observation, action, reward, next_observation, done) + + # Then store the reward and outcome using memory space + try: + # Enhance interaction data with more context + position_key = str(observation["position"]) + next_position_key = str(next_observation["position"]) + + interaction_data = { + "action": int(action), + "reward": float(reward), + "next_state": convert_numpy_to_python(next_observation["position"]), + "done": done, + "state_key": self._get_state_key(observation), + "next_state_key": self._get_state_key(next_observation), + "steps": observation["steps"], + "manhattan_distance": abs( + observation["position"][0] - observation["target"][0] + ) + + abs(observation["position"][1] - observation["target"][1]), + "position_key": position_key, + "next_position_key": next_position_key, + } + + # Increase priority for successful interactions + priority = abs(float(reward)) / 100 # Base priority on reward magnitude + if done and reward > 0: # Successful completion + priority = 1.0 # Maximum priority + + self.memory_space.store_interaction( + interaction_data=interaction_data, + step_number=self.step_number, + priority=priority, + ) + + # Add to position cache - very important for successful experiences! + # This ensures we can directly lookup both the current and next positions + for pos_key in [position_key, next_position_key]: + if pos_key not in self.position_memory_cache: + self.position_memory_cache[pos_key] = [] + + # Create a memory-like structure for our cache + memory_entry = { + "content": interaction_data, + "step_number": self.step_number, + } + + self.position_memory_cache[pos_key].append(memory_entry) + + # If this was a successful completion, store it prominently + if done and reward > 0: + # Make extra copies in the cache to increase influence + for _ in range(10): # Store 10 copies to really emphasize this path + self.position_memory_cache[position_key].append(memory_entry) + + except Exception as e: + pass + + +# Random agent that chooses actions randomly +class RandomAgent(SimpleAgent): + def select_action(self, observation, epsilon=0.1): + self.current_observation = observation + return np.random.randint(self.action_space) diff --git a/main_demo.py b/main_demo.py index bb13a1a..d4ec0ba 100644 --- a/main_demo.py +++ b/main_demo.py @@ -1,449 +1,13 @@ -import json -import logging import os import matplotlib.pyplot as plt import numpy as np -# Configure logging - suppress specific warnings -logging.getLogger("memory.agent_memory").setLevel(logging.ERROR) -logging.getLogger("memory.storage.sqlite_ltm").setLevel(logging.ERROR) - -from memory import ( - AgentMemorySystem, - MemoryConfig, - RedisIMConfig, - RedisSTMConfig, - SQLiteLTMConfig, - MemorySpace, -) - - -# Helper function to convert NumPy types to native Python types for JSON serialization -def convert_numpy_to_python(obj): - """Convert NumPy types to standard Python types for JSON serialization.""" - if isinstance(obj, np.integer): - return int(obj) - elif isinstance(obj, np.floating): - return float(obj) - elif isinstance(obj, np.ndarray): - return obj.tolist() - elif isinstance(obj, dict): - return {k: convert_numpy_to_python(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [convert_numpy_to_python(item) for item in obj] - elif isinstance(obj, tuple): - return tuple(convert_numpy_to_python(item) for item in obj) - else: - return obj - - -# Define a simple maze environment -class MazeEnvironment: - def __init__(self, size=5, obstacles=None, max_steps=15): - self.size = size - self.obstacles = obstacles or [] - self.target = (size - 2, size - 2) - self.max_steps = max_steps - self.reset() - - def reset(self): - self.position = (1, 1) - self.steps = 0 - return self.get_observation() - - def get_observation(self): - return { - "position": self.position, - "target": self.target, - "nearby_obstacles": self._get_nearby_obstacles(), - "steps": self.steps, - } - - def _get_nearby_obstacles(self): - return [ - obs - for obs in self.obstacles - if abs(obs[0] - self.position[0]) <= 2 - and abs(obs[1] - self.position[1]) <= 2 - ] - - def step(self, action): - # Actions: 0=up, 1=right, 2=down, 3=left - directions = [(-1, 0), (0, 1), (1, 0), (0, -1)] - new_position = ( - self.position[0] + directions[action][0], - self.position[1] + directions[action][1], - ) - - # Check if valid move - if ( - 0 <= new_position[0] < self.size - and 0 <= new_position[1] < self.size - and new_position not in self.obstacles - ): - self.position = new_position - - self.steps += 1 - - # Calculate reward - if self.position == self.target: - reward = 100 # Success - done = True - elif self.steps >= self.max_steps: - reward = -50 # Timeout penalty - done = True - else: - # Manhattan distance to target - dist = abs(self.position[0] - self.target[0]) + abs( - self.position[1] - self.target[1] - ) - reward = -1 - (dist * 0.1) # Small step penalty with distance hint - done = False - - return self.get_observation(), reward, done - - -# Base agent class without hooks -class SimpleAgent: - def __init__( - self, - agent_id, - action_space=4, - learning_rate=0.1, - discount_factor=0.9, - **kwargs, - ): - self.agent_id = agent_id - self.action_space = action_space - self.learning_rate = learning_rate - self.discount_factor = discount_factor - self.q_table = {} # State-action values - self.current_observation = None - self.demo_path = None # For scripted demo actions - self.demo_step = 0 - self.step_number = 0 - - def _get_state_key(self, observation): - # Enhance state representation by including more context - # Include position, target position, and steps to make it more distinctive - return f"{observation['position']}|{observation['target']}|{observation['steps']}" - - def select_action(self, observation, epsilon=0.1): - self.current_observation = observation - state_key = self._get_state_key(observation) - - # Initialize state if not seen before - if state_key not in self.q_table: - self.q_table[state_key] = np.zeros(self.action_space) - - # If we have a demo path, follow it first to ensure we explore the correct path - if self.demo_path is not None and self.demo_step < len(self.demo_path): - action = self.demo_path[self.demo_step] - self.demo_step += 1 - return action - - # Epsilon-greedy policy - if np.random.random() < epsilon: - return np.random.randint(self.action_space) - else: - return np.argmax(self.q_table[state_key]) - - def update_q_value(self, observation, action, reward, next_observation, done): - state_key = self._get_state_key(observation) - next_state_key = self._get_state_key(next_observation) - - # Initialize next state if not seen before - if next_state_key not in self.q_table: - self.q_table[next_state_key] = np.zeros(self.action_space) - - # Q-learning update - current_q = self.q_table[state_key][action] - - if done: - max_next_q = 0 - else: - max_next_q = np.max(self.q_table[next_state_key]) - - new_q = current_q + self.learning_rate * ( - reward + self.discount_factor * max_next_q - current_q - ) - self.q_table[state_key][action] = new_q - - def act(self, observation, epsilon=0.1): - """Act method that returns action as integer""" - self.step_number += 1 - # Convert NumPy types to Python types - self.current_observation = convert_numpy_to_python(observation) - action = self.select_action(self.current_observation, epsilon) - return int(action) # Return as integer instead of ActionResult - - def set_demo_path(self, path): - """Set a predetermined path to follow for demonstration""" - self.demo_path = path - self.demo_step = 0 - - -# Memory-enhanced agent using MemorySpace directly -class MemoryEnhancedAgent(SimpleAgent): - def __init__( - self, - agent_id, - memory_system, - action_space=4, - learning_rate=0.1, - discount_factor=0.9, - **kwargs, - ): - super().__init__( - agent_id=agent_id, - action_space=action_space, - learning_rate=learning_rate, - discount_factor=discount_factor, - **kwargs, - ) - - memory_config = MemoryConfig( - stm_config=RedisSTMConfig( - ttl=120, # Increase TTL to keep more memories active - memory_limit=500, # Increase memory limit - use_mock=True, # Use mock Redis for easy setup - ), - im_config=RedisIMConfig( - ttl=240, # Longer TTL for IM - memory_limit=1000, # Larger memory limit - compression_level=0, # No compression for IM - use_mock=True, # Use mock Redis for easy setup - ), - ltm_config=SQLiteLTMConfig( - compression_level=0, # No compression for LTM - batch_size=20, # Larger batch size - db_path="memory_demo.db", # Use a real file for SQLite - ), - cleanup_interval=1000, # Reduce cleanup frequency - enable_memory_hooks=False, # Disable memory hooks since we're using direct API calls - use_embedding_engine=True, # Enable embedding engine for similarity search - text_model_name="all-MiniLM-L6-v2", # Use a default text embedding model - ) - # Store the memory system and get the memory space for this agent - self.memory_space = MemorySpace(agent_id, memory_config) - - # Keep track of visited states to avoid redundant storage - self.visited_states = set() - # Add memory cache for direct position lookups - self.position_memory_cache = {} # Mapping from positions to memories - - # Override select_action to use memory for better decisions - def select_action(self, observation, epsilon=0.1): - self.current_observation = observation - state_key = self._get_state_key(observation) - position_key = str(observation['position']) # Use position as direct lookup key - - # Initialize state if not seen before - if state_key not in self.q_table: - self.q_table[state_key] = np.zeros(self.action_space) - - # If we have a demo path, follow it first to ensure we explore the correct path - if self.demo_path is not None and self.demo_step < len(self.demo_path): - action = self.demo_path[self.demo_step] - self.demo_step += 1 - return action - - # Try to retrieve similar experiences from memory - try: - # Store current state if not already visited - if state_key not in self.visited_states: - # Enhanced state representation - enhanced_state = { - "position": observation["position"], - "target": observation["target"], - "steps": observation["steps"], - "nearby_obstacles": observation["nearby_obstacles"], - "manhattan_distance": abs(observation["position"][0] - observation["target"][0]) + - abs(observation["position"][1] - observation["target"][1]), - "state_key": state_key, - "position_key": position_key # Add position key for direct lookup - } - self.memory_space.store_state( - state_data=convert_numpy_to_python(enhanced_state), - step_number=self.step_number, - priority=0.7 # Medium priority for state - ) - self.visited_states.add(state_key) - - # Create a query with the enhanced state features - query_state = { - "position": observation["position"], - "target": observation["target"], - "steps": observation["steps"], - "manhattan_distance": abs(observation["position"][0] - observation["target"][0]) + - abs(observation["position"][1] - observation["target"][1]) - } - - similar_states = self.memory_space.retrieve_similar_states( - query_state=query_state, - k=10, # Increase from 5 to 10 to find more candidates - memory_type="state" - ) - - # NEW: Direct position-based lookup as fallback - if len(similar_states) == 0: - # Try direct lookup from our position memory cache - if position_key in self.position_memory_cache: - direct_memories = self.position_memory_cache[position_key] - similar_states = direct_memories - - for i, s in enumerate(similar_states): - # Update our position memory cache with this memory for future direct lookups - mem_position = None - if 'position' in s.get('content', {}): - mem_position = str(s['content']['position']) - elif 'next_state' in s.get('content', {}): - mem_position = str(s['content']['next_state']) - - if mem_position: - if mem_position not in self.position_memory_cache: - self.position_memory_cache[mem_position] = [] - if s not in self.position_memory_cache[mem_position]: - self.position_memory_cache[mem_position].append(s) - - # Strong bias toward using memory (higher than epsilon) - if similar_states and np.random.random() > 0.2: - # Use any experience with significant reward - actions_from_memory = [] - for s in similar_states: - # Consider any action with a reward, not just positive ones - if "action" in s.get("content", {}): - # Weight action by reward to prefer better outcomes - # Add the action multiple times based on reward magnitude - reward = s["content"].get("reward", -1) - # Consider any reward better than average - # Add actions with better rewards more times - weight = 1 - if reward > -2: # Better than the typical step penalty - weight = 3 - if reward > 0: # Positive rewards get even more weight - weight = 5 - - for _ in range(weight): - actions_from_memory.append(s["content"]["action"]) - - if actions_from_memory: - # Most common action from similar states, weighted by reward - chosen_action = max(set(actions_from_memory), key=actions_from_memory.count) - return chosen_action - except Exception as e: - # Fallback to regular selection on any error - pass - - # Epsilon-greedy policy as fallback - if np.random.random() < epsilon: - action = np.random.randint(self.action_space) - return action - else: - action = np.argmax(self.q_table[state_key]) - return action - - def act(self, observation, epsilon=0.1): - """Override act method to implement agent behavior with memory storage""" - self.step_number += 1 - # Convert NumPy types to Python types - self.current_observation = convert_numpy_to_python(observation) - action = self.select_action(self.current_observation, epsilon) - - # Store the action using memory space - try: - # Include more context in the action data - position_key = str(observation['position']) - action_data = { - "action": int(action), - "position": self.current_observation["position"], - "state_key": self._get_state_key(self.current_observation), - "steps": self.current_observation["steps"], - "position_key": position_key - } - self.memory_space.store_action( - action_data=action_data, - step_number=self.step_number, - priority=0.6 # Medium priority - ) - - # Add to position cache - if position_key not in self.position_memory_cache: - self.position_memory_cache[position_key] = [] - - # Create a memory-like structure for our cache - memory_entry = { - "content": action_data, - "step_number": self.step_number - } - - self.position_memory_cache[position_key].append(memory_entry) - - except Exception as e: - pass - - # Return action as integer - return int(action) - - def update_q_value(self, observation, action, reward, next_observation, done): - """Override to store rewards and outcomes using memory space""" - # First, call the parent method to update Q-values - super().update_q_value(observation, action, reward, next_observation, done) - - # Then store the reward and outcome using memory space - try: - # Enhance interaction data with more context - position_key = str(observation['position']) - next_position_key = str(next_observation['position']) - - interaction_data = { - "action": int(action), - "reward": float(reward), - "next_state": convert_numpy_to_python(next_observation["position"]), - "done": done, - "state_key": self._get_state_key(observation), - "next_state_key": self._get_state_key(next_observation), - "steps": observation["steps"], - "manhattan_distance": abs(observation["position"][0] - observation["target"][0]) + - abs(observation["position"][1] - observation["target"][1]), - "position_key": position_key, - "next_position_key": next_position_key - } - - # Increase priority for successful interactions - priority = abs(float(reward)) / 100 # Base priority on reward magnitude - if done and reward > 0: # Successful completion - priority = 1.0 # Maximum priority - - self.memory_space.store_interaction( - interaction_data=interaction_data, - step_number=self.step_number, - priority=priority - ) - - # Add to position cache - very important for successful experiences! - # This ensures we can directly lookup both the current and next positions - for pos_key in [position_key, next_position_key]: - if pos_key not in self.position_memory_cache: - self.position_memory_cache[pos_key] = [] - - # Create a memory-like structure for our cache - memory_entry = { - "content": interaction_data, - "step_number": self.step_number - } - - self.position_memory_cache[pos_key].append(memory_entry) - - # If this was a successful completion, store it prominently - if done and reward > 0: - # Make extra copies in the cache to increase influence - for _ in range(10): # Store 10 copies to really emphasize this path - self.position_memory_cache[position_key].append(memory_entry) - - except Exception as e: - pass +from agents import MemoryAgent, SimpleAgent +from maze import MazeEnvironment +from memory.config import MemoryConfig, RedisIMConfig, RedisSTMConfig, SQLiteLTMConfig +from memory.core import AgentMemorySystem +from memory.utils.util import convert_numpy_to_python # Create a demonstration path to reach the goal @@ -466,15 +30,26 @@ def run_experiment(episodes=100, memory_enabled=True, random_seed=None): # Create a maze with obstacles maze_size = 20 # Smaller maze obstacles = [ - (3, 3), (3, 4), (3, 5), # Horizontal wall - (7, 7), (8, 7), (9, 7), # Vertical wall - (12, 12), (12, 13), (13, 12), # L-shaped wall - (15, 15), (16, 16), (17, 17), # Diagonal wall - (5, 10), (10, 5), (15, 10), # Scattered obstacles + (3, 3), + (3, 4), + (3, 5), # Horizontal wall + (7, 7), + (8, 7), + (9, 7), # Vertical wall + (12, 12), + (12, 13), + (13, 12), # L-shaped wall + (15, 15), + (16, 16), + (17, 17), # Diagonal wall + (5, 10), + (10, 5), + (15, 10), # Scattered obstacles ] env = MazeEnvironment(size=maze_size, obstacles=obstacles, max_steps=500) # Create the optimal path for demonstration + #! Why do I need this? optimal_path = create_optimal_path_for_maze(maze_size) # Create agent based on memory flag @@ -517,10 +92,11 @@ def run_experiment(episodes=100, memory_enabled=True, random_seed=None): ) # Create the memory system + #! Agent will have memory space, dont need the system memory_system = AgentMemorySystem.get_instance(memory_config) # Create the agent with memory system - agent = MemoryEnhancedAgent(agent_id, memory_system, action_space=4) + agent = MemoryAgent(agent_id, memory_system, action_space=4) # Set the demonstration path for the first episode agent.set_demo_path(optimal_path) @@ -564,9 +140,7 @@ def run_experiment(episodes=100, memory_enabled=True, random_seed=None): if memory_enabled: # Memory agent can learn faster because it has memory agent.learning_rate = 0.2 - agent.update_q_value( - observation, action, reward, next_observation, done - ) + agent.update_q_value(observation, action, reward, next_observation, done) total_reward += reward observation = next_observation @@ -605,83 +179,73 @@ def run_experiment(episodes=100, memory_enabled=True, random_seed=None): } -# Run a debug version with fewer episodes and more focused logging -def run_debug_experiment(episodes=10, memory_enabled=True, random_seed=None): - # Regular experiment code with shorter run - result = run_experiment(episodes=episodes, memory_enabled=memory_enabled, random_seed=random_seed) - - return result - # Modify the main execution to include a debug run for examination if __name__ == "__main__": - import sys - - # If --debug flag is passed, run the debug experiment - if len(sys.argv) > 1 and sys.argv[1] == "--debug": - print("Running debug experiment with memory...") - run_debug_experiment(episodes=10, memory_enabled=True, random_seed=42) - else: - # Run the regular experiment - print("Starting experiment with memory...") - results_with_memory = run_experiment(episodes=50, memory_enabled=True, random_seed=42) - print("\nStarting experiment without memory...") - results_without_memory = run_experiment( - episodes=50, memory_enabled=False, random_seed=84 - ) - - # Plot results - plt.figure(figsize=(15, 10)) - - # Plot rewards - plt.subplot(2, 2, 1) - plt.plot(results_with_memory["rewards"], label="With Memory") - plt.plot(results_without_memory["rewards"], label="Without Memory") - plt.xlabel("Episode") - plt.ylabel("Total Reward") - plt.title("Reward per Episode") - plt.legend() - - # Plot steps - plt.subplot(2, 2, 2) - plt.plot(results_with_memory["steps"], label="With Memory") - plt.plot(results_without_memory["steps"], label="Without Memory") - plt.xlabel("Episode") - plt.ylabel("Steps") - plt.title("Steps per Episode") - plt.legend() - - # Plot success rate - plt.subplot(2, 2, 3) - plt.plot(results_with_memory["success_rate"], label="With Memory") - plt.plot(results_without_memory["success_rate"], label="Without Memory") - plt.xlabel("Episode") - plt.ylabel("Success Rate") - plt.title("5-Episode Moving Success Rate") - plt.legend() - - # Plot Q-value distribution - plt.subplot(2, 2, 4) - mem_q_values = np.array([max(v) for v in results_with_memory["agent"].q_table.values()]) - std_q_values = np.array( - [max(v) for v in results_without_memory["agent"].q_table.values()] - ) - plt.hist(mem_q_values, alpha=0.5, label="With Memory") - plt.hist(std_q_values, alpha=0.5, label="Without Memory") - plt.xlabel("Max Q-Value") - plt.ylabel("Count") - plt.title("Q-Value Distribution") - plt.legend() - - plt.tight_layout() - plt.savefig("memory_benefit_comparison.png") - plt.show() - - # Clean up the SQLite database file - if os.path.exists("memory_demo.db"): - try: - os.remove("memory_demo.db") - print("Cleaned up temporary SQLite database") - except: - pass - - print("Experiment completed. Results saved to memory_benefit_comparison.png") + # Run the regular experiment + print("Starting experiment with memory...") + results_with_memory = run_experiment( + episodes=50, memory_enabled=True, random_seed=42 + ) + print("\nStarting experiment without memory...") + results_without_memory = run_experiment( + episodes=50, memory_enabled=False, random_seed=84 + ) + + # Plot results + plt.figure(figsize=(15, 10)) + + # Plot rewards + plt.subplot(2, 2, 1) + plt.plot(results_with_memory["rewards"], label="With Memory") + plt.plot(results_without_memory["rewards"], label="Without Memory") + plt.xlabel("Episode") + plt.ylabel("Total Reward") + plt.title("Reward per Episode") + plt.legend() + + # Plot steps + plt.subplot(2, 2, 2) + plt.plot(results_with_memory["steps"], label="With Memory") + plt.plot(results_without_memory["steps"], label="Without Memory") + plt.xlabel("Episode") + plt.ylabel("Steps") + plt.title("Steps per Episode") + plt.legend() + + # Plot success rate + plt.subplot(2, 2, 3) + plt.plot(results_with_memory["success_rate"], label="With Memory") + plt.plot(results_without_memory["success_rate"], label="Without Memory") + plt.xlabel("Episode") + plt.ylabel("Success Rate") + plt.title("5-Episode Moving Success Rate") + plt.legend() + + # Plot Q-value distribution + plt.subplot(2, 2, 4) + mem_q_values = np.array( + [max(v) for v in results_with_memory["agent"].q_table.values()] + ) + std_q_values = np.array( + [max(v) for v in results_without_memory["agent"].q_table.values()] + ) + plt.hist(mem_q_values, alpha=0.5, label="With Memory") + plt.hist(std_q_values, alpha=0.5, label="Without Memory") + plt.xlabel("Max Q-Value") + plt.ylabel("Count") + plt.title("Q-Value Distribution") + plt.legend() + + plt.tight_layout() + plt.savefig("memory_benefit_comparison.png") + plt.show() + + # Clean up the SQLite database file + if os.path.exists("memory_demo.db"): + try: + os.remove("memory_demo.db") + print("Cleaned up temporary SQLite database") + except: + pass + + print("Experiment completed. Results saved to memory_benefit_comparison.png") diff --git a/maze.py b/maze.py new file mode 100644 index 0000000..0a35acf --- /dev/null +++ b/maze.py @@ -0,0 +1,137 @@ +""" +Maze Environment Module +---------------------- +This module provides the MazeEnvironment class, a simple grid-based environment for reinforcement learning or pathfinding experiments. + +Features: +- Configurable maze size, obstacles, and maximum steps per episode. +- Agent starts at (1, 1) and aims to reach the target at (size-2, size-2). +- Step function supports four actions: up, right, down, left. +- Rewards for reaching the target, penalties for timeouts, and step/distance-based penalties. +- Observations include agent position, target, nearby obstacles, and step count. + +Example usage: + env = MazeEnvironment(size=5, obstacles=[(2,2), (3,3)]) + obs = env.reset() + obs, reward, done = env.step(1) # Take action 'right' +""" + + +class MazeEnvironment: + """ + A simple grid-based maze environment for RL/pathfinding experiments. + + Attributes: + size (int): Size of the maze (size x size grid). + obstacles (list[tuple[int, int]]): List of obstacle coordinates. + target (tuple[int, int]): Target position in the maze. + max_steps (int): Maximum steps per episode. + position (tuple[int, int]): Current agent position. + steps (int): Steps taken in current episode. + """ + + def __init__( + self, + size: int = 5, + obstacles: list[tuple[int, int]] = None, + max_steps: int = 15, + ) -> None: + """ + Initialize the maze environment. + + Args: + size: Size of the maze (size x size grid). + obstacles: List of (row, col) tuples for obstacle locations. + max_steps: Maximum steps allowed per episode. + """ + self.size = size + self.obstacles = obstacles or [] + self.target = (size - 2, size - 2) + self.max_steps = max_steps + self.reset() + + def reset(self) -> dict: + """ + Reset the environment to the initial state. + + Returns: + dict: Initial observation after reset. + """ + self.position = (1, 1) + self.steps = 0 + return self.get_observation() + + def get_observation(self) -> dict: + """ + Get the current observation of the environment. + + Returns: + dict: Observation containing position, target, nearby obstacles, and steps. + """ + return { + "position": self.position, + "target": self.target, + "nearby_obstacles": self._get_nearby_obstacles(), + "steps": self.steps, + } + + def _get_nearby_obstacles(self) -> list[tuple[int, int]]: + """ + Get obstacles within a Manhattan distance of 2 from the agent. + + Returns: + list[tuple[int, int]]: Nearby obstacle coordinates. + """ + return [ + obs + for obs in self.obstacles + if abs(obs[0] - self.position[0]) <= 2 + and abs(obs[1] - self.position[1]) <= 2 + ] + + def step(self, action: int) -> tuple[dict, float, bool]: + """ + Take an action in the environment. + + Args: + action (int): Action to take (0=up, 1=right, 2=down, 3=left). + + Returns: + tuple: (observation, reward, done) + observation (dict): New observation after action. + reward (float): Reward for the action. + done (bool): Whether the episode has ended. + """ + # Actions: 0=up, 1=right, 2=down, 3=left + directions = [(-1, 0), (0, 1), (1, 0), (0, -1)] + new_position = ( + self.position[0] + directions[action][0], + self.position[1] + directions[action][1], + ) + + # Check if valid move + if ( + 0 <= new_position[0] < self.size + and 0 <= new_position[1] < self.size + and new_position not in self.obstacles + ): + self.position = new_position + + self.steps += 1 + + # Calculate reward + if self.position == self.target: + reward = 100 # Success + done = True + elif self.steps >= self.max_steps: + reward = -50 # Timeout penalty + done = True + else: + # Manhattan distance to target + dist = abs(self.position[0] - self.target[0]) + abs( + self.position[1] - self.target[1] + ) + reward = -1 - (dist * 0.1) # Small step penalty with distance hint + done = False + + return self.get_observation(), reward, done diff --git a/memory/utils/util.py b/memory/utils/util.py new file mode 100644 index 0000000..302604a --- /dev/null +++ b/memory/utils/util.py @@ -0,0 +1,20 @@ +# Helper function to convert NumPy types to native Python types for JSON serialization +import numpy as np + + +def convert_numpy_to_python(obj): + """Convert NumPy types to standard Python types for JSON serialization.""" + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, dict): + return {k: convert_numpy_to_python(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [convert_numpy_to_python(item) for item in obj] + elif isinstance(obj, tuple): + return tuple(convert_numpy_to_python(item) for item in obj) + else: + return obj