How to make a user-friendly and transactional csv import in Loopback

When you build an app where users need to regularly import data from an external source, you are bound to import csv files to populate your database.
Often referred as ETL (Extract, Transform and Load), these functionalities are tricky to implement and often hard to understand for the end user. This article explains how to tackle the main challenges of csv import and provides a fully functional repo on github to get started quickly on a project!

The first challenge is to make a user friendly process. If a user uploads a file and the import fails, he needs to be able to know why and how to fix his file in order to try to import it again.

The second challenge is to keep the database in a consistent state: if the user imports a file that creates an error at the line 50, the first 49 lines should not be written in the database. You don’t want to write anything in the database if an error has risen in the process. A solution is to use SQL transactions to control when your changes to the database are eventually applied.

Lastly, in order to import large datasets without impacting the user experience, you need to separate the upload process (where the user is waiting for a few seconds for the file to be uploaded) from the import process which happens in the backend, maybe for a few minutes.


In this article, we will explain how to build your own transactional csv import using the node framework loopback and a relational database.
The process allows the user to know which cells of his excel file have failed and rollbacks if an error raises. We will demonstrate the process with the database PostgreSQL, but transactions can used with different connectors in Loopback.

I will also assume that you know the basics of the Loopback framework and coffeescript syntax. Let’s say we want to import invoices in our system and we already have an Invoice loopback model with two properties invoiceId and amount.

Start by creating an upload remote method in the Invoice model that will be called by your client app with a POST request.

common/models/Invoice.json

Invoice.upload = (req, callback) ->
  # wait for it
  callback()

Invoice.remoteMethod 'upload',
  accepts: [
    arg: 'req'
    type: 'object'
    http:
      source: 'req'
  ]
  http:
    verb: 'post'
    path: '/upload'

Add the following modules in your Invoice model:

_        = require 'lodash'
async    = require 'async'
csv      = require 'fast-csv'
fork     = require('child_process').fork
fs       = require 'fs'
path     = require 'path'
loopback = require 'loopback'

In order to separate the file upload and the data processing, we are going to store the file in the filesystem and save in the database the state of the upload (a PENDING status). As soon as the upload is done, we send a http answer to the client so that he can continue using the app while the data is processed.

Then we start a new node process using the module fork. It will call a custom import method described below.
Using the library fast-csv, we parse the csv file and begin a sql transaction.
We can now proceed to the import and commit the transaction if no error is raised. Otherwise the transaction is canceled and the import remains in the initial state, it is a all or nothing import process. Eventually we delete the file from the filesystem.

Before starting to code the upload method, we need to create a few more models.
Create a FileUpload and FileUploadError models that will be used to store the state of the imports (PENDING, SUCCESS, ERROR) and the error list.
A FileUpload has many FileUploadError, so let’s use the loopback hasManyrelation.

WARNING: If you use PostgreSQL update the poolIdleTimeout property of your database.
Because we do not commit the changes to the database before the end of the process, PostgreSQL sees the connection as idle and raises a timeout error. Set the poolIdleTimeout to be above the maximum time a import should take.

server/datasources.json

{
  "db": {
    ... // Your config,
    "poolIdleTimeout": 1200000
  },
  "container": {
    "name": "container",
    "connector": "loopback-component-storage",
    "provider": "filesystem",
    "root": "tmp"
  }
}

Create a tmp folder at the root of your projet that will be used to store the uploaded files.
Now we can start coding! Remember the import method I mentionned? Let’s implement it!
Start by installing the following dependencies: fast-csv, lodash,
async, loopback-component-storage

npm install fast-csv lodash async loopback-component-storage --save

The upload method initializes the import process:

  Invoice.upload = (req, callback) ->
    Container = Invoice.app.models.Container
    FileUpload = Invoice.app.models.FileUpload

    # Generate a unique name to the container
    containerName = "invoice-#{Math.round(Date.now())}-#{Math.round(Math.random() * 1000)}"

    # async.waterfall is like a waterfall of functions applied one after the other
    async.waterfall [
      (done) ->
        # Create the container (the directory where the file will be stored)
        Container.createContainer name: containerName, done
      (container, done) ->
        req.params.container = containerName
        # Upload one or more files into the specified container. The request body must use multipart/form-data which the file input type for HTML uses.
        Container.upload req, {}, done
      (fileContainer, done) ->

        # Store the state of the import process in the database
        FileUpload.create
          date: new Date()
          fileType: Invoice.modelName
          status: 'PENDING'
        , (err, fileUpload) ->
          return done err, fileContainer, fileUpload
    ], (err, fileContainer, fileUpload) ->
      return callback err if err
      params =
        fileUpload: fileUpload.id
        root: Invoice.app.datasources.container.settings.root
        container: fileContainer.files.file[0].container
        file: fileContainer.files.file[0].name

      # Launch a fork node process that will handle the import
      fork __dirname + '/../../server/scripts/import-invoices.coffee', [
        JSON.stringify params
      ]
      callback null, fileContainer

Create a scripts folder in server and add an import-invoices.coffee file. This script is used to lauch a forked node process calling an import method of the Invoice model. It exits to make sure that the node process is killed when an import is over.

Content of the import-invoices.coffee file:

server = require '../server.coffee'
options = JSON.parse process.argv[2]

# Make sure that the node process is killed when the import process is over.
try
  server.models.Invoice.import options.container, options.file, options, (err) ->
    process.exit if err then 1 else 0
catch err
  process.exit if err then 1 else 0

Let’s dive into the import method. It first calls a import_preprocess method that initializes the SQL transaction.
Then it uses the method import_process and commits or rollbacks if there was an error.

