How to make a user-friendly and transactional csv import in Loopback
Clément Ricateau Pasquino10 min read
When you build an app where users need to regularly import data from an external source, you are bound to import csv files to populate your database.
Often referred as ETL (Extract, Transform and Load), these functionalities are tricky to implement and often hard to understand for the end user. This article explains how to tackle the main challenges of csv import and provides a fully functional repo on github to get started quickly on a project!
The first challenge is to make a user friendly process. If a user uploads a file and the import fails, he needs to be able to know why and how to fix his file in order to try to import it again.
The second challenge is to keep the database in a consistent state: if the user imports a file that creates an error at the line 50, the first 49 lines should not be written in the database. You don’t want to write anything in the database if an error has risen in the process. A solution is to use SQL transactions to control when your changes to the database are eventually applied.
Lastly, in order to import large datasets without impacting the user experience, you need to separate the upload process (where the user is waiting for a few seconds for the file to be uploaded) from the import process which happens in the backend, maybe for a few minutes.
In this article, we will explain how to build your own transactional csv import using the node framework loopback and a relational database.
The process allows the user to know which cells of his excel file have failed and rollbacks if an error raises. We will demonstrate the process with the database PostgreSQL, but transactions can used with different connectors in Loopback.
I will also assume that you know the basics of the Loopback framework and coffeescript syntax. Let’s say we want to import invoices in our system and we already have an Invoice loopback model with two properties invoiceId
and amount
.
Start by creating an upload
remote method in the Invoice model that will be called by your client app with a POST request.
common/models/Invoice.json
Invoice.upload = (req, callback) ->
# wait for it
callback()
Invoice.remoteMethod 'upload',
accepts: [
arg: 'req'
type: 'object'
http:
source: 'req'
]
http:
verb: 'post'
path: '/upload'
Add the following modules in your Invoice model:
_ = require 'lodash'
async = require 'async'
csv = require 'fast-csv'
fork = require('child_process').fork
fs = require 'fs'
path = require 'path'
loopback = require 'loopback'
In order to separate the file upload and the data processing, we are going to store the file in the filesystem and save in the database the state of the upload (a PENDING
status). As soon as the upload is done, we send a http answer to the client so that he can continue using the app while the data is processed.
Then we start a new node process using the module fork
. It will call a custom import
method described below.
Using the library fast-csv
, we parse the csv file and begin a sql transaction.
We can now proceed to the import and commit the transaction if no error is raised. Otherwise the transaction is canceled and the import remains in the initial state, it is a all or nothing import process. Eventually we delete the file from the filesystem.
Before starting to code the upload method, we need to create a few more models.
Create a FileUpload
and FileUploadError
models that will be used to store the state of the imports (PENDING
, SUCCESS
, ERROR
) and the error list.
A FileUpload has many FileUploadError, so let’s use the loopback hasMany
relation.
- Create a model Container which will be used by the component loopback-component-storage to create a container. A container is similar to a directory and will be used to store the csv file uploaded by the user.
- Update server/datasources.json to add the container datasource.
- Create the tables related to the models in your database and don’t forget to declare your models in the server/model-config.json
WARNING
: If you use PostgreSQL update the poolIdleTimeout property of your database.
Because we do not commit the changes to the database before the end of the process, PostgreSQL sees the connection as idle and raises a timeout error. Set the poolIdleTimeout to be above the maximum time a import should take.
server/datasources.json
{
"db": {
... // Your config,
"poolIdleTimeout": 1200000
},
"container": {
"name": "container",
"connector": "loopback-component-storage",
"provider": "filesystem",
"root": "tmp"
}
}
Create a tmp folder at the root of your projet that will be used to store the uploaded files.
Now we can start coding! Remember the import method I mentionned? Let’s implement it!
Start by installing the following dependencies: fast-csv, lodash,
async, loopback-component-storage
npm install fast-csv lodash async loopback-component-storage --save
The upload
method initializes the import process:
Invoice.upload = (req, callback) ->
Container = Invoice.app.models.Container
FileUpload = Invoice.app.models.FileUpload
# Generate a unique name to the container
containerName = "invoice-#{Math.round(Date.now())}-#{Math.round(Math.random() * 1000)}"
# async.waterfall is like a waterfall of functions applied one after the other
async.waterfall [
(done) ->
# Create the container (the directory where the file will be stored)
Container.createContainer name: containerName, done
(container, done) ->
req.params.container = containerName
# Upload one or more files into the specified container. The request body must use multipart/form-data which the file input type for HTML uses.
Container.upload req, {}, done
(fileContainer, done) ->
# Store the state of the import process in the database
FileUpload.create
date: new Date()
fileType: Invoice.modelName
status: 'PENDING'
, (err, fileUpload) ->
return done err, fileContainer, fileUpload
], (err, fileContainer, fileUpload) ->
return callback err if err
params =
fileUpload: fileUpload.id
root: Invoice.app.datasources.container.settings.root
container: fileContainer.files.file[0].container
file: fileContainer.files.file[0].name
# Launch a fork node process that will handle the import
fork __dirname + '/../../server/scripts/import-invoices.coffee', [
JSON.stringify params
]
callback null, fileContainer
Create a scripts folder in server and add an import-invoices.coffee file. This script is used to lauch a forked node process calling an import
method of the Invoice model. It exits to make sure that the node process is killed when an import is over.
Content of the import-invoices.coffee file:
server = require '../server.coffee'
options = JSON.parse process.argv[2]
# Make sure that the node process is killed when the import process is over.
try
server.models.Invoice.import options.container, options.file, options, (err) ->
process.exit if err then 1 else 0
catch err
process.exit if err then 1 else 0
Let’s dive into the import method. It first calls a import_preprocess
method that initializes the SQL transaction.
Then it uses the method import_process
and commits or rollbacks if there was an error.
import_postprocess_success
and import_postprocess_error
save the FileUpload status depending of the status of the import process.
import_clean
destroys the uploaded file.
Invoice.import = (container, file, options, callback) ->
# Initialize a context object that will hold the transaction
ctx = {}
# The import_preprocess is used to initialize the sql transaction
Invoice.import_preprocess ctx, container, file, options, (err) ->
Invoice.import_process ctx, container, file, options, (importError) ->
if importError
# rollback does not apply the transaction
async.waterfall [
(done) ->
ctx.transaction.rollback done
(done) ->
# Do some other stuff to clean and acknowledge the end of the import
Invoice.import_postprocess_error ctx, container, file, options, done
(done) ->
Invoice.import_clean ctx, container, file, options, done
], ->
return callback importError
else
async.waterfall [
(done) ->
# The commit applies the changes to the database
ctx.transaction.commit done
(done) ->
# Do some other stuff to clean and acknowledge the end of the import
Invoice.import_postprocess_success ctx, container, file, options, done
(done) ->
Invoice.import_clean ctx, container, file, options, done
], ->
return callback null
Invoice.import_preprocess = (ctx, container, file, options, callback) ->
# initialize the SQL transaction
Invoice.beginTransaction
isolationLevel: Invoice.Transaction.READ_UNCOMMITTED
, (err, transaction) ->
ctx.transaction = transaction
return callback err
In the import_process
method, we iterate over each line of the csv file and apply the import_handleLine
method that holds the business logic. This is were you will define what to do with your data.
Invoice.import_process = (ctx, container, file, options, callback) ->
fileContent = []
filename = path.join Invoice.app.datasources.container.settings.root, container, file
# Here we fix the delimiter of the csv file to semicolon. You can change it or make it a parameter of the import.
stream = csv
delimiter: ';'
headers: true
stream.on 'data', (data) ->
fileContent.push data
stream.on 'end', ->
errors = []
# Iterate over every line of the file
async.mapSeries [0..fileContent.length], (i, done) ->
return done() if not fileContent[i]?
# Import the individual line
Invoice.import_handleLine ctx, fileContent[i], options, (err) ->
if err
errors.push err
# If an error is raised on a particular line, store it with the FileUploadError model
# i + 2 is the real excel user-friendly index of the line
Invoice.app.models.FileUploadError.create
line: i + 2
message: err.message
fileUploadId: options.fileUpload
, done null
else
done()
, ->
return callback errors if errors.length > 0
return callback()
fs.createReadStream(filename).pipe stream
Using the next two methods, I save the status of the import in the database. You can use those two methods to add more business logic, for example send a confirmation email.
Invoice.import_postprocess_success = (ctx, container, file, options, callback) ->
Invoice.app.models.FileUpload.findById options.fileUpload, (err, fileUpload) ->
return callback err if err
fileUpload.status = 'SUCCESS'
fileUpload.save callback
Invoice.import_postprocess_error = (ctx, container, file, options, callback) ->
Invoice.app.models.FileUpload.findById options.fileUpload, (err, fileUpload) ->
return callback err if err
fileUpload.status = 'ERROR'
fileUpload.save callback
When the process is over, there is no need to keep the file, so let’s destroy the container to delete the file:
Invoice.import_clean = (ctx, container, file, options, callback) ->
Invoice.app.models.Container.destroyContainer container, callback
import_handleLine
holds the business logic:
- Checking the validity of the data in each cell
- Creating or updating data on any model
LineHandler =
# Method to creadte/update the invoice from the data of the line
createInvoice: (req, line, done) ->
Invoice.findOne
where:
invoiceId: line.InvoiceId
, req, (error, found) ->
return done error if error
invoice =
invoiceId: line.InvoiceId
amount: line.Amount
invoice.id = invoice.id if found
Invoice.upsert invoice, req, (error, invoice) ->
if error
done error, line.InvoiceId
else
done null, invoice
rejectLine: (columnName, cellData, customErrorMessage, callback) ->
err = new Error "Unprocessable entity in column #{columnName} where data = #{cellData}: #{customErrorMessage}"
err.status = 422
callback err
# Do all the necessary checks to avoid SQL errors and check data integrity
validate: (line, callback) ->
if line.InvoiceId is ''
return @rejectLine 'InvoiceId', line.InvoiceId, 'Missing InvoiceId', callback
if _.isNaN parseInt line.InvoiceId
return @rejectLine 'InvoiceId', line.InvoiceId, 'InvoiceId in not a number', callback
if line.Amount is ''
return @rejectLine 'Amount', line.Amount, 'Missing Amount', callback
if _.isNaN parseInt line.Amount
return @rejectLine 'Amount', line.Amount, 'Amount in not a number', callback
callback()
Conclusion
You are now able to build your own csv import!
In your client app, you can add an html input field with a file type. To display the status of the upload, you can poll every few seconds the FileUpload model. Check this cool article on how to make the user wait during a load!
If the import status is ERROR
, you can get the error list using the FileUploadError model routes and make a nice UI.
A next step could be to add hints on how to fix the errors in the csv file in the FileUploadError model using the rejectLine
method. We did it on one model but could extend it to multiple models by creating a mixin!
A fully functionnal example of this example is available on github. Take a look at other cool resources for Loopback on J. Drouet github who also worked on this import process!