Introduction to Node.js Streams

Scalability. Big Data. Real Time. These are some of the challenges that a web application has to face in the modern World Wide Web. This is where Node.js and its non-blocking I/O model comes into play. This article will introduce you to one of Node’s most powerful APIs for data intensive computing, streams.

Why Use Streams?

Let’s consider the following example:

var http = require('http')
   , fs = require('fs')
   ;

var server = http.createServer(function (req, res) {
  fs.readFile(__dirname + '/data.txt', function (err, data) {
    res.end(data);
  });
});

server.listen(8000);

This code works perfectly. Nothing is wrong with it, except for the fact that Node.js
buffers the entire contents of data.txt before sending the data back to the client. With the increase of clients requests your application could start to consume a lot of memory. In addition clients will need to wait for the entire file to be read by the server application, resulting in increased latency.

Let’s have a look at another example:

var http = require('http')
  , fs = require('fs')
  ;

var server = http.createServer(function (req, res) {
  var stream = fs.createReadStream(__dirname + '/data.txt');
  stream.pipe(res);
});
server.listen(8000);

Here, to overcome the scalability issues we use the streams API. Using the stream object ensures that data.txt is sent to clients one chunk at a time as they are read from the disk, without server buffering and waiting times on the client.

What are Streams?

Streams can be defined as a continuous flow of data that can be manipulated asynchronously as data comes in (or out). In Node.js streams can be readable or writable. A readable stream is an EventEmitter object that emits data events every time a chunk of data is received. In our previous example a readable stream has been used to pipe the content of a file down to a HTTP client. When the stream reaches the end of our file it emits an end event, indicating that no more data events will occur. In addition a readable stream can be paused and resumed.

Writable streams, on the other hand, accept streams of data. This type of stream inherits from the EventEmitter object too, and implements two methods: write() and end(). The first method writes data to a buffer and returns true if the data has been flushed correctly, or false if the buffer is full (in this case the data will be sent out later).The end() method simply indicates that the stream is finished.

Your First Streams Application

Let’s take a closer look at streams. To do so we are going to build a simple file upload application. First of all, we need to build a client that reads a file using a readable stream and pipes the data to a specific destination. At the other end of the pipe we’ll implement a server that saves the uploaded data using a writable stream.

Let’s start with the client. We begin with importing the HTTP and file system modules.

var http = require('http')
  , fs = require('fs');

Then, we define our HTTP request.

var options = {
  host: 'localhost'
  , port: 8000
  , path: '/'
  , method: 'POST'
};
var req = http.request(options, function(res) {
  console.log(res.statusCode);
});

Now that we have our request, we create a readable stream that reads the file and pipes the content to the request object.

var readStream = fs.ReadStream(__dirname + "/in.txt");
readStream.pipe(req);

Once the stream has finished reading all of the data we close the connection with the server, calling the end() method of our request.

readStream.on('close', function () {
  req.end();
  console.log("I finished.");
});

The Server

As we did for the client, we start with importing Node.js modules. Then, we create a new writable stream that saves the data into a text file.

var http = require('http')
  , fs = require('fs');

var writeStream = fs.createWriteStream(__dirname + "/out.txt");

To let our client application upload files we have to create a new web server object. As data comes from the request object, the server calls our stream and flushes the buffer to an output file.

var server = http.createServer(function (req, res) {
  req.on('data', function (data) {
    writeStream.write(data);
  });
  req.on('end', function() {
    writeStream.end();
    res.statusCode = 200;
    res.end("OK");
  });
});
server.listen(8000);

Please note that the req and res objects returned by createServer() are a readable stream and writable stream, respectively. We can listen for the data event, and pipe back the result to the client once all the processing is over.

Conclusion

This article has introduced one of the most powerful tools of Node.js, the streams API. In the coming weeks, we will dive deeper into the world of streams, exploring all the different types built into Node.js, and third party streams too.

Free book: Jump Start HTML5 Basics

Grab a free copy of one our latest ebooks! Packed with hints and tips on HTML5's most powerful new features.

  • suprsidr

    How would one pipe the output of a serverside exec to say ffmpeg? The output can be quite long, so piping to res would be desirable. I’ve tried using child_process.exec without success. has no method ‘pipe’

  • hughsk

    Maybe this?


    var proc = child_process.spawn('ffmpeg'
    , ['-i', 'video.avi', '-f', 'avi', 'pipe:1']
    , { cwd: process.cwd() }
    )

    proc.stdout.pipe(res)

    http://ffmpeg.org/ffmpeg.html#pipe

  • agebrock

    The length matters ;)

    request(remoteSource.tar.gz)
    .pipe(gunzip)
    .pipe(untar)
    .pipe(node-expat)
    .pipe(fs.createWriteStream(localTarget))
    .on(“end”,….)

    //remoteSource.tar.gz contained more than 70.000 xml files..
    // the first implementation was done with cluster and ran to 8 child processes.
    //They had not even a chance to complete the task in double time…

    Since then i don’t leave house without that a streamable snippet in my pocket…

    /**
    * @see https://github.com/joyent/node/blob/master/lib/stream.js
    * @author Christoph Hagenbrock
    */
    var
    stream = require(“stream”),
    util = require(“util”);

    function Streamable(){
    this.writable = true;
    this.readable = true;
    }

    util.inherits(Streamable, stream.Stream);
    module.exports = Streamable;