Qottle recipe for managing asynchronous queues with duplicates
Qottle is a queue for asynchronous tasks with prioritization, ratelimit, and concurrency support. It can also detect and reject duplicate requests. This recipe shows an example of asynchronous queue duplicates management using a simulated pubsub scenario.
Managing pubsub messages
Pubsub is a great way to orchestrate your services, but often you’ll get duplicates. Say you get a message to process something – you won’t want to ack that message (and therefore prevent it sending reminders) until you’ve successfully processed it. On the other hand you don’t want to run it again if you have it queued or if you’ve already run it. Of course this wont work if you have multiple instances of your service, but let’s stick to the simple case for now.
If you provide a key (perhaps derived from a hash of the parameters to your service) when you add it to Qottle, and enabling skipDuplicates, qottle will not add to the queue but resolve (or reject if you have errorOnDuplicates set) addition requests if the same key is already queued or active. If you have the sticky option enabled, it will also check all finished items for duplicates too.
Here’s an simulation, using 2 queues – one playing the pub role, and another the sub role.
Initialize a queue to simulate sending messages from a pub service. Don’t start it right away, as we want to first populate it and get the sub queue ready to go.
Pubsub simulation
set up the pub sim queue
const pub = new Qottle({ immediate: false, concurrent: 8, name: 'pub' });
Populate it with a bunch of messages to be sent at random times, and randomly provoking some duplicate keys amongst them.
const ps = Promise.all(
Array.from(new Array(20)).map((f, i, a) =>
pub.add(
() => {
return pub.timer(Math.floor(1000 * Math.random()));
},
// cause some duplicates to happen
{ key: Math.floor(a.length * Math.random()) }
)
)
);
Now create a subscription queue and start it. We’ll use sticky to skip anything we’ve ever seen before.
const q = new Qottle({
skipDuplicates: true,
sticky: true,
name: 'sub'
});
In this sim, the subscriber will just wait a random amount of time- this is where you’d handle the service request in your live subscription
const dealWithSub = ({ entry }) => q.timer(Math.floor(2000 * Math.random()))
pub.on("finish", ({ entry }) => {
q.add(dealWithSub, {
key: entry.key,
}).then(({ entry, result, error }) => {
if (entry.skipped) {
// .. the entry was not processed as it was a duplicate
} else {
// .. the entry was processed and the result passed here
}
return result
}).catch (({entry, error})=> {
// handle the error
})
})
pub.startQueue()
Real life pubsub structure
const q = new Qottle({
skipDuplicates: true,
sticky: true,
name: 'sub'
});
message.on (msg=>){
const decodedMessage = somehow(msg)
q.add(()=>doTheThing(decodedMessage), {key: decodedMessage.hash})
.then (({entry})=> entry.skipped ? msg.ack() : null)
.catch((error)=> {
msg.nack())
})
})
A full version of this example is in test.js at the Qottle repo
See the Qottle documentation for more information. Qottle source code is on github and can be installed with npm or yarn