I was talking to some cockroachDB developers at an event lately and discovered that many of them were, like me, using ElasticSearch for full text and complex querying in the cases where SQL was not appropriate. One of the topics that came up a lot was the difficulty in keeping the elastic indices up to date with changes that were happening in the database. A database with hundreds of tables and lots of cross-referencing is very difficult to synchronize without simply rebuilding the indices from time to time. In this post, I’m not going to share a lot of code, partly because it’s not public, but mainly because there’s a lot of it that is very specific to my project. Instead, I’ll mainly be discussing the technique to solve the problem

GraphQL

Luckily, I use GraphQL in front of my cockroachDB database, so this abstraction allows the enhancement of mutations to include requests to rebuild the affected index. It’s still pretty complex to figure out what indices a particular change will affect, but the extra abstraction that GraphQL gives is a great advantage, especially if you have a generic function for creating mutations shared by them all. First some background.

Searching

This search for beer returns films which feature beer in some way. I can hover over the reasons for selection and find out why – for example, the highlighted film was selected because it is about the product ‘beer’. Others were selected because the film metadata mentions it, there was someone associated with the film called Beer, it won an award for beer, because an automatic transcription from voice revealed the word beer being mentioned, or even because a video analysis of the film revealed that there was a beer associated image somewhere in the film. That’s a whole bunch of reasons for a film to be selected, all driven from a selection of Elastic Search indices.

In fact, here’s a selection of some of the reasons that a search might pick up a film. The same kind of selection criteria applies to find people, brands and so on.

 

So if assigning someone to a film, for example, would modify the film findability, we really need a way to rebuild the film index to take account of that change more quickly than some kind of batch rebuild. What’s more, we also need to rebuild the people index that features that person, along with anything else that person is associated with.

The database

Behind the scenes in the cockroachDB, there are as you’d imagine, tables for products, films, people, brands and so on – but there are also link tables to assign personToBrands, filmToPerson etc. Indeed there are several hundred tables and link tables in the project

Mutations

All updates to all tables pass through a GraphQL resolver – here’s a little bit of resolver for creating rows. I won’t bother with all the code – just the bit that requests an index rebuild
 // build the values
    const sql = `INSERT INTO "${tableName}"${sqlColumnText}${sqlValuesText} RETURNING *;`;
    if (verbose) {
      // dont show it all
      console.log('mutating items', configs.length, sql.slice(0,100), configs.slice(0,20));
    }
    // log any pool usage info required
    return connectAndExecute (
      poolClient=>poolClient.query(sql,configs).then(res => res.rows)
    )
    .then(obs=>{
      // this requests any elastic re-indexing
      elReindexing({tableName, obs}).then(er=>{
        // if re-indexing is required there will be some ids
        if (er.ids) {
          // we can simply reuse the reindex mutation
          // and since the return is the same format as mutation params, they can be passed as is
          // the provoke will catch and report any errors so no need to catch here
          ns.provokeElReindex ({
            params: er
          });
        }
      });
      // no need to wait for the pubsub to be issued as it wont be executed right away anyway
      return obs;
    })
    .catch(err =>
      errors.gqlError(err, 'failed to create item(s) in DB', errors.codes.DB)
    );

The piece of interest here is the elReindexing function. Given a tableName and the result of a mutation its purpose is to figure out if any Elastic indices need rebuilding, and if so which ones and which document ids are affected.

Reindexing

The reindexing function itself is provoked – it doesn’t run as part of the GraphQL API request which needs to be getting its update response back as soon as possible. Instead, it publishes a pubsub message, which in turn wakes up a Google Cloud Run function that does the selected reindexing.

