home shape
Apache Spark

Introducing the new ArangoDB Datasource for Apache Spark

Estimated reading time: 8 minutes

We are proud to announce the general availability of ArangoDB Datasource for Apache Spark: a new generation Spark connector for ArangoDB.

Nowadays, Apache Spark is one of the most popular analytics frameworks for large-scale data processing. It is designed to process in parallel data that is too large or complex for traditional databases, providing high performances by optimizing query execution, caching data in-memory and controlling the data distribution.

It exposes a pluggable DataSource API that can be implemented to allow interaction with external data providers in order to make the data accessible from Spark Dataframe and Spark SQL.

Source: Packt

ArangoDB Spark Datasource is an implementation of DataSource API V2 and enables reading and writing from and to ArangoDB in batch execution mode.

Its typical use cases are:

  • ETL (Extract, Transform, Load) data processing
  • data analytics and machine learning
  • uniform access to data from multiple datasources
  • interactive data exploration with Spark (SQL) shell
  • batch import and export
  • integration of ArangoDB in existing Spark workflows

ArangoDB Datasource is available in different variants, each one compatible with different Spark versions. At the time of writing Spark 2.4 and 3.1 are supported. It works with all the non-EOLed ArangoDB versions. While internally implemented in Scala, it can be used from any Spark high-level API language, namely: Java, Scala, Python and R.

Batch Reading

The connector can read data from an ArangoDB collection or from a user provided AQL query.

When reading data from a collection, the reading job is split into many tasks, one for each shard in the source collection. The reading tasks can be executed in parallel in the Spark cluster and the resulting Spark DataFrame has the same number of partitions as the number of shards in the collection.

The reading requests are load balanced across all the available ArangoDB Coordinators. Each query hits only one DB-Server, the one holding the target shard, where it can be executed locally. This allows partitioning the read workload reflecting the ArangoDB collection sharding and distributing the reading tasks heavenly.

When reading data from an AQL query, the job cannot be partitioned and parallelized on the Spark side. This mode is recommended for executing queries in ArangoDB that produce only a small amount of data to be transferred to Spark.

Mapping

The reading schema can be provided by the user or inferred automatically by the connector prefetching a sample of the target collection data. Automatic schema inference can be useful in case of interactive data exploration, i.e. from the Spark Shell, nonetheless user provided schema is recommended for production use, since it allows for more detailed control.

Since ArangoDB does not internally enforce a schema for the stored data, it could be possible that the read data does not match the target Spark SQL type required by the read schema. In this case the connector will make its best effort to translate the data into the correct format, i.e. applying implicit deserialization casts. In case this is not possible, corrupt records will be handled according to one of the configurable strategies: drop them, put them in a dedicated column or make the entire task fail.

The mapping implementation leverages the Spark JSON mapping module, which is based on Jackson Databind API. In this way the connector reuses the state-of-the-art implementation of JSON mapping already existing in the Spark SQL project, which is used by the Spark JSON files datasource. Velocypack support has been plugged in by using jackson-dataformat-velocypack, a Jackson Databind API implementation for Velocypack, therefore compatible to be used from within the JSON mapping module. Thanks to this approach, the mapping behavior is identical to the Spark JSON files one and includes support for all Spark SQL types.

Filter and Projection Pushdown

Given the lazy nature of Dataframe transformations, Spark can optimize query execution with an end to end awareness, therefore requesting to the datasources only the subset of data strictly needed to perform the ongoing job. This helps to reduce the amount of transferred data and is achieved by pushing down to the datasource filter predicates and column projections, so that only the required fields of the matching documents will be returned. ArangoDB Datasource can dynamically generate AQL queries that contain the required filters and projections.

Batch Write

The connector can be used to save Dataframe records as documents in an ArangoDB collection. Write tasks are load balanced across the available ArangoDB Coordinators and the saved data is distributed in the ArangoDB cluster according to the target collection sharding strategy, so potentially differently from the Spark DataFrame partitioning. The amount of parallelizable writing tasks is equal to the number of partitions in the original Spark Dataframe.

The connector can optionally create new collections, append data to existing ones or overwrite them. Conflicts arising from documents with the same id can be handled with several strategies: updating, replacing, ignoring or throwing exceptions. The Spark Datasource API is designed to offer a transactional behavior of writing tasks. This means that Spark supervises all the writing tasks and makes its best effort to make sure that either all of them are successful or the entire job is aborted, thus trying to roll back the already executed tasks.

