Messaging
Nitric provides two common options for asynchronous messaging. Topics for publish/subscribe messaging, where new messages are immediately pushed to subscribers, and Queues for pull messaging where new messages are put on a queue and must be requested.
Nitric provides unique names for specific message types. Messages sent to a Topic are called Events, while messages sent to a Queue are called Tasks. The structures of these messages are very similar, but the delivery and retry mechanisms are different. The unique naming removes ambiguity when working with these messages in code and makes their intention clear.
Topics & Events
Topics and Events provide a scalable, decoupled, way to communicate between functions and containers.
Topics
A topic is a named target where events can be published. They can be thought of as a subject that your functions can discuss with each other.
They're awesome for allowing serverless functions to communicate in a stateless, scalable and highly decoupled way.
Events
Events are messages that can be published on a topic. They can be thought of as a notification that is sent to say that something new has happened. Events have the following properties:
- Name
id
- Optional
- Optional
- Type
- string
- Description
Unique id to apply to the event. If an id isn't provided, a version 4 UUID will be auto-generated instead.
- Name
payload
- Required
- Required
- Type
- Map<string, any>
- Description
Payload to be sent with the event.
- Name
payloadType
- Optional
- Optional
- Type
- string
- Description
The type of payload supplied.
Subscriptions
A subscription is something that listens to a topic. You can think of it as a channel that notifies your application when something new arrives on the topic.
Creating a Topic
Nitric allows you to define named topics. When defining topics, you can give the function permissions for publishing. If permissions are not specified, subscribers can be created.
Here's an example of how to create a topic with permissions to publish events.
import { topic } from '@nitric/sdk'
const userCreatedTopic = topic('user-created').for('publishing')
Publishing an event
To send an event to a topic and notify all subscribers, use the publish()
method on the topic reference. The function must have permissions to publish
to the topic.
The below example publishes a message to a topic called user-created
.
import { topic } from '@nitric/sdk'
const userCreatedTopic = topic('user-created').for('publishing')
await userCreatedTopic.publish({
payload: {
email: 'new.user@example.com',
},
})
Subscribing to a topic
To execute a function when new events are published you can set up subscribers. The delay between publishing an event and a subscriber being executed is usually only a few milliseconds. This makes subscribers perfect for responding to events as they happen.
The below code shows a subscription that responds to events when new users are created.
import { topic } from '@nitric/sdk'
const userCreatedTopic = topic('user-created')
userCreatedTopic.subscribe(async (ctx) => {
// Extract data from the event payload for processing
const { email } = ctx.req.json()
sendWelcomeEmail(email)
})
Limitations on Publishing and Subscribing
Nitric won't allow you to request publishing access and set up a subscriber in the same function.
The limitation exists to protect you from infinite loops in deployed functions where a function calls itself indirectly via a topic. These sorts of mistakes can lead to large unintentional cloud charges - something you probably want to avoid.
// this is invalid
import { topic } from '@nitric/sdk'
const loopTopic = topic('infinite').for('publishing')
loopTopic.subscribe(async (ctx) => {
await loopTopic.publish({ payload: {} })
})
Reliable event handling
If a subscriber encounters an error or is terminated before it finishes processing an event, what happens? Is the event lost?
Nitric deploys topics to cloud services that support "at-least-once delivery". Events are usually delivered exactly once, in the same order that they're published. However, to prevent lost events, they're sometimes delivered more than once or out of order.
Typically, retries occur when a subscriber doesn't respond successfully, like when unhandled exceptions occur. You'll want to make sure events aren't processed again by accident or partially processed, leaving your system in an unexpected state.
Luckily, building atomic publishers and idempotent subscribers is enough to solve this.
Atomic publishers
Your publishers need to update your database and publish associated events. If a database update fails, the events should never be sent. If the database update succeeds, the events should always publish. The two shouldn't occur independently (i.e. one shouldn't fail while the other succeeds).
One solution to this problem is the Transactional Outbox Pattern.
Idempotent subscribers
Events can be delivered more than once, but they should only be processed once. To do this your subscribers need to identify and disregard duplicate events.
Usually checking for duplicate payloads or IDs is enough. When you receive an event you've seen before don't process it, skip straight to returning a success
response from your subscriber.
import { topic } from '@nitric/sdk'
import { isDuplicate } from '../common'
const updates = topic('updates')
updates.subscribe((ctx) => {
if (isDuplicate(ctx.req)) {
return ctx
}
// not a duplicate, process the event
// ...
})
If you're checking for duplicate IDs, make sure publishers can't resend failed events with new IDs.
You can read more about idempotent subscribers and patterns to handle it here.
Queues
Queues and Tasks provide a scalable, decoupled, way for functions and containers to communicate asynchronously.
Queues
Queues are named targets where tasks can be sent. They can be thought of as a group of related tasks. Unlike topics, tasks sent to a queue won't automatically trigger functions to process them. Instead, functions receive tasks from the queue by requesting them.
This makes queues awesome for processing work asynchronously, often paired with schedules to support batch workloads, like nightly processes.
Tasks
A task is a form of message that can be sent to a queue. They can be thought of as a collection of data that has been queued for processing some time in the future. Tasks have the following properties:
- Name
id
- Optional
- Optional
- Type
- string
- Description
Unique id to apply to the task. If an id isn't provided, a version 4 UUID will be auto-generated instead.
- Name
payload
- Required
- Required
- Type
- T
- Description
Payload to be sent with the task. Has type T.
- Name
payloadType
- Optional
- Optional
- Type
- string
- Description
The type of payload supplied.
Creating a Queue
Nitric allows you to define named queues. When defining queues, you can give the function permissions for receiving and sending.
Here's an example of how to create a queue with permissions for sending and receiving.
import { queue } from '@nitric/sdk'
const transactionQueue = queue('transactions').for('sending', 'receiving')
Sending Tasks
To send a task to a queue, use the send()
method on the queue reference. The function must have permissions to send
to the queue.
The below example sends a message to a queue called transactions
.
import { queue } from '@nitric/sdk'
const transactionQueue = queue('transactions').for('sending')
await transactionQueue.send({
payload: {
message: 'hello world',
},
})
Tasks can also be sent in batches by providing an array of tasks.
import { queue } from '@nitric/sdk'
const transactionQueue = queue('transactions').for('sending')
await transactionQueue.send([
{
payload: {
message: 'batch task 1',
},
},
{
payload: {
message: 'batch task 2',
},
},
])
Receiving and Acknowledging Tasks
When you pull tasks from a queue, they are not immediately deleted. Instead, they are leased, which means they are temporarily hidden from other functions until the lease expires.
To ensure proper handling, your code should mark a task as complete
after successfully processing it. This action permanently removes the task from the queue.
If a lease expires before a task is marked as complete, the task will reappear in the queue and can be received again. This mechanism prevents tasks from getting lost in case of failures. If your function encounters an error or is terminated before completing a task, it will automatically reappear in the queue, ready to be processed again.
By following this approach, you can ensure reliable task processing and minimize the chances of losing tasks in failure scenarios.
The below example receives 10 tasks using the receive()
method, and then acknowledges the task as completed using the complete()
method. You'll note that the function has permissions to receive messages.
import { queue } from '@nitric/sdk'
const transactionQueue = queue('transactions').for('receiving')
const tasks = await transactionQueue.receive(10)
for (let task of tasks) {
// process your task's data
console.log(task.payload)
// acknowledge when the task is complete
await task.complete()
}
Choosing between queues and topics
It's common to ask when to use a queue or a topic. From a publisher's point of view, both queues and topics are almost identical. The difference is primarily on the receiver/subscriber side. Topics push new messages to their subscribers, immediately spinning up workers to process the events, while queues rely on the receiver to ask for new messages to process.
For these reasons, we usually default to Topics. Queues are more suitable for batch workloads or situations where there are occasional surges of requests that can be processed at a later time.