AI Agents & Tasks
Build and manage AI agents with ProductReady's agent task management system
AI Agents & Tasks
ProductReady includes a production-ready task management system designed for AI agents. Perfect for:
- 🤖 AI automation workflows
- 📋 Background job processing
- 🎯 User-assigned agentTasks
- 🔄 Async operations tracking
Production Ready: Database schema, tRPC API, and dashboard UI included out of the box!
What's Included
- ✅ Database schema - Tasks with priority, status, and assignments
- ✅ tRPC API - Full CRUD operations
- ✅ Dashboard UI - AgentTask list and creation form
- ✅ Demo mode - Try without database setup
- ✅ Type-safe - End-to-end TypeScript
Quick Start
View Existing Tasks
- Start dev server:
pnpm dev - Login to dashboard:
http://localhost:3000/dashboard - You'll see the agentTasks dashboard!
Or try demo mode (no login): http://localhost:3000/dashboard?demo=1
Database Schema
Tasks are stored in PostgreSQL:
// src/db/schema/agentTasks.ts
export const agentTasks = pgTable('agentTasks', {
id: uuid('id').defaultRandom().primaryKey(),
title: text('title').notNull(),
description: text('description'),
status: text('status').notNull().default('pending'),
priority: text('priority').notNull().default('medium'),
assignedTo: text('assigned_to'),
createdAt: timestamp('created_at').defaultNow().notNull(),
updatedAt: timestamp('updated_at').defaultNow().notNull(),
completedAt: timestamp('completed_at'),
});Fields:
title- AgentTask name (required)description- AgentTask details (optional)status-pending,in_progress,completed,failedpriority-low,medium,high,urgentassignedTo- User ID or agent namecreatedAt/updatedAt- TimestampscompletedAt- When task finished
Using the API
Create a AgentTask
'use client';
import { trpc } from '~/lib/trpc/client';
export function CreateTaskButton() {
const createTask = trpc.agentTasks.create.useMutation();
async function handleCreate() {
await createTask.mutateAsync({
title: 'Analyze customer feedback',
description: 'Use AI to categorize 1000 support tickets',
priority: 'high',
assignedTo: 'ai-agent-1',
});
alert('AgentTask created!');
}
return <button onClick={handleCreate}>Create AI AgentTask</button>;
}List All Tasks
export function TasksList() {
const { data: agentTasks, isLoading } = trpc.agentTasks.list.useQuery();
if (isLoading) return <div>Loading agentTasks...</div>;
return (
<div>
{agentTasks?.map((task) => (
<div key={task.id}>
<h3>{task.title}</h3>
<p>Status: {task.status}</p>
<p>Priority: {task.priority}</p>
</div>
))}
</div>
);
}Update AgentTask Status
export function TaskStatusButton({ taskId }: { taskId: string }) {
const updateTask = trpc.agentTasks.update.useMutation();
async function markComplete() {
await updateTask.mutateAsync({
id: taskId,
status: 'completed',
completedAt: new Date(),
});
}
return <button onClick={markComplete}>Mark Complete</button>;
}Delete a AgentTask
export function DeleteTaskButton({ taskId }: { taskId: string }) {
const deleteTask = trpc.agentTasks.delete.useMutation({
onSuccess: () => {
// Refresh task list
utils.agentTasks.list.invalidate();
},
});
async function handleDelete() {
if (confirm('Delete this task?')) {
await deleteTask.mutateAsync({ id: taskId });
}
}
return <button onClick={handleDelete}>Delete</button>;
}Building an AI Agent
Example: Email Summarizer Agent
Let's build an agent that summarizes emails!
Create the agent function
// src/lib/agents/email-summarizer.ts
import { OpenAI } from 'openai';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export async function summarizeEmail(emailContent: string) {
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: 'You are an email summarizer. Provide concise summaries.',
},
{
role: 'user',
content: `Summarize this email:\n\n${emailContent}`,
},
],
});
return response.choices[0].message.content;
}Create task processor
// src/lib/agents/task-processor.ts
import { db } from '~/db';
import { agentTasks } from '~/db/schema/agentTasks';
import { eq } from 'drizzle-orm';
import { summarizeEmail } from './email-summarizer';
export async function processEmailTask(taskId: string) {
// Update status to in_progress
await db
.update(agentTasks)
.set({ status: 'in_progress', updatedAt: new Date() })
.where(eq(agentTasks.id, taskId));
try {
// Get task details
const [task] = await db
.select()
.from(agentTasks)
.where(eq(agentTasks.id, taskId))
.limit(1);
// Run AI agent
const summary = await summarizeEmail(task.description || '');
// Mark complete with result
await db
.update(agentTasks)
.set({
status: 'completed',
completedAt: new Date(),
description: `Original:\n${task.description}\n\nSummary:\n${summary}`,
})
.where(eq(agentTasks.id, taskId));
return { success: true, summary };
} catch (error) {
// Mark failed
await db
.update(agentTasks)
.set({
status: 'failed',
description: `Error: ${error.message}`,
})
.where(eq(agentTasks.id, taskId));
throw error;
}
}Create tRPC endpoint to trigger agent
// src/server/routers/agentTasks.ts
export const tasksRouter = createTRPCRouter({
// ... existing endpoints ...
processEmail: protectedProcedure
.input(z.object({ taskId: z.string().uuid() }))
.mutation(async ({ ctx, input }) => {
// Queue the task for processing
await processEmailTask(input.taskId);
return { success: true };
}),
});Use in frontend
'use client';
export function ProcessEmailButton({ taskId }: { taskId: string }) {
const processEmail = trpc.agentTasks.processEmail.useMutation();
async function handleProcess() {
await processEmail.mutateAsync({ taskId });
alert('Email processing started!');
}
return <button onClick={handleProcess}>Summarize with AI</button>;
}Done! You now have an AI agent that processes agentTasks asynchronously.
Background Processing
For long-running agentTasks, use a job queue:
Option 1: Simple Polling
// Poll every 10 seconds for pending agentTasks
setInterval(async () => {
const pendingTasks = await db
.select()
.from(agentTasks)
.where(eq(agentTasks.status, 'pending'))
.limit(10);
for (const task of pendingTasks) {
await processTask(task.id);
}
}, 10000);Option 2: Bull Queue (Production)
pnpm add bull @bull-board/api @bull-board/hono// src/lib/queue.ts
import Bull from 'bull';
export const taskQueue = new Bull('agentTasks', {
redis: process.env.REDIS_URL,
});
taskQueue.process(async (job) => {
const { taskId } = job.data;
await processTask(taskId);
});
// Add job to queue
export async function queueTask(taskId: string) {
await taskQueue.add({ taskId });
}Option 3: Vercel Cron (Serverless)
Create src/app/api/cron/process-agentTasks/route.ts:
export async function GET(request: Request) {
// Verify cron secret
const authHeader = request.headers.get('authorization');
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return new Response('Unauthorized', { status: 401 });
}
// Process pending agentTasks
const pending = await db
.select()
.from(agentTasks)
.where(eq(agentTasks.status, 'pending'))
.limit(10);
for (const task of pending) {
await processTask(task.id);
}
return Response.json({ processed: pending.length });
}Configure in vercel.json:
{
"crons": [{
"path": "/api/cron/process-agentTasks",
"schedule": "*/5 * * * *"
}]
}AgentTask Notifications
Notify users when agentTasks complete:
async function notifyTaskComplete(taskId: string, userId: string) {
const [task] = await db.select().from(agentTasks).where(eq(agentTasks.id, taskId));
// Send email (with Resend, SendGrid, etc.)
await sendEmail({
to: userId,
subject: `AgentTask completed: ${task.title}`,
body: `Your task "${task.title}" has been completed!`,
});
// Or push notification
await sendPushNotification({
userId,
title: 'AgentTask Complete',
body: task.title,
});
}AgentTask Dashboard
ProductReady includes a dashboard UI at /dashboard. Customize it:
// src/components/dashboard/agentTasks-table.tsx
'use client';
import { trpc } from '~/lib/trpc/client';
export function TasksTable() {
const { data: agentTasks } = trpc.agentTasks.list.useQuery();
const deleteTask = trpc.agentTasks.delete.useMutation();
return (
<table>
<thead>
<tr>
<th>Title</th>
<th>Priority</th>
<th>Status</th>
<th>Created</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{agentTasks?.map((task) => (
<tr key={task.id}>
<td>{task.title}</td>
<td>
<PriorityBadge priority={task.priority} />
</td>
<td>
<StatusBadge status={task.status} />
</td>
<td>{formatDate(task.createdAt)}</td>
<td>
<button onClick={() => deleteTask.mutate({ id: task.id })}>
Delete
</button>
</td>
</tr>
))}
</tbody>
</table>
);
}Advanced Patterns
AgentTask Dependencies
export const agentTasks = pgTable('agentTasks', {
// ... existing fields ...
dependsOn: uuid('depends_on').references(() => agentTasks.id),
});
// Only process if dependencies are complete
async function canProcess(taskId: string) {
const [task] = await db.select().from(agentTasks).where(eq(agentTasks.id, taskId));
if (!task.dependsOn) return true;
const [dependency] = await db
.select()
.from(agentTasks)
.where(eq(agentTasks.id, task.dependsOn));
return dependency.status === 'completed';
}AgentTask Retries
export const agentTasks = pgTable('agentTasks', {
// ... existing fields ...
retries: integer('retries').default(0),
maxRetries: integer('max_retries').default(3),
});
async function processWithRetry(taskId: string) {
try {
await processTask(taskId);
} catch (error) {
const [task] = await db.select().from(agentTasks).where(eq(agentTasks.id, taskId));
if (task.retries < task.maxRetries) {
// Retry
await db
.update(agentTasks)
.set({
status: 'pending',
retries: task.retries + 1,
})
.where(eq(agentTasks.id, taskId));
} else {
// Max retries reached
await db
.update(agentTasks)
.set({ status: 'failed' })
.where(eq(agentTasks.id, taskId));
}
}
}AgentTask Progress Tracking
export const agentTasks = pgTable('agentTasks', {
// ... existing fields ...
progress: integer('progress').default(0), // 0-100
progressMessage: text('progress_message'),
});
// Update progress
async function updateProgress(taskId: string, progress: number, message: string) {
await db
.update(agentTasks)
.set({
progress,
progressMessage: message,
updatedAt: new Date(),
})
.where(eq(agentTasks.id, taskId));
}
// In your agent
async function longRunningTask(taskId: string) {
await updateProgress(taskId, 0, 'Starting...');
await step1();
await updateProgress(taskId, 25, 'Completed step 1');
await step2();
await updateProgress(taskId, 50, 'Completed step 2');
await step3();
await updateProgress(taskId, 75, 'Completed step 3');
await step4();
await updateProgress(taskId, 100, 'Complete!');
}Best Practices
✅ Do
- Track task status - Use pending/in_progress/completed/failed
- Add timestamps - createdAt, updatedAt, completedAt
- Assign ownership - Know which user/agent owns each task
- Handle failures gracefully - Log errors, retry when appropriate
- Clean up old agentTasks - Archive or delete completed agentTasks
❌ Don't
- Don't block the UI - Use background processing for long agentTasks
- Don't lose task data - Always persist to database first
- Don't skip error handling - Catch exceptions and update task status
- Don't process indefinitely - Set max retries and timeouts
Background Agent Execution with Resume Streams
ProductReady supports resume streams via the AI SDK, enabling true background agent task execution. This means agent tasks can continue running even if the user disconnects or closes their browser.
Breakthrough capability! Resume streams enable agents to work in the background, completing long-running tasks without requiring the user to stay connected.
How It Works
The AI SDK's resume stream feature allows chat sessions to be paused and resumed:
- Session Persistence: Each chat session gets a unique ID
- Stream Caching: In-progress streams are cached on the server
- Automatic Resume: If disconnected, the stream automatically resumes from where it left off
- Background Execution: Agent tasks continue running even when the user navigates away
Implementation
ProductReady's useAgentChat hook includes resume support by default:
import { useAgentChat } from '~/hooks/use-agent-chat';
export function AgentChatComponent() {
const {
messages,
input,
handleInputChange,
handleSubmit,
isLoading,
reload, // Resume stream after disconnection
sessionId, // Unique session ID for this chat
} = useAgentChat({
taskId: 'task-123',
enableResume: true, // Enable resume support (default: true)
});
return (
<div>
{/* Chat UI */}
<div>
{messages.map(msg => (
<div key={msg.id}>{msg.content}</div>
))}
</div>
{/* Resume button (appears after disconnection) */}
{!isLoading && messages.length > 0 && (
<button onClick={() => reload()}>
Resume Stream
</button>
)}
{/* Input */}
<input value={input} onChange={handleInputChange} />
<button onClick={handleSubmit}>Send</button>
</div>
);
}Key Features
1. Automatic Session Management
const chat = useAgentChat({
enableResume: true, // Enable resume (default)
sessionId: 'custom-id', // Optional: provide your own ID
});
// Session ID is automatically generated if not provided
console.log(chat.sessionId); // e.g., "550e8400-e29b-41d4-a716-446655440000"2. Stream Protocol Configuration
The chat route automatically enables the data stream protocol:
// Server sets resume-compatible headers
response.headers.set("X-Stream-Protocol", "data");
response.headers.set("X-Session-Id", chatId);3. Background Task Association
Streams are automatically linked to database tasks:
const chat = useAgentChat({
taskId: '123', // Associate with existing task
enableResume: true,
});
// Messages and artifacts are saved to database
// Task continues running in background
// User can close browser and check results laterUse Cases
Long-Running Analysis
// User requests data analysis
const chat = useAgentChat({
taskId: analysisTaskId,
enableResume: true,
});
// User sends: "Analyze last month's sales data"
// Agent starts processing (may take 5-10 minutes)
// User can close the page
// Agent continues working in background
// User comes back later to see resultsMulti-Step Workflows
// Complex workflow with multiple tool calls
const chat = useAgentChat({
taskId: workflowTaskId,
enableResume: true,
});
// Agent makes multiple API calls
// Generates reports
// Creates artifacts
// All saved to database as they completeOffline Resilience
// User's connection is unstable
const chat = useAgentChat({
enableResume: true,
});
// Stream can auto-resume after reconnection
// No messages are lost
// Task continues from last checkpointServer-Side Caching
The chat route maintains a session cache for resume support:
// In-memory cache (development)
// For production, use Redis or similar distributed cache
const streamCache = new Map<string, {
taskId: number;
messages: any[]
}>();
// Cache is automatically cleaned up when task completesProduction Deployment: Replace the in-memory Map with Redis or a distributed cache for multi-instance deployments.
Benefits for AI Agents
- True Background Execution: Agents can work without user presence
- Reliability: Network issues don't interrupt long-running tasks
- Better UX: Users aren't forced to keep browser open
- Scalability: Server handles multiple concurrent agent tasks
- Persistence: All messages and artifacts saved to database
Architecture Flow
User initiates chat
↓
Generate session ID
↓
Send message with session ID
↓
Server starts streaming response
↓
Cache session (taskId + messages)
↓
Stream begins
↓
[User disconnects] ← Can happen any time
↓
Agent continues in background
↓
Results saved to database
↓
User returns later
↓
Calls reload() with same session ID
↓
Stream resumes from last point
↓
User sees complete resultsAPI Reference
useAgentChat Options
| Option | Type | Default | Description |
|---|---|---|---|
taskId | string | - | Associate chat with existing task |
enableResume | boolean | true | Enable stream resumption |
sessionId | string | auto-generated | Custom session ID |
initialMessages | Message[] | [] | Starting messages |
onFinish | (msg: string) => void | - | Callback when stream completes |
onError | (error: Error) => void | - | Error handler |
Return Values
| Property | Type | Description |
|---|---|---|
messages | Message[] | All chat messages |
isLoading | boolean | Whether stream is active |
reload | () => void | Resume stream after disconnection |
regenerate | () => void | Regenerate last response |
sessionId | string | Current session ID |
stop | () => void | Stop streaming |
Related Documentation
- AI SDK Resume Streams - Official AI SDK documentation
- Gaia Agent SDK - ProductReady's AI agent framework
- tRPC API Guide - Build custom agent endpoints
Next Steps
- tRPC API Guide - Build more API endpoints
- Database Guide - Advanced schema patterns
- Background Jobs - Production job queues
- Testing - Test your agents
Resources
- AgentTask Router:
src/server/routers/agentTasks.ts - AgentTask Schema:
src/db/schema/agentTasks.ts - Dashboard:
src/app/(dashboard)/dashboard/page.tsx