GameCraftGameCraft

Streaming Response Handling

Implementation and best practices for AI Agent streaming responses

Streaming Response Handling

ProductReady's AI Agent system uses streaming responses to provide real-time user experiences. This document covers how to implement and handle streaming responses.

Overview

Streaming responses allow AI-generated content to be transmitted to the client in real-time, rather than waiting for the entire response to complete before displaying. This provides a better user experience, especially for long content generation tasks.

Core Features

  • Real-time Streaming: Content is progressively generated and displayed immediately
  • Resumable Streams: Support for resuming streaming after network interruptions
  • Tool Call Support: Stream includes tool calls and execution results
  • Type Safety: Complete TypeScript type support

Basic Usage

1. Server-Side Implementation

All Agent Providers return AI SDK-compatible streaming responses:

import { GaiaAgentProvider } from "agentlib/providers";
import type { CoreMessage } from "ai";

// Create provider
const provider = new GaiaAgentProvider({
  model: "gpt-4",
  apiKey: process.env.OPENAI_API_KEY,
});

// Prepare messages
const messages: CoreMessage[] = [
  { role: "user", content: "Write an article about AI" }
];

// Call streaming interface
const result = await provider.stream(messages);

// Convert to HTTP response
const response = result.toUIMessageStreamResponse({
  onFinish: async ({ messages }) => {
    console.log("Streaming completed:", messages);
  },
});

return response;

2. Client-Side Usage

Use AI SDK's useChat hook:

'use client';

import { useChat } from 'ai/react';

export function ChatComponent() {
  const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
    api: '/api/chat',
  });

  return (
    <div>
      <div>
        {messages.map((message) => (
          <div key={message.id}>
            <strong>{message.role}:</strong>
            <p>{message.content}</p>
          </div>
        ))}
      </div>
      
      <form onSubmit={handleSubmit}>
        <input
          value={input}
          onChange={handleInputChange}
          disabled={isLoading}
          placeholder="Enter message..."
        />
        <button type="submit" disabled={isLoading}>
          Send
        </button>
      </form>
    </div>
  );
}

Advanced Features

Resumable Streams

Support for resuming streaming after network interruptions, based on AI SDK v6's resumable stream pattern:

import { createConsumeSseStreamCallback, createResumeResponse } from "agentlib/stream";
import { after } from "next/server";

// POST handler - Create new stream
export async function POST(req: Request) {
  const { messages, taskId } = await req.json();
  
  const provider = new GaiaAgentProvider({
    model: "gpt-4",
    apiKey: process.env.OPENAI_API_KEY,
  });
  
  const result = await provider.stream(messages);
  
  return result.toUIMessageStreamResponse({
    onFinish: async ({ messages }) => {
      // Save messages to database
      await saveMessages(taskId, messages);
      // Clear active stream ID
      await saveActiveStreamId(taskId, null);
    },
    // Configure resumable stream
    consumeSseStream: createConsumeSseStreamCallback({
      waitUntil: after,
      onStreamCreated: async (streamId) => {
        // Save stream ID for later resumption
        await saveActiveStreamId(taskId, streamId);
      },
    }),
  });
}

// GET handler - Resume existing stream
export async function GET(req: Request) {
  const { searchParams } = new URL(req.url);
  const taskId = searchParams.get('taskId');
  
  if (!taskId) {
    return new Response('Missing taskId', { status: 400 });
  }
  
  // Read active stream ID
  const activeStreamId = await readActiveStreamId(taskId);
  
  // Create resume response
  return createResumeResponse({
    activeStreamId,
    waitUntil: after,
    onStreamDone: () => saveActiveStreamId(taskId, null),
  });
}

Custom Message ID Generation

Control how message IDs are generated:

import { createIdGenerator } from "ai";

const result = await provider.stream(messages);

return result.toUIMessageStreamResponse({
  generateMessageId: createIdGenerator({ 
    prefix: "msg", 
    size: 16 
  }),
  onFinish: async ({ messages }) => {
    // Handle completion logic
  },
});

Tool Call Handling

Streaming responses include tool call information:

import { tool } from "ai";
import { z } from "zod";

