Welcome to Day 25 of #30DaysOfLangChain – LangChain 0.3 Edition! Yesterday, we laid the groundwork for exposing our LangChain applications as RESTful APIs using FastAPI. Today, we’re taking that a significant step further: serving a more complex LangGraph agent and providing real-time streaming responses.

When dealing with intelligent agents that might perform multiple steps (like tool calls, reasoning, retries), a single, delayed response can lead to a poor user experience. Streaming allows you to provide immediate feedback to the user, showing the agent’s “thoughts” and “actions” as they happen, making the application feel much more responsive and transparent.

Why Stream API Responses?

Imagine asking a complex question to an AI agent. If you have to wait 10-20 seconds for the full answer, it can feel sluggish. Streaming solves this:

  • Improved User Experience: Users see progress immediately, reducing perceived latency. This is crucial for interactive chatbots and agents.
  • Real-time Feedback: Displaying the agent’s internal workings (tool calls, intermediate thoughts) enhances transparency and build trust.
  • Handling Long-Running Tasks: For operations that take time, streaming prevents timeouts and provides continuous updates.
  • Efficiency: Data is sent as it becomes available, rather than buffering the entire response in memory.

FastAPI Streaming with Server-Sent Events (SSE)

FastAPI, built on Starlette, provides excellent support for asynchronous operations and streaming. The StreamingResponse class is your key tool here. For real-time updates in a chat-like interface, Server-Sent Events (SSE) are a common and effective protocol.

SSE works over a single, long-lived HTTP connection, where the server pushes events to the client. Each event is a block of text formatted with data: lines, optionally an event: type, and terminated by \n\n.

In FastAPI, you achieve this by:

  1. Defining an async def generator() function that yields strings.
  2. Returning fastapi.responses.StreamingResponse(generator(), media_type="text/event-stream").

LangGraph astream_events for Detailed Agent Output

LangGraph’s power lies in its ability to orchestrate complex sequences of operations. To get granular, real-time insights into an agent’s execution, LangGraph provides the astream_events() method (specifically version="v2" is recommended).

astream_events() yields a stream of event dictionaries, each representing a step or transition within your graph. These events can include:

  • on_chat_model_start, on_chat_model_stream, on_chat_model_end: For LLM calls.
  • on_tool_start, on_tool_end: For tool invocations.
  • on_agent_action, on_agent_finish: For agent-specific states.
  • Node-specific events.

By iterating over these events and selectively sending them to the client via SSE, we can create a highly informative and responsive agent interface.

Project: Streaming LangGraph Agent with FastAPI

Our project today will:

  1. Define a simple LangGraph agent that can use a “search” tool (a dummy one for simplicity).
  2. Set up a FastAPI application with a /agent_stream endpoint.
  3. Use RunnableWithMessageHistory to manage the agent’s conversational memory across streaming requests.
  4. Invoke the LangGraph agent using agent_app.astream_events() and stream the events back to the client as SSE.

Before you run the code:

  • Install necessary libraries: pip install fastapi uvicorn "langchain-openai" "langchain-ollama" "langchain-community" langgraph python-dotenv
  • Ensure you have your OPENAI_API_KEY set if using OpenAI.
  • If using Ollama, ensure it’s running and you’ve pulled your desired models (e.g., ollama pull llama2 for chat, ollama pull nomic-embed-text for embeddings, though embeddings aren’t directly used in this streaming example, the chat model is key).
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
import os
import json
from typing import Dict, Any, AsyncGenerator, Tuple, List, Optional
import asyncio

from langchain_openai import ChatOpenAI
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, BaseMessage
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import InMemoryChatMessageHistory
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END, START
from langgraph.prebuilt import ToolExecutor, ToolNode

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# --- Configuration for LLM ---
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "ollama").lower() # 'openai' or 'ollama'
OLLAMA_MODEL_CHAT = os.getenv("OLLAMA_MODEL_CHAT", "llama2").lower()
OPENAI_MODEL_CHAT = os.getenv("OPENAI_MODEL_CHAT", "gpt-3.5-turbo")

