Skip to content

Architecture Guide

How the agent works and how to debug it.

The agent uses the ReAct pattern (Reasoning + Acting) for tool-based interactions.

Flow:

  1. Model generates reasoning and decides to call a tool
  2. Tools execute and return results
  3. Model sees results and generates final response

Example streaming output:

[ReasoningEvent] "I need to search for laptops"
[ToolCallStartEvent] tool=search_products
[ToolCallEndEvent] tool=search_products, duration=450ms
[TextEvent] "Based on the search, here are 5 laptops..."

Main entry point for conversations. Uses create_agent() from LangChain v1 with dual mode streaming.

Model and Agent Factory (src/consumer_agent/factory.py)

Section titled “Model and Agent Factory (src/consumer_agent/factory.py)”

Creates chat models and agent instances with configuration from agent_config.yaml. Supports provider-agnostic initialization via init_chat_model().

Configuration (src/consumer_agent/config.py)

Section titled “Configuration (src/consumer_agent/config.py)”

Manages settings from agent_config.yaml and settings.yaml. Secrets loaded from .env.

Streaming Events (src/consumer_agent/agent/streaming.py)

Section titled “Streaming Events (src/consumer_agent/agent/streaming.py)”

Event types: ReasoningEvent, TextEvent, ToolCallStartEvent, ToolCallEndEvent, CompletedEvent, ErrorEvent.

The agent uses two streaming modes simultaneously:

  • “messages” mode: Low-latency token streaming
  • “updates” mode: Tool execution tracking

Tokens are automatically classified:

  • Before tool calls: ReasoningEvent
  • After tool calls: TextEvent
async for event in agent.stream(messages, system_prompt):
print(f"[{event.event_type}] {event}")
if isinstance(event, ReasoningEvent):
print(f" Reasoning: {event.content}")
elif isinstance(event, TextEvent):
print(f" Text: {event.content}")
elif isinstance(event, ToolCallStartEvent):
print(f" Tool start: {event.tool_name}")
elif isinstance(event, ToolCallEndEvent):
print(f" Tool end: {event.tool_name} ({event.duration_ms}ms)")
from consumer_agent.config import agent_config
# Check model config
print(f"Model: {agent_config.openai.model}")
print(f"Reasoning effort: {agent_config.openai.reasoning_effort}")
from consumer_agent.utils.tools import build_mcp_tools
from consumer_agent.config import settings
if settings.rover_mcp.enabled:
tools = build_mcp_tools(
settings.rover_mcp.server_url,
settings.rover_mcp.authorization,
settings.rover_mcp.timeout
)
print(f"Loaded {len(tools)} tools")
import pytest
from consumer_agent.agent import Agent
from consumer_agent.factory import create_chat_model
@pytest.mark.asyncio
async def test_agent_streaming():
model = create_chat_model()
agent = Agent(model)
messages = [{"role": "user", "content": "Hello"}]
events = []
async for event in agent.stream(messages, "You are helpful."):
events.append(event)
assert len(events) > 0
assert any(isinstance(e, CompletedEvent) for e in events)