Exactly what needs to be reindexed is driven by a definition file that is related to the ElasticSearch mappings. Here’s a snippet for what happens if a given brand document needs to be updated. From this, I can know that in addition to the brand document index, I also need to reindex the product, company and film indices.

 brand: [{
    type: 'brand',
    idField: 'id',
    gQuery: 'riBrands',
    gPluck: 'brandID',
  } , {
    type: 'product',
    idField: 'ProductToBrands.brandID',
    gPluck: 'BrandToProducts.productID'
  } ,
  {
    type: 'company',
    idField: 'CompanyToBrands.brandID',
    gPluck: 'BrandToCompanies.companyID'
  }, {
    type: 'film',
    idField: 'FilmToBrands.brandID',
    gPluck: 'BrandToFilms.filmID'
  } ],

However, we want to selectively update those indices – only those documents directly related to the original brand update. There are 2 parts.

 

Identify all elastic documents in any index (identified by the type property) that refer to the brandID being updated and remove them. The idField points to the Elastic mapping path at which they can be found

  • Find all data in the database related to the brand just updated. The gQuery property refers to a graphQL query to find them and the gPluck fields describe how to pull out their ids. These documents will be re-indexed

In principle, these 2 lists of ids should be the same, but it’s possible that the elastic documents being deleted might not exactly match the current state of the database. Here’s a snip of the code to do this.

    return Promise.all (queries.map(f=>elclient.getDocIds({index:f.index,body:{...f.query,size:maxSize}})))
      .then(r=>{

        // make sure we got them all - if there are more matches than maxsize, this will fail
        r.forEach(f=>{
          if (f.hits.length !== f.total.value) {
            throw new Error(`....hits ${f.hits.length} dont match total ${f.total.value} - rebuild the whole thing`);
          }
        });
        
        //  we can concatenate  the hits and remove dups too
        const hits = [].concat(...r.map(f=>f.hits))
          .filter((f,i,a)=>a.findIndex(g=>g.id===f.id && g.index === f.index) === i);

        // and turn it all into a single bulk request
        console.log('....bulk deleting ', hits.length, 'items', hits);
        const body = hits.flatMap(doc => [ {"delete":{"_id":doc.id, "_index":doc.index}}]);
        
        // these are the rebuild tasks after deletion
        // an object where each property is an index, that contains the list of ids to rebuild
        // because they were found in the document that needs to be deleted
        const deleteTasks = hits.reduce((p,c)=>{
          if (!p){
            p = {
              index: c.index,
              ids: []
            };
          }
          // turn doc id (type-id-n) into a gql ID - just id
          p.ids.push(c.id.replace(/.*?-(\d+).*/,"$1"));         
          return p;
        },{});
      
        // we also need to search GQL for hits on the targets being deleted, because
        // the EL index might not be exactly in step so the el docs are the source of truth for
        // what gets deleted, and gql is the source for what needs rebuilt
        // these lists are merged and define the docs that need rebuilding

        const mob = targets.find(f=>f.type === type);
        if (!mob || !mob.gQuery) {
          throw `couldnt find gQuery - add its name to the reindex map for type ${type}`;
        }
        // kick that off
        console.log('....starting db check for matches');
        const gqPromise =  getAllChunks  ({
          ...load,
          query: mob.gQuery,
          partial: true,
          maxItems: ids.length + 2,
          filters: [{
            valueKey: gidField,
            value: ids
          }]
        });

        // and also the elastic index deletes
        console.log('....starting deletes');
        const bulkDeletes =  body.length ? elclient.bulk({
          body,
          refresh: true
        }) : Promise.resolve(null);

        // when both are done we can get on with the rebuild
        return Promise.all([
          bulkDeletes.then(()=>deleteTasks),
          gqPromise.then(r=>{
            return targets.map(f=>{
              const d = pluckey(r.data,f.gPluck);
              return d && d.length && d[0] ? {
                index: f.index,
                ids: d
              }: null;
            })
            .filter(f=>f)
            .reduce((p,c)=>{
              if(p) {
                throw new Error('unexpected index dup ' + c.index);
              }
              p = c;
              return p;
            },{});
          })
        ]).then (r=>{
          const [e,g] = r;
          // now we have the results from both the deletions and the gql search
          // combine and dedup
 
          const combined = targets.reduce((p,c)=> {
            const lids = [];

            // add them together
            if (e && e) Array.prototype.push.apply(lids,e.ids);
            if (g && g) Array.prototype.push.apply(lids,g.ids);

            // dedup
            if (lids.length) {
              p.push({
                ids: lids.filter((f,i,a)=>a.indexOf(f) === i),
                index: c.index
              });
            }
            return p;
          },[]);

          // now the rebuilding
          console.log('....starting rebuiding');
          return Promise.all(combined.map(f=>{
            // find the matching task definition
            const mob = targets.find(g=>g.index === f.index);
            if (!mob) {
              throw new Error('couldnt find target for ' + f.index);
            }
            return {
              ...load,
              filters: [{
                valueKey: gidField,
                value: f.ids
              }],
              indexName: mob.index, 
              idField: gidField,
              mappings: mob.mappings,
              settings: mob.settings,
              query: mob.query,
              partial: true,
              maxItems: f.ids.length + 2,
            };
          }).map(f=>{
            return chunkyBuild(f);
          }));

        });
      
    });

