Getting an FTP server running on Kubernetes is a little tricky. The FTP service uses multiple ports in its negotiation and you need to make sure that the conversation always connects to the same Kubernetes pod. FTP is not a great choice nowadays, SFTP is better, but in this case, I needed to build a server that would handle uploads from devices that were not that configurable, so it had to be plain FTP to get going. Much better though to secure it with tls if possible.

In addition, I wanted to write more or less directly to cloud storage (for backup) and use pubsub to initiate a workflow based on the uploaded data.

FTP server on Node

I’m not using a pre-baked server because of all the reasons I mentioned earlier, so instead, I chose this https://github.com/trs/ftp-srv  mainly because

  • You can create a custom file system, which allows me to create one based on cloud storage
  • FTP is not secure, so I wanted to lock it down as much as possible – limiting it to only being able to process small uploads, and nothing else.
  • ftp-srv supports tls, although I’m not yet using it in this proof of concept

The code for this entire project is at https://github.com/brucemcpherson/sensor

Kubernetes cluster

For testing, I’m using a preemptible cluster to keep the costs down. I always find it easier to create Kubernetes stuff through gcloud and kubectl rather than using the UI in the cloud console. Like that you can easily repeat it.  So first, let’s get a small cluster up and running in your cloud project

gcloud beta container --project "yourproject" clusters create "sensor" \
--zone "europe-west4-b" \
--username "admin"  \
--machine-type "g1-small" \
--image-type "COS" \
--disk-type "pd-standard" \
--disk-size "100"  \
--preemptible \
--num-nodes "2" \
--enable-autoscaling \
--min-nodes "2" \
--max-nodes "4" \
--enable-autoupgrade \
--enable-autorepair

Set up credentials and check cluster looks ok

gcloud container clusters get-credentials sensor --zone europe-west4-b
kubectl config get-contexts 
kubectl get nodes

Assign yourself as admin

kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin --user=bruce@mcpher.com

Staging, dev, production

I generally have 3 different environments for testing locally, testing in docker and Kubernetes, and a final version. These will (potentially) each have different parameters and settings, so the first task is to create a settings file (secrets.js) which will be shared between all apps in this project and which partitions the parameters into local, development and production. These will be labelled ‘tl’,’td’ and ‘tp’. It looks something like this

const settings = {
  tp,
  td,
  tl
}

module.exports = {
  settings
}

and each of tp,td and tl contain stuff like this. I’ll get into the details of these settings when they are needed. I always find it’s best to start with this kind of structure to avoid refactoring later and making it easy to add (or remove) different environments.

const td =   {
  pubsub: {
    use: true,
    includeData: true,
    forcePush: false,
    dataArrived: 'xxxxxx-td',
    dataReady: 'xxxxxxxx-td',
    pusher: false,
    idleTime: 60000,
    gcp: {
      info: {
        serviceAccountFile: './private/sensorgcpupload.json'
      },
      get creds () {
        return {
          credentials: require(this.info.serviceAccountFile)
        }
      }
    }
  },
  ftp: {
    tmp: '/tmp/',
    users: require('./private/users.json').td,
    logging: {name: 'td', level: 'warn'},
    gcp: {
      use: true,
      info: {
        serviceAccountFile: './private/sensorgcpupload.json',
        bucketName: 'xxxxxx',
        folderName: 'ftp-dev'
      },
      get creds () {
        return {
          credentials: require(this.info.serviceAccountFile),
          bucketName: this.info.bucketName,
          folderName: this.info.folderName
        }
      }
    },
    instance: {  
      anonymous: false, 
      url: 'ftp://127.0.0.1:170021',
      pasv_url: 'ftp://127.0.0.1',
      pasv_min: 18101,
      pasv_max: 18104,
      greeting: 'dev runmode',
      whitelist
    }
  }
}

Whitelisting

ftp-srv gives the ability to whitelist only certain FTP directives. Since I only want to upload files, and nothing more, this is the minimum I can get away with.

const whitelist = [
  'STOR',
  'PASS',
  'PASV',
  'USER',
  'TYPE',
  'QUIT',
  'ABOR',
  'FEAT'
]

Yarn/npm

Because multiple apps will make up this project, I have a top level package.json for things that are used in every app, then a specific for each app.

common package.json

