The problem with rate limits

You’ve most likely hit the problem of rate-limited APIS at some point. You make a request and it gets refused because you’ve done too many requests in some time period. Fair enough, there are many techniques (you’ll find various approaches elsewhere on this site) to deal with this but usually it’s after the event. You try something, it fails, you wait a bit, you try again – repeat till it works. Some APIS have the decency to give you details about how long to wait before trying again (Vimeo is a good example of this). However, this all can be a real pain when

  • The requests you are making are asynchronous and uncontrollable
  • You are using a paid-for API and have to pay for usage even if rejected
  • There are multi-layers to the rate-limiting – for example a per minute rate and perhaps a monthly limit too – retries to defeat the per minute rate get added to your monthly usage
  • If you are using pubsub, which will publish another attempt just as you are trying to deal with the previous rate limit failure, so you process that one too, and it all spirals into a recursive set of activities that achieve nothing at all.

An approach

Being able to queue requests and feed them in a controlled way is a great way to avoid rate limit failures before they happen, rather than deal with them after they’ve happened. My goto for all matters asynchronous is always Sindre Sorhus who has some awesome repositories for this kind of stuff. In this case, I’m using p-queue  as the basis for an asynchronous queuing solution.

The problem

My app and its back end system use the Google Video Intelligence API to analyze films uploaded by users, firstly to catalogue their content for searching, and secondly to disambiguate them. They may be copies or edits of films uploaded by others – so I need to ensure I have a way of matching films that are similar to each other to avoid duplication of metadata. An analyzed film has this kind of searchable information
Which expand out into stuff like this, for example
This allows not only direct navigation to points in the film where that content was detected, but also makes the film content searchable, for example
Finally, along with object tracking information, these can form part of the disambiguation process for new films being loaded.

The Video Intelligence API

 Here’s the workflow
The Video Intelligence API and this workflow ticks all the boxes as a problem API for rate limiting.
  • The processing runs as a long-running API task – a black box – that either works or doesn’t. If it fails due to a 429 (rate-limit – per minute) problem, the run is wasted, you have to start again – and pay again. At over $1 a minute, this can really mount up.
  • Pubsub requests could be arriving at any time. If there are multiple failures, they may be delivered multiple times, making things even worse.
  • Everything is asynchronous

Queuing

Here’s where a queue comes in handy.  The p-queue code almost worked straight out of the box, except that I also needed deduplication to discard multiple pubsub requests when it was becoming impatient for a message ack. The characteristics I need are

 

  • Limit a certain number of runs in a given interval
  • Limit the number of concurrent runs
  • Deduplication of requests already queued up to do the same thing, either because of a failure retry or the same film was submitted multiple times.
  • Introduce logging

pq

I made a wrapper for p-queue to add the extra stuff I needed. Here’s the code

const {
  default: PQueue
} = require('p-queue');


// this is needed to avoid quota problems with vint
// just do one at a time
// options to not add it if it's already in the queue
const pq = ((ns) => {
  ns.init = (options) => {
    ns.options = options || {};
    ns.tracker = new Set();
    ns.queue = new PQueue({
      ...options,
      concurrency: ns.options.concurrency || 1
    });

    if (ns.options.logEvents) {
      ns.queue.on('idle', () => {
        console.log(`....queue is now empty`);
      });
    }
    console.log('....initialized queuing system');
  };

  ns.add = (fn, options) => {
    // first check to see if we are avoiding duplication
    const {
      digest,
      skipIsError,
      log
    } = options || {};

    const addedToQueue = new Date().getTime();
    // the idea is to only run things that are not in the queue already - the digest differentiates them
    // (if dedup is set and a digest is supplied)
    if (!ns.options.dedup || !digest || !ns.tracker.has(digest)) {
      
      // this marks that this item is already queued or running
      if (digest) ns.tracker.add(digest);
      if (log) console.log(`....adding ${digest || 'item'} to queue at position ${ns.queue.size + 1}`);
      // now add to the queue proper
      return ns.queue.add(fn)
        .then(result => {
          const finishedAt = new Date().getTime();
          const elapsed = finishedAt - addedToQueue;
          const mess = `....${digest|| 'item'} completed ${elapsed} ms after being added to queue`;
          if (log) console.log(mess);

          const rob = {
            digest,
            result,
            finishedAt,
            addedToQueue,
            elapsed,
            error: null,
            skipped: false
          };

          // all done so remove this digest marker
          if (digest) ns.tracker.delete(digest);
          return rob;
        })

        .catch(err => {
          console.log('... queue detected an error for', digest, err);
          if (digest) ns.tracker.delete(digest);
          return Promise.reject(err);
        });
    } else {
      const error = `${digest} already in queue ... skipped`;
      if (log) console.log(error);

      // if skip is being treated as an error signal it, otherwise resolve it but with a skipped flag
      return skipIsError ? Promise.reject({
        error,
        skipped: true,
        digest
      }) : Promise.resolve({
        error,
        skipped: true,
        digest
      });
    }
  };

  return ns;
})({});
module.exports = {
  pq
};

