Last year we've worked on a dashboard prototype which visualizes data publicly released by the
BASt (German Federal Highway Research Institute).
The research institute uses a network of sensors to keep track of the count of vehicles transited on German highways (Autobahn and Bundesstraße).
The data is collected hourly and released yearly. This represented one of the main challenges that we had to face: handling with large CSV files (~3GB for each year of measurements) containing a huge amount of data-points (24 hour 365 days 1744 measuring stations * 2 directions = ~30554880 estimated data-points).
The same challenge also affected the way we designed the system to serve the data to the client and of course the way we designed the interactive data visualization. The backed was realized by implementing a node.js HTTP API server, whereas the frontend uses libraries like D3 or Leaflet to visualize the data.
We had to find out how to download, extract, transform, import, aggregate and serve to the client such an amount of data automatically and efficiently.
The tool we developed can help citizens, journalists and reporters or researcher and environment planers to get an overview and easily analyze the data provided by BASt. Take a look yourself here:
Extract, transform, load data
This article is going to cover the first steps of the development process of our prototype. After a conception phase we soon started to
collect the data we wanted to
import. We were already conscious that this process might take a long time, but maybe underestimated the problems we would face. If you want to get into the source code and beyond the development process of the ETL pipeline, you can find the code on
Github.
ETL
ETL is the acronymous of Extract, Transform, Load and it's meant as the process of collecting the data from a specific source, convert it into the required format and import it to the destination database. For the whole process we used JavaScript, in particular node.js and MongoDB.
Extract
The source of our data is a collection of .zip
files served via HTTP, so that we had to write a client to download the actual files and save them locally, and then unzip the CSV files to process in the next step.
Download the archives
We configured the list of files to download in the config.json
file and passed each entry to the download()
function.
The
download()
function is an example of an asynchronous function that can be called to issue HTTP GET requests and write their responses to the disk, after having eventually unzipped the body content.
async function download(target) {
return new Promise(async (resolve, reject) => {
const { url, out, name } = target;
const request = {
url,
responseType: 'stream',
};
try {
const response = await axios(request);
let output;
if (url.endsWith('.zip')) {
output = unzip.Extract({ path: out });
} else {
const file = path.resolve(out, name);
output = fs.createWriteStream(file);
}
const stream = response.data
.pipe(output)
.on('finish', resolve)
.on('error', reject);
} catch (err) {
reject(err);
}
});
}
Using this approach we don't need to save the .zip
files persistently, but we are only loading them partially into the memory, as long as they have been processed and the memory freed by the garbage collector. Let's say we are extracting them on-the-fly.
Transform
At this point we need to parse the CSV files and map them to our data structure.
So that our next stream pipeline looks like this:
stream.pipe(csvStream).pipe(format).pipe(merge);
Parse the CSV
The first step is to setup a
stream using
csv-parse:
const csvStream = csv({
columns: true,
delimiter: ';',
trim: true,
});
This allows to read the CSV file, passing it through the memory and again without writing it to disk.
Map data to Database structure
Next we call a transform function over the stream using
stream-transform, so that we can convert
strings to
numbers and rename fields:
transform: (data) => {
return {
nr: Number(data.DZ_Nr),
name: data.DZ_Name,
land: data.Land_Code,
roadid: Number(data.Str_Nr),
type: data.Str_Kl,
lat: Number(data.Koor_WGS84_N.replace(',', '.')),
lng: Number(data.Koor_WGS84_E.replace(',', '.')),
letter: data.Str_Zus,
};
};
We can also parse dates or even make some operations on our results:
transform: data => {
return {
...
date: moment(data.Datum, "YYMMDD").hours(data.Stunde).toDate(),
truck_1: data.Lkw_R1 - data.Bus_R1,
...
};
}
It has been comfortable to write these mapping functions to the config.json
file, so that they can be changed easily.
Append additional data
The same type of approach can be also easily used to append additional data to the processed entry:
const merge = transformer((data) => {
const found = stations.find((s) => parseInt(s.DZ_Nr) === data.nr);
return Object.assign(data, target.stations.transform(found));
});
In this case we try to find an object with the same numeric ID in the stations
array and merge it to the entry which is currently processed.
Load
The last step of our ETL process is to load the data into the database. Even if the data is obviously suitable to be loaded in a
time series database (such as for example
InfluxDB), we preferred to use the NoSQL engine provided by
MongoDB, taking advantage for examples of its easy to use
geospatial support.
Import data into MongoDB
In order to save the results to MongoDB we just need to configure our connection to the database and then use the stream object provided by
StreamToMongoDB:
const dbPort = config.db.port || 27017;
const dbHost = config.db.host || 'localhost';
const dbName = config.db.name || 'test';
const dbURL = `mongodb://${dbHost}:${dbPort}/`;
const dbConfig = { dbURL, collection, dbName };
const db = new StreamToMongoDB(dbConfig).stream;
Now we can finally stream the results directly to the database:
stream
.pipe(csvStream)
.pipe(format)
.pipe(merge)
+ .pipe(db)
Aggregate
MongoDB offers a great
aggregation framework, which can be easily used, at this point, to group and aggregate our results.
Aggregation pipeline's steps can be in fact written in JavaScript and an example query would look like the following one:
await collection.aggregate([{
$group: {
_id: {
year: { $year: "$date" },
month: { $month: "$date" },
day: { $dayOfMonth: "$date" },
roadid: '$roadid',
...
},
total_1: { $sum: '$total_1'},
total_2: { $sum: '$total_2'}
}
}]);
In this pipeline stage we are first grouping the data-points in our collection by day and by road and then summing all total_1
and total_2
values, so that we can figure out how many vehicles transited each day of the year on each road.
An additional $project
stage could be used in order to change the structure of our result object:
{
$project: {
date: {
$dateFromParts: {
year: '$_id.year',
month: '$_id.month',
day: '$_id.day'
}
},
total_1: 1,
total_2: 1
}
}
Here we are reassembling the date together using the _id
field we used for grouping in the previous step. $project
can also be used to select or deselect specific fields to keep or discard. In this case we are telling MongoDB to keep the total_1
and total_2
fields.
In case you want to store the result documents to a new collection, simply add an $out
pipeline stage:
Also consider to use the
allowDiskUse
option while working with large collections.
Conclusion
🚗🚗🚚🚗🚚🚗🚗🚗🚗🚚🚗🚚🚗🚗🚗🚗🚗🚗
Facing the problem of working with such a large data-set taught us the advantages of using streams, but also their downsides, for example the fact that you are often forced to write a result down to the disk (especially when working with large data-sets) so that you don't have to repeat the whole process from the beginning in case of failures.
Generally we are really happy that we were able to achieve the whole process using JavaScript only, which also powers the API server and the data visualization itself.