Qottle recipe for managing asynchronous queues with duplicates

Qottle is a queue for asynchronous tasks with prioritization, ratelimit, and concurrency support. It can also detect and reject duplicate requests. This recipe shows an example of asynchronous queue duplicates management using a simulated pubsub scenario.

Managing pubsub messages

Pubsub is a great way to orchestrate your services, but often you’ll get duplicates. Say you get a message to process something – you won’t want to ack that message (and therefore prevent it sending reminders) until you’ve successfully processed it. On the other hand you don’t want to run it again if you have it queued or if you’ve already run it. Of course this wont work if you have multiple instances of your service, but let’s stick to the simple case for now.

If you provide a key (perhaps derived from a hash of the parameters to your service) when you add it to Qottle, and enabling skipDuplicates, qottle will not add to the queue but resolve (or reject if you have errorOnDuplicates set) addition requests if the same key is already queued or active. If you have the sticky option enabled, it will also check all finished items for duplicates too.

Here’s an simulation, using 2 queues – one playing the pub role, and another the sub role.

Initialize a queue to simulate sending messages from a pub service. Don’t start it right away, as we want to first populate it and get the sub queue ready to go.

Pubsub simulation

set up the pub sim queue

const pub = new Qottle({ immediate: false, concurrent: 8, name: 'pub' });

Populate it with a bunch of messages to be sent at random times, and randomly provoking some duplicate keys amongst them.


  const ps = Promise.all(
    Array.from(new Array(20)).map((f, i, a) =>
      pub.add(
        () => {
          return pub.timer(Math.floor(1000 * Math.random()));
        },
        // cause some duplicates to happen
        { key: Math.floor(a.length * Math.random()) }
      )
    )
  );

Now create a subscription queue and start it. We’ll use sticky to skip anything we’ve ever seen before.

  
  const q = new Qottle({
    skipDuplicates: true,
    sticky: true,
    name: 'sub'
  });

In this sim, the subscriber will just wait a random amount of time- this is where you’d handle the service request in your live subscription

 const dealWithSub = ({ entry }) => q.timer(Math.floor(2000 * Math.random()))
The finish request on the simulated pub queue would be analagous to the message.on event when using a real pubsub implementation. It’ll trigger when each of the queued items is published, and add a task to the subscription queue, which will then check for duplicates and execute.
  pub.on("finish", ({ entry }) => { 
    q.add(dealWithSub, {
      key: entry.key,
    }).then(({ entry, result, error }) => { 
      if (entry.skipped) {
        // .. the entry was not processed as it was a duplicate
      } else { 
        // .. the entry was processed and the result passed here
      }
      return result
    }).catch (({entry, error})=> {
      // handle the error
    })
  })
Finally we can start the pub queue – this will provoke entries in the sub queue
   pub.startQueue()

Real life pubsub structure

A real life pubsub example would be very simply structured something like this, and could of course contain all the usual ratelimiting etc as required.

  const q = new Qottle({
    skipDuplicates: true,
    sticky: true,
    name: 'sub'
  });

  message.on (msg=>){
    const decodedMessage = somehow(msg)
    q.add(()=>doTheThing(decodedMessage), {key: decodedMessage.hash})
    .then (({entry})=> entry.skipped  ? msg.ack() : null)
    .catch((error)=> {
      msg.nack())
    })
  })

A full version of this example is in test.js at the Qottle repo

See the Qottle documentation for more information. Qottle  source code is on github and can be installed with npm or yarn