Of note are the use of a digest to identify a queue insertion so that duplicates can be detected, and the ability to treat a dup as a cause for concern or part of normal operation. In the vi processor itself, it’s a straightforward asynchronous queue with single concurrency. When one finishes the other starts.

Initialize it like this

  // start the queue
  pq.init({
    logEvents: true,
    log: true,
    dedup: true
  });

Add items to the queue like this, passing a digest to uniquely identify what this request is doing with which film to be used as a duplicate detector. When the queue item (action()) is finally resolved the pubsub messaged is asked (according to the returned consume property), and it’s all over. If the item is skipped it means it’s already in the queue so we don’t want to tell pubsub to stop sending messages in case the queued version subsequently fails.

  return pq.add (()=>action(), {
    // no point in doing the same film again if its already in the queue - mightbe caused by multiple pubsub timeouts
    digest: hasher({
      filmMasterID,
      features: features || 'none',
      uploadVideoFile
    })
  }).then(r=>{
    if(r.skipped) {
      console.debug(
`....already running ${pack.filmName} (${pack.filmID}/${pack.filmMasterID}) - skipping`);    
    } else {
     console.debug(
`finally ${pack.filmName} (${pack.filmID}/${pack.filmMasterID}) dequeued at ${r.finishedAt.toString()} after ${r.elapsed/1000}s`);   
    }
    return {
      consume: !r.skipped
    };
  });

Bulk processing

Normally the occasional film needs processing, but you may want to do some operation that analyzes thousands of films. In this case, we don’t want to leave it to the processor to handle duplicates and queueing, because pubsub will be going crazy waiting for its messages to be consumed while they are all waiting in the queue to be consumed one by one. In this case, we need a queue for a queue, which only provokes and analysis request according to some schedule. We can use the same pq module to accomplish this.

This time we want to provoke ‘intervalCap’ instances concurrently in any ‘interval’. In my case – this turns out to be 1 every 120 seconds.

  pq.init({
    logEvents: true,
    log: true,
    dedup: true,
    intervalCap: options.intervalCap,
    // expects milliseconds
    interval: options.interval * 1000
  });

The task this time is to send a request to pubsub to process another film. The processor will either do it right away or add it to the queue – but because the bulk updater is itself throttling requests it will receive them in an orderly enough way to be able to deal with them tidily.

         return pq.add (()=>task(), {           
            digest:f.filmMasterID,
          })
          .then(result=>{
            console.log('..executed ' + f.filmMasterID);
          });

Running bulk

Just to finish this topic, all the stages, including my back end database and graphql api, run as Kubernetes deployments, but the bulk processing submitter itself can also run as a Kubernetes job – which gets it off your desktop.  My bulk processor is a node app, so I can create an image, push it to the container registry and kick it off with.

kubectl create job fid-vibulk-kp --image=gcr.io/fid-prod/fid-vibulk-kp

And that’s it – handling asynchronous tasks and avoiding rate limits before they happen.

More

Since G+ is closed, you can now star and follow post announcements and discussions on github, here
More Google Cloud Platform topics below