Depending on the configuration, the connector can fulfill the transactional API and offer more resilient behavior. For example if the provided configuration allows for idempotent operations, the connector can and retry failing requests or entire tasks, and be compatible with speculative executions of tasks.

In case of connectivity problems connections can be failed over to a different Coordinator.

Usage Example

Next we will see how to use the connector to write and read to and from an ArangoDB collection. All the code snippets are extracted from the official ArangoDB Datasource Demo and based on the IMDB example dataset.

The connector can be imported from Maven Central using the coordinates:

com.arangodb:arangodb-spark-datasource-3.1_2.12:<version>

Given an existing Spark Dataframe moviesDF we can save it to an ArangoDB collection movies in this way:

val options = Map(
	"password" -> "***",
	"endpoints" -> "coordinator1:8529,coordinator2:8529,...",
	"database" -> "demo"
)

val writeOptions = Map(
	"table.shards" -> "9",
	"overwriteMode" -> "replace",
	"table" -> "movies"
)

moviesDF.write
	.format("com.arangodb.spark") 	(1)
	.options(options)             	(2)
	.options(writeOptions)        	(3)
	.save()                       	(4)

where we perform these steps:

  1. specify the datasource id
  2. provide configuration parameters to establish a connection to ArangoDB
  3. provide configuration parameters for the writing job
  4. perform the writing job

This will also create the collection if it does not exist already.

Now we can read back the data saved in the movies collection. First we create a schema for the data going to be read:

val movieSchema = StructType(Array(
	StructField("_key", StringType, nullable = false),
	StructField("description", StringType),
	StructField("genre", StringType),
	StructField("lastModified", TimestampType),
	StructField("releaseDate", DateType),
	StructField("runtime", IntegerType),
	StructField("title", StringType)
))

Then we create a Spark SQL view backed by the ArangoDB collection movies:

spark.read
	.format("com.arangodb.spark")             	(1)
	.options(options)                         	(2)
	.option("table", "movies")                	(3)
	.schema(movieSchema)                      	(4)
	.load                                     	(5)
	.createOrReplaceTempView("movies")        	(6)

In the snippet above we performed the following steps:

  1. specify the datasource id
  2. provide configuration parameters to establish a connection to ArangoDB
  3. provide configuration parameters to identify the source collection
  4. specify the collection schema
  5. create a Dataframe
  6. create the Spark SQL view named movies

From now on movies is visible to our Spark session, so we can reference it from Spark SQL queries, for example, we can read some data from it:

spark.sql("SELECT * FROM movies")
	.show(10, 200)

This will print to console:

+----+-----------+-----------+-------------------+-----------+-------+---------------------------------+
|_key|description|      genre|       lastModified|releaseDate|runtime|                            title|
+----+-----------+-----------+-------------------+-----------+-------+---------------------------------+
| 399| ...       |     Action|2011-03-13 14:05:06| 1982-05-26|    115|      E.T.: The Extra-Terrestrial|
| 431| ...       |     Action|2011-03-12 23:24:02| 1999-03-31|    136|                       The Matrix|
| 711| ...       |     Action|2011-03-13 14:41:21| 2005-07-15|    106|Charlie and the Chocolate Factory|
| 839| ...       |      Drama|2011-03-13 07:14:12| 1972-12-30|    115|                   State of Siege|
|1234| ...       |      Drama|2011-03-12 20:44:58| 2004-12-15|    137|              Million Dollar Baby|
|1307| ...       |      Crime|2011-03-12 07:40:29| 2001-03-16|    113|                          Memento|
|1506| ...       |Documentary|2011-03-07 22:01:52| 1998-08-02|     94|                       Megacities|
|1817| ...       |  Animation|2011-03-13 14:32:24| 1999-11-26|    134|                Princess Mononoke|
|2088| ...       |     Action|2011-03-13 14:09:42| 1982-12-10|     96|                          48 Hrs.|
|2181| ...       |Documentary|2011-03-07 22:02:12| 1895-12-28|      1| L'Arrivée d'un train à la Ciotat|
+----+-----------+-----------+-------------------+-----------+-------+---------------------------------+

