Skip to content

Conversation

@tdan1
Copy link
Contributor

@tdan1 tdan1 commented Aug 15, 2025

Status: Open
Priority: Critical
Component: spoon_ai/agents/base.py
Labels: bug, critical, concurrency, deadlock, performance

Description

The BaseAgent class has serious concurrency flaws that break under high load or when multiple agents run at the same time.

  • State changes aren’t protected, so two tasks can change the agent’s state at the same time, causing deadlocks or overwriting each other’s updates.
  • The output queue isn’t thread-safe, so one task can hog all the messages while others get none (resource starvation).
  • Memory updates aren’t synchronized, so when multiple tasks write to memory at once, they can mix data and corrupt the agent’s state.
  • These problems can freeze the system, corrupt data, and trigger chain-reaction failures across all agents.

Root Cause Analysis
Located in spoon_ai/agents/base.py:

  • Deadlock-Prone State Context @Manager (lines 49-58):
@asynccontextmanager
async def state_context(self, new_state: AgentState):
    if not isinstance(new_state, AgentState):
        raise ValueError(f"Invalid state: {new_state}")
    
    old_state = self.state
    self.state = new_state  #  No locking - race condition
    try:
        yield
    except Exception as e:
        self.state = AgentState.ERROR  #  Another race condition
        raise e
    finally:
        self.state = old_state  #  Can overwrite newer state changes

Unsafe Output Queue Operations (lines 150-164):

async def stream(self):
    while not (self.task_done.is_set() or self.output_queue.empty()):
        queue_task = asyncio.create_task(self.output_queue.get())
        task_done_task = asyncio.create_task(self.task_done.wait())

        done, pending = await asyncio.wait(queue_task, task_done_task, return_when=asyncio.FIRST_COMPLETED)
        #  Race condition: queue might become empty between check and get()
        #  Tasks not properly cleaned up on cancellation
        #  No timeout - can hang indefinitely

Race Conditions in Memory Management (lines 34-47):

def add_message(self, role: Literal["user", "assistant", "tool"], content: str, ...):
    if role == "user":
        self.memory.add_message(Message(role=Role.USER, content=content))
    elif role == "assistant":
        #  Multiple threads can modify memory simultaneously
        # No atomic operations for complex message construction
        if tool_calls:
            self.memory.add_message(Message(role=Role.ASSISTANT, content=content, tool_calls=[...]))

Concurrent Run() Method Vulnerability (lines 60-85):

async def run(self, request: Optional[str] = None) -> str:
    if self.state != AgentState.IDLE:
        raise RuntimeError(f"Agent {self.name} is not in the IDLE state")
    
    self.state = AgentState.RUNNING  # Race condition here
    #  Multiple callers can pass the state check simultaneously
    #  No protection against concurrent run() calls

Blocking MCP Processing Without Timeout (lines 170-200):

async def process_mcp_message(self, content: Any, sender: str, message: Dict[str, Any], agent_id: str):
    #  No timeout on agent.run() call
    #  Can block indefinitely if agent hangs
    #  No cancellation handling
    return await self.run(request=text_content)

Deadlock Scenarios Observed

Scenario 1: State Context Deadlock

# Agent A trying to change state while Agent B is also changing state
# Both agents get stuck waiting for state transitions
Agent A: state_context(RUNNING) -> waits for Agent B
Agent B: state_context(THINKING) -> waits for Agent A
Result: DEADLOCK

Scenario 2: Output Queue Starvation

# Multiple consumers competing for output queue
# Some consumers starve while others monopolize the queue
Consumer 1: Continuously calls output_queue.get()
Consumer 2: Never gets a chance to read from queue
Consumer 3: Blocked indefinitely
Result: RESOURCE STARVATION

Scenario 3: Memory Corruption Race

# Concurrent add_message() calls corrupting memory
Thread 1: add_message("user", "Hello")
Thread 2: add_message("assistant", "Hi") 
Thread 3: add_message("tool", "Result")
Result: Messages interleaved, memory corrupted

Steps to Reproduce
1. Concurrent State Change Deadlock:

import asyncio
from spoon_ai.agents.base import BaseAgent
from spoon_ai.schema import AgentState

