Take a moment to read these if this is a new topic for you.
Code is on github
The Orchestration of the stages is the trickiest to get your head around, but most of the work is done by methods available from the Server namespace.
We’ll use the usual dashboard to refer back to. This is kind of a long read, and the details of my specific application are unimportant, but it does demonstrate each of the scenarios you are likely to encounter.

Getting started.

Each Orchestration session begins with an .init() which is kicked off from client side. An Orchestration namespace must include an init method which creates a work package. Here’s mine. The Job namespace returns a Work package tailored for the particular Fusion table I want to process.
It also must return a call to Server.initializeWork and pass the work package you need it to deal with. For the contents of this work package see Creating a work package
When this is complete the dashboard will update this stage.

Note that your app will look nothing like mine from now on, but all the principles will be covered in each of the following stage descriptions.

Counting stage

The next stage in my App is to go to the Fusion table and see how many rows it contains. Each Orchestrate function receives a standard argument, which looks like this

where
  • stages – all the info from the work package, plus calculated thread and chunk information
  • stageIndex – the index in the work package
  • chunkIndex – the chunk or thread index. If there is no parallelism (maxThreads = 1) , then this will always be 0.
Any arguments for this method will be in stages[stageIndex].instances.arg

Every Orchestration method must end with a call to Server.registerChunk with these arguments
  • stage – the stage being processed (stages[stageIndex])
  • chunk – the chunk being processed (stages[stageIndex].chunks[chunkIndex])
  • result – an array of results from this chunk
  • optionally a resultCount value – in most cases this can be omitted and the length of the result array will be used. However in this case, there was only one result – a count of the number of rows in a table. I want to signal the next stage that it will have that count number of items to process. This will be used to calculate the optimum number of parallel threads to use to retrieve the data.

Get data stage

Next stage is to get the data from the database. The count came from the previous stage, and GasThreader has calculated the 24 would be a good number of threads to run in parallel. This means that the count from the previous stage would have been used to distribute the work amongst each of the 24 threads. I have all my app specific code in the FusionToDb namespace so its not important to look at that. What is important is that it’s being told to retrieve a certain number of rows, starting at a particular position.
Each of the 24 threads will receive a different value for
chunk.chunkSize
chunk.start
so they can all do their work simultaneously. When complete the dashboard looks like this.

Do not modify stage.instances.arg – as it is part of the stage object which will be carried forward to the next stages. If you do need to fiddle with arg – as I do in this example for demonstration – clone it first. You can use Utils.clone().

As usual a call to registerChunk will record the results of this thread. You can see from the work package below that the reduce stage will be skipped for this stage. This is because 1m rows will use too much memory to deal with all at once, so the next stage will deal with the result data in chunks.

Define fields stage

This stage is going to look at all the data to find out the maximum length of any data in them and deduce the data type to best use when creating the SQL table definition. This is fine to do many threads in parallel too, as I can combine the findings of each thread in a future merge stage.
of interest here is
  • how to get the previous stage
 const prev = pack.stages[pack.stageIndex -1];
  • how to get data from a previous stage – when the reduce operation has been skipped. It may be that the number of threads in this stage is different than the stage from which we’re getting the data, but Server.getSlice takes care of that by scanning across several previous chunks if required.
arg.data = Server.getSlice (prev ,chunk.start , chunk.chunkSize + chunk.start);
As usual it ends with a call to Server.registerChunk. Here’s the dashboard now.

Merge fields stage

Now I have 24 different interpretations of the field definitions to consolidate down to 1, the details of which are below but unimportant.
of interest here is
  • how to get the reduced data (all the chunks combined) from the previous stage

const prev = pack.stages[pack.stageIndex -1];
const reduced = Server.getReduced (prev);

As usual it ends with a call to Server.registerChunk. Here’s the dashboard now.

Define table stage

I have enough now to create the first final deliverable – an SQL definition of the table based on the fields definitions derived from scanning the data contents.
of interest here is
  • how to find a previous stage by name

const dataStage = Server.findStage (“getData” , stages);

As usual it ends with a call to Server.registerChunk, but notice that the resultCount is based on the results from a stage a few back – the getData stage. This can be signalled by finding the getData stage as above, and passing its resultLength to ServerChunk as the last argument.

SqlInserts stage

Next I need to go back to the actual data and create SQL insert scripts to insert this data into an SQL database in optimized chunks. Note that the data input for this stage is not from the previous stage, but from one a few steps ago.
of interest here is
  • referencing the datacount property to find the input stage. The work package for this stage has dataCount set to getData.
  • using the results of more than one previous stage. This stage needs both the data stage (which was not reduced) and the mergeFields stage (which was).
This ends with Server.registerChunk and the 1m records reduced to 96 SQL insert statements.

Make SQL scripts

Next the SQL insert statements need to be combined into sql scripts optimized for the target database, and write these to Drive.
There’s nothing new here, and the stage ends with a Server.registerChunk, and just 7 script files created.

Zip script files

Finally these scripts need to be combined into a single zip file, ready to be downloaded to their target server.

This is the only tweaking you need to do to the Client side code (in red below), and I may improve this in future versions.

Previous

Why not join our forum, follow the blog or follow me on Twitter to ensure you get updates when they are available.