{
  "name": "sensor",
  "version": "1.0.0",
  "main": "index.js",
  "license": "MIT",
  "dependencies": {
    "@google-cloud/pubsub": "^1.7.2",
    "@google-cloud/storage": "^4.7.0",
    "pluck-deep": "^3.0.0"
  }
}

ftp app package.json

{
  "name": "ftp",
  "version": "1.0.0",
  "main": "index.js",
  "license": "MIT",
  "dependencies": {
    "bunyan": "^1.8.12",
    "ftp-srv": "^4.3.1",
    "mime-types": "^2.1.26",
    "through": "^2.3.8"
  }
}

 

Ftp app

I won’t replicate all the code here as it’s on GitHub, but just highlight and explain a few pieces

const { server } = require('./src/server')

// -- THESE ARE shared across all apps in the project
const { libPubSub } = require ('../common/libpubsub')
const { settings:sa } = require ('../common/secrets')

// -- ENVIRONMENT overrides default settings, which are determined by the run mode
// -- LIKE this we can build for each environment with the same source code
const {env} = process
const runmode = env.RUNMODE || 'tl'
const defaultSettings = sa[runmode]
const defaultFtp = defaultSettings.ftp
const defaultInstance = defaultFtp.instance
const settings = {
  runmode,
  version:'1.0.0',
  ...defaultSettings,
  ftp: {
    ...defaultFtp,
    instance: {
      ...defaultInstance,
      url: env.URL || defaultInstance.url,
      pasv_url: env.PASVURL || defaultInstance.pasv_url,
      pasv_min: env.PASVMIN || defaultInstance.pasv_min,
      pasv_max: env.PASVMAX || defaultInstance.pasv_max
    }
  }
}
const instance = settings.ftp.instance

console.log(`${new Date().getTime()}:starting ftp server ${settings.runmode}-${settings.version} on ${instance.url}`)
console.log(`${new Date().getTime()}:passive ports ${instance.pasv_min}-${instance.pasv_max} on ${instance.pasv_url}`)
// -- THIS kicks off the pubsub environment
libPubSub.init ({settings})

// intialize server to run in spectified runmode
server.init ({settings})

// kick it off
server.instance.listen()

Event handling

The only client/server events that need handling are login, and client-error.  Login checks the user/password combination and then goes on to upload the file. When the upload is done, I also need to handle sending a pubsub message (the pubsub code is on GitHub), consisting of some control information and the contents of the file just uploaded.

const { handle } = require('./handle')
const {FtpSrv} = require('ftp-srv')
const bunyan = require('bunyan')
const { libPubSub } = require ('../../common/libpubsub')

const server = {
  // the server instance
  instance: null,

  // call this to kick everything off
  init ({settings}) {
    this.settings = settings
    const options = this.settings.ftp
    
    const instance = new FtpSrv({ 
      ...options.instance,
      log: options.logging ? bunyan.createLogger(options.logging) : null
    })
    this.instance = instance
    console.log(`listening for ftp on ${options.instance.url} (active) ${options.instance.pasv_url} (passive)`)
    
    // cusotm event handlers 
    this.handlers({
      onStorageError ({connection, error, fileName, settings}) {
        // handle a failure uploading
        console.error('failed on upload ', error, fileName)
      
      },
      onStorageFinished ({connection, storageName, settings, fileName, data}) {
        // if we're pubsubbing, now is the time
        const now = new Date().getTime()
        if (settings.pubsub.use) {
          /// the data is a stream buffer, so we'll encode to base64
          libPubSub.publish({
            ob: {
              sentAt: now,
              fileName,
              storageName,
              data: data.toString('base64')
            },
            name: 'dataArrived'
          })
        }
        console.log(`${now}:upload completion detected ${fileName} uploaded to ${storageName}`)

      }
    })
      
  },
  // setup event handlers
  handlers (customHandlers) {
    this.instance.on('login',(data, resolve, reject) =>
      handle.login({settings: this.settings, data, resolve, reject,...customHandlers}))

    this.instance.on('client-error', (connection, context, error) =>
      handle.clientError({settings: this.settings, connection, context, error})
    )
  }
}

module.exports = {
  server
}

The custom file system

ftp-srv allows the creation of a custom file system. The only method that needs to be overridden is write – which is called to handle streaming of data as it arrives since all other operations are not whitelisted. Rather than writing it to a file, which is the normal action, I need to stream it to cloud storage. (As it turned out, ftp-srv had some problem that made its stream incompatible with cloud storage streaming, so in practice, the file is temporarily written to the container’s local storage, then that file is streamed to cloud storage and finally deleted)

const {FileSystem} = require('ftp-srv');
const fs = require('fs');
const mime = require('mime-types');
const Storage = require('@google-cloud/storage').Storage;
const through = require('through')
const path = require('path')

const {createWriteStream} = require('fs');

class GcpFileSystem extends FileSystem {
  
  constructor() {
    super(...arguments);
    const [connection, options] = [...arguments] 
    this.settings = options.settings
    this.connection = connection

    // these wil get fired when storage is uploaded to 
    this.onStorageError = options.onStorageError
    this.onStorageFinished = options.onStorageFinished
  }

  /**
   * get an authorized storage object to stream to
   */
  getStorage () {
    const {creds } = this.settings.ftp.gcp;
    const { credentials, bucketName, folderName } = creds;
    const { projectId } = credentials;
    return {
      storage: new Storage({
        projectId,
        credentials,
      }),
      bucketName,
      folderName
    };
  };

  /* create the cloud storage stream
  * the credentials/bucket name and filename are in the secrets file
  * @param {string} name the filename to stream from
  */
  storageWriteStream  ({name}) {
    
    const mimeType = mime.lookup(name);
    const fileTarget = this.getFileTarget({name});
    const options = {
      contentType: mimeType
    };

    return {
      stream: fileTarget.file.createWriteStream(options),
      name: fileTarget.file.name,
      bucket: fileTarget.file.bucket.name
    }
  };

/**
 * create the cloud storage file target
 * the credentials/bucket name and filename are in the secrets file
 * @param {string} name the filename to stream from
 */
  getFileTarget  ({name}) {
    // get a storage obect
    let {storage, bucketName, folderName} = this.getStorage();
    // handle gs: type names too
    if (name.slice(0, 5) === 'gs://') {
      bucketName = name.replace(/gs:\/\/([^\/]+).*/, '$1');
      name = name.replace('gs://' + bucketName + '/', '');
    }
    name = folderName + '/' + name;
    const bucket = storage.bucket(bucketName);
    // we'll actually be streaming to/from  this file
    return {
      file: bucket.file(name)
    }

  };


  write(fileName, {append = false, start = undefined} = {}) {

    // for now we'll temporarily write it to a unique path
    // then load to storage from there
    // then delete the file
    // because of this https://github.com/trs/ftp-srv/issues/199

    const {settings, onStorageError, onStorageFinished, connection} = this
    const {pubsub, ftp} = settings
    // get a unique name for the file and add the extension
    const fsPath = ftp.tmp + this.getUniqueName() + path.extname(fileName)

    // and a temp stream
    const stream = createWriteStream(fsPath, {flags: !append ? 'w+' : 'a+', start});

    const removeTemp = (f) => {
      // delete the temp file
      fs.unlink(f, err=> {
        if(err) {
          console.log('failred to remove temp file',f)
        } else {
          console.log('temp file', f, 'removed')
        }
      })
    }
    const storageError = (error) => {
      // delete the temp file
      removeTemp(fsPath)
      console.error('stream failure', error)
      if(onStorageError) {
        onStorageError({connection, error, fileName, settings})
      } 
    }
    
    // if this fails then it wasnt able to write temp file
    stream.once('error', error=> storageError(error))
    
    // temp file was written, now write to storage
    stream.once('close', () => {
      // temp file is created, now stream it to storage
      const ws = this.storageWriteStream ({name:fileName})
      const storageStream = ws.stream
      const readStream = fs.createReadStream(fsPath);
      const storageName = `${ws.bucket}/${ws.name}`
      const before = new Date().getTime()
      console.log(`${before}:started writing ${fileName} to gcp at ${storageName}`)

      // copy file to storage through a pipe so we can catch the data for publishing
      // but only do this if the files are expected to be small
      const streamedData = []
      readStream.pipe(through(
        function write (data) {
          // this is where we'll catch the data
          if (pubsub.includeData) {
            streamedData.push(data)
          }
          this.queue(data) 
        },
        function end (data) {
          this.queue(null) 
        }
      )).pipe(storageStream)


      // This catches any errors that happen while creating the readable stream (usually invalid names)
      readStream.once('error', error => storageError(error))
      storageStream.once('error', error => storageError(error))
      
      // when done, we can resolve the stream
      storageStream.on('finish', function() {
        const now = new Date().getTime()
        console.log(`${now}:finished writing ${fileName} to gcp at ${storageName} (${now-before}ms)`)
        // signal all over
        stream.end()
        // call the thing to do when its over
        if(onStorageFinished) {
          onStorageFinished({connection, storageName, settings, fileName, data: Buffer.concat(streamedData)})
        } 
        // delete the temp file
        removeTemp(fsPath)

      });

    })
    return {
      stream
    };
  }
}
module.exports = {
  GcpFileSystem
}

Deployment

kubectl apply -f deploy.yaml

This is set up to use ports 18101 – 18104 for passive FTP. The number of ports in use will define how many simultaneous uploads can happen. The range is passed to the FTP app via an environment variable. Note the selector app=ftptd. This will be used by the service that exposes the pods of this deployment externally as a service target.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ftptd
  namespace: default

spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: ftptd
  template:
    metadata:
      labels:
        app: ftptd
    spec:
      containers:
      - image: gcr.io/fid-sql/ftp:td
        imagePullPolicy: Always
        name: ftptd
        resources: {}
        ports:
        - containerPort: 18021
          protocol: TCP
        - containerPort: 18022
          protocol: TCP
        - containerPort: 18101
          protocol: TCP
        - containerPort: 18102
          protocol: TCP
        - containerPort: 18103
          protocol: TCP
        - containerPort: 18104
          protocol: TCP         
        env:
        - name: URL
          value: "ftp://0.0.0.0:18021"
        - name: PASVURL
          value: "ftp://to.be.assigned.later-just use 0.0.0.0 for now"
        - name: PASVMIN
          value: "18101"
        - name: PASVMAX
          value: "18104"
        - name: RUNMODE
          value: "td"

The PASVURL environment variable defines the externally facing IP address that the load balancer service, to be created next, will assign. We don’t know it at this point, but once assigned it can just be patched in here and the .yaml file reapplied.

The external load balancer

kubectl apply -f svc.yaml

The deployment pods need a service to expose them externally. For simplicity, I’m using the same ports here as the pods, but if required these could be forwarded to different target ports.

apiVersion: v1
kind: Service
metadata:
  name: ftptd
  namespace: default
spec:
  externalTrafficPolicy: Local
  ports:
  - name: port-1
    port: 18021
    protocol: TCP
    targetPort: 18021
  - name: port-2
    port: 18022
    protocol: TCP
    targetPort: 18022
  - name: port-3
    port: 18101
    protocol: TCP
    targetPort: 18101
  - name: port-4
    port: 18102
    protocol: TCP
    targetPort: 18102
  - name: port-5
    port: 18103
    protocol: TCP
    targetPort: 18103
  - name: port-6
    port: 18104
    protocol: TCP
    targetPort: 18104
  selector:
    app: ftptd
  sessionAffinity: ClientIP
  sessionAffinityConfig:
    clientIP:
      timeoutSeconds: 10800
  type: LoadBalancer

Because passive ftp is not stateless, it’s important that clients always connect to the same pod instance. Using sessionAffinity: ClientIP means that the same client will connect to the same pod with second and subsequent requests. The client also checks that the IP matches the one it supplied, so we need to set externalTrafficPolicy: Local to avoid the ip address being translated from an external to an internal one in Passive mode.

After a little while, kubectl get service will show an actual external ip address for the service (it will show <pending> for while). Copy this external address into the deploy.yaml

- name: PASVURL 
  value: "ftp://the external ip address"

and redo

kubectl apply -f deploy.yaml

Building the app

I prefer to build locally rather than using cloud build, as it makes it easier to test the docker image locally before trying it on the cluster.

This build script takes 2 arguments – the name of the app plus the run mode, builds the image, tags it (note that the images are tagged with the run mode to allow different images to be used in development versus production on the same cluster), pushes it to the cloud container service, then deletes the matching pods allowing them to be recreated with the updated image. You could further enhance the tagging with version number if necessary – but the tag should match the tagged image name in the deployment YAML.

sh build.sh ftp td