# --- Initialize LLM ---
def get_llm():
    """Initializes and returns the ChatLargeLanguageModel based on provider."""
    if LLM_PROVIDER == "openai":
        if not os.getenv("OPENAI_API_KEY"):
            raise ValueError("OPENAI_API_KEY not set for OpenAI provider. Please set it.")
        return ChatOpenAI(model=OPENAI_MODEL_CHAT, temperature=0.7)
    elif LLM_PROVIDER == "ollama":
        try:
            llm_instance = ChatOllama(model=OLLAMA_MODEL_CHAT, temperature=0.7)
            llm_instance.invoke("test", config={"stream": False}) # Test connection
            return llm_instance
        except Exception as e:
            raise RuntimeError(f"Error connecting to Ollama LLM '{OLLAMA_MODEL_CHAT}': {e}. "
                               f"Ensure Ollama is running: `ollama run {OLLAMA_MODEL_CHAT}`") from e
    else:
        raise ValueError(f"Invalid LLM provider: {LLM_PROVIDER}. Must be 'openai' or 'ollama'.")

llm = get_llm().bind_tools([]) # Initially bind no tools, will update for agent

# --- Define a simple tool for the agent ---
@tool
def search_web(query: str) -> str:
    """Simulates searching the web for information."""
    # In a real app, you'd use a real search API (e.g., DuckDuckGoSearch, Google Search)
    if "current time" in query.lower():
        return f"The current time in Hyderabad, India is {asyncio.get_event_loop().time()}" # placeholder for current time
    return f"Simulated search result for '{query}': Information about '{query}' found here: [link to relevant info]"

tools = [search_web]
tool_names = [tool.name for tool in tools]
tool_executor = ToolExecutor(tools)

# --- Define the LangGraph Agent State ---
class AgentState(BaseModel):
    messages: List[BaseMessage]

# --- Define the agent nodes ---
def call_model(state: AgentState) -> dict:
    messages = state.messages
    response = llm.invoke(messages)
    return {"messages": [response]}

def call_tool(state: AgentState) -> dict:
    last_message = state.messages[-1]
    tool_calls = last_message.tool_calls
    
    if not tool_calls:
        # This should ideally not happen if the agent decided to call a tool
        return {"messages": [AIMessage(content="Agent decided to call a tool but no tool calls found.")]}

    tool_outputs = []
    for tool_call in tool_calls:
        tool_output = tool_executor.invoke(tool_call)
        tool_outputs.append(ToolMessage(content=str(tool_output), tool_call_id=tool_call.id))
    return {"messages": tool_outputs}

# --- Define the agent graph ---
workflow = StateGraph(AgentState)

# Define the nodes
workflow.add_node("llm", call_model)
workflow.add_node("tool", call_tool)

# Define the edges
workflow.add_edge(START, "llm")
workflow.add_edge("tool", "llm") # After tool execution, go back to LLM to summarize/continue

# Define the conditional edge for the LLM node
def should_continue(state: AgentState) -> str:
    messages = state.messages
    last_message = messages[-1]
    if not last_message.tool_calls:
        return "end" # If no tool calls, it's the final answer
    return "tool" # If there are tool calls, execute the tool

workflow.add_conditional_edges(
    "llm", # From LLM node
    should_continue,
    {"tool": "tool", "end": END}
)

# Compile the graph
agent_app = workflow.compile()
# Re-bind tools to the LLM now that the graph is defined
llm = get_llm().bind_tools(tools)


# --- In-memory store for chat histories (for RunnableWithMessageHistory) ---
# In a real application, this would be a persistent database
store: Dict[str, InMemoryChatMessageHistory] = {}

def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
    if session_id not in store:
        store[session_id] = InMemoryChatMessageHistory()
    return store[session_id]

# --- LangChain Runnable with Message History (for the LangGraph agent) ---
# Prompt template for the LangGraph agent
agent_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are an AI assistant with access to tools. If the user asks a question "
            "that requires external information (like current events, factual lookup, "
            "or specific data that you don't have), use the 'search_web' tool. "
            "Always respond to the user based on the tool results or your knowledge. "
            "Do not just repeat tool calls. "
            "History: {history}", # Placeholder for history
        ),
        ("human", "{input}"), # User's current input
        MessagesPlaceholder(variable_name="agent_scratchpad"), # For agent's internal monologue/tool calls
    ]
)

# Replace the base LLM with a version that has tools bound
# The LLM inside the graph (call_model) also needs tools bound if it's the one making tool_calls.
# This ensures that the agent can generate tool call messages.
# We ensure the LLM used in the agent graph also has tools bound.
llm_with_tools = get_llm().bind_tools(tools)

# The agent itself is the combination of prompt, LLM with tools, and its graph execution.
# We need to explicitly pass the tools to the LLM that's part of the prompt in the agent workflow.
# This needs careful integration if the agent's prompt directly involves tools,
# but for a simple agent like this, the 'should_continue' logic relies on the LLM's output.

# Compile the graph again if LLM binding changes, or ensure LLM is bound before compiling.
# For simplicity, we ensure the `llm` variable (which `call_model` uses) is bound.
# The graph is compiled once, and then `llm` used in `call_model` should be the one with tools.


# The actual chain that RunnableWithMessageHistory will wrap
# This is a simplified chain for the RunnableWithMessageHistory; the LangGraph agent will manage the internal steps.
# The `agent_app` (compiled graph) will be the runnable being invoked.
# We need `RunnableWithMessageHistory` to manage `history` for the *entire* agent execution.
# The `agent_app` expects `messages` as input.

# The input to the agent_app needs to be a list of BaseMessages.
# RunnableWithMessageHistory will feed the `history` as part of the `messages` list.
# `input_messages_key` will specify what the new message is.
# `history_messages_key` will specify where to put the history in the overall input.

# Let's adjust the agent_app's input structure to work well with RunnableWithMessageHistory
# LangGraph state typically expects `messages: List[BaseMessage]`.
# `RunnableWithMessageHistory` expects `input_messages_key` and `history_messages_key`.
# The `input` to `agent_app.invoke` will be a dictionary `{"messages": [HumanMessage(content=input_str)] + history_messages}`
# No, `RunnableWithMessageHistory` wraps a runnable and injects history.
# The internal state of the agent will have `messages`.
# The input to `chain_with_history` will be `{"input": user_message_str}`.
# `RunnableWithMessageHistory` will convert `user_message_str` to `HumanMessage` and append history.

# The overall agent chain that `RunnableWithMessageHistory` wraps
# It expects `input` and `history`.
# The LangGraph app itself operates on `messages` in its `AgentState`.
# We need a small adapter chain.

# Adapter from RunnableWithMessageHistory input format to LangGraph agent state format
def _format_for_agent_app(input_dict: dict) -> AgentState:
    # input_dict will have 'input' (current message) and 'history' (chat history)
    # The LangGraph agent_app expects AgentState(messages=[history + current_message])
    history_messages = input_dict.get("history", [])
    current_message = HumanMessage(content=input_dict["input"])
    return {"messages": history_messages + [current_message]} # Use dict for TypedDict compatible input

# The LangGraph agent app itself is the core runnable here.
# It returns the final AIMessage, so we can pass it directly.
final_chain = agent_app # Our compiled LangGraph agent

chain_with_history = RunnableWithMessageHistory(
    final_chain,
    get_session_history,
    input_messages_key="input", # Key for the current user message
    history_messages_key="history", # Key where history will be injected by RWMH
    # Ensure the history is passed correctly to the agent_app
    # The agent_app expects messages in its state.
    # We need to map `input` and `history` from RWMH into `messages` for agent_app.
    # Let's use `chain` to combine history and input before passing to agent_app.
).with_types(input_type={"input": str}, output_type=Any) # Define input/output types for clarity


# --- FastAPI App Setup ---
app = FastAPI(
    title="LangGraph Streaming Agent API",
    description="A FastAPI endpoint for a LangGraph agent with real-time streaming responses (SSE).",
    version="0.1.0",
)

# --- Pydantic Models for Request ---
class AgentRequest(BaseModel):
    """Request schema for the agent endpoint."""
    session_id: str
    message: str

