home shape

Efficient Massive Inserts into ArangoDB with Node.js

Estimated reading time: 3 minutes

Nothing performs faster than arangoimport and arangorestore for bulk loading or massive inserts into ArangoDB. However, if you need to do additional processing on each row inserted, this blog will help with that type of functionality.

If the data source is a streaming solution (such as Kafka, Spark, Flink, etc), where there is a need to transform data before inserting into ArangoDB, this solution will provide insight into that scenario as well.

Let’s delve into the specifics.

  • ArangoDB is version v3.6, and we will assume that it has already been installed (I am using a single instance on my Mac OS Catalina. The architecture and performance of the installation are outside the scope of this blog).
  • NodeJS should also be installed. The minimum version for this exercise should be v10 for its great capability of Async Iteration “for await (let chunk of readableStream) { … “ over chunks of a stream. The version used here is v12.13.1.

A good explanation for Async Iteration and the motivation behind it is found in this article.

The essence of it is that we need to use an await-able promise inside the loop because a regular (non-await-able) for-loop will sort of swamp the system with Promise requests causing them not to be resolved and the NodeJS heap to blow up. There is obviously an assumption that you also have a reasonably good understanding of the await / async construct in NodeJS.

Preparations:

Create a directory for your NodeJS Application. Create another directory under it called datafiles. To keep the demo simple, the data source is a csv file with only two fields per line:


1,Line: 1
2,Line: 2
…
n,Line: n

I named mine lines_file_100k.csv, it contains, as the title suggests, 100k rows. (if you feel adventurous, you can expand this to try with as many rows as desired). We ran insert tests with some 4.2 million rows while NodeJS memory stayed below 35MB.

Below is the bash script used to create the data source file:


	#!/bin/bash
	counter=1
	while [ $counter -le 100000 ]
	do
		echo $counter’,Line: ‘$counter
		((counter++))
	done

Place the script in the same datafiles directory created above and enter the following commands at a terminal prompt:


	> chmod +x bash_script.sh 
        > ./bash_script.sh > lines_file_100k.csv

In ArangoDB, we created the database db0 with user `db0_user`, password `db0_pass` and a document type collection `lines_test`.

Now for the NodeJS preparation, you will need some modules setup in your Application directory. For that, copy the package.json file below into the NodeJS Application directory (the parent of datafiles) and then enter the following command at a terminal prompt:


	> sudo npm install 

The package.json file:


{
  	"name": "arango_blogs",
  	"version": "1.0.0",
  	"description": "Sample Blog Code",
  	"main": "arango_raw_stream_tests.js",
  	"dependencies": {
    		"async": "^3.1.0"
  	},
  	"devDependencies": {},
  	"scripts": {
    		"test": "echo \"Error: no test specified\" && exit 1"
  	},
  	"author": "",
  	"license": "ISC"
}

Your environment is all set now. You should see the `node_modules` directory under your NodeJS Application directory. We are now ready to start programming.

Caveat: The Basic Authentication for the HTTP POST calls used in this exercise is not advisable for production systems unless the communication is over secure channels.

The premise is to use the bare bones ArangoDB Web API so that it can be translated to the programming language of your choice. The interactive ArangoDB HTTP API documentation can be found at (Rest API tab):

http://ip_of_your_installation:8529/_db/_system/_admin/aardvark/index.html#support

Of course, the excellent NodeJS driver for ArangoDB – arangojs – could also be used for this exercise. In fact the HTTP POST function doing the actual insert is a tiny, simplified subset of that driver.

Create your NodeJS file with the same name used in the “main” section defined in the package.json file above:


	arango_raw_stream_tests.js

First, we Promisify the HTTP POST function executing the actual document insert:


function postFuncWithAwaitableSetTimeout(  argOptions, argPostBody, argNrRecursiveCalls ){
    return new Promise ((resolve, reject) => {
        // create your post request here
        post_req.on('error', function (err) {
            reject(err);
        });
        post_req.on('response', res => {
	… 
            res.on('end', () => {
                setTimeout( function() { resolve( ‘Full response of POST here ’  ); }, 0   );
            }); 
        });
    }); 
}

Now we have our asynchronous HTTP POST “await” capable and callable from inside an async function.

Next, we create the awaitable NodeJS stream processing part:


let readStream = fs.createReadStream('./datafiles/lines_file_100k.csv', 'utf8');

(async function() {
    for await (let chunk of readStream) {
        // Process your chunk here, aggregate it, split it, parse into lines, transform values, etc.
        ....
        // Call the Promisified HTTP POST function described above
        await postFuncWithAwaitableSetTimeout ( … );
    }
})();

We now have the complete picture of the code, we have an awaitable chunk of stream capable of being processed and transformed then passed onto an awaitable HTTP POST function.

Data is processed orderly as it is received from the stream.

Here is the code in its entirety (arango_raw_stream_tests.js):

const fs = require('fs');
const async = require('async');
const http = require('http');

const dbName = 'db0';
const arangoURL = 'http://127.0.0.1';
const arangoPort = 8529;
const arangoURLFor_HTTPRequestsOnly = '127.0.0.1';

let upsQueryBindVrbls = `INSERT { _key: @lineNo, content: @lineText } INTO lines_test`;
const auth = 'Basic ' + new Buffer.from('db0_user:db0_pass').toString('base64');
// hostname cannot contain the 'http://'' it can only contain the plain hostname.
// take out 'x-arango-async':'store' to see the response
const options = {
    hostname: arangoURLFor_HTTPRequestsOnly,
    port: arangoPort,
    path: '/_db/' + dbName + '/_api/cursor',
    method: 'POST',
    headers: {
        'Authorization': auth,
        'Content-Type': 'application/json'
        /*, 'x-arango-async': store' */
    }
}; 

let counter = 0;
let leftOver = '';
// Stream is opened by default in paused mode
let readStream = fs.createReadStream('./datafiles/lines_file_100k.csv', 'utf8');
// We switch the stream to flowing mode. 
(async function() {
    for await (let chunk of readStream) {
        //console.log('chunk: ' + chunk);
        chunk = leftOver + chunk;
        tempLinesArray = chunk.split('\n');
        let localArr;
        leftOver = tempLinesArray[tempLinesArray.length - 1];
        for(let idx = 0; idx < (tempLinesArray.length - 1); idx++ ){
            counter++;
            localArr = tempLinesArray[idx].split(',');

            // Do your additional data processing here on localArr[0] and / or localArr[1] as desired

            let postDataForArango = {
                "batchSize": 1,
                "count": true,
                "memoryLimit": 0,
                "options": {
                    "failOnWarning": true,
                    "fullCount": true
                },
                "query": upsQueryBindVrbls,
                "ttl": 0,
                "bindVars": {
                    "lineNo": localArr[0],
                    "lineText": localArr[1]
                }
            };
            let strPostData = JSON.stringify(postDataForArango);
            options.headers['Content-Length'] = strPostData.length;
            try {
                let awaitResult = await postFuncWithAwaitableSetTimeout(  options, strPostData, 0 );
                console.log('counter -> awaitResult right after the call: ' + counter + ' -> ' + JSON.stringify(awaitResult) );
                console.log();

                let errIndx = awaitResult.indexOf('errorMessage');
                if ( errIndx > -1 ){
                    //errLogger.write( moment().format() + ' ' + awaitResult.substring( errIndx ) + '\n' );
                    console.log('Arango error: ' + counter + ' -> ' + awaitResult.substring( errIndx ) + '\n' );
                }
            }
            catch(err){
                console.log('promiseResp error (on data): ' + err);
            };
        }
    }
    // At this point all chunks are in and your left over is a bonafide single line. Print it!
    counter++;
    let localArr = leftOver.split(',');
    console.log(counter + '-> ' + localArr[0] + ' / ' + localArr[1]);
    let postDataForArango = {
        "batchSize": 1,
        "count": true,
        "memoryLimit":0,
        "options": {
            "failOnWarning": true,
            "fullCount": true
        },
        "query": upsQueryBindVrbls,
        "ttl": 0,
        "bindVars": {
            "lineNo": localArr[0],
            "lineText": localArr[1]
        }
    };
    let strPostData = JSON.stringify(postDataForArango);
    options.headers['Content-Length'] = strPostData.length;
    try {
        let awaitResult = await postFuncWithAwaitableSetTimeout(  options, strPostData, 0 );
        console.log('counter -> awaitResult right after the call: ' + counter + ' -> ' + JSON.stringify(awaitResult) );
        console.log();

        let errIndx = awaitResult.indexOf('errorMessage');
        if ( errIndx > -1 ){
            //errLogger.write( moment().format() + ' ' + awaitResult.substring( errIndx ) + '\n' );
            console.log('Arango error: ' + counter + ' -> ' + awaitResult.substring( errIndx ) + '\n' );
        }
    }
    catch(err){
            console.log('promiseResp error (on data): ' + err);
    };
})();
function postFuncWithAwaitableSetTimeout(  argOptions, argPostBody, argNrRecursiveCalls ){
    return new Promise ((resolve, reject) => {
        let allData = '';
        let post_req = http.request(argOptions);
        post_req.end( argPostBody );
        post_req.on('error', function (err) {
            //console.log('Arango Post Error from postFunc: ' + err.toString() );
            reject(err);
        });
        post_req.on('response', res => {
            res.setEncoding('utf8');
            res.on('data', function (chunk) {
                allData += chunk;
            });
            res.on('error', function (err) { // error will be in the response bellow under res.on('end' ...
                reject(err);
            });
            res.on('end', () => {
                setTimeout( function() { resolve(  'Full resp: ' + allData  ); }, 0   );
            }); // end of res.on('end',  () => {
        }); // end of post_req.on('response', res => {
    }); // end of new Promise( ...)
}

Continue Reading

Milestone 1 ArangoDB 3.3: Datacenter to Datacenter Replication

Setting up Datacenter to Datacenter Replication in ArangoDB

Auto-Generate GraphQL for ArangoDB

admin

3 Comments

  1. Turner Bell on January 28, 2020 at 10:32 am

    Really nice post!
    I am glad that you shared this small content about massive inserts into ArangoDB with Node Js in front of readers’ eyes. I liked the tutorial and code written here. Please keep sharing more about it because I would like to read.
    Worth reading. Thank you.

  2. Vitaliy on February 4, 2020 at 12:59 pm

    I’m wondering why you’re not using the arangodb js driver ?

  3. Bo Udubasa on February 5, 2020 at 9:24 am

    As mentioned in the article, arangojs is a great, solid, enterprise grade driver which abstracts the low level calls showcased in this article. The purpose of using a barebones REST-ful calls here is partly academic, partly expedient. If one wishes to implement such functionality (CRUD) in ArangoDB for a programming language for which there is no driver yet, one can do so. One could also design one’s own driver for such a language using this example as a starting point. Ultimately if CRUD operations were all that were needed, one can use the NodeJS native implementation of HTTP / HTTPS without installing additional modules in their deployment and also the memory of the NodeJS application will be spared a bit when by passing the need to load arangojs modules whose functionality might never be used. If there is enough interest we might follow up with the Java equivalent examples.

Leave a Comment





Get the latest tutorials, blog posts and news: