JavaScript - - By Sandeep Panda

The Basics of Node.js Streams

Node.js is asynchronous and event driven in nature. As a result, it’s very good at handling I/O bound tasks. If you are working on an app that performs I/O operations, you can take advantage of the streams available in Node.js. So, let’s explore Streams in detail and understand how they can simplify I/O.

What are Streams

Streams are unix pipes that let you easily read data from a source and pipe it to a destination. Simply put, a stream is nothing but an EventEmitter and implements some specials methods. Depending on the methods implemented, a stream becomes Readable, Writable, or Duplex (both readable and writable). Readable streams let you read data from a source while writable streams let you write data to a destination.

If you have already worked with Node.js, you may have come across streams. For example, in a Node.js based HTTP server, request is a readable stream and response is a writable stream. You might have used fs module which lets you work with both readable and writable file streams.

Now that you know the basics, lets understand different type of streams. In this article, we will discuss readable and writable streams. Duplex streams are beyond the scope of this article.

Readable Stream

A readable stream lets you read data from a source. The source can be anything. It can be a simple file on your file system, a buffer in memory or even another stream. As streams are EventEmitters, they emit several events at various points. We will use these events to work with the streams.

Reading From Streams

The best way to read data from a stream is to listen to data event and attach a callback. When a chunk of data is available, the readable stream emits a data event and your callback executes. Take a look at the following snippet:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';

readableStream.on('data', function(chunk) {
    data+=chunk;
});

readableStream.on('end', function() {
    console.log(data);
});

The function call fs.createReadStream() gives you a readable stream. Initially, the stream is in a static state. As soon as you listen to data event and attach a callback it starts flowing. After that, chunks of data are read and passed to your callback. The stream implementor decides how often data event is emitted. For example, an HTTP request may emit a data event once a few KB of data are read. When you are reading data from a file you may decide you emit data event once a line is read.

When there is no more data to read (end is reached), the stream emits an end event. In the above snippet, we listen to this event to get notified when the end is reached.

There is also another way to read from stream. You just need to call read() on the stream instance repeatedly until every chunk of data has been read.

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;

readableStream.on('readable', function() {
    while ((chunk=readableStream.read()) != null) {
        data += chunk;
    }
});

readableStream.on('end', function() {
    console.log(data)
});

The read() function reads some data from the internal buffer and returns it. When there is nothing to read, it returns null. So, in the while loop we check for null and terminate the loop. Note that the readable event is emitted when a chunk of data can be read from the stream.

Setting Encoding

By default the data you read from a stream is a Buffer object. If you are reading strings this may not be suitable for you. So, you can set encoding on the stream by calling Readable.setEncoding(), as shown below.

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
    data+=chunk;
});

readableStream.on('end', function() {
    console.log(data);
});

In the above snippet we set the encoding to utf8. As a result, the data is interpreted as utf8 and passed to your callback as string.

Piping

Piping is a great mechanism in which you can read data from the source and write to destination without managing the flow yourself. Take a look at the following snippet:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.pipe(writableStream);

The above snippet makes use of the pipe() function to write the content of file1 to file2. As pipe() manages the data flow for you, you should not worry about slow or fast data flow. This makes pipe() a neat tool to read and write data. You should also note that pipe() returns the destination stream. So, you can easily utilize this to chain multiple streams together. Let’s see how!

Chaining

Assume that you have an archive and want to decompress it. There are a number of ways to achieve this. But the easiest and cleanest way is to use piping and chaining. Have a look at the following snippet:

var fs = require('fs');
var zlib = require('zlib');

fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'));

First, we create a simple readable stream from the file input.txt.gz. Next, we pipe this stream into another stream zlib.createGunzip() to un-gzip the content. Lastly, as streams can be chained, we add a writable stream in order to write the un-gzipped content to the file.

Additional Methods

We discussed some of the important concepts in readable streams. Here are some more stream methods you need to know:

  1. Readable.pause() – This method pauses the stream. If the stream is already flowing, it won’t emit data events anymore. The data will be kept in buffer. If you call this on a static (non-flowing) stream, the stream starts flowing, but data events won’t be emitted.
  2. Readable.resume() – Resumes a paused stream.
  3. readable.unpipe() – This removes destination streams from pipe destinations. If an argument is passed, it stops the readable stream from piping into the particular destination stream. Otherwise, all the destination streams are removed.

Writable Streams

Writable streams let you write data to a destination. Like readable streams, these are also EventEmitters and emit various events at various points. Let’s see various methods and events available in writable streams.

Writing to Streams

To write data to a writable stream you need to call write() on the stream instance. The following snippet demonstrates this technique.

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
    writableStream.write(chunk);
});

The above code is straightforward. It simply reads chunks of data from an input stream and writes to the destination using write(). This function returns a Boolean value indicating if the operation was successful. If true, then the write was successful and you can keep writing more data. If false is returned, it means something went wrong and you can’t write anything at the moment. The writable stream will let you know when you can start writing more data by emitting a drain event.

End of Data

When you don’t have more data to write you can simply call end() to notify the stream that you have finished writing. Assuming res is an HTTP response object, you often do the following to send the response to browser:

res.write('Some Data!!');
res.end('Ended.');

When end() is called and every chunk of data has been flushed, a finish event is emitted by the stream. Just note that you can’t write to the stream after calling end(). For example, the following will result in an error.

res.write('Some Data!!');
res.end();
res.write('Trying to write again'); //Error!

Here are some important events related to writable streams:

  1. error – Emitted to indicate that an error has occurred while writing/piping.
  2. pipe – When a readable stream is piped into a writable stream, this event is emitted by the writable stream.
  3. unpipe – Emitted when you call unpipe on the readable stream and stop it from piping into the destination stream.

Conclusion

This was all about the basics of streams. Streams, pipes, and chaining are the core and most powerful features in Node.js. If used responsibly, streams can indeed help you write neat and performant code to perform I/O.

Did you like the article? Do let us know what you think via comments.

Sponsors
Login or Create Account to Comment
Login Create Account