async def cause_state_deadlock():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    async def state_changer_1():
        async with agent.state_context(AgentState.RUNNING):
            await asyncio.sleep(2)  # Simulate work
            async with agent.state_context(AgentState.THINKING):
                await asyncio.sleep(1)
    
    async def state_changer_2():
        async with agent.state_context(AgentState.THINKING):
            await asyncio.sleep(2)  # Simulate work  
            async with agent.state_context(AgentState.RUNNING):
                await asyncio.sleep(1)
    
    # This will deadlock - both waiting for each other
    await asyncio.gather(state_changer_1(), state_changer_2())

# Run with timeout to see deadlock
try:
    await asyncio.wait_for(cause_state_deadlock(), timeout=5.0)
except asyncio.TimeoutError:
    print("DEADLOCK DETECTED: State context managers blocked each other")

2. Output Queue Resource Starvation:

async def cause_queue_starvation():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    # Fill queue with items
    for i in range(100):
        await agent.output_queue.put(f"item_{i}")
    
    # Start multiple competing consumers
    async def greedy_consumer():
        while True:
            try:
                item = await agent.output_queue.get()
                await asyncio.sleep(0.01)  # Simulate processing
            except asyncio.QueueEmpty:
                break
    
    async def starved_consumer():
        items_received = 0
        while True:
            try:
                item = await asyncio.wait_for(agent.output_queue.get(), timeout=1.0)
                items_received += 1
            except asyncio.TimeoutError:
                print(f"Consumer starved, only got {items_received} items")
                break
    
    # Greedy consumer will monopolize queue
    await asyncio.gather(
        greedy_consumer(),
        starved_consumer(),
        starved_consumer()  # This will get very few items
    )

await cause_queue_starvation()

3. Memory Corruption Through Race Conditions:

async def cause_memory_corruption():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    async def concurrent_message_adder(role, prefix):
        for i in range(50):
            agent.add_message(role, f"{prefix}_message_{i}")
            await asyncio.sleep(0.001)  # Small delay to increase race probability
    
    # Multiple coroutines adding messages simultaneously
    await asyncio.gather(
        concurrent_message_adder("user", "user"),
        concurrent_message_adder("assistant", "assistant"), 
        concurrent_message_adder("tool", "tool")
    )
    
    # Check for corruption
    messages = agent.memory.get_messages()
    print(f"Expected 150 messages, got {len(messages)}")
    
    # Verify message integrity
    for i, msg in enumerate(messages):
        if not hasattr(msg, 'role') or not hasattr(msg, 'content'):
            print(f"CORRUPTED MESSAGE at index {i}: {msg}")

await cause_memory_corruption()

4. Concurrent Run() Method Collision:

async def cause_concurrent_run_collision():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    async def run_agent(request_id):
        try:
            result = await agent.run(f"Request {request_id}")
            print(f"Request {request_id} completed: {result}")
        except RuntimeError as e:
            print(f"Request {request_id} failed: {e}")
    
    # Multiple simultaneous run() calls - only one should succeed
    await asyncio.gather(
        run_agent(1),
        run_agent(2), 
        run_agent(3),
        run_agent(4),
        return_exceptions=True
    )

await cause_concurrent_run_collision()

Expected Behavior

Deadlock Prevention: State transitions should be atomic and non-blocking
Fair Resource Access: Output queue consumers should get fair access
Thread Safety: Memory operations should be atomic and race-condition free
Graceful Concurrency: Multiple agents should operate independently
Timeout Protection: All async operations should have reasonable timeouts
Proper Cleanup: Resources should be cleaned up even during cancellation

Actual Behavior

System Hangs: Deadlocks cause entire application to freeze
Resource Starvation: Some consumers never get queue access
Data Corruption: Race conditions corrupt agent memory
Cascade Failures: One blocked agent affects all others
Memory Leaks: Incomplete cleanup during deadlock scenarios

Production Symptoms:
Application hanging under load - no response for 10+ minutes Memory usage growing without bound during concurrent operations Error: Agent test_agent is not in the IDLE state (currently: RUNNING) Queue consumer timeout after 30 seconds waiting for items Corrupted message history: expected Role.USER, got None Impact Assessment

Availability: Critical - Application hangs and becomes unresponsive
Scalability: Critical - Cannot handle concurrent users
Data Integrity: High - Agent memory gets corrupted
Performance: High - Deadlocks cause resource waste
User Experience: Critical - Users experience timeouts and failures

communication:
TG : @fastbuild01

_Status: Open
Priority: Critical
Component: `spoon_ai/agents/base.py`
Labels: bug, critical, concurrency, deadlock, performance_

