Consumer concurrency
Consumer concurrency allows a consumer Worker processing messages from a queue to automatically scale out horizontally to keep up with the rate that messages are being written to a queue.
In many systems, the rate at which you write messages to a queue can easily exceed the rate at which a single consumer can read and process those same messages. This is often because your consumer might be parsing message contents, writing to storage or a database, or making third-party (upstream) API calls.
Note that queue producers are always scalable, up to the maximum supported messages-per-second (per queue) limit.
By default, all queues have concurrency enabled. Queue consumers will automatically scale up to the maximum concurrent invocations as needed to manage a queue's backlog and/or error rates.
After processing a batch of messages, Queues will check to see if the number of concurrent consumers should be adjusted. The number of concurrent consumers invoked for a queue will autoscale based on several factors, including:
- The number of messages in the queue (backlog) and its rate of growth.
- The ratio of failed (versus successful) invocations. A failed invocation is when your
queue()
handler returns an uncaught exception instead ofvoid
(nothing). - The value of
max_concurrency
set for that consumer.
Where possible, Queues will optimize for keeping your backlog from growing exponentially, in order to minimize scenarios where the backlog of messages in a queue grows to the point that they would reach the message retention limit before being processed.
If you are writing 100 messages/second to a queue with a single concurrent consumer that takes 5 seconds to process a batch of 100 messages, the number of messages in-flight will continue to grow at a rate faster than your consumer can keep up.
In this scenario, Queues will notice the growing backlog and will scale the number of concurrent consumer Workers invocations up to a steady-state of (approximately) five (5) until the rate of incoming messages decreases, the consumer processes messages faster, or the consumer begins to generate errors.
If your consumers are not autoscaling, there are a few likely causes:
max_concurrency
has been set to 1.- Your consumer Worker is returning errors rather than processing messages. Inspect your consumer to make sure it is healthy.
- A batch of messages is being processed. Queues checks if it should autoscale consumers only after processing an entire batch of messages, so it will not autoscale while a batch is being processed. Consider reducing batch sizes or refactoring your consumer to process messages faster.
If you have a workflow that is limited by an upstream API and/or system, you may prefer for your backlog to grow, trading off increased overall latency in order to avoid overwhelming an upstream system.
You can configure the concurrency of your consumer Worker in two ways:
- Set concurrency settings in the Cloudflare dashboard
- Set concurrency settings via
wrangler.toml
To configure the concurrency settings for your consumer Worker from the dashboard:
- Log in to the Cloudflare dashboard ↗ and select your account.
- Select Workers & Pages > Queues.
- Select your queue > Settings.
- Select Edit Consumer under Consumer details.
- Set Maximum consumer invocations to a value between
1
and250
. This value represents the maximum number of concurrent consumer invocations available to your queue.
To remove a fixed maximum value, select auto (recommended).
Note that if you are writing messages to a queue faster than you can process them, messages may eventually reach the maximum retention period set for that queue. Individual messages that reach that limit will expire from the queue and be deleted.
To set a fixed maximum number of concurrent consumer invocations for a given queue, configure a max_concurrency
in your wrangler.toml
file:
To remove the limit, remove the max_concurrency
setting from the [[queues.consumers]]
configuration for a given queue and call npx wrangler deploy
to push your configuration update.
When multiple consumer Workers are invoked, each Worker invocation incurs CPU time costs.
- If you intend to process all messages written to a queue, the effective overall cost is the same, even with concurrency enabled.
- Enabling concurrency simply brings those costs forward, and can help prevent messages from reaching the message retention limit.
Billing for consumers follows the Workers standard usage model meaning a developer is billed for the request and for CPU time used in the request.
A consumer Worker that takes 2 seconds to process a batch of messages will incur the same overall costs to process 50 million (50,000,000) messages, whether it does so concurrently (faster) or individually (slower).