D=$1
R=$2
echo "building $D and deploying $D$R"
echo "set the context to the project"
#need to run this from top of the project 
cd ~/sensor
NODE_ENV=production
P="yourproject"
Z="europe-west4-b"
C="sensor"
echo "starting build $D"
docker build -f containers/$D.dockerfile . --tag gcr.io/$P/$D:$R
docker push gcr.io/$P/$D:$R
echo "delete pods"
kubectl get pods | grep -Po  "^$D$R-[\w]+-[\w]+\s" | while read line; do kubectl delete pod $line ; done
kubectl get pods
kubectl get pods | grep -Po  "^$D$R-[\w]+-[\w]+\s" | while read line; do kubectl logs $line ; done

The docker file

Since the environment variables are set up in the YAML files, this can be very minimal.

FROM nikolaik/python-nodejs
LABEL maintainer="Bruce Mcpherson <bruce@mcpher.com>"
# build docker image for ftp dev mode
WORKDIR /usr/src/td/ftp
COPY package.json .
COPY ./common ./common
COPY ./ftp ./ftp
RUN yarn  --production=true
RUN cd ftp && yarn  --production=true
CMD [ "node", "ftp/index.js" ]

running locally

If you want to test the docker image locally (any simple FTP client should do – I’m using ncftp), this script should do it

docker run --env-file env.list -p 18021-18022:18021-18022 -p 18101-18104:18101-18104  gcr.io/fid-sql/ftp:td

with an env.list file of

URL=ftp://0.0.0.0:18021
PASVURL=ftp://0.0.0.0
PASVMIN=18101
PASVMAX=18104
RUNMODE=td

Alternatively, you can run it completely locally with node index

Private files

I haven’t published these, but you need 2 files in common/private

  • A service account file with the capability to write to storage and pubsub, exactly as downloaded from the cloud console. You can specify these through env variables, but I prefer to do it this way
  • A user file with username passwords that looks like this. There are much better ways to handle passwords, but this will do to get started with
{
  "tp":[{
    "name": "johndoe",
    "password": "some password",
    "role": "citizen"
  }],

  "td":[{
    "name": "bruce",
    "password": "another password",
    "role": "boss"
    }]  
}

These are referenced in the gcp and user properties in the settings file.

Pubsub

In this article, I won’t go into consuming the messages sent, but we need a topic and a subscription, which will probably be different between run modes. These are referenced in the settings file with the abstracted names ‘dataArrived’ and ‘dataReady’. Ftp publishes to the dataArrived topic, and any consumers will subscribe to ‘dataReady’.

Setting up topics and subscriptions through the UI can be error prone and laborious, so here’s a script to bulk delete and create the subscriptions needed for this project.

RO="roles/editor"

# service account email
SA="serviceAccount:xxxx@xxxx.iam.gserviceaccount.com"

# project 
P='your project'

# ack deadline
AD=420

# topics
TPA='zzzz-tp'
TDA='zzzz-td'

# subscriptions
TPR='xxxx-tp'
TDR='xxxx-td'

# message duration
MD="60m"
TMD="10m"

# make sure we're on the right project
gcloud config set project $P

# tidy up and remake the things
gcloud pubsub subscriptions delete projects/$P/subscriptions/$TPR
gcloud pubsub subscriptions delete projects/$P/subscriptions/$TDR
gcloud pubsub topics delete projects/$P/topics/$TPA
gcloud pubsub topics delete projects/$P/topics/$TDA

gcloud pubsub topics create $TPA
gcloud pubsub topics create $TDA

gcloud pubsub topics add-iam-policy-binding $TPA --member="$SA" --role=$RO
gcloud pubsub topics add-iam-policy-binding $TDA --member="$SA" --role=$RO

gcloud pubsub subscriptions create $TPR --topic=projects/$P/topics/$TPA --ack-deadline=$AD --expiration-period=never --message-retention-duration=$MD
gcloud pubsub subscriptions add-iam-policy-binding $TPR --member="$SA" --role="$RO"

gcloud pubsub subscriptions create $TDR --topic=projects/$P/topics/$TDA --ack-deadline=$AD --expiration-period=never --message-retention-duration=$MD
gcloud pubsub subscriptions add-iam-policy-binding $TDR --member="$SA" --role="$RO"

Summary

It’s not great to have to deal with FTP nowadays, but there we have it – FTP on Kubernetes.

Source code is here https://github.com/brucemcpherson/sensor