**Description**

The `BaseAgent` class has serious concurrency flaws that break under high load or when multiple agents run at the same time.

- State changes aren’t protected, so two tasks can change the agent’s state at the same time, causing deadlocks or overwriting each other’s updates.
- The output queue isn’t thread-safe, so one task can hog all the messages while others get none (resource starvation).
- Memory updates aren’t synchronized, so when multiple tasks write to memory at once, they can mix data and corrupt the agent’s state.
- These problems can freeze the system, corrupt data, and trigger chain-reaction failures across all agents.


**Root Cause Analysis**
_`Located in spoon_ai/agents/base.py`:_

- **Deadlock-Prone `State Context  @Manager` (lines 49-58):**

```python
@asynccontextmanager
async def state_context(self, new_state: AgentState):
    if not isinstance(new_state, AgentState):
        raise ValueError(f"Invalid state: {new_state}")
    
    old_state = self.state
    self.state = new_state  #  No locking - race condition
    try:
        yield
    except Exception as e:
        self.state = AgentState.ERROR  #  Another race condition
        raise e
    finally:
        self.state = old_state  #  Can overwrite newer state changes
```

**Unsafe Output Queue Operations (lines 150-164):**
```python
async def stream(self):
    while not (self.task_done.is_set() or self.output_queue.empty()):
        queue_task = asyncio.create_task(self.output_queue.get())
        task_done_task = asyncio.create_task(self.task_done.wait())

        done, pending = await asyncio.wait(queue_task, task_done_task, return_when=asyncio.FIRST_COMPLETED)
        #  Race condition: queue might become empty between check and get()
        #  Tasks not properly cleaned up on cancellation
        #  No timeout - can hang indefinitely
```

**Race Conditions in Memory Management (lines 34-47):**
```python
def add_message(self, role: Literal["user", "assistant", "tool"], content: str, ...):
    if role == "user":
        self.memory.add_message(Message(role=Role.USER, content=content))
    elif role == "assistant":
        #  Multiple threads can modify memory simultaneously
        # No atomic operations for complex message construction
        if tool_calls:
            self.memory.add_message(Message(role=Role.ASSISTANT, content=content, tool_calls=[...]))

```
**Concurrent Run() Method Vulnerability (lines 60-85):**
```python
async def run(self, request: Optional[str] = None) -> str:
    if self.state != AgentState.IDLE:
        raise RuntimeError(f"Agent {self.name} is not in the IDLE state")
    
    self.state = AgentState.RUNNING  # Race condition here
    #  Multiple callers can pass the state check simultaneously
    #  No protection against concurrent run() calls
```

**Blocking MCP Processing Without Timeout (lines 170-200):**
```python
async def process_mcp_message(self, content: Any, sender: str, message: Dict[str, Any], agent_id: str):
    #  No timeout on agent.run() call
    #  Can block indefinitely if agent hangs
    #  No cancellation handling
    return await self.run(request=text_content)
```


**Deadlock Scenarios Observed**

