Dataflow Programming with Straw

Dataflow is a programming model that has been around since the dawn of computing. Although dataflow programming has languished in obscurity for much of that time, it’s finding new life due to the current explosion of web scale real-time services, and is a natural fit for many of of the engineeing challenges these present.

Dataflow is a simple concept. Small nodes of code receive input, process it, and output results. Nodes are connected together, outputs to inputs, forming a processing topology. Using dataflow, complex problems becomes easy to reason about, systems are easier to scale and make resilient, and you can make better use of your computing resources.

Straw is a Node.js framework that lets your implement dataflow processing in your app. Straw came about for processing real-time future market data, and can process around 4,000 messages a second on a fairly modest system. In production it has processed many billions of messages.

This article will introduce you to Straw by showing you how to mine the Twitter Firehose for data from tweets. Because the Firehose is a continuous stream of messages, it’s ideal for processing with Straw. We’ll be using the free public version that only has a small percentage of all tweets. Even so, it will be plenty.

Introduction

In Straw, you define a topology of nodes. Each node has an input, and zero or more outputs. Nodes can receive messages. When a Node receives a message, it processes it with a user provided function. That function can output messages that will be received by any connected nodes.

We are going to write several nodes – one to consume the raw data from the Firehose and extract the bits we are interested in, one to send each of those bits to a node that analyses them, and the actual analysis nodes. From there, we’ll push out the data to an Express server and over WebSockets to our client-side visualization. There are a bunch of things to look at, so you should install the demo app, Haystack, on your local machine.

Once you understand how the parts fit together, you should expand on this basic demo — fork the repo and see how full featured you can make it. You will have to have Redis installed. You will also need Bower, which can be installed using the following command.

npm install -g bower

Once all of the prerequisite software is installed, clone Haystack using the following commands.

git clone https://github.com/simonswain/haystack
cd haystack
npm install
bower install

Running Firehose

To access the Twiter Firehose you will need to obtain API credentials by creating an app on Twitter. This will let your local copy of Haystack connect to Twitter’s API and stream in raw data from the Firehose. The app you create will only require read permissions. Once created, visit the API Keys tab and copy the values.

Haystack comes with a sample config file. Copy it, and enter your credentials from Twitter:

exports.twitter = {
  consumer<em>key: '{put yours here}',
  consumer</em>secret: '{put yours here}',
  access<em>token</em>key: '{put yours here}',
  access<em>token</em>secret: '{put yours here}'
}

Your local copy of Haystack should be ready to go now. Haystack has two parts — the Straw topology for dataflow, and the Express server for the web front end. To run it, you’ll have to have two separate shells open. First, open a shell and run the topology using the following command.

node run

You should see some output as the topology starts up, then a list of @usernames as tweets come in. Next, open another shell and run the Express server using this command:

node server.js 

Next, visit the site on http://localhost:3000. You will see a screen with a world map pinging in tweets as they happen, a histogram of languages, and the top hashtags. This will all be updating in real-time.

Haystack Screenshot

Examining the Straw Topology

Lets have a look at the dataflow and the code to make it happen. run.js boots up our Straw toplolgy. When we create our topology, we pass it an object describing the nodes we want, and how they are connected together. The following fragment shows that the consume-firehose node has an output connected to a pipe called raw-tweets, and a node called route-tweets receives it’s input from that pipe. This means any messages output by consume-firehose will be passed to route-tweets, and so on through the topology. We also pass the API details for Twitter in to the node so it knows what credentials to use. You can pass anything extra you want in to a node.

var topo = new straw.topology({
  'consume-firehose': {
    'node': __dirname + '/nodes/consume-firehose.js',
    'output': 'raw-tweets',
    'twitter': config.twitter
  },
  'route-tweets': {
    'node': __dirname + '/nodes/route-tweets.js',
    'input': 'raw-tweets',
    'outputs': {
      'geo': 'client-geo',
      'lang': 'lang',
      'text': 'text'
    }
  },
...

By convention we store the code for our nodes in the nodes directory. We need to specify the absolute path to each node, so we use our script’s __dirname variable to generate this.

You might notice that the consume-firehose has no input. This is because it is actually introducing messages into the topology. Also notice that route-tweets has three outputs. This enables it to selectively send messages to different downstream nodes.

A simplified version of the consume-firehose node looks like this:

// nodes/consume-firehose.js
var straw = require('straw');
var Twitter = require('twitter');

module.exports = straw.node.extend({
  initialize: function(opts, done) {
    this.twit = new Twitter(opts.twitter);
    process.nextTick(done);
  },
  run: function(done) {
    var self = this;

    this.twit.stream('statuses/sample', function(stream) {
      stream.on('data', function(data) {
        // process data then output it
        self.output(data);           
      });
    });

    done(false);
  }
});

There are two methods here. initialize() is called when the node is first created. It creates our Twitter client using the credentials we passed in. The second method, run(), is called when the topology starts up and binds a callback on incoming tweets that outputs a message into our topology (via the raw-tweets pipe we created previously).

route-tweets is a good example of a simple node:

var straw = require('straw');

module.exports = straw.node.extend({
  initialize: function(opts, done) {
    var self = this;

    process.nextTick(done);
  },
  process: function(x, done) {
    var self = this;

    if (x.hasOwnProperty('geo') && x.geo && x.geo.hasOwnProperty('type') && x.geo.type == 'Point') {
      console.log('@' + x.user.screen_name);
      self.output('geo', x.geo.coordinates);
    }

    self.output('lang', x.lang);

    self.output('text', {
      lang: x.lang,
      text: x.text
    });

    done();
  }
});

The process() method is called whenever a message arrives. It examines the message (which is basically a tweet and it’s metadata in JSON) and outputs parts of it to the outputs we set up. Not all tweets contain geolocation data, so we check to see if it is present, and do a sneaky console.log() to give a rough idea of activity in our topology.

The destructured tweets get routed to a few different nodes for processing. Straw runs each node in a separate unix process, so effectively this downstream work happens concurrently. Since Redis is being used for communication, you could run your nodes on separate machines if you wanted to.

The catch-langs Node

We could be receiving a huge volume of incoming data. We will be pushing updates in near real-time to our web-based clients, but we don’t want to bombard them with every message that comes in. catch-langs solves this problem by counting up incoming languages, and then periodically emitting total counts. When this node is run, it sets up an interval to control the emitter:

run: function(done) {
  var self = this;
  var fn = function() {
    self.ping();
  };

  this.timer = setInterval(fn, this.opts.interval);
  done(false);
}

When messages come in, we bump up the count for that language, and flag that the counts have changed:

process: function(x, done) {
  var self = this;

  if (!this.langs.hasOwnProperty(x)) {
    this.langs[x] = 0;
  }

  this.langs[x] ++;
  this.total++;
  this.changed = true;
  done();
}

Every time the interval timer fires, if our counts have changed we emit our totals:

ping: function() {
  var self = this;
  var msg;

  if (!this.changed) {
    return;
  }

  this.changed = false;
  msg = {};
  _.each(this.langs, function(x, i) {
    msg[i] = (x / self.total);
  });
  this.output(msg);
}

The Express Server

So far we’ve consumed the data from Twitter, pulled it apart, and obtained some metrics from it. To get the data to our end-users we have to extract it from the Topology, send it over a WebSocket, and display it. This is where the Express based server comes in.

Take a look at server.js. This is a pretty standard, minimal Express app. It uses Socket.IO as an easy way to deliver data in real-time. You might also want to look at sock.js as a more modern alternative.

The interesting part of server.js is it’s use of a Straw feature called a Tap. A Tap lets us tap into a topology so we can stream data from it. If you look at our topology definintion in run.js, you’ll see there are pipes for client-langs and a few other nodes, but no consumers attached to them. These are there for our Tap to hook into.

Inside server.js we have code like this (simplified a bit here):

var straw = require('straw');
var langs = new straw.tap({
      'input':'client-langs',
    });

langs.on('message', function(msg) {
  langs = msg;
  io.sockets.emit('langs', msg);
});

This includes the Straw library, creates a new Tap from it that is connected to the client-langs pipe, and binds an event handler that will be called whenever a messages is received on that pipe. When a message is received, we push it out using Socket.IO. On the client side, things are very simple. See the following code in public/js/haystack.js.

var socket = io.connect('http://localhost:3000');

socket.on('langs', function (data) {
  // do something
});

Whenever a message is received, a callback is fired on the client with whatever payload we provided in data. This is used to draw our histogram of languages, counts of hashtags, and geolocated pings on the map.

Conclusion

Wrapping up, Haystack is a compact example of how to use dataflow to process incoming messages. It provides a good example of how Straw is being used in the real world. Because every node is run in it’s own process, it’s possible to scale it without too much trouble. But, I think the bigger benefit is how easy it makes it to decompose your problem into small, simple steps.

Feel free to use Haystack as a foundation for your application. Haystack is easy to extend by adding more processing nodes and visualizations for them. Fork the repo, and if you come up with something cool, send a pull request — lets see how comprehensive we can make this.

Free book: Jump Start HTML5 Basics

Grab a free copy of one our latest ebooks! Packed with hints and tips on HTML5's most powerful new features.

  • Simon Swain

    The concept is similar. Dataflow programming has been around for a long time. However the implementation and approach is different. Straw is server side, and made for engineers. It also runs each node in it’s own process, which provides many benefits, such as isolating failure and making good use of multicore CPUs