// Define tools
const tools = {
  generate_article: tool({
    description: "Generate article content",
    inputSchema: z.object({
      title: z.string(),
      content: z.string(),
    }),
    execute: async ({ title, content }) => {
      // Execute article generation
      await createArticle(title, content);
      return { success: true };
    },
  }),
};

// Use provider with tools
const provider = new GaiaAgentProvider({
  model: "gpt-4",
  apiKey: process.env.OPENAI_API_KEY,
  tools,
});

const result = await provider.stream(messages);

return result.toUIMessageStreamResponse({
  onFinish: async ({ messages }) => {
    // Messages include tool calls and results
    messages.forEach(msg => {
      if (msg.toolInvocations) {
        console.log("Tool calls:", msg.toolInvocations);
      }
    });
  },
});

Advanced Client Usage

Handling Tool Calls

'use client';

import { useChat } from 'ai/react';

export function AdvancedChatComponent() {
  const { messages, input, handleInputChange, handleSubmit } = useChat({
    api: '/api/chat',
    onToolCall: ({ toolCall }) => {
      console.log('Tool call:', toolCall);
      // Can display tool execution status in UI
    },
  });

  return (
    <div>
      {messages.map((message) => (
        <div key={message.id}>
          <p>{message.content}</p>
          
          {message.toolInvocations?.map((tool) => (
            <div key={tool.toolCallId}>
              <strong>Tool: {tool.toolName}</strong>
              {tool.state === 'result' && (
                <pre>{JSON.stringify(tool.result, null, 2)}</pre>
              )}
            </div>
          ))}
        </div>
      ))}
      
      <form onSubmit={handleSubmit}>
        <input value={input} onChange={handleInputChange} />
        <button type="submit">Send</button>
      </form>
    </div>
  );
}

Stream Interruption & Reconnection

'use client';

import { useChat } from 'ai/react';
import { useEffect } from 'react';

export function RobustChatComponent() {
  const { messages, input, handleInputChange, handleSubmit, reload } = useChat({
    api: '/api/chat',
    onError: (error) => {
      console.error('Streaming error:', error);
      // Can automatically retry
    },
  });

  // Handle page resume
  useEffect(() => {
    const handleVisibilityChange = () => {
      if (document.visibilityState === 'visible') {
        // Try to resume stream when page becomes visible again
        reload();
      }
    };

    document.addEventListener('visibilitychange', handleVisibilityChange);
    return () => {
      document.removeEventListener('visibilitychange', handleVisibilityChange);
    };
  }, [reload]);

  return (
    <div>
      {messages.map((message) => (
        <div key={message.id}>
          <p>{message.content}</p>
        </div>
      ))}
      
      <form onSubmit={handleSubmit}>
        <input value={input} onChange={handleInputChange} />
        <button type="submit">Send</button>
      </form>
    </div>
  );
}

Real-World Example

Complete API Route Implementation

// app/api/chat/route.ts
import { GaiaAgentProvider } from "agentlib/providers";
import { createConsumeSseStreamCallback, createResumeResponse } from "agentlib/stream";
import { after } from "next/server";
import { auth } from "@clerk/nextjs/server";

// POST - Create new streaming conversation
export async function POST(req: Request) {
  const { userId } = await auth();
  
  if (!userId) {
    return new Response('Unauthorized', { status: 401 });
  }

  const { messages, taskId } = await req.json();

  // Create provider
  const provider = new GaiaAgentProvider({
    model: process.env.AGENT_MODEL || "gpt-4",
    apiKey: process.env.OPENAI_API_KEY,
    tools: getCustomTools(taskId),
  });

  // Clear previous stream
  if (taskId) {
    await saveActiveStreamId(taskId, null);
  }

  // Execute streaming call
  const result = await provider.stream(messages);

  return result.toUIMessageStreamResponse({
    onFinish: async ({ messages }) => {
      // Save messages
      if (taskId) {
        await saveMessages(taskId, messages);
        await saveActiveStreamId(taskId, null);
      }
      
      // Billing handling
      const assistantMsgs = messages.filter(m => m.role === 'assistant');
      if (assistantMsgs.length > 0) {
        await consumeUsage(userId, 'ai_credits', assistantMsgs.length);
      }
    },
    consumeSseStream: taskId
      ? createConsumeSseStreamCallback({
          waitUntil: after,
          onStreamCreated: (streamId) => saveActiveStreamId(taskId, streamId),
        })
      : undefined,
  });
}

