@idearium/cloudflare-queues
Safely process Cloudflare Queues batches and messages.
Installation
$ yarn add -E @idearium/cloudflare-queues
Beta installation
If you need to install a beta version, you can:
$ yarn add -E @idearium/cloudflare-queues@beta
Usage
Use @idearium/cloudflare-queues
to safely process both batches and individual messages.
Process batches
This shows an example of a Cloudflare Worker that processes the entire batch of messages at once.
import { safeBatchProcess } from '@idearium/cloudflare-queues';
export default {
async queue(messageBatch, env) {
await safeBatchProcess({
process: async (batch) => {
await Promise.all(
batch.messages.map(({ body }) => Promise.resolve(body))
);
},
batch: messageBatch,
});
},
};
Process messages
This shows an example of a Cloudflare Worker that hands off to a function that processes each message individually.
// lib/consume.js
import { safeMessageProcess } from '@idearium/cloudflare-queues';
export const consume = async (queueMessage) =>
safeMessageProcess({
message: queueMessage,
process: async (message) => Promise.resolve(message.body),
});
// worker.js
import { consume } from './lib/consume.js';
export default {
async queue(batch, env) {
await Promise.allSettled(batch.messages.map(consume));
},
};
Functions
You can process entire batches of messages or each individual message. If you process an entire batch and they all fail, the entire batch will be retried. If you process each message individually, only those messages that fail will be retried. Read more about explicit acknowledgement and retries.
safeBatchProcess
safeBatchProcess
expects an object with the following properties:
process
: A function that processes the batch.batch
: The batch to process.
safeBatchProcess
will call the process
function provided to it and pass the batch as the only argument.
safeBatchProcess
returns a promise that resolves when the batch has been processed. It will also call batch.ackAll()
to acknowledge the batch.
If the batch cannot be processed, the promise will reject with an error. It will also call batch.retryAll()
to reject the batch so that all messages in the batch will be retried.
safeMessageProcess
safeMessageProcess
expects an object with the following properties:
process
: A function that processes the message.message
: The message to process.
safeMessageProcess
will call the process
function provided to it and pass the message as the only argument.
safeMessageProcess
returns a promise that resolves when the message has been processed. It will also call message.ack()
to acknowledge the message.
If the message cannot be processed, the promise will reject with an error. It will also call message.retry()
to reject the message so that it will be retried.