home shape

A brief introduction to the Pregel module

This post is outdated, please see more recent infos below.

Please see a technical article about our current Pregel integration in our blog, details about the various Pregel algorithms ArangoDB supports in our documentation and a tutorial about Community Detection with real data in our training center.

 

 

Ever since Google introduced Pregel as a system for large-scale graph processing we thought of a way how to enable this feature in ArangoDB. So we set up an ArangoDB cluster, created some huge graphs and started evaluating the concept. We came up with a new ArangoDB module (called pregelRunner) that enables the user to write own algorithms, pass them to ArangoDB and have them executed in a Pregel-like fashion.

This means that the user’s algorithm is executed step wise on each server in the cluster in parallel for all its local vertices. In each step the vertices can send messages to each other to distribute information. These messages can be received by the other vertex in the next step. The algorithm terminates when there are no more active vertices left and no message has been sent.

We started to implement an experimental version of Pregel in ArangoDB. You need to check-out the pregel branch of ArangoDB in order to play with the following examples. Please be advised that the implementation is still in an early phase and very like to change. In this post we will provide a brief introduction to ArangoDB’s Pregel module by guiding you through the implementation of an example algorithm.


An algorithm to detect “connected components”

pregel_connected_componentsOur example algorithm is supposed to solve the connected components problem, which means it identifies all connected sub-graphs within a graph. So first of all we need a graph to work with …

The graph

We will generate a simple graph (called CountryGraph) containing 10 countries and a relation named hasBorderWith which forms 4 connected components:

  1. Germany, Austria, Switzerland
  2. Morocco, Algeria, Tunisia
  3. Brazil, Argentina, Uruguay
  4. Australia

To import this graph save this file, open an ArangoShell and execute the following line:

arangosh [_system]> require("internal").load(PATH_TO_THE_FILE);    

The worker algorithm

First of all the worker algorithm is executed on each vertex in the graph, a cursor on the messages for this vertex and a global object containing step information and user-defined global data. So our algorithm signature has to look like this:

