Skip to content

Conversation

@tdan1
Copy link
Contributor

@tdan1 tdan1 commented Aug 15, 2025

Priority: Critical
Component: spoon_ai/agents/graph_agent.py

Description
The GraphAgent class has critical state corruption issues during graph execution failures. When errors occur during graph execution, the agent's internal state (memory, preserved state, execution metadata) becomes corrupted and inconsistent, leading to unpredictable behavior in subsequent operations.

Root Cause Analysis
Located in spoon_ai/agents/graph_agent.py:

1. Incomplete Memory Restoration (lines 195-204):

# Reset memory to original state to prevent corruption
try:
    self.memory.clear()
    for msg in original_messages:
        self.memory.add_message(msg)  # No validation of message integrity

2. Unsafe Preserved State Handling (lines 85-90)

if self.preserve_state:
    self._last_state = self._sanitize_preserved_state(final_state)
# No validation that final_state isn't corrupted before sanitization

3. Unbounded Error Metadata (line 186):

self.execution_metadata = {
    "error": str(error),  # Could contain sensitive data
    "error_type": type(error).__name__,
    # No size limits or cleanup mechanism
}

4. Race Conditions: No synchronization for concurrent state access during error handling.

Steps to Reproduce

Create a GraphAgent with preserve_state=True:
from spoon_ai.agents.graph_agent import GraphAgent
from spoon_ai.graph import StateGraph

# Create a graph that will fail mid-execution
graph = StateGraph()
graph.add_node("start", lambda x: x)
graph.add_node("fail", lambda x: 1/0)  # Division by zero
graph.add_edge("start", "fail")

agent = GraphAgent(
    name="test_agent",
    graph=graph,
    preserve_state=True
)

Run the agent with a request:

try:
    result = await agent.run("test request")
except Exception as e:
    print(f"Error: {e}")

Inspect agent state after error:

print(f"Messages count: {len(agent.memory.get_messages())}")
print(f"Preserved state: {hasattr(agent, '_last_state')}")
print(f"Execution metadata: {agent.execution_metadata}")

Try to run the agent again:

# This may fail or produce incorrect results
result2 = await agent.run("second request")

Expected Behavior

Agent state should be atomically restored to pre-execution state on any error
Memory should contain only validated, uncorrupted messages
Preserved state should be cleared if corruption is detected
Execution metadata should have bounded size and no sensitive data
Subsequent runs should work correctly after error recovery
Concurrent access should be properly synchronized

Actual Behavior

Memory Corruption: Messages may be partially restored or contain invalid data
State Inconsistency: _last_state may contain corrupted data from failed execution
Memory Leaks: Execution metadata grows unbounded with error details
Race Conditions: Concurrent operations can corrupt state during error handling
Cascade Failures: Subsequent runs fail due to corrupted internal state

Error Examples:

GraphAgent 'test_agent' memory restored after error
WARNING - GraphAgent 'test_agent' had invalid preserved state type: <class 'NoneType'>
ERROR - Unexpected error during graph execution for agent 'test_agent': 'NoneType' object is not subscriptable

Impact Assessment

Reliability: High - Agent becomes unusable after first error
Data Integrity: High - Conversation history may be corrupted
Performance: Medium - Memory leaks degrade performance over time
Security: Medium - Error metadata may contain sensitive information

Phase 1: Add State Synchronization

class GraphAgent(BaseAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._state_lock = asyncio.Lock()
        self._max_metadata_size = 1024  # Configurable limit
    
    async def run(self, request: Optional[str] = None) -> str:
        async with self._state_lock:  # Synchronize all state operations
            # ... existing implementation

Phase 2: Atomic State Management

def _handle_execution_error(self, error: Exception, original_messages: list):
    """Enhanced error handling with atomic rollback."""
    async with self._state_lock:
        try:
            # Create checkpoint before restoration
            checkpoint = {
                'messages': original_messages.copy(),
                'current_step': 0,
                'state': AgentState.IDLE
            }
            
            # Atomic restoration with validation
            self._restore_from_checkpoint(checkpoint)
            
            # Bounded error metadata
            error_str = str(error)[:500]  # Truncate long errors
            self.execution_metadata = {
                "error": error_str,
                "error_type": type(error).__name__,
                "execution_successful": False,
                "execution_time": time.time(),
                "recovery_attempted": True
            }
            
        except Exception as restore_error:
            logger.critical(f"State corruption detected in {self.name}: {restore_error}")
            # Emergency reset to safe state
            self._emergency_reset()
            
    # Clear any corrupted preserved state
    self._safe_clear_preserved_state()

Phase 3: Message Validation

def _validate_message(self, msg) -> bool:
    """Validate message integrity before restoration."""
    if not hasattr(msg, 'role') or not hasattr(msg, 'content'):
        return False
    if msg.role not in ['user', 'assistant', 'tool']:
        return False
    if not isinstance(msg.content, (str, type(None))):
        return False
    return True

def _restore_from_checkpoint(self, checkpoint: Dict[str, Any]):
    """Atomically restore agent state from checkpoint."""
    self.memory.clear()
    
    # Validate and restore messages
    valid_messages = []
    for msg in checkpoint['messages']:
        if self._validate_message(msg):
            valid_messages.append(msg)
        else:
            logger.warning(f"Skipping corrupted message during restoration: {type(msg)}")
    
    # Batch restore all valid messages
    for msg in valid_messages:
        self.memory.add_message(msg)
    
    # Restore other state
    self.current_step = checkpoint.get('current_step', 0)
    self.state = checkpoint.get('state', AgentState.IDLE)

Testing Requirements
Unit Tests

Test memory restoration with corrupted messages
Test preserved state corruption handling
Test metadata size limits
Test concurrent error scenarios
Test emergency reset functionality

Integration Tests

Test end-to-end error recovery flows
Test agent reusability after errors
Test state consistency across multiple runs
Test memory leak prevention

Performance Tests

Benchmark error recovery overhead
Test memory usage under repeated errors
Validate synchronization performance impact

Documentation Updates Required

Update GraphAgent docstrings with error handling details
Add state management best practices guide
Document configuration options for error recovery
Add troubleshooting section for state corruption issues

Breaking Changes
None - This is a bug fix that maintains API compatibility.

Migration Guide
No migration required. Existing code will automatically benefit from improved reliability.

Definition of Done

All identified race conditions eliminated with proper synchronization
Memory restoration is atomic and validates message integrity
Preserved state corruption is detected and safely handled

Comms : https://t.me/fastbuild01

tdan1 added 2 commits August 15, 2025 05:22
Priority: Critical
Component: spoon_ai/agents/graph_agent.py

**Description**
The GraphAgent class has critical state corruption issues during graph execution failures. When errors occur during graph execution, the agent's internal state (memory, preserved state, execution metadata) becomes corrupted and inconsistent, leading to unpredictable behavior in subsequent operations.

**Root Cause Analysis**
Located in spoon_ai/agents/graph_agent.py:

**1. Incomplete Memory Restoration (lines 195-204):**
```python
# Reset memory to original state to prevent corruption
try:
    self.memory.clear()
    for msg in original_messages:
        self.memory.add_message(msg)  # No validation of message integrity
```
**2. Unsafe Preserved State Handling (lines 85-90)**
```python
if self.preserve_state:
    self._last_state = self._sanitize_preserved_state(final_state)
# No validation that final_state isn't corrupted before sanitization
```
**3. Unbounded Error Metadata (line 186):**
```python
self.execution_metadata = {
    "error": str(error),  # Could contain sensitive data
    "error_type": type(error).__name__,
    # No size limits or cleanup mechanism
}
```
**4. Race Conditions: No synchronization for concurrent state access during error handling.**

**Steps to Reproduce**

```python
Create a GraphAgent with preserve_state=True:
from spoon_ai.agents.graph_agent import GraphAgent
from spoon_ai.graph import StateGraph

# Create a graph that will fail mid-execution
graph = StateGraph()
graph.add_node("start", lambda x: x)
graph.add_node("fail", lambda x: 1/0)  # Division by zero
graph.add_edge("start", "fail")

agent = GraphAgent(
    name="test_agent",
    graph=graph,
    preserve_state=True
)
```
Run the agent with a request:
```python
try:
    result = await agent.run("test request")
except Exception as e:
    print(f"Error: {e}")
```
Inspect agent state after error:
```python
print(f"Messages count: {len(agent.memory.get_messages())}")
print(f"Preserved state: {hasattr(agent, '_last_state')}")
print(f"Execution metadata: {agent.execution_metadata}")
```

Try to run the agent again:
```python
# This may fail or produce incorrect results
result2 = await agent.run("second request")
```

**Expected Behavior**

Agent state should be atomically restored to pre-execution state on any error
Memory should contain only validated, uncorrupted messages
Preserved state should be cleared if corruption is detected
Execution metadata should have bounded size and no sensitive data
Subsequent runs should work correctly after error recovery
Concurrent access should be properly synchronized

**Actual Behavior**

Memory Corruption: Messages may be partially restored or contain invalid data
State Inconsistency: _last_state may contain corrupted data from failed execution
Memory Leaks: Execution metadata grows unbounded with error details
Race Conditions: Concurrent operations can corrupt state during error handling
Cascade Failures: Subsequent runs fail due to corrupted internal state

Error Examples:
```
GraphAgent 'test_agent' memory restored after error
WARNING - GraphAgent 'test_agent' had invalid preserved state type: <class 'NoneType'>
ERROR - Unexpected error during graph execution for agent 'test_agent': 'NoneType' object is not subscriptable
```
Impact Assessment

Reliability: High - Agent becomes unusable after first error
Data Integrity:  High - Conversation history may be corrupted
Performance: Medium - Memory leaks degrade performance over time
Security:  Medium - Error metadata may contain sensitive information

Phase 1: Add State Synchronization
```python
class GraphAgent(BaseAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._state_lock = asyncio.Lock()
        self._max_metadata_size = 1024  # Configurable limit
    
    async def run(self, request: Optional[str] = None) -> str:
        async with self._state_lock:  # Synchronize all state operations
            # ... existing implementation
```
Phase 2: Atomic State Management
```python
def _handle_execution_error(self, error: Exception, original_messages: list):
    """Enhanced error handling with atomic rollback."""
    async with self._state_lock:
        try:
            # Create checkpoint before restoration
            checkpoint = {
                'messages': original_messages.copy(),
                'current_step': 0,
                'state': AgentState.IDLE
            }
            
            # Atomic restoration with validation
            self._restore_from_checkpoint(checkpoint)
            
            # Bounded error metadata
            error_str = str(error)[:500]  # Truncate long errors
            self.execution_metadata = {
                "error": error_str,
                "error_type": type(error).__name__,
                "execution_successful": False,
                "execution_time": time.time(),
                "recovery_attempted": True
            }
            
        except Exception as restore_error:
            logger.critical(f"State corruption detected in {self.name}: {restore_error}")
            # Emergency reset to safe state
            self._emergency_reset()
            
    # Clear any corrupted preserved state
    self._safe_clear_preserved_state()
```
Phase 3: Message Validation
```python
def _validate_message(self, msg) -> bool:
    """Validate message integrity before restoration."""
    if not hasattr(msg, 'role') or not hasattr(msg, 'content'):
        return False
    if msg.role not in ['user', 'assistant', 'tool']:
        return False
    if not isinstance(msg.content, (str, type(None))):
        return False
    return True

def _restore_from_checkpoint(self, checkpoint: Dict[str, Any]):
    """Atomically restore agent state from checkpoint."""
    self.memory.clear()
    
    # Validate and restore messages
    valid_messages = []
    for msg in checkpoint['messages']:
        if self._validate_message(msg):
            valid_messages.append(msg)
        else:
            logger.warning(f"Skipping corrupted message during restoration: {type(msg)}")
    
    # Batch restore all valid messages
    for msg in valid_messages:
        self.memory.add_message(msg)
    
    # Restore other state
    self.current_step = checkpoint.get('current_step', 0)
    self.state = checkpoint.get('state', AgentState.IDLE)
```
**Testing Requirements**
**Unit Tests**

 Test memory restoration with corrupted messages
 Test preserved state corruption handling
 Test metadata size limits
 Test concurrent error scenarios
 Test emergency reset functionality

**Integration Tests**

 Test end-to-end error recovery flows
 Test agent reusability after errors
 Test state consistency across multiple runs
 Test memory leak prevention

**Performance Tests**

 Benchmark error recovery overhead
 Test memory usage under repeated errors
 Validate synchronization performance impact

**Documentation Updates Required**

 Update GraphAgent docstrings with error handling details
 Add state management best practices guide
 Document configuration options for error recovery
 Add troubleshooting section for state corruption issues

**Breaking Changes**
None - This is a bug fix that maintains API compatibility.

**Migration Guide**
No migration required. Existing code will automatically benefit from improved reliability.



**Definition of Done**

 All identified race conditions eliminated with proper synchronization
 Memory restoration is atomic and validates message integrity
 Preserved state corruption is detected and safely handled

Comms : https://t.me/fastbuild01
@tdan1
Copy link
Contributor Author

tdan1 commented Aug 15, 2025

@veithly please see above

@veithly
Copy link
Collaborator

veithly commented Aug 15, 2025

LGTM

@veithly veithly merged commit 65c226f into XSpoonAi:main Aug 15, 2025
@tdan1
Copy link
Contributor Author

tdan1 commented Aug 15, 2025

Other contributions

Issues:
#116
#105
#103
#99
#97

Pr:
#119
#98
#100

cc @veithly @claude @vaibhavgeek @steven1227

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants