Skip to content

Centralized Buffer with Agent Identification #4

@csmangum

Description

@csmangum

Handling concurrency in a multi-agent simulation involves ensuring that multiple agents can make predictions, update buffers, and evaluate fitness without causing data corruption or inconsistencies. Here’s a detailed approach to manage concurrency effectively:

  1. Thread Safety:

    • Use thread-safe data structures or synchronization mechanisms to ensure that concurrent access to shared resources (e.g., the buffer) does not lead to race conditions.
  2. Locks:

    • Employ locks to synchronize access to the shared buffer. Python’s threading module provides Lock objects that can be used to ensure only one thread can modify the buffer at a time.
  3. Thread Pools:

    • Use thread pools to manage a pool of worker threads. The concurrent.futures.ThreadPoolExecutor class can simplify the management of threads and task execution.
  4. Atomic Operations:

    • Ensure that operations on shared data are atomic. This can be achieved using locks or by designing operations that are inherently atomic.

Here’s an example implementation using ThreadPoolExecutor and Lock for concurrency management:

import collections
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
import random

# Define a prediction buffer with a maximum size
Prediction = collections.namedtuple('Prediction', ['agent_id', 'timestamp', 'predicted_price', 'actual_price'])
prediction_buffer = collections.deque(maxlen=10000)
buffer_lock = threading.Lock()

class Agent:
    def __init__(self, agent_id):
        self.agent_id = agent_id
    
    def make_prediction(self, current_market_conditions):
        predicted_price = some_prediction_function(current_market_conditions)
        prediction_timestamp = datetime.now()
        
        with buffer_lock:
            prediction_buffer.append(Prediction(self.agent_id, prediction_timestamp, predicted_price, None))
        return predicted_price

def update_actual_price(prediction_timestamp, actual_price):
    with buffer_lock:
        for i, prediction in enumerate(prediction_buffer):
            if prediction.timestamp == prediction_timestamp:
                updated_prediction = prediction._replace(actual_price=actual_price)
                prediction_buffer[i] = updated_prediction
                break

def evaluate_fitness():
    agent_fitness = collections.defaultdict(list)
    
    with buffer_lock:
        for prediction in prediction_buffer:
            if prediction.actual_price is not None:
                fitness = compute_fitness(prediction.predicted_price, prediction.actual_price)
                agent_fitness[prediction.agent_id].append(fitness)
    
    for agent_id, fitness_scores in agent_fitness.items():
        average_fitness = sum(fitness_scores) / len(fitness_scores)
        update_agent_fitness(agent_id, average_fitness)

def some_prediction_function(current_market_conditions):
    return current_market_conditions['price'] * (1 + random.uniform(-0.05, 0.05))

def compute_fitness(predicted_price, actual_price):
    return 1 / abs(predicted_price - actual_price)

def update_agent_fitness(agent_id, fitness):
    print(f"Agent {agent_id} fitness: {fitness}")

def get_current_market_conditions():
    return {'price': 100}

def get_actual_price(prediction_timestamp):
    return 105 + random.uniform(-5, 5)

def agent_task(agent):
    current_conditions = get_current_market_conditions()
    agent.make_prediction(current_conditions)

# Create a pool of agents
agents = [Agent(agent_id) for agent_id in range(100)]

# Use ThreadPoolExecutor to manage concurrent execution of agent tasks
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(agent_task, agent) for agent in agents]
    
    # Optionally, wait for all agents to finish making predictions
    for future in futures:
        future.result()

# Simulate updating actual prices and evaluating fitness
def update_prices_and_evaluate():
    for prediction in list(prediction_buffer):
        if prediction.actual_price is None:
            actual_price = get_actual_price(prediction.timestamp)
            update_actual_price(prediction.timestamp, actual_price)
    evaluate_fitness()

# Update prices and evaluate fitness periodically
with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(update_prices_and_evaluate)

Key Points:

  1. Thread Safety: The buffer_lock ensures that only one thread can modify the prediction_buffer at a time, preventing race conditions.
  2. Thread Pool: ThreadPoolExecutor manages a pool of threads for making predictions and updating prices, providing an efficient way to handle concurrent tasks.
  3. Task Submission: Agents submit their prediction tasks to the thread pool for concurrent execution.
  4. Periodic Updates: A separate task periodically updates the actual prices and evaluates fitness scores.

This approach ensures that the multi-agent simulation runs smoothly and efficiently, handling concurrency without data corruption or inconsistencies.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions