The Agents SDK provides a built-in queue system that allows you to schedule tasks for asynchronous execution. This is useful for background processing, delayed operations, and managing workloads that do not need immediate execution.

Overview

The queue system is built into the base Agent class. Tasks are stored in a SQLite table and processed automatically in FIFO (First In, First Out) order.

QueueItem type

TypeScript type QueueItem < T > = { id : string ; // Unique identifier for the queued task payload : T ; // Data to pass to the callback function callback : keyof Agent ; // Name of the method to call created_at : number ; // Timestamp when the task was created };

Core methods

Adds a task to the queue for future execution.

TypeScript async queue < T > ( callback : keyof this , payload : T ): Promise < string >

Parameters:

callback - The name of the method to call when processing the task

- The name of the method to call when processing the task payload - Data to pass to the callback method

Returns: The unique ID of the queued task

Example:

JavaScript

JavaScript TypeScript JavaScript class MyAgent extends Agent { async processEmail ( data ) { // Process the email console . log ( `Processing email: ${ data . subject } ` ) ; } async onMessage ( message ) { // Queue an email processing task const taskId = await this . queue ( "processEmail" , { email : "user@example.com" , subject : "Welcome!" , } ) ; console . log ( `Queued task with ID: ${ taskId } ` ) ; } } TypeScript class MyAgent extends Agent { async processEmail ( data : { email : string ; subject : string }) { // Process the email console . log ( `Processing email: ${ data . subject } ` ) ; } async onMessage ( message : string ) { // Queue an email processing task const taskId = await this . queue ( "processEmail" , { email : "user@example.com" , subject : "Welcome!" , } ) ; console . log ( `Queued task with ID: ${ taskId } ` ) ; } }

Removes a specific task from the queue by ID.

TypeScript async dequeue ( id : string ): Promise <void>

Parameters:

id - The ID of the task to remove

Example:

JavaScript

JavaScript TypeScript JavaScript // Remove a specific task await agent . dequeue ( "abc123def" ) ; TypeScript // Remove a specific task await agent . dequeue ( "abc123def" ) ;

Removes all tasks from the queue.

TypeScript async dequeueAll (): Promise <void>

Example:

JavaScript

JavaScript TypeScript JavaScript // Clear the entire queue await agent . dequeueAll () ; TypeScript // Clear the entire queue await agent . dequeueAll () ;

Removes all tasks that match a specific callback method.

TypeScript async dequeueAllByCallback ( callback : string ): Promise <void>

Parameters:

callback - Name of the callback method

Example:

JavaScript

JavaScript TypeScript JavaScript // Remove all email processing tasks await agent . dequeueAllByCallback ( "processEmail" ) ; TypeScript // Remove all email processing tasks await agent . dequeueAllByCallback ( "processEmail" ) ;

Retrieves a specific queued task by ID.

TypeScript async getQueue < T > ( id : string ): Promise < QueueItem < T > | undefined >

Parameters:

id - The ID of the task to retrieve

Returns: The QueueItem with parsed payload or undefined if not found

The payload is automatically parsed from JSON before being returned.

Example:

JavaScript

JavaScript TypeScript JavaScript const task = await agent . getQueue ( "abc123def" ) ; if ( task ) { console . log ( `Task callback: ${ task . callback } ` ) ; console . log ( `Task payload:` , task . payload ) ; } TypeScript const task = await agent . getQueue ( "abc123def" ) ; if ( task ) { console . log ( `Task callback: ${ task . callback } ` ) ; console . log ( `Task payload:` , task . payload ) ; }

Retrieves all queued tasks that match a specific key-value pair in their payload.

TypeScript async getQueues < T > ( key : string , value : string ): Promise < QueueItem < T > [] >

Parameters:

key - The key to filter by in the payload

- The key to filter by in the payload value - The value to match

Returns: Array of matching QueueItem objects

This method fetches all queue items and filters them in memory by parsing each payload and checking if the specified key matches the value.

Example:

JavaScript

JavaScript TypeScript JavaScript // Find all tasks for a specific user const userTasks = await agent . getQueues ( "userId" , "12345" ) ; TypeScript // Find all tasks for a specific user const userTasks = await agent . getQueues ( "userId" , "12345" ) ;

How queue processing works

Validation: When calling queue() , the method validates that the callback exists as a function on the agent. Automatic processing: After queuing, the system automatically attempts to flush the queue. FIFO order: Tasks are processed in the order they were created ( created_at timestamp). Context preservation: Each queued task runs with the same agent context (connection, request, email). Automatic dequeue: Successfully executed tasks are automatically removed from the queue. Error handling: If a callback method does not exist at execution time, an error is logged and the task is skipped. Persistence: Tasks are stored in the cf_agents_queues SQL table and survive agent restarts.

Queue callback methods

When defining callback methods for queued tasks, they must follow this signature:

TypeScript async callbackMethod ( payload : unknown , queueItem : QueueItem ): Promise <void>

Example:

JavaScript

JavaScript TypeScript JavaScript class MyAgent extends Agent { async sendNotification ( payload , queueItem ) { console . log ( `Processing task ${ queueItem . id } ` ) ; console . log ( `Sending notification to user ${ payload . userId } : ${ payload . message } ` , ) ; // Your notification logic here await this . notificationService . send ( payload . userId , payload . message ) ; } async onUserSignup ( userData ) { // Queue a welcome notification await this . queue ( "sendNotification" , { userId : userData . id , message : "Welcome to our platform!" , } ) ; } } TypeScript class MyAgent extends Agent { async sendNotification ( payload : { userId : string ; message : string }, queueItem : QueueItem <{ userId : string ; message : string }>, ) { console . log ( `Processing task ${ queueItem . id } ` ) ; console . log ( `Sending notification to user ${ payload . userId } : ${ payload . message } ` , ) ; // Your notification logic here await this . notificationService . send ( payload . userId , payload . message ) ; } async onUserSignup ( userData : any ) { // Queue a welcome notification await this . queue ( "sendNotification" , { userId : userData . id , message : "Welcome to our platform!" , } ) ; } }

Use cases

Background processing

JavaScript

JavaScript TypeScript JavaScript class DataProcessor extends Agent { async processLargeDataset ( data ) { const results = await this . heavyComputation ( data . datasetId ) ; await this . notifyUser ( data . userId , results ) ; } async onDataUpload ( uploadData ) { // Queue the processing instead of doing it synchronously await this . queue ( "processLargeDataset" , { datasetId : uploadData . id , userId : uploadData . userId , } ) ; return { message : "Data upload received, processing started" }; } } TypeScript class DataProcessor extends Agent { async processLargeDataset ( data : { datasetId : string ; userId : string }) { const results = await this . heavyComputation ( data . datasetId ) ; await this . notifyUser ( data . userId , results ) ; } async onDataUpload ( uploadData : any ) { // Queue the processing instead of doing it synchronously await this . queue ( "processLargeDataset" , { datasetId : uploadData . id , userId : uploadData . userId , } ) ; return { message : "Data upload received, processing started" }; } }

Batch operations

JavaScript

JavaScript TypeScript JavaScript class BatchProcessor extends Agent { async processBatch ( data ) { for ( const item of data . items ) { await this . processItem ( item ) ; } console . log ( `Completed batch ${ data . batchId } ` ) ; } async onLargeRequest ( items ) { // Split large requests into smaller batches const batchSize = 10 ; for ( let i = 0 ; i < items . length ; i += batchSize ) { const batch = items . slice ( i , i + batchSize ) ; await this . queue ( "processBatch" , { items : batch , batchId : `batch- ${ i / batchSize + 1 } ` , } ) ; } } } TypeScript class BatchProcessor extends Agent { async processBatch ( data : { items : any [] ; batchId : string }) { for ( const item of data . items ) { await this . processItem ( item ) ; } console . log ( `Completed batch ${ data . batchId } ` ) ; } async onLargeRequest ( items : any [] ) { // Split large requests into smaller batches const batchSize = 10 ; for ( let i = 0 ; i < items . length ; i += batchSize ) { const batch = items . slice ( i , i + batchSize ) ; await this . queue ( "processBatch" , { items : batch , batchId : `batch- ${ i / batchSize + 1 } ` , } ) ; } } }

Error handling

JavaScript

JavaScript TypeScript JavaScript class RobustAgent extends Agent { async reliableTask ( payload , queueItem ) { try { await this . doSomethingRisky ( payload ) ; } catch ( error ) { console . error ( `Task ${ queueItem . id } failed:` , error ) ; // Optionally re-queue with retry logic if ( payload . retryCount < 3 ) { await this . queue ( "reliableTask" , { ... payload , retryCount : ( payload . retryCount || 0 ) + 1 , } ) ; } } } } TypeScript class RobustAgent extends Agent { async reliableTask ( payload : any , queueItem : QueueItem ) { try { await this . doSomethingRisky ( payload ) ; } catch ( error ) { console . error ( `Task ${ queueItem . id } failed:` , error ) ; // Optionally re-queue with retry logic if ( payload . retryCount < 3 ) { await this . queue ( "reliableTask" , { ... payload , retryCount : ( payload . retryCount || 0 ) + 1 , } ) ; } } } }

Best practices

Keep payloads small: Payloads are JSON-serialized and stored in the database. Idempotent operations: Design callback methods to be safe to retry. Error handling: Include proper error handling in callback methods. Monitoring: Use logging to track queue processing. Cleanup: Regularly clean up completed or failed tasks if needed.

Integration with other features

The queue system works with other Agent SDK features:

State management : Access agent state within queued callbacks.

: Access agent state within queued callbacks. Scheduling : Combine with schedule() for time-based queue processing.

: Combine with for time-based queue processing. Context : Queued tasks maintain the original request context.

: Queued tasks maintain the original request context. Database: Uses the same database as other agent data.

Limitations

Tasks are processed sequentially, not in parallel.

No built-in retry mechanism (implement your own).

No priority system (FIFO only).

Queue processing happens during agent execution, not as separate background jobs.

Queue vs Schedule

Use queue when you want tasks to execute as soon as possible in order. Use schedule when you need tasks to run at specific times or on a recurring basis.

Feature Queue Schedule Execution timing Immediate (FIFO) Specific time or cron Use case Background processing Delayed or recurring tasks Storage cf_agents_queues table cf_agents_schedules table

