Qottle

If you are using an API you often need to queue tasks that are asynchronous and constrained by other factors such as rate limits of the API. Qottle is a general purpose queue manager that helps to overcome these complications. Here are some of the scenarios Qottle is intended to handle.

  • Repeated calls to an API that limits the number of calls in a given time period.
  • API calls that insist on a minimum delay between each call.
  • Calls that are limited to a certain number at the same time.
  • Queuing both asynchronous and synchronous functions with a limit on how many can run simultaneously.
  • Allowing entries to be added to a queue, respecting priorities

Another common problem is how to avoid running the same thing multiple times – a great example is if you are running services orchestrated by pub/sub but that haven’t yet completed. Qottle can automatically skip task duplication on insertion so over prolific pub.sub messages about the same task can be omitted.

These examples all use promise syntax, but obviously you can substitute async/await if you prefer.

Installation

If you are using Node, or webpack you can get the code off npm

yarn add qottle
-- or --
npm install qottle

or you can get the source code from github to host yourself – you’ll only need src/qottle.js

Usage

const Qottle = require('qottle');

You can add actions that need to be performed, and the queue concurrency will allow a number of them to be run concurrently. For example, single threaded queue might be initialized like this

onst q = new Qottle({ concurrent: 1 })

And add items to it like this

queue.add (action, options)

For example – with concurrent set to 1, these items will be run 1 after another

q.add (()=> doSomething()) 
q.add (()=> doSomethingElse())

Each entry is resolved as a promise, whether or not the original action was a promise –

q.add (()=> console.log('Im running')).then(()=> console.log('ive said im starting')) 
q.add (()=> doAnAsyncThing()).then({result} => console.log('the result was', result)) 
q.add (()=> console.log('its all over'))

Or perhaps

Promise.all([ 
  q.add (()=> console.log('Im running')).then(()=> console.log('ive said im starting')),
  q.add (()=> doAnAsyncThing()),
  q.add (()=> console.log('its all over')) 
 ]).then([,pack]=> { console.log('the result was', pack.result) })

Skipping duplicates

If you are running something like Pub/sub you often get requests to do something you already know you have to do, are not ready to ack them, but you don’t want to add them to the queue. By providing a key for each entry, usually a digest of some kind of parameters you can selectively add things to the queue only if you dont already know about them.

const q = new Qottle({ skipDuplcates: true })

then add entries to the queue with a key

q.add (() => onlyDoOnce ({ key: someId }))

qottle will skip any add requests with duplicate keys. Normally a duplicate key only applies to items that are either active or in the queue – not completed items. You can set the option ‘sticky’ to mean you want qottle to keep a record of all keys it has processed in this instance.

Rate limiting

qottle can help avoid rate limit problems with APIs by applying various rate limit breaking avoidance techniques, such as limiting the number of calls over a given period. See the options section for how this works

Options

Most options can be applied when the queue is initialized, then individually overriden when an entry is added to the queue.

option default purpose
concurrent Infinity How many items can be run at once
skipDuplicates false Enables duplicate skipping where items with the same key are not added to the queue more than once
sticky false whether to keep a record for skipping duplcates of finished items as well as active or queued items
immediate true whether to start the queue whenever something is in it, or to wait for it to be explicity started
priority 100 the order to do things in. Lower values happen before higher values. Where priorities are the same, the order of insertion applies
log false whether to log console info on starting and finishing items
rateLimited false whether rate limiting management is required
rateLimitPeriod 60000 how long to measure rate limiting over
rateLimitDelay 0 how long to wait between starting each concurrent item
rateLimitMax 1 how many items to allow to be outstading at once – this is an additional constraint to the concurrent value
rateLimitMinWait 100 if a delay is required, qottle will calculate how much time is left in the rateLimitPeriod and wait that long before attempting to run. This is the minimum period to wait before trying again. Can be useful where the rate limited API time is slighty out of sync with your client
catchError false normally a run error will be returned to the add function for you to catch. If catchError is set to true, then qottle will catch the error and pass it to the .then() of add(). See later for examples
errorOnDuplicate false When adding to a queue,iIf skipDuplicates is enabled and a dup is detected the entry will resolve, with entry.skipped set to true. If you’d rather it treated a duplicate as an error set errorOnDuplicate to trues
name qottle Can be useful if you have multiple queues and logging enabled – as the log includes the queue name

Events

In addition to the promise resolutions, events can also be triggered. For example

q.on("finish", ({entry}) => { console.log(`${entry.key} is finished and ran for ${entry.runTime}ms`) });
eventName triggered on
empty queue is empty
error there’s been an error for an item
finish an item has finished
skip an item has been skipped as it had a duplicate key
start an item has started
startqueue the queue has started
stopqueue the queue has been stopped
stopqueue the queue has been stopped – stopped queues still accept additions
ratewait an entry is waiting for an opportunity to run but cant as it would violate a rate limit rule
add an entry is added

