Streaming Response Handling
Implementation and best practices for AI Agent streaming responses
Streaming Response Handling
ProductReady's AI Agent system uses streaming responses to provide real-time user experiences. This document covers how to implement and handle streaming responses.
Overview
Streaming responses allow AI-generated content to be transmitted to the client in real-time, rather than waiting for the entire response to complete before displaying. This provides a better user experience, especially for long content generation tasks.
Core Features
- Real-time Streaming: Content is progressively generated and displayed immediately
- Resumable Streams: Support for resuming streaming after network interruptions
- Tool Call Support: Stream includes tool calls and execution results
- Type Safety: Complete TypeScript type support
Basic Usage
1. Server-Side Implementation
All Agent Providers return AI SDK-compatible streaming responses:
import { GaiaAgentProvider } from "agentlib/providers";
import type { CoreMessage } from "ai";
// Create provider
const provider = new GaiaAgentProvider({
model: "gpt-4",
apiKey: process.env.OPENAI_API_KEY,
});
// Prepare messages
const messages: CoreMessage[] = [
{ role: "user", content: "Write an article about AI" }
];
// Call streaming interface
const result = await provider.stream(messages);
// Convert to HTTP response
const response = result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
console.log("Streaming completed:", messages);
},
});
return response;2. Client-Side Usage
Use AI SDK's useChat hook:
'use client';
import { useChat } from 'ai/react';
export function ChatComponent() {
const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
api: '/api/chat',
});
return (
<div>
<div>
{messages.map((message) => (
<div key={message.id}>
<strong>{message.role}:</strong>
<p>{message.content}</p>
</div>
))}
</div>
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={handleInputChange}
disabled={isLoading}
placeholder="Enter message..."
/>
<button type="submit" disabled={isLoading}>
Send
</button>
</form>
</div>
);
}Advanced Features
Resumable Streams
Support for resuming streaming after network interruptions, based on AI SDK v6's resumable stream pattern:
import { createConsumeSseStreamCallback, createResumeResponse } from "agentlib/stream";
import { after } from "next/server";
// POST handler - Create new stream
export async function POST(req: Request) {
const { messages, taskId } = await req.json();
const provider = new GaiaAgentProvider({
model: "gpt-4",
apiKey: process.env.OPENAI_API_KEY,
});
const result = await provider.stream(messages);
return result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
// Save messages to database
await saveMessages(taskId, messages);
// Clear active stream ID
await saveActiveStreamId(taskId, null);
},
// Configure resumable stream
consumeSseStream: createConsumeSseStreamCallback({
waitUntil: after,
onStreamCreated: async (streamId) => {
// Save stream ID for later resumption
await saveActiveStreamId(taskId, streamId);
},
}),
});
}
// GET handler - Resume existing stream
export async function GET(req: Request) {
const { searchParams } = new URL(req.url);
const taskId = searchParams.get('taskId');
if (!taskId) {
return new Response('Missing taskId', { status: 400 });
}
// Read active stream ID
const activeStreamId = await readActiveStreamId(taskId);
// Create resume response
return createResumeResponse({
activeStreamId,
waitUntil: after,
onStreamDone: () => saveActiveStreamId(taskId, null),
});
}Custom Message ID Generation
Control how message IDs are generated:
import { createIdGenerator } from "ai";
const result = await provider.stream(messages);
return result.toUIMessageStreamResponse({
generateMessageId: createIdGenerator({
prefix: "msg",
size: 16
}),
onFinish: async ({ messages }) => {
// Handle completion logic
},
});Tool Call Handling
Streaming responses include tool call information:
import { tool } from "ai";
import { z } from "zod";
// Define tools
const tools = {
generate_article: tool({
description: "Generate article content",
inputSchema: z.object({
title: z.string(),
content: z.string(),
}),
execute: async ({ title, content }) => {
// Execute article generation
await createArticle(title, content);
return { success: true };
},
}),
};
// Use provider with tools
const provider = new GaiaAgentProvider({
model: "gpt-4",
apiKey: process.env.OPENAI_API_KEY,
tools,
});
const result = await provider.stream(messages);
return result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
// Messages include tool calls and results
messages.forEach(msg => {
if (msg.toolInvocations) {
console.log("Tool calls:", msg.toolInvocations);
}
});
},
});Advanced Client Usage
Handling Tool Calls
'use client';
import { useChat } from 'ai/react';
export function AdvancedChatComponent() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: '/api/chat',
onToolCall: ({ toolCall }) => {
console.log('Tool call:', toolCall);
// Can display tool execution status in UI
},
});
return (
<div>
{messages.map((message) => (
<div key={message.id}>
<p>{message.content}</p>
{message.toolInvocations?.map((tool) => (
<div key={tool.toolCallId}>
<strong>Tool: {tool.toolName}</strong>
{tool.state === 'result' && (
<pre>{JSON.stringify(tool.result, null, 2)}</pre>
)}
</div>
))}
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
<button type="submit">Send</button>
</form>
</div>
);
}Stream Interruption & Reconnection
'use client';
import { useChat } from 'ai/react';
import { useEffect } from 'react';
export function RobustChatComponent() {
const { messages, input, handleInputChange, handleSubmit, reload } = useChat({
api: '/api/chat',
onError: (error) => {
console.error('Streaming error:', error);
// Can automatically retry
},
});
// Handle page resume
useEffect(() => {
const handleVisibilityChange = () => {
if (document.visibilityState === 'visible') {
// Try to resume stream when page becomes visible again
reload();
}
};
document.addEventListener('visibilitychange', handleVisibilityChange);
return () => {
document.removeEventListener('visibilitychange', handleVisibilityChange);
};
}, [reload]);
return (
<div>
{messages.map((message) => (
<div key={message.id}>
<p>{message.content}</p>
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
<button type="submit">Send</button>
</form>
</div>
);
}Real-World Example
Complete API Route Implementation
// app/api/chat/route.ts
import { GaiaAgentProvider } from "agentlib/providers";
import { createConsumeSseStreamCallback, createResumeResponse } from "agentlib/stream";
import { after } from "next/server";
import { auth } from "@clerk/nextjs/server";
// POST - Create new streaming conversation
export async function POST(req: Request) {
const { userId } = await auth();
if (!userId) {
return new Response('Unauthorized', { status: 401 });
}
const { messages, taskId } = await req.json();
// Create provider
const provider = new GaiaAgentProvider({
model: process.env.AGENT_MODEL || "gpt-4",
apiKey: process.env.OPENAI_API_KEY,
tools: getCustomTools(taskId),
});
// Clear previous stream
if (taskId) {
await saveActiveStreamId(taskId, null);
}
// Execute streaming call
const result = await provider.stream(messages);
return result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
// Save messages
if (taskId) {
await saveMessages(taskId, messages);
await saveActiveStreamId(taskId, null);
}
// Billing handling
const assistantMsgs = messages.filter(m => m.role === 'assistant');
if (assistantMsgs.length > 0) {
await consumeUsage(userId, 'ai_credits', assistantMsgs.length);
}
},
consumeSseStream: taskId
? createConsumeSseStreamCallback({
waitUntil: after,
onStreamCreated: (streamId) => saveActiveStreamId(taskId, streamId),
})
: undefined,
});
}
// GET - Resume existing stream
export async function GET(req: Request) {
const { searchParams } = new URL(req.url);
const taskId = searchParams.get('taskId');
if (!taskId) {
return new Response('Missing taskId', { status: 400 });
}
const activeStreamId = await readActiveStreamId(taskId);
return createResumeResponse({
activeStreamId,
waitUntil: after,
onStreamDone: () => saveActiveStreamId(taskId, null),
});
}Best Practices
1. Error Handling
// ✅ Good: Properly handle streaming errors
try {
const result = await provider.stream(messages);
return result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
await saveMessages(taskId, messages);
},
});
} catch (error) {
console.error("Streaming call failed:", error);
// Return error response
return new Response(
JSON.stringify({ error: "Streaming failed" }),
{
status: 500,
headers: { 'Content-Type': 'application/json' }
}
);
}2. Message Persistence
// ✅ Good: Save messages in onFinish
return result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
try {
// Filter and save messages
const validMessages = messages.filter(m =>
m.content && m.content.length > 0
);
await saveMessages(taskId, validMessages);
} catch (error) {
console.error("Failed to save messages:", error);
// Don't throw error to avoid interrupting stream
}
},
});3. Resource Cleanup
// ✅ Good: Ensure stream resource cleanup
return result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
await saveMessages(taskId, messages);
// Clear active stream ID
await saveActiveStreamId(taskId, null);
},
consumeSseStream: createConsumeSseStreamCallback({
waitUntil: after,
onStreamCreated: async (streamId) => {
await saveActiveStreamId(taskId, streamId);
// Set timeout for cleanup
setTimeout(async () => {
const current = await readActiveStreamId(taskId);
if (current === streamId) {
await saveActiveStreamId(taskId, null);
}
}, 5 * 60 * 1000); // Cleanup after 5 minutes
},
}),
});4. Client-Side Reconnection Strategy
// ✅ Good: Implement smart reconnection
const { messages, input, handleInputChange, handleSubmit, reload } = useChat({
api: '/api/chat',
onError: (error) => {
console.error('Streaming error:', error);
// Retry on network errors
if (error.message.includes('network') || error.message.includes('fetch')) {
setTimeout(() => reload(), 1000);
}
},
// Enable automatic retry
maxRetries: 3,
retryDelay: 1000,
});Performance Optimization
Message Optimization
import { optimizeMessagesForContext } from "agentlib/chat";
// Optimize message history to avoid token limits
const optimized = optimizeMessagesForContext(messages);
const result = await provider.stream(optimized);Stream Buffering
// Use throttling on client to reduce re-renders
import { useState, useEffect } from 'react';
export function ChatComponent() {
const { messages } = useChat({ api: '/api/chat' });
const [displayMessages, setDisplayMessages] = useState(messages);
// Use throttling to update displayed messages
useEffect(() => {
const timer = setTimeout(() => {
setDisplayMessages(messages);
}, 100);
return () => clearTimeout(timer);
}, [messages]);
return (
<div>
{displayMessages.map((message) => (
<div key={message.id}>{message.content}</div>
))}
</div>
);
}Troubleshooting
Stream Interruption
Problem: Streaming transmission interrupted mid-way
Solutions:
- Check network connectivity
- Verify API key is valid
- Check if rate limits are reached
- Use resumable stream mode
Message Loss
Problem: Some messages not saved
Solutions:
- Ensure
onFinishis properly implemented - Add error handling and logging
- Use database transactions for atomicity
Tool Call Failures
Problem: Tool execution exceptions
Solutions:
- Verify tool schema definitions
- Add tool execution error handling
- Log tool call details