Scenario 1: State Context Deadlock
```python
# Agent A trying to change state while Agent B is also changing state
# Both agents get stuck waiting for state transitions
Agent A: state_context(RUNNING) -> waits for Agent B
Agent B: state_context(THINKING) -> waits for Agent A
Result: DEADLOCK
```
**Scenario 2: Output Queue Starvation**
```python
# Multiple consumers competing for output queue
# Some consumers starve while others monopolize the queue
Consumer 1: Continuously calls output_queue.get()
Consumer 2: Never gets a chance to read from queue
Consumer 3: Blocked indefinitely
Result: RESOURCE STARVATION
```
**Scenario 3: Memory Corruption Race**
```python
# Concurrent add_message() calls corrupting memory
Thread 1: add_message("user", "Hello")
Thread 2: add_message("assistant", "Hi") 
Thread 3: add_message("tool", "Result")
Result: Messages interleaved, memory corrupted
```
**Steps to Reproduce**
**1. Concurrent State Change Deadlock:**
```python
import asyncio
from spoon_ai.agents.base import BaseAgent
from spoon_ai.schema import AgentState

async def cause_state_deadlock():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    async def state_changer_1():
        async with agent.state_context(AgentState.RUNNING):
            await asyncio.sleep(2)  # Simulate work
            async with agent.state_context(AgentState.THINKING):
                await asyncio.sleep(1)
    
    async def state_changer_2():
        async with agent.state_context(AgentState.THINKING):
            await asyncio.sleep(2)  # Simulate work  
            async with agent.state_context(AgentState.RUNNING):
                await asyncio.sleep(1)
    
    # This will deadlock - both waiting for each other
    await asyncio.gather(state_changer_1(), state_changer_2())

# Run with timeout to see deadlock
try:
    await asyncio.wait_for(cause_state_deadlock(), timeout=5.0)
except asyncio.TimeoutError:
    print("DEADLOCK DETECTED: State context managers blocked each other")
```
**2. Output Queue Resource Starvation:**
```python
async def cause_queue_starvation():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    # Fill queue with items
    for i in range(100):
        await agent.output_queue.put(f"item_{i}")
    
    # Start multiple competing consumers
    async def greedy_consumer():
        while True:
            try:
                item = await agent.output_queue.get()
                await asyncio.sleep(0.01)  # Simulate processing
            except asyncio.QueueEmpty:
                break
    
    async def starved_consumer():
        items_received = 0
        while True:
            try:
                item = await asyncio.wait_for(agent.output_queue.get(), timeout=1.0)
                items_received += 1
            except asyncio.TimeoutError:
                print(f"Consumer starved, only got {items_received} items")
                break
    
    # Greedy consumer will monopolize queue
    await asyncio.gather(
        greedy_consumer(),
        starved_consumer(),
        starved_consumer()  # This will get very few items
    )

await cause_queue_starvation()
```
**3. Memory Corruption Through Race Conditions:**
```python
async def cause_memory_corruption():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    async def concurrent_message_adder(role, prefix):
        for i in range(50):
            agent.add_message(role, f"{prefix}_message_{i}")
            await asyncio.sleep(0.001)  # Small delay to increase race probability
    
    # Multiple coroutines adding messages simultaneously
    await asyncio.gather(
        concurrent_message_adder("user", "user"),
        concurrent_message_adder("assistant", "assistant"), 
        concurrent_message_adder("tool", "tool")
    )
    
    # Check for corruption
    messages = agent.memory.get_messages()
    print(f"Expected 150 messages, got {len(messages)}")
    
    # Verify message integrity
    for i, msg in enumerate(messages):
        if not hasattr(msg, 'role') or not hasattr(msg, 'content'):
            print(f"CORRUPTED MESSAGE at index {i}: {msg}")

await cause_memory_corruption()
```
**4. Concurrent Run() Method Collision:**
```python
async def cause_concurrent_run_collision():
    agent = BaseAgent(name="test_agent", llm=MockChatBot())
    
    async def run_agent(request_id):
        try:
            result = await agent.run(f"Request {request_id}")
            print(f"Request {request_id} completed: {result}")
        except RuntimeError as e:
            print(f"Request {request_id} failed: {e}")
    
    # Multiple simultaneous run() calls - only one should succeed
    await asyncio.gather(
        run_agent(1),
        run_agent(2), 
        run_agent(3),
        run_agent(4),
        return_exceptions=True
    )

await cause_concurrent_run_collision()
```
Expected Behavior

Deadlock Prevention: State transitions should be atomic and non-blocking
Fair Resource Access: Output queue consumers should get fair access
Thread Safety: Memory operations should be atomic and race-condition free
Graceful Concurrency: Multiple agents should operate independently
Timeout Protection: All async operations should have reasonable timeouts
Proper Cleanup: Resources should be cleaned up even during cancellation

Actual Behavior

System Hangs: Deadlocks cause entire application to freeze
Resource Starvation: Some consumers never get queue access
Data Corruption: Race conditions corrupt agent memory
Cascade Failures: One blocked agent affects all others
Memory Leaks: Incomplete cleanup during deadlock scenarios

Production Symptoms:
Application hanging under load - no response for 10+ minutes
Memory usage growing without bound during concurrent operations  
Error: Agent test_agent is not in the IDLE state (currently: RUNNING)
Queue consumer timeout after 30 seconds waiting for items
Corrupted message history: expected Role.USER, got None
Impact Assessment

Availability: Critical - Application hangs and becomes unresponsive
Scalability:  Critical - Cannot handle concurrent users
Data Integrity:  High - Agent memory gets corrupted
Performance: High - Deadlocks cause resource waste
User Experience:  Critical - Users experience timeouts and failures


  communication:
TG : @fastbuild01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants