# Demos and architectures URL: https://developers.cloudflare.com/queues/demos/ import { ExternalResources, GlossaryTooltip, ResourcesBySelector } from "~/components" Learn how you can use Queues within your existing application and architecture. ## Demos Explore the following demo applications for Queues. ## Reference architectures Explore the following reference architectures that use Queues: --- # Get started URL: https://developers.cloudflare.com/queues/get-started/ import { Render, PackageManagers, WranglerConfig } from "~/components"; Cloudflare Queues is a flexible messaging queue that allows you to queue messages for asynchronous processing. By following this guide, you will create your first queue, a Worker to publish messages to that queue, and a consumer Worker to consume messages from that queue. ## Prerequisites To use Queues, you will need: ## 1. Create a Worker project You will access your queue from a Worker, the producer Worker. You must create at least one producer Worker to publish messages onto your queue. If you are using [R2 Bucket Event Notifications](/r2/buckets/event-notifications/), then you do not need a producer Worker. To create a producer Worker, run: This will create a new directory, which will include both a `src/index.ts` Worker script, and a [`wrangler.jsonc`](/workers/wrangler/configuration/) configuration file. After you create your Worker, you will create a Queue to access. Move into the newly created directory: ```sh cd producer-worker ``` ## 2. Create a queue To use queues, you need to create at least one queue to publish messages to and consume messages from. To create a queue, run: ```sh npx wrangler queues create ``` Choose a name that is descriptive and relates to the types of messages you intend to use this queue for. Descriptive queue names look like: `debug-logs`, `user-clickstream-data`, or `password-reset-prod`. Queue names must be 1 to 63 characters long. Queue names cannot contain special characters outside dashes (`-`), and must start and end with a letter or number. You cannot change your queue name after you have set it. After you create your queue, you will set up your producer Worker to access it. ## 3. Set up your producer Worker To expose your queue to the code inside your Worker, you need to connect your queue to your Worker by creating a binding. [Bindings](/workers/runtime-apis/bindings/) allow your Worker to access resources, such as Queues, on the Cloudflare developer platform. To create a binding, open your newly generated `wrangler.jsonc` file and add the following: ```toml [[queues.producers]] queue = "MY-QUEUE-NAME" binding = "MY_QUEUE" ``` Replace `MY-QUEUE-NAME` with the name of the queue you created in [step 2](/queues/get-started/#2-create-a-queue). Next, replace `MY_QUEUE` with the name you want for your `binding`. The binding must be a valid JavaScript variable name. This is the variable you will use to reference this queue in your Worker. ### Write your producer Worker You will now configure your producer Worker to create messages to publish to your queue. Your producer Worker will: 1. Take a request it receives from the browser. 2. Transform the request to JSON format. 3. Write the request directly to your queue. In your Worker project directory, open the `src` folder and add the following to your `index.ts` file: ```ts null {8} export default { async fetch(request, env, ctx): Promise { let log = { url: request.url, method: request.method, headers: Object.fromEntries(request.headers), }; await env..send(log); return new Response('Success!'); }, } satisfies ExportedHandler; ``` Replace `MY_QUEUE` with the name you have set for your binding from your `wrangler.jsonc` file. Also add the queue to `Env` interface in `index.ts`. ```ts null {2} export interface Env { : Queue; } ``` If this write fails, your Worker will return an error (raise an exception). If this write works, it will return `Success` back with a HTTP `200` status code to the browser. In a production application, you would likely use a [`try...catch`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/try...catch) statement to catch the exception and handle it directly (for example, return a custom error or even retry). ### Publish your producer Worker With your Wrangler file and `index.ts` file configured, you are ready to publish your producer Worker. To publish your producer Worker, run: ```sh npx wrangler deploy ``` You should see output that resembles the below, with a `*.workers.dev` URL by default. ``` Uploaded (0.76 sec) Published (0.29 sec) https://..workers.dev ``` Copy your `*.workers.dev` subdomain and paste it into a new browser tab. Refresh the page a few times to start publishing requests to your queue. Your browser should return the `Success` response after writing the request to the queue each time. You have built a queue and a producer Worker to publish messages to the queue. You will now create a consumer Worker to consume the messages published to your queue. Without a consumer Worker, the messages will stay on the queue until they expire, which defaults to four (4) days. ## 4. Create your consumer Worker A consumer Worker receives messages from your queue. When the consumer Worker receives your queue's messages, it can write them to another source, such as a logging console or storage objects. In this guide, you will create a consumer Worker and use it to log and inspect the messages with [`wrangler tail`](/workers/wrangler/commands/#tail). You will create your consumer Worker in the same Worker project that you created your producer Worker. :::note Queues also supports [pull-based consumers](/queues/configuration/pull-consumers/), which allows any HTTP-based client to consume messages from a queue. This guide creates a push-based consumer using Cloudflare Workers. ::: To create a consumer Worker, open your `index.ts` file and add the following `queue` handler to your existing `fetch` handler: ```ts null {11} export default { async fetch(request, env, ctx): Promise { let log = { url: request.url, method: request.method, headers: Object.fromEntries(request.headers), }; await env..send(log); return new Response('Success!'); }, async queue(batch, env): Promise { let messages = JSON.stringify(batch.messages); console.log(`consumed from our queue: ${messages}`); }, } satisfies ExportedHandler; ``` Replace `MY_QUEUE` with the name you have set for your binding from your `wrangler.jsonc` file. Every time messages are published to the queue, your consumer Worker's `queue` handler (`async queue`) is called and it is passed one or more messages. In this example, your consumer Worker transforms the queue's JSON formatted message into a string and logs that output. In a real world application, your consumer Worker can be configured to write messages to object storage (such as [R2](/r2/)), write to a database (like [D1](/d1/)), further process messages before calling an external API (such as an [email API](/workers/tutorials/)) or a data warehouse with your legacy cloud provider. When performing asynchronous tasks from within your consumer handler, use `waitUntil()` to ensure the response of the function is handled. Other asynchronous methods are not supported within the scope of this method. ### Connect the consumer Worker to your queue After you have configured your consumer Worker, you are ready to connect it to your queue. Each queue can only have one consumer Worker connected to it. If you try to connect multiple consumers to the same queue, you will encounter an error when attempting to publish that Worker. To connect your queue to your consumer Worker, open your Wrangler file and add this to the bottom: ```toml [[queues.consumers]] queue = "" # Required: this should match the name of the queue you created in step 3. # If you misspell the name, you will receive an error when attempting to publish your Worker. max_batch_size = 10 # optional: defaults to 10 max_batch_timeout = 5 # optional: defaults to 5 seconds ``` Replace `MY-QUEUE-NAME` with the queue you created in [step 2](/queues/get-started/#2-create-a-queue). In your consumer Worker, you are using queues to auto batch messages using the `max_batch_size` option and the `max_batch_timeout` option. The consumer Worker will receive messages in batches of `10` or every `5` seconds, whichever happens first. `max_batch_size` (defaults to 10) helps to reduce the amount of times your consumer Worker needs to be called. Instead of being called for every message, it will only be called after 10 messages have entered the queue. `max_batch_timeout` (defaults to 5 seconds) helps to reduce wait time. If the producer Worker is not sending up to 10 messages to the queue for the consumer Worker to be called, the consumer Worker will be called every 5 seconds to receive messages that are waiting in the queue. ### Publish your consumer Worker With your Wrangler file and `index.ts` file configured, publish your consumer Worker by running: ```sh npx wrangler deploy ``` ## 5. Read messages from your queue After you set up consumer Worker, you can read messages from the queue. Run `wrangler tail` to start waiting for our consumer to log the messages it receives: ```sh npx wrangler tail ``` With `wrangler tail` running, open the Worker URL you opened in [step 3](/queues/get-started/#3-set-up-your-producer-worker). You should receive a `Success` message in your browser window. If you receive a `Success` message, refresh the URL a few times to generate messages and push them onto the queue. With `wrangler tail` running, your consumer Worker will start logging the requests generated by refreshing. If you refresh less than 10 times, it may take a few seconds for the messages to appear because batch timeout is configured for 10 seconds. After 10 seconds, messages should arrive in your terminal. If you get errors when you refresh, check that the queue name you created in [step 2](/queues/get-started/#2-create-a-queue) and the queue you referenced in your Wrangler file is the same. You should ensure that your producer Worker is returning `Success` and is not returning an error. By completing this guide, you have now created a queue, a producer Worker that publishes messages to that queue, and a consumer Worker that consumes those messages from it. ## Related resources - Learn more about [Cloudflare Workers](/workers/) and the applications you can build on Cloudflare. --- # Glossary URL: https://developers.cloudflare.com/queues/glossary/ import { Glossary } from "~/components" Review the definitions for terms used across Cloudflare's Queues documentation. --- # Overview URL: https://developers.cloudflare.com/queues/ import { CardGrid, Description, Feature, LinkTitleCard, Plan, RelatedProduct } from "~/components" Send and receive messages with guaranteed delivery and no charges for egress bandwidth. Cloudflare Queues integrate with [Cloudflare Workers](/workers/) and enable you to build applications that can [guarantee delivery](/queues/reference/delivery-guarantees/), [offload work from a request](/queues/reference/how-queues-works/), [send data from Worker to Worker](/queues/configuration/configure-queues/), and [buffer or batch data](/queues/configuration/batching-retries/). *** ## Features Cloudflare Queues allows you to batch, retry and delay messages. Redirect your messages when a delivery failure occurs. Configure pull-based consumers to pull from a queue over HTTP from infrastructure outside of Cloudflare Workers. *** ## Related products Cloudflare R2 Storage allows developers to store large amounts of unstructured data without the costly egress bandwidth fees associated with typical cloud storage services. Cloudflare Workers allows developers to build serverless applications and deploy instantly across the globe for exceptional performance, reliability, and scale. *** ## More resources Learn about pricing. Learn about Queues limits. Try Cloudflare Queues which can run on your local machine. Follow @CloudflareDev on Twitter to learn about product announcements, and what is new in Cloudflare Workers. Connect with the Workers community on Discord to ask questions, show what you are building, and discuss the platform with other developers. Learn how to configure Cloudflare Queues using Wrangler. Learn how to use JavaScript APIs to send and receive messages to a Cloudflare Queue. --- # Batching, Retries and Delays URL: https://developers.cloudflare.com/queues/configuration/batching-retries/ import { WranglerConfig } from "~/components"; ## Batching When configuring a [consumer Worker](/queues/reference/how-queues-works#consumers) for a queue, you can also define how messages are batched as they are delivered. Batching can: 1. Reduce the total number of times your consumer Worker needs to be invoked (which can reduce costs). 2. Allow you to batch messages when writing to an external API or service (reducing writes). 3. Disperse load over time, especially if your producer Workers are associated with user-facing activity. There are two ways to configure how messages are batched. You configure batching when connecting your consumer Worker to a queue. - `max_batch_size` - The maximum size of a batch delivered to a consumer (defaults to 10 messages). - `max_batch_timeout` - the _maximum_ amount of time the queue will wait before delivering a batch to a consumer (defaults to 5 seconds) :::note[Batch size configuration] Both `max_batch_size` and `max_batch_timeout` work together. Whichever limit is reached first will trigger the delivery of a batch. ::: For example, a `max_batch_size = 30` and a `max_batch_timeout = 10` means that if 30 messages are written to the queue, the consumer will deliver a batch of 30 messages. However, if it takes longer than 10 seconds for those 30 messages to be written to the queue, then the consumer will get a batch of messages that contains however many messages were on the queue at the time (somewhere between 1 and 29, in this case). :::note[Empty queues] When a queue is empty, a push-based (Worker) consumer's `queue` handler will not be invoked until there are messages to deliver. A queue does not attempt to push empty batches to a consumer and thus does not invoke unnecessary reads. [Pull-based consumers](/queues/configuration/pull-consumers/) that attempt to pull from a queue, even when empty, will incur a read operation. ::: When determining what size and timeout settings to configure, you will want to consider latency (how long can you wait to receive messages?), overall batch size (when writing to external systems), and cost (fewer-but-larger batches). ### Batch settings The following batch-level settings can be configured to adjust how Queues delivers batches to your configured consumer. | Setting | Default | Minimum | Maximum | | ----------------------------------------- | ----------- | --------- | ------------ | | Maximum Batch Size `max_batch_size` | 10 messages | 1 message | 100 messages | | Maximum Batch Timeout `max_batch_timeout` | 5 seconds | 0 seconds | 60 seconds | ## Explicit acknowledgement and retries You can acknowledge individual messages within a batch by explicitly acknowledging each message as it is processed. Messages that are explicitly acknowledged will not be re-delivered, even if your queue consumer fails on a subsequent message and/or fails to return successfully when processing a batch. - Each message can be acknowledged as you process it within a batch, and avoids the entire batch from being re-delivered if your consumer throws an error during batch processing. - Acknowledging individual messages is useful when you are calling external APIs, writing messages to a database, or otherwise performing non-idempotent (state changing) actions on individual messages. To explicitly acknowledge a message as delivered, call the `ack()` method on the message. ```ts title="index.js" export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // TODO: do something with the message // Explicitly acknowledge the message as delivered msg.ack(); } }, }; ``` You can also call `retry()` to explicitly force a message to be redelivered in a subsequent batch. This is referred to as "negative acknowledgement". This can be particularly useful when you want to process the rest of the messages in that batch without throwing an error that would force the entire batch to be redelivered. ```ts title="index.ts" export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // TODO: do something with the message that fails msg.retry(); } }, }; ``` You can also acknowledge or negatively acknowledge messages at a batch level with `ackAll()` and `retryAll()`. Calling `ackAll()` on the batch of messages (`MessageBatch`) delivered to your consumer Worker has the same behaviour as a consumer Worker that successfully returns (does not throw an error). Note that calls to `ack()`, `retry()` and their `ackAll()` / `retryAll()` equivalents follow the below precedence rules: - If you call `ack()` on a message, subsequent calls to `ack()` or `retry()` are silently ignored. - If you call `retry()` on a message and then call `ack()`: the `ack()` is ignored. The first method call wins in all cases. - If you call either `ack()` or `retry()` on a single message, and then either/any of `ackAll()` or `retryAll()` on the batch, the call on the single message takes precedence. That is, the batch-level call does not apply to that message (or messages, if multiple calls were made). ## Delivery failure When a message is failed to be delivered, the default behaviour is to retry delivery three times before marking the delivery as failed. You can set `max_retries` (defaults to 3) when configuring your consumer, but in most cases we recommend leaving this as the default. Messages that reach the configured maximum retries will be deleted from the queue, or if a [dead-letter queue](/queues/configuration/dead-letter-queues/) (DLQ) is configured, written to the DLQ instead. :::note Each retry counts as an additional read operation per [Queues pricing](/queues/platform/pricing/). ::: When a single message within a batch fails to be delivered, the entire batch is retried, unless you have [explicitly acknowledged](#explicit-acknowledgement-and-retries) a message (or messages) within that batch. For example, if a batch of 10 messages is delivered, but the 8th message fails to be delivered, all 10 messages will be retried and thus redelivered to your consumer in full. :::caution[Retried messages and consumer concurrency] Retrying messages with `retry()` or calling `retryAll()` on a batch will **not** cause the consumer to autoscale down if consumer concurrency is enabled. Refer to [Consumer concurrency](/queues/configuration/consumer-concurrency/) to learn more. ::: ## Delay messages When publishing messages to a queue, or when [marking a message or batch for retry](#explicit-acknowledgement-and-retries), you can choose to delay messages from being processed for a period of time. Delaying messages allows you to defer tasks until later, and/or respond to backpressure when consuming from a queue. For example, if an upstream API you are calling to returns a `HTTP 429: Too Many Requests`, you can delay messages to slow down how quickly you are consuming them before they are re-processed. Messages can be delayed by up to 12 hours. :::note Configuring delivery and retry delays via the `wrangler` CLI or when [developing locally](/queues/configuration/local-development/) requires `wrangler` version `3.38.0` or greater. Use `npx wrangler@latest` to always use the latest version of `wrangler`. ::: ### Delay on send To delay a message or batch of messages when sending to a queue, you can provide a `delaySeconds` parameter when sending a message. ```ts // Delay a singular message by 600 seconds (10 minutes) await env.YOUR_QUEUE.send(message, { delaySeconds: 600 }); // Delay a batch of messages by 300 seconds (5 minutes) await env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 300 }); // Do not delay this message. // If there is a global delay configured on the queue, ignore it. await env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 0 }); ``` You can also configure a default, global delay on a per-queue basis by passing `--delivery-delay-secs` when creating a queue via the `wrangler` CLI: ```sh # Delay all messages by 5 minutes as a default npx wrangler queues create $QUEUE-NAME --delivery-delay-secs=300 ``` ### Delay on retry When [consuming messages from a queue](/queues/reference/how-queues-works/#consumers), you can choose to [explicitly mark messages to be retried](#explicit-acknowledgement-and-retries). Messages can be retried and delayed individually, or as an entire batch. To delay an individual message within a batch: ```ts title="index.ts" export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // Mark for retry and delay a singular message // by 3600 seconds (1 hour) msg.retry({ delaySeconds: 3600 }); } }, }; ``` To delay a batch of messages: ```ts title="index.ts" export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { // Mark for retry and delay a batch of messages // by 600 seconds (10 minutes) batch.retryAll({ delaySeconds: 600 }); }, }; ``` You can also choose to set a default retry delay to any messages that are retried due to either implicit failure or when calling `retry()` explicitly. This is set at the consumer level, and is supported in both push-based (Worker) and pull-based (HTTP) consumers. Delays can be configured via the `wrangler` CLI: ```sh # Push-based consumers # Delay any messages that are retried by 60 seconds (1 minute) by default. npx wrangler@latest queues consumer worker add $QUEUE-NAME $WORKER_SCRIPT_NAME --retry-delay-secs=60 # Pull-based consumers # Delay any messages that are retried by 60 seconds (1 minute) by default. npx wrangler@latest queues consumer http add $QUEUE-NAME --retry-delay-secs=60 ``` Delays can also be configured in the [Wrangler configuration file](/workers/wrangler/configuration/#queues) with the `delivery_delay` setting for producers (when sending) and/or the `retry_delay` (when retrying) per-consumer: ```toml title="wrangler.toml" [[queues.producers]] binding = "" queue = "" delivery_delay = 60 # delay every message delivery by 1 minute [[queues.consumers]] queue = "my-queue" retry_delay = 300 # delay any retried message by 5 minutes before re-attempting delivery ``` If you use both the `wrangler` CLI and the [Wrangler configuration file](/workers/wrangler/configuration/) to change the settings associated with a queue or a queue consumer, the most recent configuration change will take effect. Refer to the [Queues REST API documentation](/api/resources/queues/subresources/consumers/methods/get/) to learn how to configure message delays and retry delays programmatically. ### Message delay precedence Messages can be delayed by default at the queue level, or per-message (or batch). - Per-message/batch delay settings take precedence over queue-level settings. - Setting `delaySeconds: 0` on a message when sending or retrying will ignore any queue-level delays and cause the message to be delivered in the next batch. - A message sent or retried with `delaySeconds: ` to a queue with a shorter default delay will still respect the message-level setting. ### Apply a backoff algorithm You can apply a backoff algorithm to increasingly delay messages based on the current number of attempts to deliver the message. Each message delivered to a consumer includes an `attempts` property that tracks the number of delivery attempts made. For example, to generate an [exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) for a message, you can create a helper function that calculates this for you: ```ts const calculateExponentialBackoff = ( attempts: number, baseDelaySeconds: number, ) => { return baseDelaySeconds ** attempts; }; ``` In your consumer, you then pass the value of `msg.attempts` and your desired delay factor as the argument to `delaySeconds` when calling `retry()` on an individual message: ```ts title="index.ts" const BASE_DELAY_SECONDS = 30; export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // Mark for retry and delay a singular message // by 3600 seconds (1 hour) msg.retry({ delaySeconds: calculateExponentialBackoff( msg.attempts, BASE_DELAY_SECONDS, ), }); } }, }; ``` ## Related - Review the [JavaScript API](/queues/configuration/javascript-apis/) documentation for Queues. - Learn more about [How Queues Works](/queues/reference/how-queues-works/). - Understand the [metrics available](/queues/observability/metrics/) for your queues, including backlog and delayed message counts. --- # Configure Queues URL: https://developers.cloudflare.com/queues/configuration/configure-queues/ import { WranglerConfig, Type } from "~/components"; Cloudflare Queues can be configured using [Wrangler](/workers/wrangler/install-and-update/), the command-line interface for Cloudflare's Developer Platform, which includes [Workers](/workers/), [R2](/r2/), and other developer products. Each Producer and Consumer Worker has a [Wrangler configuration file](/workers/wrangler/configuration/) that specifies environment variables, triggers, and resources, such as a queue. To enable Worker-to-resource communication, you must set up a [binding](/workers/runtime-apis/bindings/) in your Worker project's Wrangler file. Use the options below to configure your queue. :::note Below are options for queues, refer to the Wrangler documentation for a full reference of the [Wrangler configuration file](/workers/wrangler/configuration/). ::: ## Queue configuration The following queue level settings can be configured using Wrangler: ```sh $ npx run wrangler queues update --delivery-delay-secs 60 --message-retention-period-secs 3000 ``` * `--delivery-delay-secs` * How long a published message is delayed for, before it is delivered to consumers. * Must be between 0 and 43200 (12 hours). * Defaults to 0. * `--message-retention-period-secs` * How long messages are retained on the Queue. * Defaults to 345600 (4 days). * Must be between 60 and 1209600 (14 days) ## Producer Worker configuration A producer is a [Cloudflare Worker](/workers/) that writes to one or more queues. A producer can accept messages over HTTP, asynchronously write messages when handling requests, and/or write to a queue from within a [Durable Object](/durable-objects/). Any Worker can write to a queue. To produce to a queue, set up a binding in your Wrangler file. These options should be used when a Worker wants to send messages to a queue. ```toml [[queues.producers]] queue = "my-queue" binding = "MY_QUEUE" ``` * queue * The name of the queue. * binding * The name of the binding, which is a JavaScript variable. ## Consumer Worker Configuration To consume messages from one or more queues, set up a binding in your Wrangler file. These options should be used when a Worker wants to receive messages from a queue. ```toml [[queues.consumers]] queue = "my-queue" max_batch_size = 10 max_batch_timeout = 30 max_retries = 10 dead_letter_queue = "my-queue-dlq" ``` Refer to [Limits](/queues/platform/limits) to review the maximum values for each of these options. * queue * The name of the queue. * max\_batch\_size * The maximum number of messages allowed in each batch. * Defaults to `10` messages. * max\_batch\_timeout * The maximum number of seconds to wait until a batch is full. * Defaults to `5` seconds. * max\_retries * The maximum number of retries for a message, if it fails or [`retryAll()`](/queues/configuration/javascript-apis/#messagebatch) is invoked. * Defaults to `3` retries. * dead\_letter\_queue * The name of another queue to send a message if it fails processing at least `max_retries` times. * If a `dead_letter_queue` is not defined, messages that repeatedly fail processing will eventually be discarded. * If there is no queue with the specified name, it will be created automatically. * max\_concurrency * The maximum number of concurrent consumers allowed to run at once. Leaving this unset will mean that the number of invocations will scale to the [currently supported maximum](/queues/platform/limits/). * Refer to [Consumer concurrency](/queues/configuration/consumer-concurrency/) for more information on how consumers autoscale, particularly when messages are retried. ## Pull-based A queue can have a HTTP-based consumer that pulls from the queue. This consumer can be any HTTP-speaking service that can communicate over the Internet. Review [Pull consumers](/queues/configuration/pull-consumers/) to learn how to configure a pull-based consumer. --- # Consumer concurrency URL: https://developers.cloudflare.com/queues/configuration/consumer-concurrency/ import { WranglerConfig } from "~/components"; Consumer concurrency allows a [consumer Worker](/queues/reference/how-queues-works/#consumers) processing messages from a queue to automatically scale out horizontally to keep up with the rate that messages are being written to a queue. In many systems, the rate at which you write messages to a queue can easily exceed the rate at which a single consumer can read and process those same messages. This is often because your consumer might be parsing message contents, writing to storage or a database, or making third-party (upstream) API calls. Note that queue producers are always scalable, up to the [maximum supported messages-per-second](/queues/platform/limits/) (per queue) limit. ## Enable concurrency By default, all queues have concurrency enabled. Queue consumers will automatically scale up [to the maximum concurrent invocations](/queues/platform/limits/) as needed to manage a queue's backlog and/or error rates. ## How concurrency works After processing a batch of messages, Queues will check to see if the number of concurrent consumers should be adjusted. The number of concurrent consumers invoked for a queue will autoscale based on several factors, including: - The number of messages in the queue (backlog) and its rate of growth. - The ratio of failed (versus successful) invocations. A failed invocation is when your `queue()` handler returns an uncaught exception instead of `void` (nothing). - The value of `max_concurrency` set for that consumer. Where possible, Queues will optimize for keeping your backlog from growing exponentially, in order to minimize scenarios where the backlog of messages in a queue grows to the point that they would reach the [message retention limit](/queues/platform/limits/) before being processed. :::note[Consumer concurrency and retried messages] [Retrying messages with `retry()`](/queues/configuration/batching-retries/#explicit-acknowledgement-and-retries) or calling `retryAll()` on a batch will **not** count as a failed invocation. ::: ### Example If you are writing 100 messages/second to a queue with a single concurrent consumer that takes 5 seconds to process a batch of 100 messages, the number of messages in-flight will continue to grow at a rate faster than your consumer can keep up. In this scenario, Queues will notice the growing backlog and will scale the number of concurrent consumer Workers invocations up to a steady-state of (approximately) five (5) until the rate of incoming messages decreases, the consumer processes messages faster, or the consumer begins to generate errors. ### Why are my consumers not autoscaling? If your consumers are not autoscaling, there are a few likely causes: - `max_concurrency` has been set to 1. - Your consumer Worker is returning errors rather than processing messages. Inspect your consumer to make sure it is healthy. - A batch of messages is being processed. Queues checks if it should autoscale consumers only after processing an entire batch of messages, so it will not autoscale while a batch is being processed. Consider reducing batch sizes or refactoring your consumer to process messages faster. ## Limit concurrency :::caution[Recommended concurrency setting] Cloudflare recommends leaving the maximum concurrency unset, which will allow your queue consumer to scale up as much as possible. Setting a fixed number means that your consumer will only ever scale up to that maximum, even as Queues increases the maximum supported invocations over time. ::: If you have a workflow that is limited by an upstream API and/or system, you may prefer for your backlog to grow, trading off increased overall latency in order to avoid overwhelming an upstream system. You can configure the concurrency of your consumer Worker in two ways: 1. Set concurrency settings in the Cloudflare dashboard 2. Set concurrency settings via the [Wrangler configuration file](/workers/wrangler/configuration/) ### Set concurrency settings in the Cloudflare dashboard To configure the concurrency settings for your consumer Worker from the dashboard: 1. Log in to the [Cloudflare dashboard](https://dash.cloudflare.com) and select your account. 2. Select **Workers & Pages** > **Queues**. 3. Select your queue > **Settings**. 4. Select **Edit Consumer** under Consumer details. 5. Set **Maximum consumer invocations** to a value between `1` and `250`. This value represents the maximum number of concurrent consumer invocations available to your queue. To remove a fixed maximum value, select **auto (recommended)**. Note that if you are writing messages to a queue faster than you can process them, messages may eventually reach the [maximum retention period](/queues/platform/limits/) set for that queue. Individual messages that reach that limit will expire from the queue and be deleted. ### Set concurrency settings in the [Wrangler configuration file](/workers/wrangler/configuration/) :::note Ensure you are using the latest version of [wrangler](/workers/wrangler/install-and-update/). Support for configuring the maximum concurrency of a queue consumer is only supported in wrangler [`2.13.0`](https://github.com/cloudflare/workers-sdk/releases/tag/wrangler%402.13.0) or greater. ::: To set a fixed maximum number of concurrent consumer invocations for a given queue, configure a `max_concurrency` in your Wrangler file: ```toml [[queues.consumers]] queue = "my-queue" max_concurrency = 1 ``` To remove the limit, remove the `max_concurrency` setting from the `[[queues.consumers]]` configuration for a given queue and call `npx wrangler deploy` to push your configuration update. {/* Not yet available but will be very soon ### wrangler CLI ```sh # where `N` is a positive integer between 1 and 250 wrangler queues consumer update --max-concurrency=N ``` To remove the limit and allow Queues to scale your consumer to the maximum number of invocations, call `consumer update` without any flags: ```sh # Call update without passing a flag to allow concurrency to scale to the maximum wrangler queues consumer update ``` */} ## Billing When multiple consumer Workers are invoked, each Worker invocation incurs [CPU time costs](/workers/platform/pricing/#workers). - If you intend to process all messages written to a queue, _the effective overall cost is the same_, even with concurrency enabled. - Enabling concurrency simply brings those costs forward, and can help prevent messages from reaching the [message retention limit](/queues/platform/limits/). Billing for consumers follows the [Workers standard usage model](/workers/platform/pricing/#example-pricing) meaning a developer is billed for the request and for CPU time used in the request. ### Example A consumer Worker that takes 2 seconds to process a batch of messages will incur the same overall costs to process 50 million (50,000,000) messages, whether it does so concurrently (faster) or individually (slower). --- # Dead Letter Queues URL: https://developers.cloudflare.com/queues/configuration/dead-letter-queues/ import { WranglerConfig } from "~/components"; A Dead Letter Queue (DLQ) is a common concept in a messaging system, and represents where messages are sent when a delivery failure occurs with a consumer after `max_retries` is reached. A Dead Letter Queue is like any other queue, and can be produced to and consumed from independently. With Cloudflare Queues, a Dead Letter Queue is defined within your [consumer configuration](/queues/configuration/configure-queues/). Messages are delivered to the DLQ when they reach the configured retry limit for the consumer. Without a DLQ configured, messages that reach the retry limit are deleted permanently. For example, the following consumer configuration would send messages to our DLQ named `"my-other-queue"` after retrying delivery (by default, 3 times): ```toml [[queues.consumers]] queue = "my-queue" dead_letter_queue = "my-other-queue" ``` You can also configure a DLQ when creating a consumer from the command-line using `wrangler`: ```sh wrangler queues consumer add $QUEUE_NAME $SCRIPT_NAME --dead-letter-queue=$NAME_OF_OTHER_QUEUE ``` To process messages placed on your DLQ, you need to [configure a consumer](/queues/configuration/configure-queues/) for that queue as you would with any other queue. Messages delivered to a DLQ without an active consumer will persist for four (4) days before being deleted from the queue. --- # Configuration URL: https://developers.cloudflare.com/queues/configuration/ import { DirectoryListing } from "~/components" --- # Local Development URL: https://developers.cloudflare.com/queues/configuration/local-development/ Queues support local development workflows using [Wrangler](/workers/wrangler/install-and-update/), the command-line interface for Workers. Wrangler runs the same version of Queues as Cloudflare runs globally. ## Prerequisites To develop locally with Queues, you will need: - [Wrangler v3.1.0](https://blog.cloudflare.com/wrangler3/) or later. - Node.js version of `18.0.0` or later. Consider using a Node version manager like [Volta](https://volta.sh/) or [nvm](https://github.com/nvm-sh/nvm) to avoid permission issues and change Node versions. - If you are new to Queues and/or Cloudflare Workers, refer to the [Queues tutorial](/queues/get-started/) to install `wrangler` and deploy their first Queue. ## Start a local development session Open your terminal and run the following commands to start a local development session: ```sh # Confirm we are using wrangler v3.1.0+ wrangler --version ``` ```sh output ⛅️ wrangler 3.1.0 ``` Start a local dev session ```sh # Start a local dev session: npx wrangler dev ``` ```sh output ------------------ wrangler dev now uses local mode by default, powered by 🔥 Miniflare and 👷 workerd. To run an edge preview session for your Worker, use wrangler dev --remote ⎔ Starting local server... [mf:inf] Ready on http://127.0.0.1:8787/ ``` Local development sessions create a standalone, local-only environment that mirrors the production environment Queues runs in so you can test your Workers _before_ you deploy to production. Refer to the [`wrangler dev` documentation](/workers/wrangler/commands/#dev) to learn more about how to configure a local development session. ## Known Issues Wrangler does not yet support running separate producer and consumer Workers bound to the same Queue locally. To develop locally with Queues, you can temporarily put your consumer's `queue()` handler in the same Worker as your producer, so the same Worker acts as both a producer and consumer. Wrangler also does not yet support `wrangler dev --remote`. --- # JavaScript APIs URL: https://developers.cloudflare.com/queues/configuration/javascript-apis/ import { Type } from "~/components"; Cloudflare Queues is integrated with [Cloudflare Workers](/workers). To send and receive messages, you must use a Worker. A Worker that can send messages to a Queue is a producer Worker, while a Worker that can receive messages from a Queue is a consumer Worker. It is possible for the same Worker to be a producer and consumer, if desired. In the future, we expect to support other APIs, such as HTTP endpoints to send or receive messages. To report bugs or request features, go to the [Cloudflare Community Forums](https://community.cloudflare.com/c/developers/workers/40). To give feedback, go to the [`#queues`](https://discord.cloudflare.com) Discord channel. ## Producer These APIs allow a producer Worker to send messages to a Queue. An example of writing a single message to a Queue: ```ts type Environment = { readonly MY_QUEUE: Queue; }; export default { async fetch(req: Request, env: Environment): Promise { await env.MY_QUEUE.send({ url: req.url, method: req.method, headers: Object.fromEntries(req.headers), }); return new Response('Sent!'); }, }; ``` The Queues API also supports writing multiple messages at once: ```ts const sendResultsToQueue = async (results: Array, env: Environment) => { const batch: MessageSendRequest[] = results.map((value) => ({ body: JSON.stringify(value), })); await env.queue.sendBatch(batch); }; ``` ### `Queue` A binding that allows a producer to send messages to a Queue. ```ts interface Queue { send(body: Body, options?: QueueSendOptions): Promise; sendBatch(messages: Iterable>, options?: QueueSendBatchOptions): Promise; } ``` * `send(bodyunknown, options?{ contentType?: QueuesContentType })` * Sends a message to the Queue. The body can be any type supported by the [structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#supported_types), as long as its size is less than 128 KB. * When the promise resolves, the message is confirmed to be written to disk. * `sendBatch(bodyIterable>)` * Sends a batch of messages to the Queue. Each item in the provided [Iterable](https://www.typescriptlang.org/docs/handbook/iterators-and-generators.html) must be supported by the [structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#supported_types). A batch can contain up to 100 messages, though items are limited to 128 KB each, and the total size of the array cannot exceed 256 KB. * When the promise resolves, the messages are confirmed to be written to disk. ### `MessageSendRequest` A wrapper type used for sending message batches. ```ts type MessageSendRequest = { body: Body; options?: QueueSendOptions; }; ``` * body * The body of the message. * The body can be any type supported by the [structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#supported_types), as long as its size is less than 128 KB. * options * Options to apply to the current message, including content type and message delay settings. ### `QueueSendOptions` Optional configuration that applies when sending a message to a queue. * contentType * The explicit content type of a message so it can be previewed correctly with the [List messages from the dashboard](/queues/examples/list-messages-from-dash/) feature. Optional argument. * As of now, this option is for internal use. In the future, `contentType` will be used by alternative consumer types to explicitly mark messages as serialized so they can be consumed in the desired type. * See [QueuesContentType](#queuescontenttype) for possible values. * delaySeconds * The number of seconds to [delay a message](/queues/configuration/batching-retries/) for within the queue, before it can be delivered to a consumer. * Must be an integer between 0 and 43200 (12 hours). Setting this value to zero will explicitly prevent the message from being delayed, even if there is a global (default) delay at the queue level. ### `QueueSendBatchOptions` Optional configuration that applies when sending a batch of messages to a queue. * delaySeconds * The number of seconds to [delay messages](/queues/configuration/batching-retries/) for within the queue, before it can be delivered to a consumer. * Must be a positive integer. ### `QueuesContentType` A union type containing valid message content types. ```ts // Default: json type QueuesContentType = "text" | "bytes" | "json" | "v8"; ``` * Use `"json"` to send a JavaScript object that can be JSON-serialized. This content type can be previewed from the [Cloudflare dashboard](https://dash.cloudflare.com). The `json` content type is the default. * Use `"text"` to send a `String`. This content type can be previewed with the [List messages from the dashboard](/queues/examples/list-messages-from-dash/) feature. * Use `"bytes"` to send an `ArrayBuffer`. This content type cannot be previewed from the [Cloudflare dashboard](https://dash.cloudflare.com) and will display as Base64-encoded. * Use `"v8"` to send a JavaScript object that cannot be JSON-serialized but is supported by [structured clone](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#supported_types) (for example `Date` and `Map`). This content type cannot be previewed from the [Cloudflare dashboard](https://dash.cloudflare.com) and will display as Base64-encoded. :::note The default content type for Queues changed to `json` (from `v8`) to improve compatibility with pull-based consumers for any Workers with a [compatibility date](/workers/configuration/compatibility-flags/#queues-send-messages-in-json-format) after `2024-03-18`. ::: If you specify an invalid content type, or if your specified content type does not match the message content's type, the send operation will fail with an error. ## Consumer These APIs allow a consumer Worker to consume messages from a Queue. To define a consumer Worker, add a `queue()` function to the default export of the Worker. This will allow it to receive messages from the Queue. By default, all messages in the batch will be acknowledged as soon as all of the following conditions are met: 1. The `queue()` function has returned. 2. If the `queue()` function returned a promise, the promise has resolved. 3. Any promises passed to `waitUntil()` have resolved. If the `queue()` function throws, or the promise returned by it or any of the promises passed to `waitUntil()` were rejected, then the entire batch will be considered a failure and will be retried according to the consumer's retry settings. :::note `waitUntil()` is the only supported method to run tasks (such as logging or metrics calls) that resolve after a queue handler has completed. Promises that have not resolved by the time the queue handler returns may not complete and will not block completion of execution. ::: ```ts export default { async queue( batch: MessageBatch, env: Environment, ctx: ExecutionContext ): Promise { for (const message of batch.messages) { console.log('Received', message); } }, }; ``` The `env` and `ctx` fields are as [documented in the Workers documentation](/workers/reference/migrate-to-module-workers/). Or alternatively, a queue consumer can be written using the (deprecated) service worker syntax: ```js addEventListener('queue', (event) => { event.waitUntil(handleMessages(event)); }); ``` In service worker syntax, `event` provides the same fields and methods as `MessageBatch`, as defined below, in addition to [`waitUntil()`](https://developer.mozilla.org/en-US/docs/Web/API/ExtendableEvent/waitUntil). :::note When performing asynchronous tasks in your queue handler that iterates through messages, use an asynchronous version of iterating through your messages. For example, `for (const m of batch.messages)`or `await Promise.all(batch.messages.map(work))` allow for waiting for the results of asynchronous calls. `batch.messages.forEach()` does not. ::: ### `MessageBatch` A batch of messages that are sent to a consumer Worker. ```ts interface MessageBatch { readonly queue: string; readonly messages: Message[]; ackAll(): void; retryAll(options?: QueueRetryOptions): void; } ``` * queue * The name of the Queue that belongs to this batch. * messages * An array of messages in the batch. Ordering of messages is best effort -- not guaranteed to be exactly the same as the order in which they were published. If you are interested in guaranteed FIFO ordering, please [email the Queues team](mailto:queues@cloudflare.com). * ackAll() * Marks every message as successfully delivered, regardless of whether your `queue()` consumer handler returns successfully or not. * retryAll(options?: QueueRetryOptions) * Marks every message to be retried in the next batch. * Supports an optional `options` object. ### `Message` A message that is sent to a consumer Worker. ```ts interface Message { readonly id: string; readonly timestamp: Date; readonly body: Body; readonly attempts: number; ack(): void; retry(options?: QueueRetryOptions): void; } ``` * id * A unique, system-generated ID for the message. * timestamp * A timestamp when the message was sent. * body * The body of the message. * The body can be any type supported by the [structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#supported_types), as long as its size is less than 128 KB. * attempts * The number of times the consumer has attempted to process this message. Starts at 1. * ack() * Marks a message as successfully delivered, regardless of whether your `queue()` consumer handler returns successfully or not. * retry(options?: QueueRetryOptions) * Marks a message to be retried in the next batch. * Supports an optional `options` object. ### `QueueRetryOptions` Optional configuration when marking a message or a batch of messages for retry. ```ts interface QueueRetryOptions { delaySeconds?: number; } ``` * delaySeconds * The number of seconds to [delay a message](/queues/configuration/batching-retries/) for within the queue, before it can be delivered to a consumer. * Must be a positive integer. --- # Pull consumers URL: https://developers.cloudflare.com/queues/configuration/pull-consumers/ import { WranglerConfig } from "~/components"; A pull-based consumer allows you to pull from a queue over HTTP from any environment and/or programming language outside of Cloudflare Workers. A pull-based consumer can be useful when your message consumption rate is limited by upstream infrastructure or long-running tasks. ## How to choose between push or pull consumer Deciding whether to configure a push-based consumer or a pull-based consumer will depend on how you are using your queues, as well as the configuration of infrastructure upstream from your queue consumer. - **Starting with a [push-based consumer](/queues/reference/how-queues-works/#consumers) is the easiest way to get started and consume from a queue**. A push-based consumer runs on Workers, and by default, will automatically scale up and consume messages as they are written to the queue. - Use a pull-based consumer if you need to consume messages from existing infrastructure outside of Cloudflare Workers, and/or where you need to carefully control how fast messages are consumed. A pull-based consumer must explicitly make a call to pull (and then acknowledge) messages from the queue, only when it is ready to do so. You can remove and attach a new consumer on a queue at any time, allowing you to change from a pull-based to a push-based consumer if your requirements change. :::note[Retrieve an API bearer token] To configure a pull-based consumer, create [an API token](/fundamentals/api/get-started/create-token/) with both the `queues#read` and `queues#write` permissions. A consumer must be able to write to a queue to acknowledge messages. ::: To configure a pull-based consumer and receive messages from a queue, you need to: 1. Enable HTTP pull for the queue. 2. Create a valid authentication token for the HTTP client. 3. Pull message batches from the queue. 4. Acknowledge and/or retry messages within a batch. ## 1. Enable HTTP pull You can enable HTTP pull or change a queue from push-based to pull-based via the [Wrangler configuration file](/workers/wrangler/configuration/), the `wrangler` CLI, or via the [Cloudflare dashboard](https://dash.cloudflare.com/). ### Wrangler configuration file A HTTP consumer can be configured in the [Wrangler configuration file](/workers/wrangler/configuration/) by setting `type = "http_pull"` in the consumer configuration: ```toml [[queues.consumers]] # Required queue = "QUEUE-NAME" type = "http_pull" # Optional visibility_timeout_ms = 5000 max_retries = 5 dead_letter_queue = "SOME-OTHER-QUEUE" ``` Omitting the `type` property will default the queue to push-based. ### wrangler CLI You can enable a pull-based consumer on any existing queue by using the `wrangler queues consumer http` sub-commands and providing a queue name. ```sh npx wrangler queues consumer http add $QUEUE-NAME ``` If you have an existing push-based consumer, you will need to remove that first. `wrangler` will return an error if you attempt to call `consumer http add` on a queue with an existing consumer configuration: ```sh wrangler queues consumer worker remove $QUEUE-NAME $SCRIPT_NAME ``` :::note If you remove the Worker consumer with `wrangler` but do not delete the `[[queues.consumer]]` configuration from your [Wrangler configuration file](/workers/wrangler/configuration/), subsequent deployments of your Worker will fail when they attempt to add a conflicting consumer configuration. Ensure you remove the consumer configuration first. ::: ## 2. Consumer authentication HTTP Pull consumers require an [API token](/fundamentals/api/get-started/create-token/) with the `com.cloudflare.api.account.queues_read` and `com.cloudflare.api.account.queues_write` permissions. Both read _and_ write are required as a pull-based consumer needs to write to the queue state to acknowledge the messages it receives. Consuming messages mutates the queue. API tokens are presented as Bearer tokens in the `Authorization` header of a HTTP request in the format `Authorization: Bearer $YOUR_TOKEN_HERE`. The following example shows how to pass an API token using the `curl` HTTP client: ```bash curl "https://api.cloudflare.com/client/v4/accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull" \ --header "Authorization: Bearer ${QUEUES_TOKEN}" \ --header "Content-Type: application/json" \ --data '{ "visibility_timeout": 10000, "batch_size": 2 }' ``` You may authenticate and run multiple concurrent pull-based consumers against a single queue, noting that all consumers will share the same [rate limit](/queues/platform/limits/) against the Cloudflare API. ### Create API tokens To create an API token: 1. Go to the API tokens page of the [Cloudflare dashboard](https://dash.cloudflare.com/profile/api-tokens/). 2. Select **Create Token**. 3. Scroll to the bottom of the page and select **Create Custom Token**. 4. Give the token a name. For example, `queue-pull-token`. 5. Under the **Permissions** section, choose **Account** and then **Queues**. Ensure you have selected **Edit** (read+write). 6. (Optional) Select **All accounts** (default) or a specific account to scope the token to. 7. Select **Continue to summary** and then **Create token**. You will need to note the token down: it will only be displayed once. ## 3. Pull messages To pull a message, make a HTTP POST request to the [Queues REST API](/api/resources/queues/subresources/messages/methods/pull/) with a JSON-encoded body that optionally specifies a `visibility_timeout` and a `batch_size`, or an empty JSON object (`{}`): ```ts // POST /accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull with the timeout & batch size let resp = await fetch( `https://api.cloudflare.com/client/v4/accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull`, { method: "POST", headers: { "content-type": "application/json", authorization: `Bearer ${QUEUES_API_TOKEN}`, }, // Optional - you can provide an empty object '{}' and the defaults will apply. body: JSON.stringify({ visibility_timeout_ms: 6000, batch_size: 50 }), }, ); ``` This will return an array of messages (up to the specified `batch_size`) in the below format: ```json { "success": true, "errors": [], "messages": [], "result": { "message_backlog_count": 10, "messages": [ { "body": "hello", "id": "1ad27d24c83de78953da635dc2ea208f", "timestamp_ms": 1689615013586, "attempts": 2, "metadata":{ "CF-sourceMessageSource":"dash", "CF-Content-Type":"json" }, "lease_id": "eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0..NXmbr8h6tnKLsxJ_AuexHQ.cDt8oBb_XTSoKUkVKRD_Jshz3PFXGIyu7H1psTO5UwI.smxSvQ8Ue3-ymfkV6cHp5Va7cyUFPIHuxFJA07i17sc" }, { "body": "world", "id": "95494c37bb89ba8987af80b5966b71a7", "timestamp_ms": 1689615013586, "attempts": 2, "metadata":{ "CF-sourceMessageSource":"dash", "CF-Content-Type":"json" }, "lease_id": "eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0..QXPgHfzETsxYQ1Vd-H0hNA.mFALS3lyouNtgJmGSkTzEo_imlur95EkSiH7fIRIn2U.PlwBk14CY_EWtzYB-_5CR1k30bGuPFPUx1Nk5WIipFU" } ] } } ``` Pull consumers follow a "short polling" approach: if there are messages available to be delivered, Queues will return a response immediately with messages up to the configured `batch_size`. If there are no messages to deliver, Queues will return an empty response. Queues does not hold an open connection (often referred to as "long polling") if there are no messages to deliver. :::note The [`pull`](/api/resources/queues/subresources/messages/methods/pull/) and [`ack`](/api/resources/queues/subresources/messages/methods/ack/) endpoints use the new `/queues/queue_id/messages/{action}` API format, as defined in the Queues API documentation. The undocumented `/queues/queue_id/{action}` endpoints are not supported and will be deprecated as of June 30th, 2024. ::: Each message object has five fields: 1. `body` - this may be base64 encoded based on the [content-type the message was published as](#content-types). 2. `id` - a unique, read-only ephemeral identifier for the message. 3. `timestamp_ms` - when the message was published to the queue in milliseconds since the [Unix epoch](https://en.wikipedia.org/wiki/Unix_time). This allows you to determine how old a message is by subtracting it from the current timestamp. 4. `attempts` - how many times the message has been attempted to be delivered in full. When this reaches the value of `max_retries`, the message will not be re-delivered and will be deleted from the queue permanently. 5. `lease_id` - the encoded lease ID of the message. The `lease_id` is used to explicitly acknowledge or retry the message. The `lease_id` allows your pull consumer to explicitly acknowledge some, none or all messages in the batch or mark them for retry. If messages are not acknowledged or marked for retry by the consumer, then they will be marked for re-delivery once the `visibility_timeout` is reached. A `lease_id` is no longer valid once this timeout has been reached. You can configure both `batch_size` and `visibility_timeout` when pulling from a queue: - `batch_size` (defaults to 5; max 100) - how many messages are returned to the consumer in each pull. - `visibility_timeout` (defaults to 30 second; max 12 hours) - defines how long the consumer has to explicitly acknowledge messages delivered in the batch based on their `lease_id`. Once this timeout expires, messages are assumed unacknowledged and queued for re-delivery again. ### Concurrent consumers You may have multiple HTTP clients pulling from the same queue concurrently: each client will receive a unique batch of messages and retain the "lease" on those messages up until the `visibility_timeout` expires, or until those messages are marked for retry. Messages marked for retry will be put back into the queue and can be delivered to any consumer. Messages are _not_ tied to a specific consumer, as consumers do not have an identity and to avoid a slow or stuck consumer from holding up processing of messages in a queue. Multiple consumers can be useful in cases where you have multiple upstream resources (for example, GPU infrastructure), where you want to autoscale based on the [backlog](/queues/observability/metrics/) of a queue, and/or cost. ## 4. Acknowledge messages Messages pulled by a consumer need to be either acknowledged or marked for retry. To acknowledge and/or mark messages to be retried, make a HTTP `POST` request to `/ack` endpoint of your queue per the [Queues REST API](/api/resources/queues/subresources/messages/methods/ack/) by providing an array of `lease_id` objects to acknowledge and/or retry: ```ts // POST /accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/ack with the lease_ids let resp = await fetch( `https://api.cloudflare.com/client/v4/accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/ack`, { method: "POST", headers: { "content-type": "application/json", authorization: `Bearer ${QUEUES_API_TOKEN}`, }, // If you have no messages to retry, you can specify an empty array - retries: [] body: JSON.stringify({ acks: [ { lease_id: "lease_id1" }, { lease_id: "lease_id2" }, { lease_id: "etc" }, ], retries: [{ lease_id: "lease_id4" }], }), }, ); ``` You may optionally specify the number of seconds to delay a message for when marking it for retry by providing a `{ lease_id: string, delay_seconds: number }` object in the `retries` array: ```json { "acks": [ { "lease_id": "lease_id1" }, { "lease_id": "lease_id2" }, { "lease_id": "lease_id3" } ], "retries": [{ "lease_id": "lease_id4", "delay_seconds": 600 }] } ``` Additionally: - You should provide every `lease_id` in the request to the `/ack` endpoint if you are processing those messages in your consumer. If you do not acknowledge a message, it will be marked for re-delivery (put back in the queue). - You can optionally mark messages to be retried: for example, if there is an error processing the message or you have upstream resource pressure. Explicitly marking a message for retry will place it back into the queue immediately, instead of waiting for a (potentially long) `visibility_timeout` to be reached. - You can make multiple calls to the `/ack` endpoint as you make progress through a batch of messages, but we recommend grouping acknowledgements to avoid hitting [API rate limits](/queues/platform/limits/). Queues aims to be permissive when it comes to lease IDs: if a consumer acknowledges a message by its lease ID _after_ the visibility timeout is reached, Queues will still accept that acknowledgment. If the message was delivered to another consumer during the intervening period, it will also be able to acknowledge the message without an error. {/* */} ## Content types :::caution When attaching a pull-based consumer to a queue, you should ensure that messages are sent with only a `text`, `bytes` or `json` [content type](/queues/configuration/javascript-apis/#queuescontenttype). The default content type is `json`. Pull-based consumers cannot decode the `v8` content type as it is specific to the Workers runtime. ::: When publishing to a queue that has an external consumer, you should be aware that certain content types may be encoded in a way that allows them to be safely serialized within a JSON object. For both the `json` and `bytes` content types, this means that they will be base64-encoded ([RFC 4648](https://datatracker.ietf.org/doc/html/rfc4648)). The `text` type will be sent as a plain UTF-8 encoded string. Your consumer will need to decode the `json` and `bytes` types before operating on the data. ## Next steps - Review the [REST API documentation](/api/resources/queues/subresources/consumers/methods/create/) and schema for Queues. - Learn more about [how to make API calls](/fundamentals/api/how-to/make-api-calls/) to the Cloudflare API. - Understand [what limit apply](/queues/platform/limits/) when consuming and writing to a queue. --- # Examples URL: https://developers.cloudflare.com/queues/examples/ import { ListExamples } from "~/components"; --- # List and acknowledge messages from the dashboard URL: https://developers.cloudflare.com/queues/examples/list-messages-from-dash/ ## List messages from the dashboard Listing messages from the dashboard allows you to debug Queues or queue producers without a consumer Worker. Fetching a batch of messages to preview will not acknowledge or retry the message or affect its position in the queue. The queue can still be consumed normally by a consumer Worker. To list messages in the dashboard: 1. Log in to the [Cloudflare dashboard](https://dash.cloudflare.com) and select your account. 2. Select **Workers & Pages** > **Queues**. 3. Select the queue to preview messages from. 4. Select the **Messages** tab. 5. Select **Queued Messages**. 6. Select a maximum batch size of messages to fetch. The size can be a number from 1 to 100. If a consumer Worker is configured, this defaults to your consumer Worker's maximum batch size. ![A form to configure how many messages are listed at a time, with a number input showing '10'](~/assets/images/queues/examples/fetch-message-batch-size.png) 7. Select **List messages**. 8. When the list of messages loads, select the blue arrow to the right of each row to expand the message preview. ![A table showing two previewed messages, one text and one JSON, both with some placeholder text](~/assets/images/queues/examples/fetched-messages.png) This will preview a batch of messages currently in the Queue. ## Acknowledge messages from the dashboard Acknowledging messages from the [Cloudflare dashboard](https://dash.cloudflare.com) will permanently remove them from the queue, with equivalent behavior as `ack()` in a Worker. 1. Select the checkbox to the left of each row to select the message for acknowledgement, or select the checkbox in the table header to select all messages. 2. Select **Acknowledge messages**. 3. Confirm you want to acknowledge the messages, and select **Acknowledge messages**. This will remove the selected messages from the queue and prevent consumers from processing them further. Refer to the [Get Started guide](/queues/get-started/) to learn how to process and acknowledge messages from a queue in a Worker. --- # Publish to a Queue via HTTP URL: https://developers.cloudflare.com/queues/examples/publish-to-a-queue-over-http/ import { WranglerConfig } from "~/components"; The following example shows you how to publish messages to a queue from any HTTP client, using a shared secret to securely authenticate the client. This allows you to write to a Queue from any service or programming language that support HTTP, including Go, Rust, Python or even a Bash script. ### Prerequisites - A [queue created](/queues/get-started/#3-create-a-queue) via the [Cloudflare dashboard](https://dash.cloudflare.com) or the [wrangler CLI](/workers/wrangler/install-and-update/). - A [configured **producer** binding](/queues/configuration/configure-queues/#producer-worker-configuration) in the Cloudflare dashboard or Wrangler file. Configure your Wrangler file as follows: ```toml name = "my-worker" [[queues.producers]] queue = "my-queue" binding = "YOUR_QUEUE" ``` ### 1. Create a shared secret Before you deploy the Worker, you need to create a [secret](/workers/configuration/secrets/) that you can use as a shared secret. A shared secret is a secret that both the client uses to authenticate and the server (your Worker) matches against for authentication. :::caution Do not commit secrets to source control. You should use [`wrangler secret`](/workers/configuration/secrets/) to store API keys and authentication tokens securely. ::: To generate a cryptographically secure secret, you can use the `openssl` command-line tool and `wrangler secret` to create a hex-encoded string that can be used as the shared secret: ```sh openssl rand -hex 32 # This will output a 65 character long hex string ``` Copy this string and paste it into the prompt for `wrangler secret`: ```sh npx wrangler secret put QUEUE_AUTH_SECRET ``` ```sh output ✨ Success! Uploaded secret QUEUE_AUTH_SECRET ``` This secret will also need to be used by the client application writing to the queue: ensure you store it securely. ### 2. Create the Worker The following Worker script: 1. Authenticates the client using a shared secret. 2. Validates that the payload uses JSON. 3. Publishes the payload to the queue. ```ts interface Env { YOUR_QUEUE: Queue; QUEUE_AUTH_SECRET: string; } export default { async fetch(req, env): Promise { // Authenticate that the client has the correct auth key if (env.QUEUE_AUTH_SECRET == "") { return Response.json( { err: "application not configured" }, { status: 500 }, ); } // Return a HTTP 403 (Forbidden) if the auth key is invalid/incorrect/misconfigured let authToken = req.headers.get("Authorization") || ""; let encoder = new TextEncoder(); // Securely compare our secret with the auth token provided by the client try { if ( !crypto.subtle.timingSafeEqual( encoder.encode(env.QUEUE_AUTH_SECRET), encoder.encode(authToken), ) ) { return Response.json( { err: "invalid auth token provided" }, { status: 403 }, ); } } catch (e) { return Response.json( { err: "invalid auth token provided" }, { status: 403 }, ); } // Optional: Validate the payload is JSON // In a production application, we may more robustly validate the payload // against a schema using a library like 'zod' let messages; try { messages = await req.json(); } catch (e) { // Return a HTTP 400 (Bad Request) if the payload isn't JSON return Response.json({ err: "payload not valid JSON" }, { status: 500 }); } // Publish to the Queue try { await env.YOUR_QUEUE.send(messages); } catch (e: any) { console.log(`failed to send to the queue: ${e}`); // Return a HTTP 500 (Internal Error) if our publish operation fails return Response.json({ error: e.message }, { status: 500 }); } // Return a HTTP 200 if the send succeeded! return Response.json({ success: true }); }, } satisfies ExportedHandler; ``` To deploy this Worker: ```sh npx wrangler deploy ``` ### 3. Send a test message To make sure you successfully authenticate and write a message to your queue, use `curl` on the command line: ```sh # Make sure to replace the placeholder with your shared secret curl -H "Authorization: pasteyourkeyhere" "https://YOUR_WORKER.YOUR_ACCOUNT.workers.dev" --data '{"messages": [{"msg":"hello world"}]}' ``` ```sh output {"success":true} ``` This will issue a HTTP POST request, and if successful, return a HTTP 200 with a `success: true` response body. - If you receive a HTTP 403, this is because the `Authorization` header is invalid, or you did not configure a secret. - If you receive a HTTP 500, this is either because you did not correctly create a shared secret to your Worker, or you attempted to send an invalid message to your queue. You can use [`wrangler tail`](/workers/observability/logs/real-time-logs/) to debug the output of `console.log`. --- # Use Queues to store data in R2 URL: https://developers.cloudflare.com/queues/examples/send-errors-to-r2/ import { WranglerConfig } from "~/components"; The following Worker will catch JavaScript errors and send them to a queue. The same Worker will receive those errors in batches and store them to a log file in an R2 bucket. ```toml name = "my-worker" [[queues.producers]] queue = "my-queue" binding = "ERROR_QUEUE" [[queues.consumers]] queue = "my-queue" max_batch_size = 100 max_batch_timeout = 30 [[r2_buckets]] bucket_name = "my-bucket" binding = "ERROR_BUCKET" ``` ```ts type Environment = { readonly ERROR_QUEUE: Queue; readonly ERROR_BUCKET: R2Bucket; }; export default { async fetch(req, env): Promise { try { return doRequest(req); } catch (error) { await env.ERROR_QUEUE.send(error); return new Response(error.message, { status: 500 }); } }, async queue(batch, env): Promise { let file = ''; for (const message of batch.messages) { const error = message.body; file += error.stack || error.message || String(error); file += '\r\n'; } await env.ERROR_BUCKET.put(`errors/${Date.now()}.log`, file); }, } satisfies ExportedHandler; function doRequest(request: Request): Promise { if (Math.random() > 0.5) { return new Response('Success!'); } throw new Error('Failed!'); } ``` --- # Send messages from the dashboard URL: https://developers.cloudflare.com/queues/examples/send-messages-from-dash/ Sending messages from the dashboard allows you to debug Queues or queue consumers without a producer Worker. To send messages from the dashboard: 1. Log in to the [Cloudflare dashboard](https://dash.cloudflare.com) and select your account. 2. Select **Workers & Pages** > **Queues**. 3. Select the queue to send a message to. 4. Select the **Messages** tab. 5. Select **Send message**. 6. Enter your message. You can choose your message content type by selecting the **Text** or **JSON** tabs. Alternatively, select the **Upload a file** button or drag a file over the textbox to upload a file as a message. 7. Select **Send message**. Your message will be sent to the queue. Refer to the [Get Started guide](/queues/get-started/) to learn how to send messages to a queue from a Worker. --- # Use Queues from Durable Objects URL: https://developers.cloudflare.com/queues/examples/use-queues-with-durable-objects/ import { WranglerConfig } from "~/components"; The following example shows you how to write a Worker script to publish to [Cloudflare Queues](/queues/) from within a [Durable Object](/durable-objects/). Prerequisites: - A [queue created](/queues/get-started/#3-create-a-queue) via the Cloudflare dashboard or the [wrangler CLI](/workers/wrangler/install-and-update/). - A [configured **producer** binding](/queues/configuration/configure-queues/#producer-worker-configuration) in the Cloudflare dashboard or Wrangler file. - A [Durable Object namespace binding](/workers/wrangler/configuration/#durable-objects). Configure your Wrangler file as follows: ```toml name = "my-worker" [[queues.producers]] queue = "my-queue" binding = "YOUR_QUEUE" [durable_objects] bindings = [ { name = "YOUR_DO_CLASS", class_name = "YourDurableObject" } ] [[migrations]] tag = "v1" new_classes = ["YourDurableObject"] ``` The following Worker script: 1. Creates a Durable Object stub, or retrieves an existing one based on a userId. 2. Passes request data to the Durable Object. 3. Publishes to a queue from within the Durable Object. The `constructor()` in the Durable Object makes your `Environment` available (in scope) on `this.env` to the [`fetch()` handler](/durable-objects/best-practices/create-durable-object-stubs-and-send-requests/) in the Durable Object. ```ts interface Env { YOUR_QUEUE: Queue; YOUR_DO_CLASS: DurableObjectNamespace; } export default { async fetch(req, env): Promise { // Assume each Durable Object is mapped to a userId in a query parameter // In a production application, this will be a userId defined by your application // that you validate (and/or authenticate) first. let url = new URL(req.url) let userIdParam = url.searchParams.get("userId") if (userIdParam) { // Create (or get) a Durable Object based on that userId. let durableObjectId = env.YOUR_DO_CLASS.idFromName(userIdParam); // Get a "stub" that allows you to call that Durable Object let durableObjectStub = env.YOUR_DO_CLASS.get(durableObjectId); // Pass the request to that Durable Object and await the response // This invokes the constructor once on your Durable Object class (defined further down) // on the first initialization, and the fetch method on each request. // We pass the original Request to the Durable Object's fetch method let response = await durableObjectStub.fetch(req); // This would return "wrote to queue", but you could return any response. return response; } return new Response("userId must be provided", { status: 400 }); }, } satisfies ExportedHandler; export class YourDurableObject implements DurableObject { constructor(private state: DurableObjectState, private env: Env) {} async fetch(req: Request): Promise { // Error handling elided for brevity. // Publish to your queue await this.env.YOUR_QUEUE.send({ id: this.state.id.toString() // Write the ID of the Durable Object to your queue // Write any other properties to your queue }); return new Response("wrote to queue") } ``` --- # Metrics URL: https://developers.cloudflare.com/queues/observability/metrics/ You can view the metrics for a Queue on your account via the [Cloudflare dashboard](https://dash.cloudflare.com). Navigate to **Storage & Databases** > **Queues** > **your Queue** and under the **Metrics** tab you'll be able to view line charts describing the number of messages processed by final outcome, the number of messages in the backlog, and other important indicators. The metrics displayed in the Cloudflare dashboard charts are all pulled from Cloudflare's GraphQL Analytics API. You can access the metrics programmatically. The Queues metrics are split across three different nodes under `viewer` > `accounts`. Refer to [Explore the GraphQL schema](/analytics/graphql-api/getting-started/explore-graphql-schema/) to learn how to navigate a GraphQL schema and discover which data are available. To learn more about the GraphQL Analytics API, refer to [GraphQL Analytics API](/analytics/graphql-api/). ## Write GraphQL queries Examples of how to explore your Queues metrics. ### Get average Queue backlog over time period ```graphql query QueueBacklog($accountTag: string!, $queueId: string!, $datetimeStart: Time!, $datetimeEnd: Time!) { viewer { accounts(filter: {accountTag: $accountTag}) { queueBacklogAdaptiveGroups( limit: 10000 filter: { queueId: $queueId datetime_geq: $datetimeStart datetime_leq: $datetimeEnd } ) { avg { messages bytes } } } } } ``` ### Get average consumer concurrency by hour ```graphql query QueueConcurrencyByHour($accountTag: string!, $queueId: string!, $datetimeStart: Time!, $datetimeEnd: Time!) { viewer { accounts(filter: {accountTag: $accountTag}) { queueConsumerMetricsAdaptiveGroups( limit: 10000 filter: { queueId: $queueId datetime_geq: $datetimeStart datetime_leq: $datetimeEnd } orderBy: [datetimeHour_DESC] ) { avg { concurrency } dimensions { datetimeHour } } } } } ``` ### Get message operations by minute ```graphql query QueueMessageOperationsByMinute($accountTag: string!, $queueId: string!, $datetimeStart: Date!, $datetimeEnd: Date!) { viewer { accounts(filter: {accountTag: $accountTag}) { queueMessageOperationsAdaptiveGroups( limit: 10000 filter: { queueId: $queueId datetime_geq: $datetimeStart datetime_leq: $datetimeEnd } orderBy: [datetimeMinute_DESC] ) { count sum { bytes } dimensions { datetimeMinute } } } } } ``` --- # Observability URL: https://developers.cloudflare.com/queues/observability/ import { DirectoryListing } from "~/components" --- # Audit Logs URL: https://developers.cloudflare.com/queues/platform/audit-logs/ [Audit logs](/fundamentals/setup/account/account-security/review-audit-logs/) provide a comprehensive summary of changes made within your Cloudflare account, including those made to Queues. This functionality is always enabled. ## Viewing audit logs To view audit logs for your Queue: 1. Log in to the [Cloudflare dashboard](https://dash.cloudflare.com/?account=audit-log) and select your account. 2. Go to **Manage Account** > **Audit Log**. For more information on how to access and use audit logs, refer to [Review audit logs](/fundamentals/setup/account/account-security/review-audit-logs/). ## Logged operations The following configuration actions are logged:
Operation Description
CreateQueue Creation of a new queue.
DeleteQueue Deletion of an existing queue.
UpdateQueue Updating the configuration of a queue.
AttachConsumer Attaching a consumer, including HTTP pull consumers, to the Queue.
RemoveConsumer Removing a consumer, including HTTP pull consumers, from the Queue.
UpdateConsumerSettings Changing Queues consumer settings.
--- # Changelog URL: https://developers.cloudflare.com/queues/platform/changelog/ import { ProductReleaseNotes } from "~/components"; {/* */} --- # Platform URL: https://developers.cloudflare.com/queues/platform/ import { DirectoryListing } from "~/components" --- # Limits URL: https://developers.cloudflare.com/queues/platform/limits/ import { Render } from "~/components" | Feature | Limit | | --------------------------------------------- | ------------------------------------------------------------- | | Queues | 10,000 per account | | Message size | 128 KB 1 | | Message retries | 100 | | Maximum consumer batch size | 100 messages | | Maximum messages per `sendBatch` call | 100 (or 256KB in total) | | Maximum Batch wait time | 60 seconds | | Per-queue message throughput | 5,000 messages per second 2 | | Message retention period 3 | 14 days | | Per-queue backlog size 4 | 25GB | | Concurrent consumer invocations | 250 push-based only | | Consumer duration (wall clock time) | 15 minutes 5 | | Consumer CPU time | 30 seconds | | `visibilityTimeout` (pull-based queues) | 12 hours | | `delaySeconds` (when sending or retrying) | 12 hours | | Requests to the Queues API (incl. pulls/acks) | [1200 requests / 5 mins](/fundamentals/api/reference/limits/) | 1 1 KB is measured as 1000 bytes. Messages can include up to \~100 bytes of internal metadata that counts towards total message limits. 2 Exceeding the maximum message throughput will cause the `send()` and `sendBatch()` methods to throw an exception with a `Too Many Requests` error until your producer falls below the limit. 3 Messages in a queue that reach the maximum message retention are deleted from the queue. Queues does not delete messages in the same queue that have not reached this limit. 4 Individual queues that reach this limit will receive a `Storage Limit Exceeded` error when calling `send()` or `sendBatch()` on the queue. 5 Refer to [Workers limits](/workers/platform/limits/#cpu-time). --- # Pricing URL: https://developers.cloudflare.com/queues/platform/pricing/ import { Render } from "~/components" ## Examples If an application writes, reads and deletes (consumes) one million messages a day (in a 30 day month), and each message is less than 64 KB in size, the estimated bill for the month would be: | | Total Usage | Free Usage | Billed Usage | Price | | ------------------- | --------------------- | ---------- | ------------ | ---------- | | Standard operations | 3 \* 30 \* 1,000,000 | 1,000,000 | 89,000,000 | $35.60 | | | (write, read, delete) | | | | | **TOTAL** | | | | **$35.60** | An application that writes, reads and deletes (consumes) 100 million \~127 KB messages (each message counts as two 64 KB chunks) per month would have an estimated bill resembling the following: | | Total Usage | Free Usage | Billed Usage | Price | | ------------------- | ---------------------------- | ---------- | ------------ | ----------- | | Standard operations | 2 \* 3 \* 100 \* 1,000,000 | 1,000,000 | 599,000,000 | $239.60 | | | (2x ops for > 64KB messages) | | | | | **TOTAL** | | | | **$239.60** | --- # Delivery guarantees URL: https://developers.cloudflare.com/queues/reference/delivery-guarantees/ Delivery guarantees define how strongly a messaging system enforces the delivery of messages it processes. As you make stronger guarantees about message delivery, the system needs to perform more checks and acknowledgments to ensure that messages are delivered, or maintain state to ensure a message is only delivered the specified number of times. This increases the latency of the system and reduces the overall throughput of the system. Each message may require an additional internal acknowledgements, and an equivalent number of additional roundtrips, before it can be considered delivered. * **Queues provides *at least once* delivery by default** in order to optimize for reliability. * This means that messages are guaranteed to be delivered at least once, and in rare occasions, may be delivered more than once. * For the majority of applications, this is the right balance between not losing any messages and minimizing end-to-end latency, as exactly once delivery incurs additional overheads in any messaging system. In cases where processing the same message more than once would introduce unintended behaviour, generating a unique ID when writing the message to the queue and using that as the primary key on database inserts and/or as an idempotency key to de-duplicate the message after processing. For example, using this idempotency key as the ID in an upstream email API or payment API will allow those services to reject the duplicate on your behalf, without you having to carry additional state in your application. --- # How Queues Works URL: https://developers.cloudflare.com/queues/reference/how-queues-works/ import { WranglerConfig } from "~/components"; Cloudflare Queues is a flexible messaging queue that allows you to queue messages for asynchronous processing. Message queues are great at decoupling components of applications, like the checkout and order fulfillment services for an e-commerce site. Decoupled services are easier to reason about, deploy, and implement, allowing you to ship features that delight your customers without worrying about synchronizing complex deployments. Queues also allow you to batch and buffer calls to downstream services and APIs. There are four major concepts to understand with Queues: 1. [Queues](#what-is-a-queue) 2. [Producers](#producers) 3. [Consumers](#consumers) 4. [Messages](#messages) ## What is a queue A queue is a buffer or list that automatically scales as messages are written to it, and allows a consumer Worker to pull messages from that same queue. Queues are designed to be reliable, and messages written to a queue should never be lost once the write succeeds. Similarly, messages are not deleted from a queue until the [consumer](#consumers) has successfully consumed the message. Queues does not guarantee that messages will be delivered to a consumer in the same order in which they are published. Developers can create multiple queues. Creating multiple queues can be useful to: * Separate different use-cases and processing requirements: for example, a logging queue vs. a password reset queue. * Horizontally scale your overall throughput (messages per second) by using multiple queues to scale out. * Configure different batching strategies for each consumer connected to a queue. For most applications, a single producer Worker per queue, with a single consumer Worker consuming messages from that queue allows you to logically separate the processing for each of your queues. ## Producers A producer is the term for a client that is publishing or producing messages on to a queue. A producer is configured by [binding](/workers/runtime-apis/bindings/) a queue to a Worker and writing messages to the queue by calling that binding. For example, if we bound a queue named `my-first-queue` to a binding of `MY_FIRST_QUEUE`, messages can be written to the queue by calling `send()` on the binding: ```ts type Environment = { readonly MY_FIRST_QUEUE: Queue; }; export default { async fetch(req, env, context): Promise { let message = { url: req.url, method: req.method, headers: Object.fromEntries(req.headers), }; await env.MY_FIRST_QUEUE.send(message); // This will throw an exception if the send fails for any reason }, } satisfies ExportedHandler; ``` :::note You can also use [`context.waitUntil()`](/workers/runtime-apis/context/#waituntil) to send the message without blocking the response. Note that because `waitUntil()` is non-blocking, any errors raised from the `send()` or `sendBatch()` methods on a queue will be implicitly ignored. ::: A queue can have multiple producer Workers. For example, you may have multiple producer Workers writing events or logs to a shared queue based on incoming HTTP requests from users. There is no limit to the total number of producer Workers that can write to a single queue. Additionally, multiple queues can be bound to a single Worker. That single Worker can decide which queue to write to (or write to multiple) based on any logic you define in your code. ### Content types Messages published to a queue can be published in different formats, depending on what interoperability is needed with your consumer. The default content type is `json`, which means that any object that can be passed to `JSON.stringify()` will be accepted. To explicitly set the content type or specify an alternative content type, pass the `contentType` option to the `send()` method of your queue: ```ts type Environment = { readonly MY_FIRST_QUEUE: Queue; }; export default { async fetch(req, env): Promise { let message = { url: req.url, method: req.method, headers: Object.fromEntries(req.headers), }; try { await env.MY_FIRST_QUEUE.send(message, { contentType: "json" }); // "json" is the default } catch (e) { // Catch cases where send fails, including due to a mismatched content type console.log(e) return Response.json({"msg": e}, { status: 500 }) } }, } satisfies ExportedHandler; ``` To only accept simple strings when writing to a queue, set `{ contentType: "text" }` instead: ```ts try { // This will throw an exception (error) if you write to pass a non-string to the queue, such as a // native JavaScript object or ArrayBuffer. await env.MY_FIRST_QUEUE.send("hello there", { contentType: "text" }); // explicitly set 'text' } catch (e) { console.log(e) return Response.json({"msg": e}, { status: 500 }) ``` The [`QueuesContentType`](/queues/configuration/javascript-apis/#queuescontenttype) API documentation describes how each format is serialized to a queue. ## Consumers Queues supports two types of consumer: 1. A [consumer Worker](/queues/configuration/configure-queues/), which is push-based: the Worker is invoked when the queue has messages to deliver. 2. A [HTTP pull consumer](/queues/configuration/pull-consumers/), which is pull-based: the consumer calls the queue endpoint over HTTP to receive and then acknowledge messages. A queue can only have one type of consumer configured. ### Create a consumer Worker A consumer is the term for a client that is subscribing to or *consuming* messages from a queue. In its most basic form, a consumer is defined by creating a `queue` handler in a Worker: ```ts export default { async queue(batch: MessageBatch, env: Environment): Promise { // Do something with messages in the batch // i.e. write to R2 storage, D1 database, or POST to an external API // You can also iterate over each message in the batch by looping over batch.messages }, }; ``` You then connect that consumer to a queue with `wrangler queues consumer ` or by defining a `[[queues.consumers]]` configuration in your [Wrangler configuration file](/workers/wrangler/configuration/) manually: ```toml [[queues.consumers]] queue = "" max_batch_size = 100 # optional max_batch_timeout = 30 # optional ``` Importantly, each queue can only have one active consumer. This allows Cloudflare Queues to achieve at least once delivery and minimize the risk of duplicate messages beyond that. :::note[Best practice] Configure a single consumer per queue. This both logically separates your queues, and ensures that errors (failures) in processing messages from one queue do not impact your other queues. ::: Notably, you can use the same consumer with multiple queues. The queue handler that defines your consumer Worker will be invoked by the queues it is connected to. * The `MessageBatch` that is passed to your `queue` handler includes a `queue` property with the name of the queue the batch was read from. * This can reduce the amount of code you need to write, and allow you to process messages based on the name of your queues. For example, a consumer configured to consume messages from multiple queues would resemble the following: ```ts export default { async queue(batch: MessageBatch, env: Environment): Promise { // MessageBatch has a `queue` property we can switch on switch (batch.queue) { case 'log-queue': // Write the batch to R2 break; case 'debug-queue': // Write the message to the console or to another queue break; case 'email-reset': // Trigger a password reset email via an external API break; default: // Handle messages we haven't mentioned explicitly (write a log, push to a DLQ) } }, }; ``` ### Remove a consumer To remove a queue from your project, run `wrangler queues consumer remove ` and then remove the desired queue below the `[[queues.consumers]]` in Wrangler file. ### Pull consumers A queue can have a HTTP-based consumer that pulls from the queue, instead of messages being pushed to a Worker. This consumer can be any HTTP-speaking service that can communicate over the Internet. Review the [pull consumer guide](/queues/configuration/pull-consumers/) to learn how to configure a pull-based consumer for a queue. ## Messages A message is the object you are producing to and consuming from a queue. Any JSON serializable object can be published to a queue. For most developers, this means either simple strings or JSON objects. You can explicitly [set the content type](#content-types) when sending a message. Messages themselves can be [batched when delivered to a consumer](/queues/configuration/batching-retries/). By default, messages within a batch are treated as all or nothing when determining retries. If the last message in a batch fails to be processed, the entire batch will be retried. You can also choose to [explicitly acknowledge](/queues/configuration/batching-retries/) messages as they are successfully processed, and/or mark individual messages to be retried. --- # Reference URL: https://developers.cloudflare.com/queues/reference/ import { DirectoryListing } from "~/components" --- # Tutorials URL: https://developers.cloudflare.com/queues/tutorials/ import { ListTutorials } from "~/components" --- # Handle rate limits of external APIs URL: https://developers.cloudflare.com/queues/tutorials/handle-rate-limits/ import { Render, PackageManagers, WranglerConfig } from "~/components"; This tutorial explains how to use Queues to handle rate limits of external APIs by building an application that sends email notifications using [Resend](https://www.resend.com/). However, you can use this pattern to handle rate limits of any external API. Resend is a service that allows you to send emails from your application via an API. Resend has a default [rate limit](https://resend.com/docs/api-reference/introduction#rate-limit) of two requests per second. You will use Queues to handle the rate limit of Resend. ## Prerequisites 4. Sign up for [Resend](https://resend.com/) and generate an API key by following the guide on the [Resend documentation](https://resend.com/docs/dashboard/api-keys/introduction). 5. Additionally, you will need access to Cloudflare Queues. ## 1. Create a new Workers application To get started, create a Worker application using the [`create-cloudflare` CLI](https://github.com/cloudflare/workers-sdk/tree/main/packages/create-cloudflare). Open a terminal window and run the following command: Then, go to your newly created directory: ```sh frame="none" cd resend-rate-limit-queue ``` ## 2. Set up a Queue You need to create a Queue and a binding to your Worker. Run the following command to create a Queue named `rate-limit-queue`: ```sh title="Create a Queue" npx wrangler queues create rate-limit-queue ``` ```sh output Creating queue rate-limit-queue. Created queue rate-limit-queue. ``` ### Add Queue bindings to your [Wrangler configuration file](/workers/wrangler/configuration/) In your Wrangler file, add the following: ```toml [[queues.producers]] binding = "EMAIL_QUEUE" queue = "rate-limit-queue" [[queues.consumers]] queue = "rate-limit-queue" max_batch_size = 2 max_batch_timeout = 10 max_retries = 3 ``` It is important to include the `max_batch_size` of two to the consumer queue is important because the Resend API has a default rate limit of two requests per second. This batch size allows the queue to process the message in the batch size of two. If the batch size is less than two, the queue will wait for 10 seconds to collect the next message. If no more messages are available, the queue will process the message in the batch. For more information, refer to the [Batching, Retries and Delays documentation](/queues/configuration/batching-retries) Your final Wrangler file should look similar to the example below. ```toml title="wrangler.toml" #:schema node_modules/wrangler/config-schema.json name = "resend-rate-limit-queue" main = "src/index.ts" compatibility_date = "2024-09-09" compatibility_flags = ["nodejs_compat"] [[queues.producers]] binding = "EMAIL_QUEUE" queue = "rate-limit-queue" [[queues.consumers]] queue = "rate-limit-queue" max_batch_size = 2 max_batch_timeout = 10 max_retries = 3 ``` ## 3. Add bindings to environment Add the bindings to the environment interface in `worker-configuration.d.ts`, so TypeScript correctly types the bindings. Type the queue as `Queue`. Refer to the following step for instructions on how to change this type. ```ts title="worker-configuration.d.ts" interface Env { EMAIL_QUEUE: Queue; } ``` ## 4. Send message to the queue The application will send a message to the queue when the Worker receives a request. For simplicity, you will send the email address as a message to the queue. A new message will be sent to the queue with a delay of one second. ```ts title="src/index.ts" export default { async fetch(req: Request, env: Env): Promise { try { await env.EMAIL_QUEUE.send( { email: await req.text() }, { delaySeconds: 1 }, ); return new Response("Success!"); } catch (e) { return new Response("Error!", { status: 500 }); } }, }; ``` This will accept requests to any subpath and forwards the request's body. It expects that the request body to contain only an email. In production, you should check that the request was a `POST` request. You should also avoid sending such sensitive information (email) directly to the queue. Instead, you can send a message to the queue that contains a unique identifier for the user. Then, your consumer queue can use the unique identifier to look up the email address in a database and use that to send the email. ## 5. Process the messages in the queue After the message is sent to the queue, it will be processed by the consumer Worker. The consumer Worker will process the message and send the email. Since you have not configured Resend yet, you will log the message to the console. After you configure Resend, you will use it to send the email. Add the `queue()` handler as shown below: ```ts title="src/index.ts" ins={1-3,17-28} interface Message { email: string; } export default { async fetch(req: Request, env: Env): Promise { try { await env.EMAIL_QUEUE.send( { email: await req.text() }, { delaySeconds: 1 }, ); return new Response("Success!"); } catch (e) { return new Response("Error!", { status: 500 }); } }, async queue(batch: MessageBatch, env: Env): Promise { for (const message of batch.messages) { try { console.log(message.body.email); // After configuring Resend, you can send email message.ack(); } catch (e) { console.error(e); message.retry({ delaySeconds: 5 }); } } }, }; ``` The above `queue()` handler will log the email address to the console and send the email. It will also retry the message if sending the email fails. The `delaySeconds` is set to five seconds to avoid sending the email too quickly. To test the application, run the following command: ```sh title="Start the development server" npm run dev ``` Use the following cURL command to send a request to the application: ```sh title="Test with a cURL request" curl -X POST -d "test@example.com" http://localhost:8787/ ``` ```sh output [wrangler:inf] POST / 200 OK (2ms) QueueMessage { attempts: 1, body: { email: 'test@example.com' }, timestamp: 2024-09-12T13:48:07.236Z, id: '72a25ff18dd441f5acb6086b9ce87c8c' } ``` ## 6. Set up Resend To call the Resend API, you need to configure the Resend API key. Create a `.dev.vars` file in the root of your project and add the following: ```txt title=".dev.vars" RESEND_API_KEY='your-resend-api-key' ``` Replace `your-resend-api-key` with your actual Resend API key. Next, update the `Env` interface in `worker-configuration.d.ts` to include the `RESEND_API_KEY` variable. ```ts title="worker-configuration.d.ts" ins={3} interface Env { EMAIL_QUEUE: Queue; RESEND_API_KEY: string; } ``` Lastly, install the [`resend` package](https://www.npmjs.com/package/resend) using the following command: ```sh title="Install Resend" npm install resend ``` You can now use the `RESEND_API_KEY` variable in your code. ## 7. Send email with Resend In your `src/index.ts` file, import the Resend package and update the `queue()` handler to send the email. ```ts title="src/index.ts" ins={1,21,26-40} del={24,41} import { Resend } from "resend"; interface Message { email: string; } export default { async fetch(req: Request, env: Env): Promise { try { await env.EMAIL_QUEUE.send( { email: await req.text() }, { delaySeconds: 1 }, ); return new Response("Success!"); } catch (e) { return new Response("Error!", { status: 500 }); } }, async queue(batch: MessageBatch, env: Env): Promise { // Initialize Resend const resend = new Resend(env.RESEND_API_KEY); for (const message of batch.messages) { try { console.log(message.body.email); // send email const sendEmail = await resend.emails.send({ from: "onboarding@resend.dev", to: [message.body.email], subject: "Hello World", html: "Sending an email from Worker!", }); // check if the email failed if (sendEmail.error) { console.error(sendEmail.error); message.retry({ delaySeconds: 5 }); } else { // if success, ack the message message.ack(); } message.ack(); } catch (e) { console.error(e); message.retry({ delaySeconds: 5 }); } } }, }; ``` The `queue()` handler will now send the email using the Resend API. It also checks if sending the email failed and will retry the message. The final script is included below: ```ts title="src/index.ts" import { Resend } from "resend"; interface Message { email: string; } export default { async fetch(req: Request, env: Env): Promise { try { await env.EMAIL_QUEUE.send( { email: await req.text() }, { delaySeconds: 1 }, ); return new Response("Success!"); } catch (e) { return new Response("Error!", { status: 500 }); } }, async queue(batch: MessageBatch, env: Env): Promise { // Initialize Resend const resend = new Resend(env.RESEND_API_KEY); for (const message of batch.messages) { try { // send email const sendEmail = await resend.emails.send({ from: "onboarding@resend.dev", to: [message.body.email], subject: "Hello World", html: "Sending an email from Worker!", }); // check if the email failed if (sendEmail.error) { console.error(sendEmail.error); message.retry({ delaySeconds: 5 }); } else { // if success, ack the message message.ack(); } } catch (e) { console.error(e); message.retry({ delaySeconds: 5 }); } } }, }; ``` To test the application, start the development server using the following command: ```sh title="Start the development server" npm run dev ``` Use the following cURL command to send a request to the application: ```sh title="Test with a cURL request" curl -X POST -d "delivered@resend.dev" http://localhost:8787/ ``` On the Resend dashboard, you should see that the email was sent to the provided email address. ## 8. Deploy your Worker To deploy your Worker, run the following command: ```sh title="Deploy your Worker" npx wrangler deploy ``` Lastly, add the Resend API key using the following command: ```sh title="Add the Resend API key" npx wrangler secret put RESEND_API_KEY ``` Enter the value of your API key. Your API key will get added to your project. You can now use the `RESEND_API_KEY` variable in your code. You have successfully created a Worker which can send emails using the Resend API respecting rate limits. To test your Worker, you could use the following cURL request. Replace `` with the URL of your deployed Worker. ```bash title="Test with a cURL request" curl -X POST -d "delivered@resend.dev" ``` Refer to the [GitHub repository](https://github.com/harshil1712/queues-rate-limit) for the complete code for this tutorial. If you are using [Hono](https://hono.dev/), you can refer to the [Hono example](https://github.com/harshil1712/resend-rate-limit-demo). ## Related resources - [How Queues works](/queues/reference/how-queues-works/) - [Queues Batching and Retries](/queues/configuration/batching-retries/) - [Resend](https://resend.com/docs/) --- # Build a web crawler with Queues and Browser Rendering URL: https://developers.cloudflare.com/queues/tutorials/web-crawler-with-browser-rendering/ import { Render, PackageManagers, WranglerConfig } from "~/components"; This tutorial explains how to build and deploy a web crawler with Queues, [Browser Rendering](/browser-rendering/), and [Puppeteer](/browser-rendering/platform/puppeteer/). Puppeteer is a high-level library used to automate interactions with Chrome/Chromium browsers. On each submitted page, the crawler will find the number of links to `cloudflare.com` and take a screenshot of the site, saving results to [Workers KV](/kv/). You can use Puppeteer to request all images on a page, save the colors used on a site, and more. ## Prerequisites ## 1. Create new Workers application To get started, create a Worker application using the [`create-cloudflare` CLI](https://github.com/cloudflare/workers-sdk/tree/main/packages/create-cloudflare). Open a terminal window and run the following command: Then, move into your newly created directory: ```sh cd queues-web-crawler ``` ## 2. Create KV namespace We need to create a KV store. This can be done through the Cloudflare dashboard or the Wrangler CLI. For this tutorial, we will use the Wrangler CLI. ```sh npx wrangler kv namespace create crawler_links npx wrangler kv namespace create crawler_screenshots ``` ```sh output 🌀 Creating namespace with title "web-crawler-crawler-links" ✨ Success! Add the following to your configuration file in your kv_namespaces array: [[kv_namespaces]] binding = "crawler_links" id = "" 🌀 Creating namespace with title "web-crawler-crawler-screenshots" ✨ Success! Add the following to your configuration file in your kv_namespaces array: [[kv_namespaces]] binding = "crawler_screenshots" id = "" ``` ### Add KV bindings to the [Wrangler configuration file](/workers/wrangler/configuration/) Then, in your Wrangler file, add the following with the values generated in the terminal: ```toml kv_namespaces = [ { binding = "CRAWLER_SCREENSHOTS_KV", id = "" }, { binding = "CRAWLER_LINKS_KV", id = "" } ] ``` ## 3. Set up Browser Rendering Now, you need to set up your Worker for Browser Rendering. In your current directory, install Cloudflare’s [fork of Puppeteer](/browser-rendering/platform/puppeteer/) and also [robots-parser](https://www.npmjs.com/package/robots-parser): ```sh npm install @cloudflare/puppeteer --save-dev npm install robots-parser ``` Then, add a Browser Rendering binding. Adding a Browser Rendering binding gives the Worker access to a headless Chromium instance you will control with Puppeteer. ```toml browser = { binding = "CRAWLER_BROWSER" } ``` ## 4. Set up a Queue Now, we need to set up the Queue. ```sh npx wrangler queues create queues-web-crawler ``` ```txt title="Output" Creating queue queues-web-crawler. Created queue queues-web-crawler. ``` ### Add Queue bindings to wrangler.toml Then, in your Wrangler file, add the following: ```toml [[queues.consumers]] queue = "queues-web-crawler" max_batch_timeout = 60 [[queues.producers]] queue = "queues-web-crawler" binding = "CRAWLER_QUEUE" ``` Adding the `max_batch_timeout` of 60 seconds to the consumer queue is important because Browser Rendering has a limit of two new browsers per minute per account. This timeout waits up to a minute before collecting queue messages into a batch. The Worker will then remain under this browser invocation limit. Your final Wrangler file should look similar to the one below. ```toml #:schema node_modules/wrangler/config-schema.json name = "web-crawler" main = "src/index.ts" compatibility_date = "2024-07-25" compatibility_flags = ["nodejs_compat"] kv_namespaces = [ { binding = "CRAWLER_SCREENSHOTS_KV", id = "" }, { binding = "CRAWLER_LINKS_KV", id = "" } ] browser = { binding = "CRAWLER_BROWSER" } [[queues.consumers]] queue = "queues-web-crawler" max_batch_timeout = 60 [[queues.producers]] queue = "queues-web-crawler" binding = "CRAWLER_QUEUE" ``` ## 5. Add bindings to environment Add the bindings to the environment interface in `src/index.ts`, so TypeScript correctly types the bindings. Type the queue as `Queue`. The following step will show you how to change this type. ```ts import { BrowserWorker } from "@cloudflare/puppeteer"; export interface Env { CRAWLER_QUEUE: Queue; CRAWLER_SCREENSHOTS_KV: KVNamespace; CRAWLER_LINKS_KV: KVNamespace; CRAWLER_BROWSER: BrowserWorker; } ``` ## 6. Submit links to crawl Add a `fetch()` handler to the Worker to submit links to crawl. ```ts type Message = { url: string; }; export interface Env { CRAWLER_QUEUE: Queue; // ... etc. } export default { async fetch(req, env): Promise { await env.CRAWLER_QUEUE.send({ url: await req.text() }); return new Response("Success!"); }, } satisfies ExportedHandler; ``` This will accept requests to any subpath and forwards the request's body to be crawled. It expects that the request body only contains a URL. In production, you should check that the request was a `POST` request and contains a well-formed URL in its body. This has been omitted for simplicity. ## 7. Crawl with Puppeteer Add a `queue()` handler to the Worker to process the links you send. ```ts import puppeteer from "@cloudflare/puppeteer"; import robotsParser from "robots-parser"; async queue(batch: MessageBatch, env: Env): Promise { let browser: puppeteer.Browser | null = null; try { browser = await puppeteer.launch(env.CRAWLER_BROWSER); } catch { batch.retryAll(); return; } for (const message of batch.messages) { const { url } = message.body; let isAllowed = true; try { const robotsTextPath = new URL(url).origin + "/robots.txt"; const response = await fetch(robotsTextPath); const robots = robotsParser(robotsTextPath, await response.text()); isAllowed = robots.isAllowed(url) ?? true; // respect robots.txt! } catch {} if (!isAllowed) { message.ack(); continue; } // TODO: crawl! message.ack(); } await browser.close(); }, ``` This is a skeleton for the crawler. It launches the Puppeteer browser and iterates through the Queue's received messages. It fetches the site's `robots.txt` and uses `robots-parser` to check that this site allows crawling. If crawling is not allowed, the message is `ack`'ed, removing it from the Queue. If crawling is allowed, you can continue to crawl the site. The `puppeteer.launch()` is wrapped in a `try...catch` to allow the whole batch to be retried if the browser launch fails. The browser launch may fail due to going over the limit for number of browsers per account. ```ts type Result = { numCloudflareLinks: number; screenshot: ArrayBuffer; }; const crawlPage = async (url: string): Promise => { const page = await (browser as puppeteer.Browser).newPage(); await page.goto(url, { waitUntil: "load", }); const numCloudflareLinks = await page.$$eval("a", (links) => { links = links.filter((link) => { try { return new URL(link.href).hostname.includes("cloudflare.com"); } catch { return false; } }); return links.length; }); await page.setViewport({ width: 1920, height: 1080, deviceScaleFactor: 1, }); return { numCloudflareLinks, screenshot: ((await page.screenshot({ fullPage: true })) as Buffer).buffer, }; }; ``` This helper function opens a new page in Puppeteer and navigates to the provided URL. `numCloudflareLinks` uses Puppeteer's `$$eval` (equivalent to `document.querySelectorAll`) to find the number of links to a `cloudflare.com` page. Checking if the link's `href` is to a `cloudflare.com` page is wrapped in a `try...catch` to handle cases where `href`s may not be URLs. Then, the function sets the browser viewport size and takes a screenshot of the full page. The screenshot is returned as a `Buffer` so it can be converted to an `ArrayBuffer` and written to KV. To enable recursively crawling links, add a snippet after checking the number of Cloudflare links to send messages recursively from the queue consumer to the queue itself. Recursing too deep, as is possible with crawling, will cause a Durable Object `Subrequest depth limit exceeded.` error. If one occurs, it is caught, but the links are not retried. ```ts null {3-14} // const numCloudflareLinks = await page.$$eval("a", (links) => { ... await page.$$eval("a", async (links) => { const urls: MessageSendRequest[] = links.map((link) => { return { body: { url: link.href, }, }; }); try { await env.CRAWLER_QUEUE.sendBatch(urls); } catch {} // do nothing, likely hit subrequest limit }); // await page.setViewport({ ... ``` Then, in the `queue` handler, call `crawlPage` on the URL. ```ts null {8-23} // in the `queue` handler: // ... if (!isAllowed) { message.ack(); continue; } try { const { numCloudflareLinks, screenshot } = await crawlPage(url); const timestamp = new Date().getTime(); const resultKey = `${encodeURIComponent(url)}-${timestamp}`; await env.CRAWLER_LINKS_KV.put(resultKey, numCloudflareLinks.toString(), { metadata: { date: timestamp }, }); await env.CRAWLER_SCREENSHOTS_KV.put(resultKey, screenshot, { metadata: { date: timestamp }, }); message.ack(); } catch { message.retry(); } // ... ``` This snippet saves the results from `crawlPage` into the appropriate KV namespaces. If an unexpected error occurred, the URL will be retried and resent to the queue again. Saving the timestamp of the crawl in KV helps you avoid crawling too frequently. Add a snippet before checking `robots.txt` to check KV for a crawl within the last hour. This lists all KV keys beginning with the same URL (crawls of the same page), and check if any crawls have been done within the last hour. If any crawls have been done within the last hour, the message is `ack`'ed and not retried. ```ts null {12-23} type KeyMetadata = { date: number; }; // in the `queue` handler: // ... for (const message of batch.messages) { const sameUrlCrawls = await env.CRAWLER_LINKS_KV.list({ prefix: `${encodeURIComponent(url)}`, }); let shouldSkip = false; for (const key of sameUrlCrawls.keys) { if (timestamp - (key.metadata as KeyMetadata)?.date < 60 * 60 * 1000) { // if crawled in last hour, skip message.ack(); shouldSkip = true; break; } } if (shouldSkip) { continue; } let isAllowed = true; // ... ``` The final script is included below. ```ts import puppeteer, { BrowserWorker } from "@cloudflare/puppeteer"; import robotsParser from "robots-parser"; type Message = { url: string; }; export interface Env { CRAWLER_QUEUE: Queue; CRAWLER_SCREENSHOTS_KV: KVNamespace; CRAWLER_LINKS_KV: KVNamespace; CRAWLER_BROWSER: BrowserWorker; } type Result = { numCloudflareLinks: number; screenshot: ArrayBuffer; }; type KeyMetadata = { date: number; }; export default { async fetch(req: Request, env: Env): Promise { // util endpoint for testing purposes await env.CRAWLER_QUEUE.send({ url: await req.text() }); return new Response("Success!"); }, async queue(batch: MessageBatch, env: Env): Promise { const crawlPage = async (url: string): Promise => { const page = await (browser as puppeteer.Browser).newPage(); await page.goto(url, { waitUntil: "load", }); const numCloudflareLinks = await page.$$eval("a", (links) => { links = links.filter((link) => { try { return new URL(link.href).hostname.includes("cloudflare.com"); } catch { return false; } }); return links.length; }); // to crawl recursively - uncomment this! /*await page.$$eval("a", async (links) => { const urls: MessageSendRequest[] = links.map((link) => { return { body: { url: link.href, }, }; }); try { await env.CRAWLER_QUEUE.sendBatch(urls); } catch {} // do nothing, might've hit subrequest limit });*/ await page.setViewport({ width: 1920, height: 1080, deviceScaleFactor: 1, }); return { numCloudflareLinks, screenshot: ((await page.screenshot({ fullPage: true })) as Buffer) .buffer, }; }; let browser: puppeteer.Browser | null = null; try { browser = await puppeteer.launch(env.CRAWLER_BROWSER); } catch { batch.retryAll(); return; } for (const message of batch.messages) { const { url } = message.body; const timestamp = new Date().getTime(); const resultKey = `${encodeURIComponent(url)}-${timestamp}`; const sameUrlCrawls = await env.CRAWLER_LINKS_KV.list({ prefix: `${encodeURIComponent(url)}`, }); let shouldSkip = false; for (const key of sameUrlCrawls.keys) { if (timestamp - (key.metadata as KeyMetadata)?.date < 60 * 60 * 1000) { // if crawled in last hour, skip message.ack(); shouldSkip = true; break; } } if (shouldSkip) { continue; } let isAllowed = true; try { const robotsTextPath = new URL(url).origin + "/robots.txt"; const response = await fetch(robotsTextPath); const robots = robotsParser(robotsTextPath, await response.text()); isAllowed = robots.isAllowed(url) ?? true; // respect robots.txt! } catch {} if (!isAllowed) { message.ack(); continue; } try { const { numCloudflareLinks, screenshot } = await crawlPage(url); await env.CRAWLER_LINKS_KV.put( resultKey, numCloudflareLinks.toString(), { metadata: { date: timestamp } }, ); await env.CRAWLER_SCREENSHOTS_KV.put(resultKey, screenshot, { metadata: { date: timestamp }, }); message.ack(); } catch { message.retry(); } } await browser.close(); }, }; ``` ## 8. Deploy your Worker To deploy your Worker, run the following command: ```sh npx wrangler deploy ``` You have successfully created a Worker which can submit URLs to a queue for crawling and save results to Workers KV. To test your Worker, you could use the following cURL request to take a screenshot of this documentation page. ```bash title="Test with a cURL request" curl \ -H "Content-Type: application/json" \ -d 'https://developers.cloudflare.com/queues/tutorials/web-crawler-with-browser-rendering/' ``` Refer to the [GitHub repository for the complete tutorial](https://github.com/cloudflare/queues-web-crawler), including a front end deployed with Pages to submit URLs and view crawler results. ## Related resources - [How Queues works](/queues/reference/how-queues-works/) - [Queues Batching and Retries](/queues/configuration/batching-retries/) - [Browser Rendering](/browser-rendering/) - [Puppeteer Examples](https://github.com/puppeteer/puppeteer/tree/main/examples) ---