Scheduled Tasks
Schedule and automate recurring agent tasks with cron expressions or intervals
Scheduled Tasks
ProductReady includes a production-ready scheduled task system designed for automating recurring AI agent workflows. Perfect for:
- β° Recurring automation (hourly reports, daily syncs)
- π Periodic health checks and monitoring
- π Scheduled data processing
- π€ Automated agent task creation
Production Ready: Multi-instance safe with PostgreSQL FOR UPDATE SKIP LOCKED and optional Redis caching!
What's Included
- β Database schema - Scheduled tasks with cron/interval support
- β Cron parser - Lightweight cron expression parser (no dependencies)
- β tRPC API - Full CRUD operations with validation
- β Dashboard UI - User-friendly form for creating scheduled tasks
- β Redis cache (optional) - Performance optimization for large deployments
- β K8s CronJob - Ready-to-deploy Kubernetes manifest
- β Multi-instance safe - Uses PostgreSQL row-level locking
Architecture Overview
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Cron Trigger ββββββΆβ /api/scheduler ββββββΆβ PostgreSQL β
β (every 1 min) β β /tick β β FOR UPDATE β
βββββββββββββββββββ βββββββββββββββββββ β SKIP LOCKED β
β βββββββββββββββββββ
β
ββββββββΌβββββββ
β Redis β
β (cache) β
βββββββββββββββKey Design Decisions
- External trigger: Triggers processing every minute (K8s CronJob, GitHub Actions, VM cron, etc.)
- PostgreSQL locking:
FOR UPDATE SKIP LOCKEDensures only one instance processes each task - Redis cache (optional): Reduces database queries for high-volume deployments
- Idempotent processing: Safe to retry if tick fails
Quick Start
Enable the scheduled_tasks schema
Run database push to create the required tables:
pnpm db:pushThis creates two tables:
scheduled_tasks- Task configuration and schedulescheduled_task_runs- Execution history
Create your first scheduled task
Use the tRPC API or dashboard UI:
Navigate to /agent/schedule and click "New Schedule":
- Basic Info: Enter schedule name and optional description
- Schedule Type: Choose between:
- Interval: Simple recurring (e.g., every 60 minutes)
- Cron: Advanced schedules with cron expressions (e.g., weekdays at 9 AM)
- One-time: Run once at a specific date/time
- Task Template: Configure the agent task to create when triggered
- Advanced: Optionally set maximum runs limit
The UI includes:
- Live cron expression validation
- Human-readable schedule descriptions
- Common cron presets (hourly, daily, weekdays, etc.)
- Visual feedback for validation errors
// Using tRPC
const task = await trpc.scheduledTasks.create.mutate({
name: "Daily Report",
description: "Generate daily summary report",
taskTemplate: {
title: "Daily Summary Report",
prompt: "Generate a summary of today's activities",
priority: "medium",
},
scheduleType: "cron",
cronExpression: "0 9 * * *", // Every day at 9:00
isEnabled: true,
});Deploy the K8s CronJob
Apply the manifest to your cluster:
kubectl apply -f k8s/scheduler-cronjob.yamlOr for local development, manually trigger:
curl -X POST http://localhost:3000/api/scheduler/tick \
-H "Authorization: Bearer your-cron-secret"Non-K8s Deployment Options
All deployment options do the same thing: invoke the scheduler tick periodically.
- Trigger endpoint:
POST /api/scheduler/tick(requiresAuthorization: Bearer $CRON_SECRET) - Health endpoint:
GET /api/scheduler/tick(status only)
Vercel Cron Jobs are configured in vercel.json:
{
"$schema": "https://openapi.vercel.sh/vercel.json",
"crons": [
{
"path": "/api/scheduler/tick",
"schedule": "* * * * *"
}
]
}Vercel Cron Jobs make HTTP GET requests to the configured path. In ProductReady, GET /api/scheduler/tick is a health endpoint and does not process tasks.
To run scheduled tasks on Vercel, use an external scheduler that can send a POST with an Authorization header (example: GitHub Actions tab), or implement a dedicated GET-based cron trigger endpoint with appropriate protection.
Use GitHub Actions cron to call your deployment (works for Vercel, Fly.io, Render, bare VMs, etc.):
# .github/workflows/scheduler-tick.yml
name: Scheduler Tick
on:
schedule:
- cron: "* * * * *" # every minute (UTC)
workflow_dispatch: {}
jobs:
tick:
runs-on: ubuntu-latest
steps:
- name: Call scheduler tick
run: |
curl -X POST "${{ secrets.SCHEDULER_URL }}/api/scheduler/tick" \
-H "Authorization: Bearer ${{ secrets.CRON_SECRET }}" \
-H "Content-Type: application/json" \
--fail --silent --show-errorSet repository secrets:
SCHEDULER_URL(example:https://your-app.vercel.app)CRON_SECRET
On a traditional VM (Ubuntu/Debian/etc.), use cron to call the tick endpoint:
# Run every minute
* * * * * curl -X POST "https://your-app.example.com/api/scheduler/tick" \
-H "Authorization: Bearer $CRON_SECRET" \
-H "Content-Type: application/json" \
--fail --silent --show-errorSchedule Types
ProductReady supports three schedule types:
Standard 5-field cron expressions:
{
scheduleType: "cron",
cronExpression: "0 9 * * 1-5", // Weekdays at 9:00
}Common patterns:
* * * * *- Every minute0 * * * *- Every hour at :000 9 * * *- Daily at 9:000 9 * * 1-5- Weekdays at 9:000 0 1 * *- Monthly on 1st at 0:00*/15 * * * *- Every 15 minutes
Simple interval in minutes:
{
scheduleType: "interval",
intervalMinutes: 30, // Every 30 minutes
}Best for simple recurring tasks without specific time requirements.
One-time execution at a specific time:
{
scheduleType: "once",
scheduledAt: new Date("2024-12-25T00:00:00Z"),
}Task is disabled after execution.
Database Schema
scheduled_tasks
// src/db/schema/scheduled-tasks.ts
export const scheduledTasks = pgTable("scheduled_tasks", {
id: text("id").primaryKey().$defaultFn(() => generateId("sch")),
name: varchar("name", { length: 255 }).notNull(),
description: text("description"),
// Task template (what to create when triggered)
taskTemplate: jsonb("task_template").$type<TaskTemplate>().notNull(),
// Schedule configuration
scheduleType: varchar("schedule_type", { length: 20 }).notNull(), // cron | interval | once
cronExpression: varchar("cron_expression", { length: 100 }),
intervalMinutes: integer("interval_minutes"),
scheduledAt: timestamp("scheduled_at"),
timezone: varchar("timezone", { length: 50 }).default("UTC"),
// Execution limits
maxRuns: integer("max_runs"), // null = unlimited
isEnabled: boolean("is_enabled").notNull().default(true),
// Tracking
nextRunAt: timestamp("next_run_at"),
lastRunAt: timestamp("last_run_at"),
runCount: integer("run_count").notNull().default(0),
// Relations
spaceId: text("space_id").notNull().references(() => spaces.id),
createdBy: text("created_by").notNull().references(() => users.id),
createdAt: timestamp("created_at").defaultNow().notNull(),
updatedAt: timestamp("updated_at"),
});TaskTemplate Interface
interface TaskTemplate {
title: string; // Required: Task title
prompt?: string; // Optional: AI prompt
initialParts?: unknown[];// Optional: Initial context
priority?: "low" | "medium" | "high" | "urgent";
}tRPC API Reference
scheduledTasks.list
List scheduled tasks for the current space.
const result = await trpc.scheduledTasks.list.query({
limit: 20,
offset: 0,
isEnabled: true, // Optional filter
});
// Returns: { tasks: ScheduledTask[], total: number }scheduledTasks.create
Create a new scheduled task.
const task = await trpc.scheduledTasks.create.mutate({
name: "Weekly Digest",
taskTemplate: { title: "Generate Weekly Digest" },
scheduleType: "cron",
cronExpression: "0 9 * * 1", // Every Monday 9:00
});scheduledTasks.update
Update an existing task.
await trpc.scheduledTasks.update.mutate({
id: "sch_abc123",
name: "Updated Name",
cronExpression: "0 10 * * 1", // Change to 10:00
});scheduledTasks.enable / disable
Toggle task status:
await trpc.scheduledTasks.enable.mutate({ id: "sch_abc123" });
await trpc.scheduledTasks.disable.mutate({ id: "sch_abc123" });scheduledTasks.validateCron
Validate a cron expression before saving:
const result = await trpc.scheduledTasks.validateCron.query({
expression: "0 9 * * *",
});
// Returns: { isValid: true, description: "Daily at 09:00" }scheduledTasks.byId
Get a single task with optional run history:
const task = await trpc.scheduledTasks.byId.query({
id: "sch_abc123",
includeRuns: true, // Include recent execution history
runsLimit: 10, // Max runs to include (1-50)
});
// Returns: { ...task, scheduleDescription, runs: ScheduledTaskRun[] }scheduledTasks.runs
Get paginated run history for a scheduled task:
const history = await trpc.scheduledTasks.runs.query({
scheduledTaskId: "sch_abc123",
limit: 20,
offset: 0,
});
// Returns: { runs: ScheduledTaskRun[], total: number }Each run record contains:
id- Unique run IDscheduledTaskId- Parent task IDstartedAt- When execution startedcompletedAt- When execution finishedstatus- "pending" | "running" | "completed" | "failed"createdTaskId- ID of the agent task createderror- Error message if failed
scheduledTasks.delete
Delete a task and all its run history:
await trpc.scheduledTasks.delete.mutate({ id: "sch_abc123" });
// Returns: { success: true }scheduledTasks.stats
Get processor statistics:
const stats = await trpc.scheduledTasks.stats.query();
// Returns: { totalTasks, enabledTasks, dueTasksCount, recentRuns }Processor: How It Works
The scheduler processor (src/lib/scheduler/processor.ts) is the core execution engine:
1. Global Lock Acquisition
const lockAcquired = await acquireLock("scheduler:tick", 55);
if (!lockAcquired) {
console.log("Another tick is in progress, skipping");
return;
}Prevents overlapping tick executions across instances.
2. Fetch Due Tasks with Row Locking
const dueTasks = await db.execute(sql`
SELECT id, name, task_template, ...
FROM scheduled_tasks
WHERE is_enabled = true
AND next_run_at <= NOW()
AND (max_runs IS NULL OR run_count < max_runs)
ORDER BY next_run_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
`);FOR UPDATE- Locks selected rowsSKIP LOCKED- Skips rows locked by other instances
3. Process Each Task
For each due task:
- Create new
agent_taskfromtaskTemplate - Record run in
scheduled_task_runs - Calculate and update
nextRunAt - Increment
runCount - Update Redis cache (if enabled)
4. Automatic Disabling
Tasks are automatically disabled when:
scheduleType === 'once'after executionrunCount >= maxRuns(if maxRuns is set)
Processor Helper Functions
import {
processDueTasks,
enableTask,
disableTask,
getProcessorStats,
getNextRunTime
} from "@/lib/scheduler/processor";
// Enable a task and recalculate next run time
const nextRunAt = await enableTask(taskId);
// Disable a task and clear its schedule
await disableTask(taskId);
// Get scheduler statistics
const stats = await getProcessorStats();
// { totalTasks, enabledTasks, dueTasksCount, recentRuns }
// Get next run time for a specific task
const nextRun = await getNextRunTime(taskId);
// Returns Date or nullK8s Deployment
CronJob Manifest
# k8s/scheduler-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: productready-scheduler
spec:
schedule: "* * * * *" # Every minute
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
containers:
- name: scheduler
image: curlimages/curl:latest
command:
- /bin/sh
- -c
- |
curl -X POST "$INTERNAL_URL/api/scheduler/tick" \
-H "Authorization: Bearer $CRON_SECRET" \
-H "Content-Type: application/json" \
--fail --silent --show-error
env:
- name: INTERNAL_URL
valueFrom:
configMapKeyRef:
name: productready-config
key: INTERNAL_URL
- name: CRON_SECRET
valueFrom:
secretKeyRef:
name: productready-secrets
key: CRON_SECRET
restartPolicy: OnFailureEnvironment Variables
| Variable | Required | Description |
|---|---|---|
CRON_SECRET | Yes | Bearer token for API authentication |
INTERNAL_URL | Yes | Internal service URL (e.g., http://productready:3000) |
REDIS_URL | No | Redis connection URL for caching |
Redis Cache (Optional)
The Redis cache improves performance for large deployments by:
- Caching next run times in a Redis ZSET for fast lookups
- Distributed locking to prevent duplicate processing
- Graceful degradation - works without Redis, just slower
Setup
# Set environment variable
REDIS_URL=redis://localhost:6379Cache Operations
// Add task to cache
await cacheTaskNextRun(taskId, nextRunAt);
// Remove from cache (when disabled/deleted)
await removeFromCache(taskId);
// Acquire distributed lock
const acquired = await acquireLock("scheduler:tick", 55);
// Release lock
await releaseLock("scheduler:tick");Redis is optional. The scheduler works perfectly with just PostgreSQL, but Redis improves performance for high-volume scenarios.
Testing
Integration tests are included in tests/scheduler.test.ts:
# Run scheduler tests
pnpm vitest run tests/scheduler.test.ts
# Run all tests
pnpm testTest Categories
- Cron Parser Unit Tests - Validate cron expression parsing
- Router Integration Tests - Test tRPC CRUD operations
- Redis Cache Tests - Verify cache operations
Prerequisites
- PostgreSQL running (
make db) - Redis running (
make db) - optional - Database seeded (
pnpm db:seed)
Monitoring
Health Check Endpoint
curl http://localhost:3000/api/scheduler/tick \
-H "Authorization: Bearer $CRON_SECRET"
# GET returns health status, POST triggers processingStats API
const stats = await trpc.scheduledTasks.stats.query();
// {
// totalTasks: 10,
// enabledTasks: 8,
// dueTasksCount: 2,
// recentRuns: { total: 50, succeeded: 48, failed: 2 }
// }Logs
Look for [Scheduler] prefixed logs:
[Scheduler] Processing 3 due tasks
[Scheduler] Task sch_abc123 executed successfully
[Scheduler] Task sch_def456 failed: timeout
[Scheduler Redis] ConnectedBest Practices
1. Use Meaningful Names
// β
Good
name: "Daily Sales Report - 9 AM"
name: "Hourly Health Check"
// β Bad
name: "Task 1"
name: "Cron job"2. Set maxRuns for One-Time or Limited Tasks
{
scheduleType: "interval",
intervalMinutes: 5,
maxRuns: 12, // Run 12 times then stop
}3. Include Context in taskTemplate
taskTemplate: {
title: "Weekly Report - Week 42",
prompt: "Generate sales report for the past week",
initialParts: [
{ type: "context", data: { weekNumber: 42 } }
],
}4. Monitor Execution History
Use scheduledTasks.runs to track execution history:
const history = await trpc.scheduledTasks.runs.query({
scheduledTaskId: "sch_abc123",
limit: 20,
});Troubleshooting
Task Not Running
- Check isEnabled:
SELECT is_enabled FROM scheduled_tasks WHERE id = ? - Check nextRunAt: Is it in the past?
- Check maxRuns: Has the limit been reached?
- Check CronJob logs:
kubectl logs -l app=productready-scheduler
Duplicate Executions
If tasks run multiple times:
- Verify
FOR UPDATE SKIP LOCKEDis working - Check Redis lock acquisition logs
- Ensure only one CronJob is running:
kubectl get cronjobs
Performance Issues
- Enable Redis cache
- Reduce batch size in processor
- Add index:
CREATE INDEX idx_scheduled_tasks_next_run ON scheduled_tasks(next_run_at) WHERE is_enabled = true
Dashboard UI
Creating Scheduled Tasks
Navigate to /agent/schedule to access the scheduled tasks dashboard:
- View All Tasks: See a list of all scheduled tasks with their status, schedule, and run history
- Create New: Click "New Schedule" to open the creation dialog
- Enable/Disable: Toggle tasks on/off with a switch
- Delete: Remove tasks via the dropdown menu
UI Features
Schedule Types:
- Interval: Simple recurring execution (e.g., "every 60 minutes")
- Cron: Advanced scheduling with cron expressions
- Common presets: hourly, daily, weekdays at 9 AM, etc.
- Custom expressions with live validation
- Human-readable descriptions shown automatically
- One-time: Schedule for specific date/time
Form Validation:
- Real-time cron expression validation
- Required field indicators
- Visual error feedback
- Smart helpers (e.g., interval shows "β 1 hour" for 60 minutes)
Task Configuration:
- Task title and optional prompt
- Priority levels (low/medium/high/urgent)
- Maximum runs limit (optional)
- Schedule name and description
UI Components
The UI is built with:
CreateScheduledTaskDialog- Main creation form componentScheduledTasksView- List view with enable/disable/delete actions- Real-time validation using
trpc.scheduledTasks.validateCron - Toast notifications for success/error feedback
File Reference
| File | Purpose |
|---|---|
src/db/schema/scheduled-tasks.ts | Task configuration table |
src/db/schema/scheduled-task-runs.ts | Execution history table |
src/lib/scheduler/cron-parser.ts | Cron expression parser |
src/lib/scheduler/redis-cache.ts | Redis cache operations |
src/lib/scheduler/processor.ts | Core task processing |
src/app/api/scheduler/tick/route.ts | K8s CronJob endpoint |
src/server/routers/scheduled-tasks.ts | tRPC router |
src/components/agent/create-scheduled-task-dialog.tsx | Creation form UI |
src/components/agent/scheduled-tasks-view.tsx | List view UI |
k8s/scheduler-cronjob.yaml | Kubernetes deployment |
tests/scheduler.test.ts | Integration tests |