GameCraftGameCraft

AI Agents & Tasks

Build and manage AI agents with ProductReady's agent task management system

AI Agents & Tasks

ProductReady includes a production-ready task management system designed for AI agents. Perfect for:

  • 🤖 AI automation workflows
  • 📋 Background job processing
  • 🎯 User-assigned agentTasks
  • 🔄 Async operations tracking

Production Ready: Database schema, tRPC API, and dashboard UI included out of the box!


What's Included

  • Database schema - Tasks with priority, status, and assignments
  • tRPC API - Full CRUD operations
  • Dashboard UI - AgentTask list and creation form
  • Demo mode - Try without database setup
  • Type-safe - End-to-end TypeScript

Quick Start

View Existing Tasks

  1. Start dev server: pnpm dev
  2. Login to dashboard: http://localhost:3000/dashboard
  3. You'll see the agentTasks dashboard!

Or try demo mode (no login): http://localhost:3000/dashboard?demo=1


Database Schema

Tasks are stored in PostgreSQL:

// src/db/schema/agentTasks.ts
export const agentTasks = pgTable('agentTasks', {
  id: uuid('id').defaultRandom().primaryKey(),
  title: text('title').notNull(),
  description: text('description'),
  status: text('status').notNull().default('pending'),
  priority: text('priority').notNull().default('medium'),
  assignedTo: text('assigned_to'),
  createdAt: timestamp('created_at').defaultNow().notNull(),
  updatedAt: timestamp('updated_at').defaultNow().notNull(),
  completedAt: timestamp('completed_at'),
});

Fields:

  • title - AgentTask name (required)
  • description - AgentTask details (optional)
  • status - pending, in_progress, completed, failed
  • priority - low, medium, high, urgent
  • assignedTo - User ID or agent name
  • createdAt / updatedAt - Timestamps
  • completedAt - When task finished

Using the API

Create a AgentTask

'use client';

import { trpc } from '~/lib/trpc/client';

export function CreateTaskButton() {
  const createTask = trpc.agentTasks.create.useMutation();
  
  async function handleCreate() {
    await createTask.mutateAsync({
      title: 'Analyze customer feedback',
      description: 'Use AI to categorize 1000 support tickets',
      priority: 'high',
      assignedTo: 'ai-agent-1',
    });
    
    alert('AgentTask created!');
  }
  
  return <button onClick={handleCreate}>Create AI AgentTask</button>;
}

List All Tasks

export function TasksList() {
  const { data: agentTasks, isLoading } = trpc.agentTasks.list.useQuery();
  
  if (isLoading) return <div>Loading agentTasks...</div>;
  
  return (
    <div>
      {agentTasks?.map((task) => (
        <div key={task.id}>
          <h3>{task.title}</h3>
          <p>Status: {task.status}</p>
          <p>Priority: {task.priority}</p>
        </div>
      ))}
    </div>
  );
}

Update AgentTask Status

export function TaskStatusButton({ taskId }: { taskId: string }) {
  const updateTask = trpc.agentTasks.update.useMutation();
  
  async function markComplete() {
    await updateTask.mutateAsync({
      id: taskId,
      status: 'completed',
      completedAt: new Date(),
    });
  }
  
  return <button onClick={markComplete}>Mark Complete</button>;
}

Delete a AgentTask

export function DeleteTaskButton({ taskId }: { taskId: string }) {
  const deleteTask = trpc.agentTasks.delete.useMutation({
    onSuccess: () => {
      // Refresh task list
      utils.agentTasks.list.invalidate();
    },
  });
  
  async function handleDelete() {
    if (confirm('Delete this task?')) {
      await deleteTask.mutateAsync({ id: taskId });
    }
  }
  
  return <button onClick={handleDelete}>Delete</button>;
}

Building an AI Agent

Example: Email Summarizer Agent

Let's build an agent that summarizes emails!

Create the agent function

// src/lib/agents/email-summarizer.ts
import { OpenAI } from 'openai';

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

export async function summarizeEmail(emailContent: string) {
  const response = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [
      {
        role: 'system',
        content: 'You are an email summarizer. Provide concise summaries.',
      },
      {
        role: 'user',
        content: `Summarize this email:\n\n${emailContent}`,
      },
    ],
  });
  
  return response.choices[0].message.content;
}

Create task processor

// src/lib/agents/task-processor.ts
import { db } from '~/db';
import { agentTasks } from '~/db/schema/agentTasks';
import { eq } from 'drizzle-orm';
import { summarizeEmail } from './email-summarizer';

export async function processEmailTask(taskId: string) {
  // Update status to in_progress
  await db
    .update(agentTasks)
    .set({ status: 'in_progress', updatedAt: new Date() })
    .where(eq(agentTasks.id, taskId));
  
  try {
    // Get task details
    const [task] = await db
      .select()
      .from(agentTasks)
      .where(eq(agentTasks.id, taskId))
      .limit(1);
    
    // Run AI agent
    const summary = await summarizeEmail(task.description || '');
    
    // Mark complete with result
    await db
      .update(agentTasks)
      .set({
        status: 'completed',
        completedAt: new Date(),
        description: `Original:\n${task.description}\n\nSummary:\n${summary}`,
      })
      .where(eq(agentTasks.id, taskId));
    
    return { success: true, summary };
  } catch (error) {
    // Mark failed
    await db
      .update(agentTasks)
      .set({
        status: 'failed',
        description: `Error: ${error.message}`,
      })
      .where(eq(agentTasks.id, taskId));
    
    throw error;
  }
}

Create tRPC endpoint to trigger agent

// src/server/routers/agentTasks.ts
export const tasksRouter = createTRPCRouter({
  // ... existing endpoints ...
  
  processEmail: protectedProcedure
    .input(z.object({ taskId: z.string().uuid() }))
    .mutation(async ({ ctx, input }) => {
      // Queue the task for processing
      await processEmailTask(input.taskId);
      
      return { success: true };
    }),
});

Use in frontend

'use client';

export function ProcessEmailButton({ taskId }: { taskId: string }) {
  const processEmail = trpc.agentTasks.processEmail.useMutation();
  
  async function handleProcess() {
    await processEmail.mutateAsync({ taskId });
    alert('Email processing started!');
  }
  
  return <button onClick={handleProcess}>Summarize with AI</button>;
}

Done! You now have an AI agent that processes agentTasks asynchronously.


Background Processing

For long-running agentTasks, use a job queue:

Option 1: Simple Polling

// Poll every 10 seconds for pending agentTasks
setInterval(async () => {
  const pendingTasks = await db
    .select()
    .from(agentTasks)
    .where(eq(agentTasks.status, 'pending'))
    .limit(10);
  
  for (const task of pendingTasks) {
    await processTask(task.id);
  }
}, 10000);

Option 2: Bull Queue (Production)

pnpm add bull @bull-board/api @bull-board/hono
// src/lib/queue.ts
import Bull from 'bull';

export const taskQueue = new Bull('agentTasks', {
  redis: process.env.REDIS_URL,
});

taskQueue.process(async (job) => {
  const { taskId } = job.data;
  await processTask(taskId);
});

// Add job to queue
export async function queueTask(taskId: string) {
  await taskQueue.add({ taskId });
}

Option 3: Vercel Cron (Serverless)

Create src/app/api/cron/process-agentTasks/route.ts:

export async function GET(request: Request) {
  // Verify cron secret
  const authHeader = request.headers.get('authorization');
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return new Response('Unauthorized', { status: 401 });
  }
  
  // Process pending agentTasks
  const pending = await db
    .select()
    .from(agentTasks)
    .where(eq(agentTasks.status, 'pending'))
    .limit(10);
  
  for (const task of pending) {
    await processTask(task.id);
  }
  
  return Response.json({ processed: pending.length });
}

Configure in vercel.json:

{
  "crons": [{
    "path": "/api/cron/process-agentTasks",
    "schedule": "*/5 * * * *"
  }]
}

AgentTask Notifications

Notify users when agentTasks complete:

async function notifyTaskComplete(taskId: string, userId: string) {
  const [task] = await db.select().from(agentTasks).where(eq(agentTasks.id, taskId));
  
  // Send email (with Resend, SendGrid, etc.)
  await sendEmail({
    to: userId,
    subject: `AgentTask completed: ${task.title}`,
    body: `Your task "${task.title}" has been completed!`,
  });
  
  // Or push notification
  await sendPushNotification({
    userId,
    title: 'AgentTask Complete',
    body: task.title,
  });
}

AgentTask Dashboard

ProductReady includes a dashboard UI at /dashboard. Customize it:

// src/components/dashboard/agentTasks-table.tsx
'use client';

import { trpc } from '~/lib/trpc/client';

export function TasksTable() {
  const { data: agentTasks } = trpc.agentTasks.list.useQuery();
  const deleteTask = trpc.agentTasks.delete.useMutation();
  
  return (
    <table>
      <thead>
        <tr>
          <th>Title</th>
          <th>Priority</th>
          <th>Status</th>
          <th>Created</th>
          <th>Actions</th>
        </tr>
      </thead>
      <tbody>
        {agentTasks?.map((task) => (
          <tr key={task.id}>
            <td>{task.title}</td>
            <td>
              <PriorityBadge priority={task.priority} />
            </td>
            <td>
              <StatusBadge status={task.status} />
            </td>
            <td>{formatDate(task.createdAt)}</td>
            <td>
              <button onClick={() => deleteTask.mutate({ id: task.id })}>
                Delete
              </button>
            </td>
          </tr>
        ))}
      </tbody>
    </table>
  );
}

Advanced Patterns

AgentTask Dependencies

export const agentTasks = pgTable('agentTasks', {
  // ... existing fields ...
  dependsOn: uuid('depends_on').references(() => agentTasks.id),
});

// Only process if dependencies are complete
async function canProcess(taskId: string) {
  const [task] = await db.select().from(agentTasks).where(eq(agentTasks.id, taskId));
  
  if (!task.dependsOn) return true;
  
  const [dependency] = await db
    .select()
    .from(agentTasks)
    .where(eq(agentTasks.id, task.dependsOn));
  
  return dependency.status === 'completed';
}

AgentTask Retries

export const agentTasks = pgTable('agentTasks', {
  // ... existing fields ...
  retries: integer('retries').default(0),
  maxRetries: integer('max_retries').default(3),
});

async function processWithRetry(taskId: string) {
  try {
    await processTask(taskId);
  } catch (error) {
    const [task] = await db.select().from(agentTasks).where(eq(agentTasks.id, taskId));
    
    if (task.retries < task.maxRetries) {
      // Retry
      await db
        .update(agentTasks)
        .set({
          status: 'pending',
          retries: task.retries + 1,
        })
        .where(eq(agentTasks.id, taskId));
    } else {
      // Max retries reached
      await db
        .update(agentTasks)
        .set({ status: 'failed' })
        .where(eq(agentTasks.id, taskId));
    }
  }
}

AgentTask Progress Tracking

export const agentTasks = pgTable('agentTasks', {
  // ... existing fields ...
  progress: integer('progress').default(0),  // 0-100
  progressMessage: text('progress_message'),
});

// Update progress
async function updateProgress(taskId: string, progress: number, message: string) {
  await db
    .update(agentTasks)
    .set({
      progress,
      progressMessage: message,
      updatedAt: new Date(),
    })
    .where(eq(agentTasks.id, taskId));
}

// In your agent
async function longRunningTask(taskId: string) {
  await updateProgress(taskId, 0, 'Starting...');
  
  await step1();
  await updateProgress(taskId, 25, 'Completed step 1');
  
  await step2();
  await updateProgress(taskId, 50, 'Completed step 2');
  
  await step3();
  await updateProgress(taskId, 75, 'Completed step 3');
  
  await step4();
  await updateProgress(taskId, 100, 'Complete!');
}

Best Practices

✅ Do

  • Track task status - Use pending/in_progress/completed/failed
  • Add timestamps - createdAt, updatedAt, completedAt
  • Assign ownership - Know which user/agent owns each task
  • Handle failures gracefully - Log errors, retry when appropriate
  • Clean up old agentTasks - Archive or delete completed agentTasks

❌ Don't

  • Don't block the UI - Use background processing for long agentTasks
  • Don't lose task data - Always persist to database first
  • Don't skip error handling - Catch exceptions and update task status
  • Don't process indefinitely - Set max retries and timeouts

Background Agent Execution with Resume Streams

ProductReady supports resume streams via the AI SDK, enabling true background agent task execution. This means agent tasks can continue running even if the user disconnects or closes their browser.

Breakthrough capability! Resume streams enable agents to work in the background, completing long-running tasks without requiring the user to stay connected.

How It Works

The AI SDK's resume stream feature allows chat sessions to be paused and resumed:

  1. Session Persistence: Each chat session gets a unique ID
  2. Stream Caching: In-progress streams are cached on the server
  3. Automatic Resume: If disconnected, the stream automatically resumes from where it left off
  4. Background Execution: Agent tasks continue running even when the user navigates away

Implementation