# --- API Endpoint ---
@app.post("/agent_stream")
async def stream_agent_response(request: AgentRequest):
    """
    Streams responses from the LangGraph agent, including thoughts and actions,
    using Server-Sent Events (SSE).
    """
    async def event_generator():
        try:
            # We need to manually update history for streaming events from LangGraph,
            # as `RunnableWithMessageHistory.astream_events` is not directly supported.
            # Instead, we'll get the history, pass it to the agent_app, and then
            # manage history updates manually based on the agent's output.

            session_history = get_session_history(request.session_id)
            current_messages = session_history.messages + [HumanMessage(content=request.message)]

            # Astream events from the LangGraph agent
            # We directly stream the agent_app (the compiled graph)
            async for event in agent_app.astream_events(
                {"messages": current_messages}, # AgentState input
                config={"configurable": {"session_id": request.session_id}},
                version="v2" # Recommended for more consistent event structure
            ):
                event_name = event["event"]
                event_data = event["data"]
                
                # Filter events for what we want to send to the client
                # You can customize this to send more or less detail
                payload = None
                if event_name == "on_chat_model_start":
                    payload = {"type": "llm_start", "name": event["name"], "input": event_data.get("input")}
                elif event_name == "on_chat_model_stream":
                    chunk_content = event_data["chunk"].content
                    if chunk_content: # Only send if there's actual content
                        payload = {"type": "llm_stream", "content": chunk_content}
                elif event_name == "on_tool_start":
                    payload = {"type": "tool_start", "name": event["name"], "input": event_data.get("input")}
                elif event_name == "on_tool_end":
                    payload = {"type": "tool_end", "output": str(event_data.get("output"))}
                elif event_name == "on_chain_end" and event["name"] == "agent_app": # Final output of the agent
                    final_messages = event_data.get("output", {}).get("messages", [])
                    if final_messages:
                        final_response = final_messages[-1].content
                        payload = {"type": "final_answer", "content": final_response}
                        # Update the session history with the final exchange
                        session_history.add_user_message(request.message)
                        session_history.add_ai_message(final_response)

                if payload:
                    yield f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n"
            
            # After the stream ends, ensure the connection is closed gracefully
            # A final event can signal the end of the response
            yield "event: stream_end\ndata: {}\n\n"

        except Exception as e:
            # Log the error for server-side debugging
            print(f"Error in streaming: {e}")
            # Send an error event to the client
            yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
            # Ensure the connection is closed
            await asyncio.sleep(0.1) # Small delay before closing for client to receive error
            # Re-raise the exception or handle as needed for FastAPI error logging
            raise HTTPException(status_code=500, detail="Internal server error during streaming.")


    return StreamingResponse(event_generator(), media_type="text/event-stream")

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "ok", "llm_provider": LLM_PROVIDER}

# --- How to run this app ---
# To run this file directly (for development with auto-reload):
# uvicorn day25-fastapi-streaming-agent:app --reload --host 0.0.0.0 --port 8000
#
# Open your browser to http://localhost:8000/docs for interactive API documentation.
# Test with a tool like curl (requires --no-buffer for real-time streaming):
# curl --no-buffer -X POST "http://localhost:8000/agent_stream" -H "Content-Type: application/json" -d '{"session_id": "user123", "message": "What is the current time?"}'
# curl --no-buffer -X POST "http://localhost:8000/agent_stream" -H "Content-Type: application/json" -d '{"session_id": "user123", "message": "Tell me a joke."}'