// GET - Resume existing stream
export async function GET(req: Request) {
  const { searchParams } = new URL(req.url);
  const taskId = searchParams.get('taskId');

  if (!taskId) {
    return new Response('Missing taskId', { status: 400 });
  }

  const activeStreamId = await readActiveStreamId(taskId);

  return createResumeResponse({
    activeStreamId,
    waitUntil: after,
    onStreamDone: () => saveActiveStreamId(taskId, null),
  });
}

Best Practices

1. Error Handling

// ✅ Good: Properly handle streaming errors
try {
  const result = await provider.stream(messages);
  return result.toUIMessageStreamResponse({
    onFinish: async ({ messages }) => {
      await saveMessages(taskId, messages);
    },
  });
} catch (error) {
  console.error("Streaming call failed:", error);
  
  // Return error response
  return new Response(
    JSON.stringify({ error: "Streaming failed" }), 
    { 
      status: 500,
      headers: { 'Content-Type': 'application/json' }
    }
  );
}

2. Message Persistence

// ✅ Good: Save messages in onFinish
return result.toUIMessageStreamResponse({
  onFinish: async ({ messages }) => {
    try {
      // Filter and save messages
      const validMessages = messages.filter(m => 
        m.content && m.content.length > 0
      );
      await saveMessages(taskId, validMessages);
    } catch (error) {
      console.error("Failed to save messages:", error);
      // Don't throw error to avoid interrupting stream
    }
  },
});

3. Resource Cleanup

// ✅ Good: Ensure stream resource cleanup
return result.toUIMessageStreamResponse({
  onFinish: async ({ messages }) => {
    await saveMessages(taskId, messages);
    // Clear active stream ID
    await saveActiveStreamId(taskId, null);
  },
  consumeSseStream: createConsumeSseStreamCallback({
    waitUntil: after,
    onStreamCreated: async (streamId) => {
      await saveActiveStreamId(taskId, streamId);
      
      // Set timeout for cleanup
      setTimeout(async () => {
        const current = await readActiveStreamId(taskId);
        if (current === streamId) {
          await saveActiveStreamId(taskId, null);
        }
      }, 5 * 60 * 1000); // Cleanup after 5 minutes
    },
  }),
});

4. Client-Side Reconnection Strategy

// ✅ Good: Implement smart reconnection
const { messages, input, handleInputChange, handleSubmit, reload } = useChat({
  api: '/api/chat',
  onError: (error) => {
    console.error('Streaming error:', error);
    
    // Retry on network errors
    if (error.message.includes('network') || error.message.includes('fetch')) {
      setTimeout(() => reload(), 1000);
    }
  },
  // Enable automatic retry
  maxRetries: 3,
  retryDelay: 1000,
});

Performance Optimization

Message Optimization

import { optimizeMessagesForContext } from "agentlib/chat";

// Optimize message history to avoid token limits
const optimized = optimizeMessagesForContext(messages);
const result = await provider.stream(optimized);

Stream Buffering

// Use throttling on client to reduce re-renders
import { useState, useEffect } from 'react';

export function ChatComponent() {
  const { messages } = useChat({ api: '/api/chat' });
  const [displayMessages, setDisplayMessages] = useState(messages);
  
  // Use throttling to update displayed messages
  useEffect(() => {
    const timer = setTimeout(() => {
      setDisplayMessages(messages);
    }, 100);
    
    return () => clearTimeout(timer);
  }, [messages]);

  return (
    <div>
      {displayMessages.map((message) => (
        <div key={message.id}>{message.content}</div>
      ))}
    </div>
  );
}

Troubleshooting

Stream Interruption

Problem: Streaming transmission interrupted mid-way

Solutions:

  1. Check network connectivity
  2. Verify API key is valid
  3. Check if rate limits are reached
  4. Use resumable stream mode

Message Loss

Problem: Some messages not saved

Solutions:

  1. Ensure onFinish is properly implemented
  2. Add error handling and logging
  3. Use database transactions for atomicity

Tool Call Failures

Problem: Tool execution exceptions

Solutions:

  1. Verify tool schema definitions
  2. Add tool execution error handling
  3. Log tool call details

Next Steps

On this page