ProductReady's useAgentChat hook includes resume support by default:

import { useAgentChat } from '~/hooks/use-agent-chat';

export function AgentChatComponent() {
  const { 
    messages, 
    input, 
    handleInputChange, 
    handleSubmit,
    isLoading,
    reload,        // Resume stream after disconnection
    sessionId,     // Unique session ID for this chat
  } = useAgentChat({
    taskId: 'task-123',
    enableResume: true,  // Enable resume support (default: true)
  });
  
  return (
    <div>
      {/* Chat UI */}
      <div>
        {messages.map(msg => (
          <div key={msg.id}>{msg.content}</div>
        ))}
      </div>
      
      {/* Resume button (appears after disconnection) */}
      {!isLoading && messages.length > 0 && (
        <button onClick={() => reload()}>
          Resume Stream
        </button>
      )}
      
      {/* Input */}
      <input value={input} onChange={handleInputChange} />
      <button onClick={handleSubmit}>Send</button>
    </div>
  );
}

Key Features

1. Automatic Session Management

const chat = useAgentChat({
  enableResume: true,     // Enable resume (default)
  sessionId: 'custom-id', // Optional: provide your own ID
});

// Session ID is automatically generated if not provided
console.log(chat.sessionId); // e.g., "550e8400-e29b-41d4-a716-446655440000"

2. Stream Protocol Configuration

The chat route automatically enables the data stream protocol:

// Server sets resume-compatible headers
response.headers.set("X-Stream-Protocol", "data");
response.headers.set("X-Session-Id", chatId);

3. Background Task Association

Streams are automatically linked to database tasks:

const chat = useAgentChat({
  taskId: '123',        // Associate with existing task
  enableResume: true,
});

// Messages and artifacts are saved to database
// Task continues running in background
// User can close browser and check results later

Use Cases

Long-Running Analysis

// User requests data analysis
const chat = useAgentChat({
  taskId: analysisTaskId,
  enableResume: true,
});

// User sends: "Analyze last month's sales data"
// Agent starts processing (may take 5-10 minutes)
// User can close the page
// Agent continues working in background
// User comes back later to see results

Multi-Step Workflows

// Complex workflow with multiple tool calls
const chat = useAgentChat({
  taskId: workflowTaskId,
  enableResume: true,
});

// Agent makes multiple API calls
// Generates reports
// Creates artifacts
// All saved to database as they complete

Offline Resilience

// User's connection is unstable
const chat = useAgentChat({
  enableResume: true,
});

// Stream can auto-resume after reconnection
// No messages are lost
// Task continues from last checkpoint

Server-Side Caching

The chat route maintains a session cache for resume support:

// In-memory cache (development)
// For production, use Redis or similar distributed cache
const streamCache = new Map<string, { 
  taskId: number; 
  messages: any[] 
}>();

// Cache is automatically cleaned up when task completes

Production Deployment: Replace the in-memory Map with Redis or a distributed cache for multi-instance deployments.

Benefits for AI Agents

  1. True Background Execution: Agents can work without user presence
  2. Reliability: Network issues don't interrupt long-running tasks
  3. Better UX: Users aren't forced to keep browser open
  4. Scalability: Server handles multiple concurrent agent tasks
  5. Persistence: All messages and artifacts saved to database

Architecture Flow

User initiates chat

Generate session ID

Send message with session ID

Server starts streaming response

Cache session (taskId + messages)

Stream begins

[User disconnects] ← Can happen any time

Agent continues in background

Results saved to database

User returns later

Calls reload() with same session ID

Stream resumes from last point

User sees complete results

API Reference

useAgentChat Options

OptionTypeDefaultDescription
taskIdstring-Associate chat with existing task
enableResumebooleantrueEnable stream resumption
sessionIdstringauto-generatedCustom session ID
initialMessagesMessage[][]Starting messages
onFinish(msg: string) => void-Callback when stream completes
onError(error: Error) => void-Error handler

Return Values

PropertyTypeDescription
messagesMessage[]All chat messages
isLoadingbooleanWhether stream is active
reload() => voidResume stream after disconnection
regenerate() => voidRegenerate last response
sessionIdstringCurrent session ID
stop() => voidStop streaming

Next Steps


Resources

  • AgentTask Router: src/server/routers/agentTasks.ts
  • AgentTask Schema: src/db/schema/agentTasks.ts
  • Dashboard: src/app/(dashboard)/dashboard/page.tsx

On this page