# Example of a simple HTML client (save as `index.html` and open in browser):
"""
<!DOCTYPE html>
<html>
<head>
    <title>LangGraph Streaming Agent</title>
    <style>
        body { font-family: sans-serif; margin: 20px; }
        .chat-container { max-width: 600px; margin: auto; border: 1px solid #ccc; padding: 15px; border-radius: 8px; }
        .message { margin-bottom: 10px; padding: 8px; border-radius: 5px; }
        .user-message { background-color: #e0f7fa; text-align: right; }
        .ai-message { background-color: #f1f8e9; text-align: left; }
        #output { border: 1px solid #eee; padding: 10px; min-height: 150px; overflow-y: auto; background-color: #f9f9f9; }
        #controls { margin-top: 15px; }
        textarea { width: calc(100% - 20px); padding: 10px; margin-bottom: 10px; border-radius: 5px; border: 1px solid #ddd; }
        button { padding: 10px 15px; background-color: #007bff; color: white; border: none; border-radius: 5px; cursor: pointer; }
        button:hover { background-color: #0056b3; }
        .event-log { font-family: monospace; font-size: 0.8em; color: #555; background-color: #f0f0f0; padding: 5px; margin-top: 5px; border-radius: 3px; }
    </style>
</head>
<body>
    <div class="chat-container">
        <h1>LangGraph Streaming Agent</h1>
        <p>Ask a question, and see the agent's thoughts and actions in real-time!</p>
        <div id="output"></div>
        <div id="controls">
            <input type="text" id="sessionId" value="user_session_1" placeholder="Enter Session ID">
            <textarea id="userInput" placeholder="Type your message..."></textarea>
            <button onclick="sendMessage()">Send</button>
        </div>
    </div>

    <script>
        const outputDiv = document.getElementById('output');
        const userInput = document.getElementById('userInput');
        const sessionIdInput = document.getElementById('sessionId');
        let currentEventSource = null;
        let aiMessageBuffer = ''; // Buffer for accumulating LLM stream chunks

        function appendMessage(role, content) {
            const msgDiv = document.createElement('div');
            msgDiv.className = `message ${role}-message`;
            msgDiv.innerHTML = content; // Use innerHTML to allow for formatting
            outputDiv.appendChild(msgDiv);
            outputDiv.scrollTop = outputDiv.scrollHeight; // Auto-scroll
        }

        function appendEventLog(message) {
            const logDiv = document.createElement('div');
            logDiv.className = 'event-log';
            logDiv.textContent = message;
            outputDiv.appendChild(logDiv);
            outputDiv.scrollTop = outputDiv.scrollHeight;
        }

        function sendMessage() {
            const message = userInput.value;
            const sessionId = sessionIdInput.value;
            if (!message.trim() || !sessionId.trim()) {
                alert("Please enter a message and a Session ID.");
                return;
            }

            appendMessage('user', message);
            userInput.value = '';
            aiMessageBuffer = ''; // Reset buffer for new message

            if (currentEventSource) {
                currentEventSource.close(); // Close any existing connection
            }

            // Using fetch with EventSource is cleaner for POST requests
            fetch('http://localhost:8000/agent_stream', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json'
                },
                body: JSON.stringify({
                    session_id: sessionId,
                    message: message
                })
            })
            .then(response => {
                if (!response.ok) {
                    throw new Error(`HTTP error! Status: ${response.status}`);
                }
                const reader = response.body.getReader();
                const decoder = new TextDecoder('utf-8');
                let processedMessageElement = null; // Element to update for AI's final message

                async function processStream() {
                    let buffer = '';
                    while (true) {
                        const { done, value } = await reader.read();
                        if (done) {
                            console.log('Stream complete');
                            if (processedMessageElement) {
                                processedMessageElement.innerHTML = aiMessageBuffer; // Final update
                            }
                            break;
                        }

                        buffer += decoder.decode(value, { stream: true });
                        let lines = buffer.split('\n');
                        buffer = lines.pop(); // Keep incomplete last line in buffer

                        for (const line of lines) {
                            if (line.startsWith('data: ')) {
                                try {
                                    const data = JSON.parse(line.substring(6));
                                    if (data.type === 'llm_stream') {
                                        if (!processedMessageElement) {
                                            processedMessageElement = document.createElement('div');
                                            processedMessageElement.className = 'message ai-message';
                                            outputDiv.appendChild(processedMessageElement);
                                            outputDiv.scrollTop = outputDiv.scrollHeight;
                                        }
                                        aiMessageBuffer += data.content;
                                        processedMessageElement.innerHTML = aiMessageBuffer; // Update in place
                                    } else if (data.type === 'final_answer') {
                                        // This is redundant if llm_stream already built the answer,
                                        // but good for explicit finalization or if no llm_stream events occurred.
                                        if (!processedMessageElement) {
                                            appendMessage('ai', data.content);
                                        } else {
                                            processedMessageElement.innerHTML = data.content;
                                        }
                                        appendEventLog(`Agent Finished: ${data.content.substring(0, 50)}...`);
                                        aiMessageBuffer = ''; // Clear buffer
                                    } else if (data.type === 'llm_start') {
                                        appendEventLog(`LLM Thinking (Node: ${data.name})...`);
                                    } else if (data.type === 'tool_start') {
                                        appendEventLog(`Calling Tool: ${data.name} with input: ${JSON.stringify(data.input)}`);
                                    } else if (data.type === 'tool_end') {
                                        appendEventLog(`Tool Output: ${data.output.substring(0, 100)}...`);
                                    } else if (data.type === 'error') {
                                        appendEventLog(`ERROR: ${data.error}`);
                                    } else if (data.type === 'stream_end') {
                                        appendEventLog('Stream closed by server.');
                                    } else {
                                        appendEventLog(`Received event: ${JSON.stringify(data)}`);
                                    }
                                } catch (e) {
                                    console.error('Error parsing JSON:', e, line);
                                    appendEventLog(`Malformed event: ${line}`);
                                }
                            } else if (line.startsWith('event: ')) {
                                // This block could be used to directly read event types if 'data:' isn't always present
                                // but our JSON payload includes 'type' key, so it's less critical.
                            }
                        }
                    }
                }
                processStream().catch(error => {
                    console.error('Stream processing error:', error);
                    appendEventLog(`Stream processing error: ${error.message}`);
                });
            })
            .catch(error => {
                console.error('Fetch error:', error);
                appendEventLog(`Failed to connect to streaming endpoint: ${error.message}`);
                appendMessage('error', `Failed to get response: ${error.message}`);
            });
        }
    </script>
</body>
</html>
"""

Code Explanation & Key Takeaways:

  1. Dependencies: Added langgraph for the agent framework.
  2. FastAPI Setup: Standard FastAPI app initialization.
  3. LLM and Tool Setup:
    • get_llm(): Initializes the LLM (OpenAI or Ollama). Note that bind_tools([]) is initially called, but the llm used within the LangGraph’s call_model node will need tools bound for the LLM to know how to generate tool calls. We ensure llm_with_tools = get_llm().bind_tools(tools) is available and conceptually used by the call_model in the agent.
    • search_web Tool: A simple @tool decorated function. In a real application, this would be a real search API call.
  4. LangGraph Agent Definition:
    • AgentState: A BaseModel (or TypedDict) to define the state of our agent, which primarily holds a list of messages.
    • call_model: A node that takes the current messages from the state and invokes the LLM.
    • call_tool: A node that takes the last message (expected to contain tool calls), executes the tool(s) using ToolExecutor, and returns ToolMessages.
    • should_continue: A conditional edge function that decides whether the agent should continue by calling a tool or if it’s done and should END. This is the core of the agent’s “decision making”.
    • workflow.compile(): Compiles the graph into a runnable agent_app.
  5. RunnableWithMessageHistory and LangGraph Integration:
    • The store and get_session_history are similar to Day 24, providing an in-memory history.
    • Crucial Part: agent_app.astream_events(...) is invoked directly inside the event_generator.
      • The input to agent_app.astream_events is {"messages": current_messages}, where current_messages is the combination of historical messages (retrieved from session_history) and the new user message. This effectively passes the full conversation context to the LangGraph agent for its execution.
      • config={"configurable": {"session_id": request.session_id}}: Passes the session ID, which LangGraph’s event system can use (though RunnableWithMessageHistory uses it more directly for history management).
      • version="v2": Recommended for a more consistent event structure.
  6. FastAPI Streaming Endpoint (@app.post("/agent_stream")):
    • async def event_generator(): This is the asynchronous generator that StreamingResponse consumes.
    • Event Filtering and Formatting: The for await event in agent_app.astream_events(...) loop is where the magic happens. We iterate through each event emitted by the LangGraph agent.
      • We then filter these events (on_chat_model_start, on_tool_start, on_tool_end, on_chat_model_stream, on_chain_end for agent_app) to extract relevant information.
      • Each piece of information is formatted into a JSON payload and then into an SSE string (event: <type>\ndata: <json_payload>\n\n).
      • yield sends these events to the client in real-time.
    • session_history.add_user_message(request.message) and session_history.add_ai_message(final_response): We explicitly update the InMemoryChatMessageHistory after the agent completes its final answer. This is important because astream_events provides granular events, but the overall RunnableWithMessageHistory might not automatically commit the final state when using astream_events directly on the wrapped runnable. This manual update ensures the next call with the same session ID gets the full, updated history.
    • Error Handling: Includes try-except to catch errors during streaming and send an error event to the client.
  7. Client-Side HTML Example: A simple HTML file with JavaScript is provided to demonstrate how to consume these SSE events. It uses fetch to initiate the POST request and response.body.getReader() to process the streamed response chunk by chunk, accumulating LLM output and logging agent events.

This setup provides a highly responsive and transparent agent experience, crucial for complex AI applications.


Key Takeaway

Day 25 was all about building a responsive, real-time AI backend! We integrated a LangGraph agent into FastAPI and leveraged astream_events to capture its detailed thoughts and actions. By streaming these granular events as Server-Sent Events (SSE), we created an API that provides immediate, step-by-step feedback, transforming a potentially slow, black-box agent into a dynamic and transparent conversational partner. This is fundamental for modern, user-friendly AI applications.

Leave a comment

I’m Arpan

I’m a Software Engineer driven by curiosity and a deep interest in Generative AI Technologies. I believe we’re standing at the frontier of a new era—where machines not only learn but create, and I’m excited to explore what’s possible at this intersection of intelligence and imagination.

When I’m not writing code or experimenting with new AI models, you’ll probably find me travelling, soaking in new cultures, or reading a book that challenges how I think. I thrive on new ideas—especially ones that can be turned into meaningful, impactful projects. If it’s bold, innovative, and GenAI-related, I’m all in.

“The future belongs to those who believe in the beauty of their dreams.”Eleanor Roosevelt

“Imagination is more important than knowledge. For knowledge is limited, whereas imagination embraces the entire world.”Albert Einstein

This blog, MLVector, is my space to share technical insights, project breakdowns, and explorations in GenAI—from the models shaping tomorrow to the code powering today.

Let’s build the future, one vector at a time.

Let’s connect