Let’s perform some filtering and column selection to find all action or animation movies about dinosaurs and showing only some fields (title, releaseDate, genre):

spark.sql(
	"""
    	SELECT title, releaseDate, genre
    	FROM movies
    	WHERE genre IN ('Action', 'Animation') 
         AND description LIKE '%dinosaur%'
	""")
	.show(100, 200)

This will show:

+--------------------------------+-----------+---------+
|                           title|releaseDate|	genre|
+--------------------------------+-----------+---------+
|                Cesta do pravěku| 1967-02-11|Animation|
|                        Dinosaur| 2000-05-19|   Action|
|        	   The Land Before Time| 1988-11-18|Animation|
|   The Lost World: Jurassic Park| 1997-05-22|   Action|
|                Land of the Lost| 2009-06-05|   Action|
|               Jurassic Park III| 2001-07-18|   Action|
|                  The Lost World| 1960-07-13|   Action|
|  We're Back! A Dinosaur's Story| 1993-11-24|Animation|
|               	Jurassic Park| 1993-06-11|   Action|
|Ice Age 3: Dawn of the Dinosaurs| 2009-07-01|Animation|
+--------------------------------+-----------+---------+

As we can see, we get in the console logs a feedback about the pushed down filters:

INFO  ArangoScanBuilder:57 - Filters fully applied in AQL:
	IsNotNull(description)
	In(genre, [Action,Animation])
	StringContains(description,dinosaur)

and about the generated AQL:

DEBUG ArangoClient:61 - Executing AQL query: 
	 FOR d IN @@col
	 FILTER `d`.`description` != null 
	 	 AND LENGTH(["Action","Animation"][* FILTER `d`.`genre` == CURRENT]) > 0 
	 	 AND CONTAINS(`d`.`description`, "dinosaur") 
	 RETURN {
         `genre`: `d`.`genre`,
         `releaseDate`: `d`.`releaseDate`,
         `title`: `d`.`title`
	 } 
	 with params: Map(@col -> movies)

The log above is printed for every query task into which the job is partitioned, therefore so many times as the number of shards of the movie collection. Each query is performed against a different source collection shard.

Next we would like to perform some aggregate calculation on the data, for example getting the count of movies by genre. Since Spark Dataframe API does not push down aggregate clauses (before Spark 3.2), we can express it as a user AQL query that will be executed on the db side:

spark.read
	.format("com.arangodb.spark")
	.options(options)
	.option("query",
    	"""
    	FOR d IN movies
    	COLLECT genre = d.genre WITH COUNT INTO count
    	SORT count DESC
    	RETURN {genre, count}
    	""")
	.schema(StructType(Array(
    	StructField("genre", StringType),
    	StructField("count", LongType)
	)))
	.load
	.show(3)

which will show:

+---------------+-----+
|          genre|count|
+---------------+-----+
|         Comedy| 3188|
|          Drama| 2690|
|         Action| 2449|
+---------------+-----+

Conclusion

In this article, we saw an architectural overview of ArangoDB Datasource for Apache Spark and some introductory steps to get started with it. Further usage examples can be found in the ArangoDB Datasource Demo project.

The Spark Datasource V2 API is still evolving and being extended at every Spark new release, in Spark 3.2 aggregate push-down capability has been added and many improvements are still yet to come (Data Source V2 improvements).

In future releases of ArangoDB Datasource, we can expect to see support for Spark 3.2 API, streaming execution mode, and better observability, i.e. tasks metrics.

Continue Reading

Introducing ArangoDB 3.9 – Graph Meets Analytics

Introducing the ArangoDB-DGL Adapter

Introducing the ArangoDB-NetworkX Adapter

Michele

Michele Rastelli

Michele is a software engineer with experience in algorithms, web architectures, and data modeling. He has been working as a technical leader for web startups, designing and implementing scalable and resilient microservices applications. He currently works at ArangoDB on the development of the Java driver and frameworks integrations.

2 Comments

  1. quanns on June 20, 2022 at 8:36 am

    Hi Rasetelli,
    Does this driver support PySpark. I tried to use with pyspark and it doesn’t work. I can not find any documents for the integration between arango-spark lib and pyspark.

Leave a Comment





Get the latest tutorials, blog posts and news: