Data Analysis
Data Visualization
June 11, 2019

Handling Large Datasets with Node.js

Giuseppe Di Vincenzo
@gdiv_
Verkehrsmonitor Dashboard
Last year we've worked on a dashboard prototype which visualizes data publicly released by the BASt (German Federal Highway Research Institute).
The research institute uses a network of sensors to keep track of the count of vehicles transited on German highways (Autobahn and Bundesstraße).
The data is collected hourly and released yearly. This represented one of the main challenges that we had to face: handling with large CSV files (~3GB for each year of measurements) containing a huge amount of data-points (24 hour 365 days 1744 measuring stations * 2 directions = ~30554880 estimated data-points).
The same challenge also affected the way we designed the system to serve the data to the client and of course the way we designed the interactive data visualization. The backed was realized by implementing a node.js HTTP API server, whereas the frontend uses libraries like D3 or Leaflet to visualize the data.
We had to find out how to download, extract, transform, import, aggregate and serve to the client such an amount of data automatically and efficiently.
The tool we developed can help citizens, journalists and reporters or researcher and environment planers to get an overview and easily analyze the data provided by BASt. Take a look yourself here:

Extract, transform, load data

This article is going to cover the first steps of the development process of our prototype. After a conception phase we soon started to collect the data we wanted to import. We were already conscious that this process might take a long time, but maybe underestimated the problems we would face. If you want to get into the source code and beyond the development process of the ETL pipeline, you can find the code on Github.

ETL

ETL is the acronymous of Extract, Transform, Load and it's meant as the process of collecting the data from a specific source, convert it into the required format and import it to the destination database. For the whole process we used JavaScript, in particular node.js and MongoDB.

Extract

The source of our data is a collection of .zip files served via HTTP, so that we had to write a client to download the actual files and save them locally, and then unzip the CSV files to process in the next step.

Download the archives

We configured the list of files to download in the config.json file and passed each entry to the download() function.
The download() function is an example of an asynchronous function that can be called to issue HTTP GET requests and write their responses to the disk, after having eventually unzipped the body content.
To do that we create a request of type stream using axios. In case the requested file has the .zip extension, we are going to pipe the response through unzip, otherwise we write the stream to the disk using the node.js native fs.createWriteStream(path[, options]) function.
async function download(target) {
return new Promise(async (resolve, reject) => {
const { url, out, name } = target;
const request = {
url,
responseType: 'stream',
};
try {
const response = await axios(request);
let output;
if (url.endsWith('.zip')) {
output = unzip.Extract({ path: out });
} else {
const file = path.resolve(out, name);
output = fs.createWriteStream(file);
}
const stream = response.data
.pipe(output)
.on('finish', resolve)
.on('error', reject);
} catch (err) {
reject(err);
}
});
}
Using this approach we don't need to save the .zip files persistently, but we are only loading them partially into the memory, as long as they have been processed and the memory freed by the garbage collector. Let's say we are extracting them on-the-fly.

Transform

At this point we need to parse the CSV files and map them to our data structure.
So that our next stream pipeline looks like this:
stream.pipe(csvStream).pipe(format).pipe(merge);

Parse the CSV

The first step is to setup a stream using csv-parse:
const csvStream = csv({
columns: true,
delimiter: ';',
trim: true,
});
This allows to read the CSV file, passing it through the memory and again without writing it to disk.

Map data to Database structure

Next we call a transform function over the stream using stream-transform, so that we can convert strings to numbers and rename fields:
// from config.json
transform: (data) => {
return {
nr: Number(data.DZ_Nr),
name: data.DZ_Name,
land: data.Land_Code,
roadid: Number(data.Str_Nr),
type: data.Str_Kl,
lat: Number(data.Koor_WGS84_N.replace(',', '.')),
lng: Number(data.Koor_WGS84_E.replace(',', '.')),
letter: data.Str_Zus,
};
};
We can also parse dates or even make some operations on our results:
// from config.json
transform: data => {
return {
...
date: moment(data.Datum, "YYMMDD").hours(data.Stunde).toDate(),
truck_1: data.Lkw_R1 - data.Bus_R1,
...
};
}
It has been comfortable to write these mapping functions to the config.json file, so that they can be changed easily.

Append additional data

The same type of approach can be also easily used to append additional data to the processed entry:
const merge = transformer((data) => {
const found = stations.find((s) => parseInt(s.DZ_Nr) === data.nr);
return Object.assign(data, target.stations.transform(found));
});
In this case we try to find an object with the same numeric ID in the stations array and merge it to the entry which is currently processed.

Load

The last step of our ETL process is to load the data into the database. Even if the data is obviously suitable to be loaded in a time series database (such as for example InfluxDB), we preferred to use the NoSQL engine provided by MongoDB, taking advantage for examples of its easy to use geospatial support.

Import data into MongoDB

In order to save the results to MongoDB we just need to configure our connection to the database and then use the stream object provided by StreamToMongoDB:
const dbPort = config.db.port || 27017;
const dbHost = config.db.host || 'localhost';
const dbName = config.db.name || 'test';
const dbURL = `mongodb://${dbHost}:${dbPort}/`;
const dbConfig = { dbURL, collection, dbName };
const db = new StreamToMongoDB(dbConfig).stream;
Now we can finally stream the results directly to the database:
stream
.pipe(csvStream)
.pipe(format)
.pipe(merge)
+ .pipe(db)

Aggregate

MongoDB offers a great aggregation framework, which can be easily used, at this point, to group and aggregate our results.
Aggregation pipeline's steps can be in fact written in JavaScript and an example query would look like the following one:
await collection.aggregate([{
$group: {
_id: {
year: { $year: "$date" },
month: { $month: "$date" },
day: { $dayOfMonth: "$date" },
roadid: '$roadid',
...
},
total_1: { $sum: '$total_1'},
total_2: { $sum: '$total_2'}
}
}]);
In this pipeline stage we are first grouping the data-points in our collection by day and by road and then summing all total_1 and total_2 values, so that we can figure out how many vehicles transited each day of the year on each road.
An additional $project stage could be used in order to change the structure of our result object:
{
$project: {
date: {
$dateFromParts: {
year: '$_id.year',
month: '$_id.month',
day: '$_id.day'
}
},
total_1: 1,
total_2: 1
}
}
Here we are reassembling the date together using the _id field we used for grouping in the previous step. $project can also be used to select or deselect specific fields to keep or discard. In this case we are telling MongoDB to keep the total_1 and total_2 fields.
In case you want to store the result documents to a new collection, simply add an $out pipeline stage:
{
$out: 'by_day';
}
Also consider to use the allowDiskUse option while working with large collections.

Conclusion

🚗🚗🚚🚗🚚🚗🚗🚗🚗🚚🚗🚚🚗🚗🚗🚗🚗🚗
Facing the problem of working with such a large data-set taught us the advantages of using streams, but also their downsides, for example the fact that you are often forced to write a result down to the disk (especially when working with large data-sets) so that you don't have to repeat the whole process from the beginning in case of failures.
Generally we are really happy that we were able to achieve the whole process using JavaScript only, which also powers the API server and the data visualization itself.
Further Reading
webkid logo
webkid GmbH
Kohlfurter Straße 41/43
10999 Berlin
info@webkid.io
+49 30 232 575 450
Imprint
Privacy