var connectedSetsAlgorithm = function (vertex, message, global) {

So how can an algorithm be designed that is supposed to detect all components? We will implement this in a very straight-forward fashion:

Each component’s identifier is the alphabetically sorted last _key attribute of its vertices. So in step 0 each vertex stores its own key in the result and reaches out to its connected edges. Note that you can not access the vertex’ attributes directly as you are working with a generic wrapper around the original vertex. To access an attribute of the original vertex you need to use the _get(“someAttribute”) method :

switch(global.step) {
 case 0:
   result = {
     value: vertex._get("_key"),
     backwards: []
   }
   sendSender = true;
   break;
}

A vertex can only access its outgoing edges and hence in step 1 remembers the senders of all its received messages in an array to enable backwards communication. Also in step 1 it is already changing its component id based on the incoming messages:

case 1:
  result = vertex._getResult();
  while (message.hasNext()) {
    next = message.next();
    if (result.value < next.data) {
      result.value = next.data;
    }
    result.backwards.push(next.sender);
  }
  break;

So the first 2 steps enable backwards and forward communication, now we just need to run the algorithm until every vertex received its component identifier. Hence in every other step each vertex changes its component identifier in case it receives a new one from his neighbors:

default:
  result = vertex._getResult();
  while (message.hasNext()) {
    next = message.next();
    if (result.value < next.data) {
      result.value = next.data;
      changed = true;
    }
  }

If a vertex does not receive any more messages or does not take on a new component identifier it has to send no messages and will be deactivated. Notice that it could be reactivated in the next step if it would receive a new message from its neighbors:

if (global.step > 1 && ! changed) {
 vertex._deactivate();
 return;
}

Now if the vertex took on a new component identifier we have to store it …

vertex._setResult(result);

… and spread it to its neighbors , both outbound …

while (vertex._outEdges.hasNext()) {
  edge = vertex._outEdges.next();
  message.sendTo(edge._getTarget(), result.value, sendSender);
}

…. and inbound:

for (var i = 0; i < result.backwards.length; ++i) {
  message.sendTo(result.backwards[i], result.value, sendSender);
}

Now the vertex is deactivated and awaits further messages in the next steps.

vertex._deactivate();

The combiner algorithm

Perfect, the worker algorithm is now defined, but we want it to be really fast and reduce unnecessary messages. This is where it can be a good idea to make use of a customized message combiner. In our example one country can receive multiple messages in one step, i.e. Germany might receive a message from Austria and Switzerland. In step 1 Austria sent “Austria” and Switzerland “Switzerland” as their component identifiers. We are only interested in the designated component identifier so we could ignore Austria’s message (because of alphabetical sort order). The Pregel module allows you to define a message combiner with the signature

function(next, old) {}

where next is the next message and old is the combined message. So in our example we would define a combiner that would already filter every unnecessary message and would just send the actual component identifier:

var combiner = function(next, old) {
  if (old == null || next < old) {
    return next;
  }
  return old;
}

As i.e. Germany has 2 neighboring countries, the algorithm would send 1 message instead of 2 using this combiner. So our algorithms are defined, let’s use ArangoDB’s pregelRunner module to execute it.

The pregelRunner module

So at first we require the pregelRunner and create a new Runner instance:

arangosh [_system]> var pregelRunner = require("org/arangodb/pregelRunner");
arangosh [_system]> var runner = new pregelRunner.Runner();

For convenience reasons our defined Pregel algorithm is available in this file. After loading it in the shell the functions can be passed to the runner:

arangosh [_system]> require("internal").load(PATH_TO_THE_FILE);
arangosh [_system]> runner.setWorker(worker);
{ 
  "worker" : function (vertex, message, global) { ... } 
}
arangosh [_system]> runner.setCombiner(combiner);
{ 
  "worker" : function (vertex, message, global) { ... }, 
  "combiner" : function (next, old) { ... } 
}

Now as the runner is configured, we can start the Pregel run on our graph:

arangosh [_system]> runner.start("CountryGraph");
16752523162

The start method returns the unique execution number of the triggered run. Now we can use the runner to retrieve information about the current state of the run …

arangosh [_system]> runner.getStatus();
{ 
 "step" : 5, 
 "state" : "finished" 
}

… and when it finished we can get the name of the result graph:

arangosh [_system]> runner.getResult();
{ 
  "graphName" : "P_16752523162_RESULT_CountryGraph", 
  "state" : "finished" 
}

Note that the Pregel module creates a copy of the original graph and stores any result there. The original graph remains untouched. Now let’s see if the result matches our expectations by loading the vertices of the result graph:

arangosh [_system]> var graph_module = require("org/arangodb/general-graph");
var resultGraph = graph_module._graph("P_16752523162_RESULT_CountryGraph");
arangosh [_system]> resultGraph._vertices().toArray().forEach(function(vertex) {require("internal").print(vertex._key + " is in component " + vertex.result.value); });
Switzerland is in component Switzerland
Germany is in component Switzerland
Austria is in component Switzerland
Algeria is in component Tunisia
Tunisia is in component Tunisia
Morocco is in component Tunisia
Brazil is in component Uruguay
Argentina is in component Uruguay
Uruguay is in component Uruguay
Australia is in component Australia

Perfect, we have identified 4 subgraphs in our graph (Switzerland, Tunisia , Uruguay and Australia). As we enjoyed this moment long enough we can delete the result graph and any meta data:

arangosh [_system]> runner.dropResult();
{ 
  "error" : false, 
  "code" : 200 
}

Now the result graph is gone ..

arangosh [_system]> var resultGraph = graph_module._graph("P_16752523162_RESULT_CountryGraph");
JavaScript exception in file './js/common/modules/org/arangodb/general-graph.js' at 2349,11: [ArangoError 1924: graph not found]
!    throw err;
!          ^

More features

A few already implemented features not used by this example are:

The final Step:

Remember, the algorithm terminates when every vertex has been deactivated and no more messages have been sent, right? Well, sometimes one wants to perform a final step for cleanup or even wants to continue and reactivate some vertices. The optional finalStepAlgorithm solves this issue. Again it is a function with the same signature as the worker algorithm (vertex, message, global) but is called for every deactivated but not deleted vertex in the graph whenever the execution would normally end.

Usage:

arangosh [_system]> runner.setFinalstep(finalStepAlgorithm)

The super step:

Sometimes you want a “super-algorithm” to be executed between steps, f.e in a graph coloring algorithm you need a mechanism that introduces a new color to the algorithm when the last one has been spread. This is where the superStep comes in. The superStep is a function not called on a vertex but in a global context. Hence its signature must be function (globals, stepInfo).

Usage:

arangosh [_system]> runner.setSuperstep(superStepAlgorithm)

Outlook

The pregelRunner module is currently under development and we will add more features to it and make it a mighty tool for large scale graph processing. We already found various problems in graph theory that were unsolvable due to time and memory restrictions for very large graphs but which can be approached with the Pregel system (f.e. all-shortest-paths, graph coloring, minimum spanning trees).

Frank Celler

Frank Celler

Frank is both entrepreneur and backend developer, developing mostly memory databases for two decades. He is the CTO and co-founder of ArangoDB. Try to challenge Frank asking him questions on C, C++ and MRuby. Besides Frank organizes Cologne’s NoSQL group & is an active member of NoSQL community.

Leave a Comment





Get the latest tutorials, blog posts and news: