Dataflow Programming with Straw

Share this article

Dataflow is a programming model that has been around since the dawn of computing. Although dataflow programming has languished in obscurity for much of that time, it’s finding new life due to the current explosion of web scale real-time services, and is a natural fit for many of of the engineeing challenges these present. Dataflow is a simple concept. Small nodes of code receive input, process it, and output results. Nodes are connected together, outputs to inputs, forming a processing topology. Using dataflow, complex problems becomes easy to reason about, systems are easier to scale and make resilient, and you can make better use of your computing resources. Straw is a Node.js framework that lets your implement dataflow processing in your app. Straw came about for processing real-time future market data, and can process around 4,000 messages a second on a fairly modest system. In production it has processed many billions of messages. This article will introduce you to Straw by showing you how to mine the Twitter Firehose for data from tweets. Because the Firehose is a continuous stream of messages, it’s ideal for processing with Straw. We’ll be using the free public version that only has a small percentage of all tweets. Even so, it will be plenty.

Introduction

In Straw, you define a topology of nodes. Each node has an input, and zero or more outputs. Nodes can receive messages. When a Node receives a message, it processes it with a user provided function. That function can output messages that will be received by any connected nodes. We are going to write several nodes – one to consume the raw data from the Firehose and extract the bits we are interested in, one to send each of those bits to a node that analyses them, and the actual analysis nodes. From there, we’ll push out the data to an Express server and over WebSockets to our client-side visualization. There are a bunch of things to look at, so you should install the demo app, Haystack, on your local machine. Once you understand how the parts fit together, you should expand on this basic demo — fork the repo and see how full featured you can make it. You will have to have Redis installed. You will also need Bower, which can be installed using the following command.
npm install -g bower
Once all of the prerequisite software is installed, clone Haystack using the following commands.
git clone https://github.com/simonswain/haystack
cd haystack
npm install
bower install

Running Firehose

To access the Twiter Firehose you will need to obtain API credentials by creating an app on Twitter. This will let your local copy of Haystack connect to Twitter’s API and stream in raw data from the Firehose. The app you create will only require read permissions. Once created, visit the API Keys tab and copy the values. Haystack comes with a sample config file. Copy it, and enter your credentials from Twitter:
exports.twitter = {
  consumer<em>key: '{put yours here}',
  consumer</em>secret: '{put yours here}',
  access<em>token</em>key: '{put yours here}',
  access<em>token</em>secret: '{put yours here}'
}
Your local copy of Haystack should be ready to go now. Haystack has two parts — the Straw topology for dataflow, and the Express server for the web front end. To run it, you’ll have to have two separate shells open. First, open a shell and run the topology using the following command.
node run
You should see some output as the topology starts up, then a list of @usernames as tweets come in. Next, open another shell and run the Express server using this command:
node server.js
Next, visit the site on http://localhost:3000. You will see a screen with a world map pinging in tweets as they happen, a histogram of languages, and the top hashtags. This will all be updating in real-time. Haystack Screenshot

Examining the Straw Topology

Lets have a look at the dataflow and the code to make it happen. run.js boots up our Straw toplolgy. When we create our topology, we pass it an object describing the nodes we want, and how they are connected together. The following fragment shows that the consume-firehose node has an output connected to a pipe called raw-tweets, and a node called route-tweets receives it’s input from that pipe. This means any messages output by consume-firehose will be passed to route-tweets
, and so on through the topology. We also pass the API details for Twitter in to the node so it knows what credentials to use. You can pass anything extra you want in to a node.
var topo = new straw.topology({
  'consume-firehose': {
    'node': __dirname + '/nodes/consume-firehose.js',
    'output': 'raw-tweets',
    'twitter': config.twitter
  },
  'route-tweets': {
    'node': __dirname + '/nodes/route-tweets.js',
    'input': 'raw-tweets',
    'outputs': {
      'geo': 'client-geo',
      'lang': 'lang',
      'text': 'text'
    }
  },
...
By convention we store the code for our nodes in the nodes directory. We need to specify the absolute path to each node, so we use our script’s __dirname variable to generate this. You might notice that the consume-firehose has no input. This is because it is actually introducing messages into the topology. Also notice that route-tweets has three outputs. This enables it to selectively send messages to different downstream nodes. A simplified version of the consume-firehose node looks like this:
// nodes/consume-firehose.js
var straw = require('straw');
var Twitter = require('twitter');

module.exports = straw.node.extend({
  initialize: function(opts, done) {
    this.twit = new Twitter(opts.twitter);
    process.nextTick(done);
  },
  run: function(done) {
    var self = this;

    this.twit.stream('statuses/sample', function(stream) {
      stream.on('data', function(data) {
        // process data then output it
        self.output(data);           
      });
    });

    done(false);
  }
});
There are two methods here. initialize() is called when the node is first created. It creates our Twitter client using the credentials we passed in. The second method, run(), is called when the topology starts up and binds a callback on incoming tweets that outputs a message into our topology (via the raw-tweets pipe we created previously). route-tweets is a good example of a simple node:
var straw = require('straw');

module.exports = straw.node.extend({
  initialize: function(opts, done) {
    var self = this;

    process.nextTick(done);
  },
  process: function(x, done) {
    var self = this;

    if (x.hasOwnProperty('geo') && x.geo && x.geo.hasOwnProperty('type') && x.geo.type == 'Point') {
      console.log('@' + x.user.screen_name);
      self.output('geo', x.geo.coordinates);
    }

    self.output('lang', x.lang);

    self.output('text', {
      lang: x.lang,
      text: x.text
    });

    done();
  }
});
The process() method is called whenever a message arrives. It examines the message (which is basically a tweet and it’s metadata in JSON) and outputs parts of it to the outputs we set up. Not all tweets contain geolocation data, so we check to see if it is present, and do a sneaky console.log() to give a rough idea of activity in our topology. The destructured tweets get routed to a few different nodes for processing. Straw runs each node in a separate unix process, so effectively this downstream work happens concurrently. Since Redis is being used for communication, you could run your nodes on separate machines if you wanted to.

The catch-langs Node

We could be receiving a huge volume of incoming data. We will be pushing updates in near real-time to our web-based clients, but we don’t want to bombard them with every message that comes in. catch-langs solves this problem by counting up incoming languages, and then periodically emitting total counts. When this node is run, it sets up an interval to control the emitter:
run: function(done) {
  var self = this;
  var fn = function() {
    self.ping();
  };

  this.timer = setInterval(fn, this.opts.interval);
  done(false);
}
When messages come in, we bump up the count for that language, and flag that the counts have changed:
process: function(x, done) {
  var self = this;

  if (!this.langs.hasOwnProperty(x)) {
    this.langs[x] = 0;
  }

  this.langs[x] ++;
  this.total++;
  this.changed = true;
  done();
}
Every time the interval timer fires, if our counts have changed we emit our totals:
ping: function() {
  var self = this;
  var msg;

  if (!this.changed) {
    return;
  }

  this.changed = false;
  msg = {};
  _.each(this.langs, function(x, i) {
    msg[i] = (x / self.total);
  });
  this.output(msg);
}

The Express Server

So far we’ve consumed the data from Twitter, pulled it apart, and obtained some metrics from it. To get the data to our end-users we have to extract it from the Topology, send it over a WebSocket, and display it. This is where the Express based server comes in. Take a look at server.js. This is a pretty standard, minimal Express app. It uses Socket.IO as an easy way to deliver data in real-time. You might also want to look at sock.js as a more modern alternative. The interesting part of server.js is it’s use of a Straw feature called a Tap. A Tap lets us tap into a topology so we can stream data from it. If you look at our topology definintion in run.js, you’ll see there are pipes for client-langs and a few other nodes, but no consumers attached to them. These are there for our Tap to hook into. Inside server.js we have code like this (simplified a bit here):
var straw = require('straw');
var langs = new straw.tap({
      'input':'client-langs',
    });

langs.on('message', function(msg) {
  langs = msg;
  io.sockets.emit('langs', msg);
});
This includes the Straw library, creates a new Tap from it that is connected to the client-langs pipe, and binds an event handler that will be called whenever a messages is received on that pipe. When a message is received, we push it out using Socket.IO. On the client side, things are very simple. See the following code in public/js/haystack.js.
var socket = io.connect('http://localhost:3000');

socket.on('langs', function (data) {
  // do something
});
Whenever a message is received, a callback is fired on the client with whatever payload we provided in data. This is used to draw our histogram of languages, counts of hashtags, and geolocated pings on the map.

Conclusion

Wrapping up, Haystack is a compact example of how to use dataflow to process incoming messages. It provides a good example of how Straw is being used in the real world. Because every node is run in it’s own process, it’s possible to scale it without too much trouble. But, I think the bigger benefit is how easy it makes it to decompose your problem into small, simple steps. Feel free to use Haystack as a foundation for your application. Haystack is easy to extend by adding more processing nodes and visualizations for them. Fork the repo, and if you come up with something cool, send a pull request — lets see how comprehensive we can make this.

Frequently Asked Questions (FAQs) about Dataflow Programming

What is the main difference between dataflow programming and traditional programming?

In traditional programming, the control flow of the program is explicit. The programmer determines the exact order in which operations will be executed. In contrast, dataflow programming is based on the concept of data dependencies. The execution order of operations is determined by the availability of input data. This allows for greater parallelism and concurrency, as operations that do not depend on each other can be executed simultaneously.

How does dataflow programming improve efficiency?

Dataflow programming can significantly improve efficiency by allowing for parallel execution of operations. This is particularly beneficial in applications that require heavy computation, such as image processing or machine learning. By executing operations as soon as their input data is available, dataflow programming can make better use of system resources and reduce overall execution time.

What are some common applications of dataflow programming?

Dataflow programming is widely used in areas that require high-performance computation. This includes fields like machine learning, image and signal processing, and scientific computing. It is also used in event-driven programming, such as in graphical user interfaces or server applications, where the flow of the program is determined by external events.

What are the challenges in implementing dataflow programming?

While dataflow programming offers many benefits, it also presents some challenges. One of the main challenges is managing data dependencies. This requires careful design to ensure that operations are executed in the correct order. Another challenge is handling concurrency, as operations can be executed simultaneously. This can lead to issues such as race conditions, where the outcome of the program depends on the relative timing of operations.

How does dataflow programming relate to functional programming?

Dataflow programming and functional programming are both declarative programming paradigms, meaning they focus on what the program should accomplish, rather than how it should do it. In functional programming, functions are pure and do not have side effects, which makes it easier to reason about the program. Similarly, in dataflow programming, operations are independent and can be executed in any order, as long as their data dependencies are satisfied.

Can dataflow programming be used with other programming paradigms?

Yes, dataflow programming can be combined with other programming paradigms. For example, it can be used in an object-oriented programming context, where operations are methods on objects. It can also be used in a procedural programming context, where operations are procedures or functions.

What tools are available for dataflow programming?

There are many tools available for dataflow programming, ranging from general-purpose programming languages to specialized libraries and frameworks. Some examples include TensorFlow for machine learning, Apache Flink for stream processing, and LabVIEW for graphical programming.

How does dataflow programming handle errors?

Error handling in dataflow programming can be more complex than in traditional programming, due to the concurrent execution of operations. However, many dataflow programming systems provide mechanisms for error handling, such as exception propagation, where an error in one operation can cause dependent operations to be cancelled.

What is the future of dataflow programming?

With the increasing demand for high-performance computation, dataflow programming is likely to become more important in the future. Advances in hardware, such as multi-core processors and GPUs, make it possible to execute more operations in parallel, which is where dataflow programming shines. Furthermore, the rise of big data and machine learning applications, which require processing large amounts of data, is driving the adoption of dataflow programming.

How can I learn more about dataflow programming?

There are many resources available for learning about dataflow programming. This includes online tutorials, books, and academic papers. Additionally, many dataflow programming tools provide extensive documentation and examples, which can be a great way to learn by doing.

Simon SwainSimon Swain
View Author

Simon has been building the web since the days of Netscape 1.0. Recently he has been engineering real time data capture and delivery systems. On the side he produces Techno, lifts weights and researches computer arcana.

dataflowNode-JS-ToolsStraw.js
Share this article
Read Next
Get the freshest news and resources for developers, designers and digital creators in your inbox each week