I won’t go into the details of the helper functions as they are specific to this project, but in summary

  • getDocIds gets the document ids of elastic documents that match elastic queries built from the mapping reference file. These are combined into a bulk delete.
  • chunkBuild executes graphQL queries to get related items from the database, again built from the same mapping reference file, then to rebuild the affected indices.

Log file

Here’s what happens during a trial local run of a request to update brand id ‘1’. Notice it detected that other documents in other indices needed to be rebuilt too.

....bulk deleting  11 items [
  { id: 'Brand-1-0', index: 'fidsearcherbrand' },
  { id: 'Product-1-0', index: 'fidsearcherproduct' },
  { id: 'Product-274-0', index: 'fidsearcherproduct' },
  { id: 'Company-2435-0', index: 'fidsearchercompany' },
  { id: 'Film-29-0', index: 'fidsearcherfilm' },
  { id: 'Film-238-0', index: 'fidsearcherfilm' },
  { id: 'Film-444-0', index: 'fidsearcherfilm' },
  { id: 'Film-600-0', index: 'fidsearcherfilm' },
  { id: 'Film-621-0', index: 'fidsearcherfilm' },
  { id: 'Film-860-0', index: 'fidsearcherfilm' },
  { id: 'Film-934-0', index: 'fidsearcherfilm' }
]
....starting db check for matches
....starting gql query chunks riBrands maxItems 3
....starting deletes
....finishing gql query riBrands items 1 took 990
....starting rebuiding
....getting fidsearcherbrand offset 0 maxItems 3
....getting fidsearcherproduct offset 0 maxItems 4
....getting fidsearchercompany offset 0 maxItems 3
....getting fidsearcherfilm offset 0 maxItems 9
....graphql done for fidsearchercompany at offset 0 limit 3 took 789 ms
....graphql done for fidsearcherbrand at offset 0 limit 3 took 883 ms
....graphql done for fidsearcherproduct at offset 0 limit 4 took 1062 ms
....graphql done for fidsearcherfilm at offset 0 limit 9 took 1080 ms
....elastic written to fidsearcherbrand 1 took 890 ms roundtrip 1083 ms
Finishing elastic load of  fidsearcherbrand 1 items
....elastic written to fidsearcherproduct 2 took 963 ms roundtrip 1086 ms
Finishing elastic load of  fidsearcherproduct 2 items
....elastic written to fidsearchercompany 1 took 1544 ms roundtrip 1761 ms
Finishing elastic load of  fidsearchercompany 1 items
....elastic written to fidsearcherfilm 7 took 1215 ms roundtrip 1442 ms
Finishing elastic load of  fidsearcherfilm 7 items

 

Back to the mutation

The exact same reference file can be used to identify what needs to happen when a given table is updated.  Since the table names are constructed in a standard way, it’s possible to deduce an affected table and link table name, and thus whether or not any re-indexing is required. For example, if a mutation involves the brandToFilm table a simple search of the mapping defintion file will reveal whether that provokes an update, and the same definition will reveal where to the find the id of the document that needs to be redone. The reindexing task has the responsibility of discovering everything else affected so there’s no need to worry about that in the API. Like this

 /**
   * 
   * @param {string} tableName the table thats been affected
   * @param {[object]} obs the result of the update
   * @return {object} the ids and type for input to elreindex
   */
  const elReindexing = ({tableName, obs}) => {
    const relMap = ns.relMap[tableName];
    // the idea is to review which fields are interesting fr re-indexing
    // by referencing the definition that does the indexing
    // because the indexing processing itself finds everything related
    // all we have to do is find it once

    return new Promise ((resolve, reject) => {
      // if this is mappable we have to find the ids of the type thats changing
      const ids = !relMap ? null :
        // now we have to extract the id from the result of the mutation
        // skip null obs if there are any, and then skip any missing ids
        obs.filter(f=>f).map(f=>{
          const id = f[relMap.idField];
          if (!id) {
            console.log (`warning - missing idfield value ${relMap.idField} in ${JSON.stringify(f)}`);
          } else {
            return id;
          }
        }).filter(f=>f);
        // this instructs which type needs rebuilt and which ids it applies to
        resolve ({
          obs,
          ids: ids && ids.length && ids,
          ...relMap
        });
    });
  };

Provoke reindex

The mutation provokes a pubsub message so that cloud run can get on and do the work.

ns.provokeElReindex = rgs => {
    const {params} = rgs;
    // ids are the ids of the items that need to be rebuilt
    // type is something like 'person'
    let {ids, type} = params;
    if(ids && !Array.isArray(ids)) ids =[ids];

    // this will define which subscription gets the message
    const mode = getFidRunMode();

    // these are the same args that are supplied to elastic/bulkcli and bulkcloud
    const bargs = {
      type
    };
    // no ids builds the whole index
    if(ids && ids.length) bargs.ids = ids;

    // pubish a pubsub request for this to be run (on cloud run)
    return elps.publish({
      ob: {
        mode,
        bargs,
        workType: 'elb'
      },
      type: 'elb',
    })
    .then (r=> ({
      messageID: r.toString(),
      timeStamp: new Date(),
      ids,
      type
    }))
    .catch(err=> {
      errors.gqlError(err, `failed to publish reindex for ${type}`, errors.codes.DB);
    });

  };

This is actually a mutation resolver being reused – the makes it even possible to provoke a rebuild through GraphIQL if you need some repairs or custom rebuilding.

 

Summary

For as long as you keep table names standard (mine almost are, but they needed a little tweaking with minor upper and lower camel case issues), all indices will automatically be kept up to date. The API does a one-off preparation to fix those issues and make a faster lookup for my mapping definition file.

    ns.relMap = Object.keys(reindexMap).reduce((p,c)=> {
      const bob = reindexMap;
      bob.forEach (f=>{
        // we can get the tablename from the gpluck field initially
        const {gPluck, type} = f;
        const plucked = gPluck.split('.');
        // default - would apply if this is a main table
        const rob = {
          type,
          tableName: c,
          idField: gidField
        };
        // this would apply if its a link table
        if (plucked.length === 2) {
          // link tables are for example filmToCompany
          // so the current main type + To + the current subtype
          rob.tableName = `${c}To${type.slice(0,1).toUpperCase()}${type.slice(1)}`;
          rob.idField = plucked[1];
        } else if(plucked.length !== 1) {
          // can only handle 1 deep for now - TODO if reindexmap becaomes more complicated, this will need improved
          throw new Error('can only expect 1 deep on reindexmap plucked idfield');
        }
        // index this by table name to find it quickly 
        if (!p[rob.tableName]) {
          p[rob.tableName] = rob;
        }
      });
      return p;
    } ,{});

ElasticSearch, Google Cloud Run, Pubsub and GraphQL make a great team!

You can find an Apps Script version of this here.
Since G+ is closed, you can now star and follow post announcements and discussions on github, here