import_postprocess_success and import_postprocess_error save the FileUpload status depending of the status of the import process.

import_clean destroys the uploaded file.

  Invoice.import = (container, file, options, callback) ->
    # Initialize a context object that will hold the transaction
    ctx = {}

    # The import_preprocess is used to initialize the sql transaction
    Invoice.import_preprocess ctx, container, file, options, (err) ->
      Invoice.import_process ctx, container, file, options, (importError) ->
        if importError
          # rollback does not apply the transaction
          async.waterfall [
            (done) ->
              ctx.transaction.rollback done
            (done) ->
              # Do some other stuff to clean and acknowledge the end of the import
              Invoice.import_postprocess_error ctx, container, file, options, done
            (done) ->
              Invoice.import_clean ctx, container, file, options, done
          ], ->
            return callback importError

        else
          async.waterfall [
            (done) ->
              # The commit applies the changes to the database
              ctx.transaction.commit done
            (done) ->
               # Do some other stuff to clean and acknowledge the end of the import
              Invoice.import_postprocess_success ctx, container, file, options, done
            (done) ->
              Invoice.import_clean ctx, container, file, options, done
          ], ->
            return callback null


  Invoice.import_preprocess = (ctx, container, file, options, callback) ->

    # initialize the SQL transaction
    Invoice.beginTransaction
      isolationLevel: Invoice.Transaction.READ_UNCOMMITTED
    , (err, transaction) ->
      ctx.transaction = transaction
      return callback err

In the import_process method, we iterate over each line of the csv file and apply the import_handleLine method that holds the business logic. This is were you will define what to do with your data.

  Invoice.import_process = (ctx, container, file, options, callback) ->
    fileContent = []
    filename = path.join Invoice.app.datasources.container.settings.root, container, file

    # Here we fix the delimiter of the csv file to semicolon. You can change it or make it a parameter of the import.
    stream = csv
      delimiter: ';'
      headers: true
    stream.on 'data', (data) ->
      fileContent.push data
    stream.on 'end', ->
      errors = []

      # Iterate over every line of the file
      async.mapSeries [0..fileContent.length], (i, done) ->
        return done() if not fileContent[i]?

        #  Import the individual line
        Invoice.import_handleLine ctx, fileContent[i], options, (err) ->
          if err
            errors.push err
            # If an error is raised on a particular line, store it with the FileUploadError model
            # i + 2 is the real excel user-friendly index of the line
            Invoice.app.models.FileUploadError.create
              line: i + 2
              message: err.message
              fileUploadId: options.fileUpload
            , done null
          else
            done()
      , ->
        return callback errors if errors.length > 0
        return callback()
    fs.createReadStream(filename).pipe stream

Using the next two methods, I save the status of the import in the database. You can use those two methods to add more business logic, for example send a confirmation email.

  Invoice.import_postprocess_success = (ctx, container, file, options, callback) ->
    Invoice.app.models.FileUpload.findById options.fileUpload, (err, fileUpload) ->
      return callback err if err
      fileUpload.status = 'SUCCESS'
      fileUpload.save callback

  Invoice.import_postprocess_error = (ctx, container, file, options, callback) ->
    Invoice.app.models.FileUpload.findById options.fileUpload, (err, fileUpload) ->
      return callback err if err
      fileUpload.status = 'ERROR'
      fileUpload.save callback

When the process is over, there is no need to keep the file, so let’s destroy the container to delete the file:

  Invoice.import_clean = (ctx, container, file, options, callback) ->
    Invoice.app.models.Container.destroyContainer container, callback

import_handleLine holds the business logic:

  • Checking the validity of the data in each cell
  • Creating or updating data on any model
  LineHandler =
    # Method to creadte/update the invoice from the data of the line
    createInvoice: (req, line, done) ->
      Invoice.findOne
        where:
          invoiceId: line.InvoiceId
      , req, (error, found) ->
        return done error if error

        invoice =
          invoiceId: line.InvoiceId
          amount: line.Amount
        invoice.id = invoice.id if found

        Invoice.upsert invoice, req, (error, invoice) ->
          if error
            done error, line.InvoiceId
          else
            done null, invoice

    rejectLine: (columnName, cellData, customErrorMessage, callback) ->
      err = new Error "Unprocessable entity in column #{columnName} where data = #{cellData}: #{customErrorMessage}"
      err.status = 422
      callback err

    # Do all the necessary checks to avoid SQL errors and check data integrity
    validate: (line, callback) ->
      if line.InvoiceId is ''
        return @rejectLine 'InvoiceId', line.InvoiceId, 'Missing InvoiceId', callback
      if _.isNaN parseInt line.InvoiceId
        return @rejectLine 'InvoiceId', line.InvoiceId, 'InvoiceId in not a number', callback
      if line.Amount is ''
        return @rejectLine 'Amount', line.Amount, 'Missing Amount', callback
      if _.isNaN parseInt line.Amount
        return @rejectLine 'Amount', line.Amount, 'Amount in not a number', callback
      callback()

Conclusion

You are now able to build your own csv import!
In your client app, you can add an html input field with a file type. To display the status of the upload, you can poll every few seconds the FileUpload model. Check this cool article on how to make the user wait during a load!
If the import status is ERROR, you can get the error list using the FileUploadError model routes and make a nice UI.
A next step could be to add hints on how to fix the errors in the csv file in the FileUploadError model using the rejectLine method. We did it on one model but could extend it to multiple models by creating a mixin!

A fully functionnal example of this example is available on github. Take a look at other cool resources for Loopback on J. Drouet github who also worked on this import process!