Event payload

The payloads returned for each event can vary on the type but they are some or all of the following properties

property content
entry details of the execution and options for an item
error details of an error
waitTime how long a ratelimit constraint will wait before trying again
result the final result returned from the action

Entry object

The entry object contains all the options applied plus various other info. It is passed as an argument to every action in the queue – for example

q.add (({entry}) => { console.log('im executing something for entry', entry.id) })

It also arrives as an argument to most events

q.on('start', ({entry}) => { console.log('entry just started', entry.id) })

and as part of the completed result of a queue item

q.add (({entry}) => { console.log('im executing something for entry', entry.id) })
.then(({entry, result})=> { console.log('entry', entry.id,'gave me this result', result) })

or

q.on('finished', ({entry, result}) => { console.log('entry', entry.id,'gave me this result', result) })

Most methods and events return an Entry object that looks like this.

property content
…options all the options mentioned earlier
status ‘finish’, ‘error’, ‘queued’, ‘active’
queuedAt timestamp when first added
startedAt timestamp when started to run
finishedAt timestamp when finished run
elapsed ms from time queued to time finished
runTime ms it spent actually running
id a unique id
waitTime total ms it spent waiting to run because of a ratelimit constraint
waitStartedAt if forced to wait because of a rate limit constraint this is when it started waiting
waitFinishedAt if forced to wait because of a rate limit constraint this is when it finished waiting and started running
waitUntil if entry is in process of waiting, this is when it will try again
attempts how many times it tried to start
action the function it ran
skipped whether the entry was skipped. Skipped items resolve successfully, but don’t run and have this property set to true
error the error if one was thrown. Most useful with catchError: true

methods

The are no ‘property gets’. All are methods. Where the return value is ‘self’, you can chain methods.

property content returns
add (action : function , options : object) add to the queue { result: any, error: Error, entry: Entry}
stopQueue() stop the queue running anything else self
startQueue() start the queue – items can be added to the queue whether it’s started or not self
isStarted() check if the queue is started boolean
clear() clear any unstarted queued items self
clearSticky() clear all items from the sticky history self
clearRateLimitHistory() clear rate limit history to avoid any outstanding constraint self
remove(entry: Entry) remove an entry from the queued items – pass the entry object over entry
clearListeners(eventName: string} clear all the listeners for a given event name. ‘all’ as the eventName will clear all listeners self
on(eventName: string, listener: function) add a listener to be executed when a given eventName triggers Listener
off(listener: Listener) pass over the Listener returned from .on to remove a listener Listener

Rate limiting

A key capability of this queue is to deal with rate limiting. A queue can be set up to throttle calls – often to dela with APIS with rate limits. This is over and above the constraint of ‘concurrent’ which manages how many queue items can be executed at the same time.

Let’s say you have an API that allows 10 calls per minute, and you don’t mind if the they all run simultaneously.

const q = new qottle({ rateLimitPeriod: 10 * 60 * 100, rateLimitMax: 10 })

Then you can simply add requests to the queue and the queue will submit them according to that rule.

Promise.all ([ 
  q.add (()=>getSome()),
  q.add (()=>getSomeMore()) 
 ]).then (results=> { console.log('all the results', results) })

Another API constraint might be the time between individual calls limited to some value like 20 seconds, and only 1 request being processed at a time.

const q = new qottle({ rateLimitPeriod: 10 * 60 * 100, rateLimitMax: 10, concurrent: 1, rateLimitDelay: 20 * 1000 })

Then you can simply add requests to the queue and the queue will submit them according to that rule.

Promise.all ([ 
  q.add (()=>getSome()),
  q.add (()=>getSomeMore()) 
 ]).then (results=> { console.log('all the results', results) })

Error handling

By default you’ll deal with errors like this (all queue items are converted to async)

q.add (()=>something())
.then (({result, entry})=>{ ... the result ... }) 
.catch(({error, entry})=> { ... the error ...})

However, you can ask qottle to catch thrown errors.

const q = new qottle({ catchError: true })

Then any errors will be resolved (rather than rejected)

q.add (()=>something())
.then (({result, entry, error})=>{ ... the result ... or check for error })

Error event

Irrespective of the catchError options, q.on(‘error’, …) will always fire on an error, and q.on(‘finish’,…) will only trigger on a successful finish.

q.on('error', ({entry,error})=> { ... will always trigger on an error })
q.on('finish', ({entry,result})=> { ... will not trigger on an error })

Examples

See the test.js for many examples

Recipes

Continous rate limitied polling

Managing duplicates

Repos

Github

npm

Apps script id: 1J_Oci2WG7wQ4KffCg8XOs-HID9gaIjEcNFlidE_7gwiJnrgQw24ozKct

Apps script library script 

Apps script github