The Basics of Node.js Streams

Sandeep Panda
Sandeep Panda
Published in
·Updated:

Share this article

The Basics of Node.js Streams
SitePoint Premium
Stay Relevant and Grow Your Career in Tech
  • Premium Results
  • Publish articles on SitePoint
  • Daily curated jobs
  • Learning Paths
  • Discounts to dev tools

7 Day Free Trial. Cancel Anytime.

Node.js is asynchronous and event driven in nature. As a result, it’s very good at handling I/O bound tasks. If you are working on an app that performs I/O operations, you can take advantage of the streams available in Node.js. So, let’s explore Streams in detail and understand how they can simplify I/O.

Key Takeaways

  • Node.js streams, which are asynchronous and event-driven, can simplify I/O operations by efficiently handling data in smaller, manageable chunks.
  • Streams can be categorized as Readable, Writable, Duplex (both readable and writable) or Transform (modifying data as it passes through).
  • The ‘pipe()‘ function is a useful tool in Node.js streams, allowing data to be read from a source and written to a destination without manually managing the data flow.
  • Modern Node.js provides utilities like ‘stream.pipeline()‘ and ‘stream.finished()‘ along with Promise-based APIs for better error handling and flow control.
  • Streams can be used with async/await patterns for cleaner, more maintainable code.

What are Streams

Streams in Node.js are inspired by Unix pipes and provide a mechanism to read data from a source and pipe it to a destination in a streaming fashion.

Simply put, a stream is nothing but an EventEmitter and implements some specials methods. Depending on the methods implemented, a stream becomes Readable, Writable, Duplex, or Transform. Readable streams let you read data from a source while writable streams let you write data to a destination.

If you have already worked with Node.js, you may have come across streams. For example, in a Node.js based HTTP server, request is a readable stream and response is a writable stream. You might have used fs module which lets you work with both readable and writable file streams.

Let’s understand the different types of streams. In this article, we will focus primarily on readable and writable streams, but will also briefly cover Duplex and Transform streams.

Readable Stream

A readable stream lets you read data from a source. The source can be anything. It can be a simple file on your file system, a buffer in memory or even another stream. As streams are EventEmitters, they emit several events at various points. We will use these events to work with the streams.

Reading From Streams

The best way to read data from a stream is to listen to data event and attach a callback. When a chunk of data is available, the readable stream emits a data event and your callback executes. Take a look at the following snippet:

// Traditional callback approach
const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let data = '';

readableStream.on('data', function(chunk) {
  data += chunk;
});

readableStream.on('end', function() {
  console.log(data);
});

// Error handling
readableStream.on('error', (err) => {
  console.error('Error reading stream:', err);
});

The function call fs.createReadStream() gives you a readable stream. Initially, the stream is in a static state. As soon as you listen to data event and attach a callback it starts flowing. After that, chunks of data are read and passed to your callback. The stream implementor decides how often data event is emitted. For example, an HTTP request may emit a data event once a few KB of data are read. When you are reading data from a file you may decide you emit data event once a line is read.

When there is no more data to read (end is reached), the stream emits an end event. In the above snippet, we listen to this event to get notified when the end is reached.

With modern ECMAScript features, we can rewrite this using async/await:

const fs = require('fs');
const { Readable } = require('stream');
const { promisify } = require('util');

// Convert stream.on('end') to a Promise
const streamToString = async (stream) => {
  const chunks = [];
  
  for await (const chunk of stream) {
    chunks.push(typeof chunk === 'string' ? chunk : chunk.toString());
  }
  
  return chunks.join('');
};

async function readFile() {
  try {
    const readableStream = fs.createReadStream('file.txt');
    const content = await streamToString(readableStream);
    console.log(content);
  } catch (err) {
    console.error('Error reading file:', err);
  }
}

readFile();

 Here, we’re using several newer JavaScript features:

  1. The for await...of loop allows us to iterate over async iterables (like streams in Node.js)
  2. We’re creating a streamToString helper function that collects all chunks from a stream and returns a Promise that resolves to the full string
  3. We wrap everything in a try/catch block for proper error handling
  4. This approach is more linear and easier to read than the event-based approach

Now there are two modes a Readable stream can operate in:

1. Flowing mode – Data is read automatically and provided as quickly as possible through events
2. Paused mode – You must explicitly call read() to get data chunks repeatedly until every chunk of data has been read.

const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let data = '';
let chunk;

readableStream.on('readable', function() {
  while ((chunk = readableStream.read()) != null) {
    data += chunk;
  }
});

readableStream.on('end', function() {
  console.log(data);
});

The read() function reads some data from the internal buffer and returns it. When there is nothing to read, it returns null. So, in the while loop we check for null and terminate the loop. Note that the readable event is emitted when a chunk of data can be read from the stream.

Setting Encoding

By default the data you read from a stream is a Buffer object. If you are reading strings this may not be suitable for you. So, you can set encoding on the stream by calling Readable.setEncoding(), as shown below.

const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let data = '';

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  data += chunk;
});

readableStream.on('end', function() {
  console.log(data);
});

In the above snippet we set the encoding to utf8. As a result, the data is interpreted as utf8 and passed to your callback as string.

Piping

Piping is a great mechanism in which you can read data from the source and write to destination without managing the flow yourself. Take a look at the following snippet:

const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.pipe(writableStream);

The above snippet makes use of the pipe() function to write the content of file1 to file2. As pipe() manages the data flow for you, you should not worry about slow or fast data flow. This makes pipe() a neat tool to read and write data. You should also note that pipe() returns the destination stream. So, you can easily utilize this to chain multiple streams together. Let’s see how!

However, one limitation of pipe() is that it doesn’t provide good error handling. This is where modern Node.js provides better utilities:

const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');

const pipelineAsync = promisify(pipeline);

async function copyFile() {
  try {
    const readableStream = fs.createReadStream('file1.txt');
    const writableStream = fs.createWriteStream('file2.txt');
    
    await pipelineAsync(readableStream, writableStream);
    console.log('File copied successfully');
  } catch (err) {
    console.error('Pipeline failed:', err);
  }
}

copyFile();

Here:

  1. We’re using the pipeline function from the stream module, which automatically handles errors and resource cleanup.
  2. We convert the callback-based pipeline to a Promise using promisify
  3. We can then use async/await for a cleaner flow.
  4. All errors are properly caught in a single try/catch block.
  5. If any stream in the pipeline emits an error, pipeline automatically destroys all streams and calls the callback with the error.

Chaining

Assume that you have an archive and want to decompress it. There are a number of ways to achieve this. But the easiest and cleanest way is to use piping and chaining. Have a look at the following snippet:

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'));

First, we create a simple readable stream from the file input.txt.gz. Next, we pipe this stream into another stream zlib.createGunzip() to un-gzip the content. Lastly, as streams can be chained, we add a writable stream in order to write the un-gzipped content to the file.

A more robust approach using pipeline:

const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');

pipeline(
  fs.createReadStream('input.txt.gz'),
  zlib.createGunzip(),
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Here we’re using pipeline with multiple streams:

  1. Unlike pipe() which doesn’t properly forward errors, pipeline handles errors from any stream in the chain.
  2. If any stream in the pipeline fails (like if the file doesn’t exist or the content isn’t valid gzip), the callback receives the error.
  3. Pipeline automatically cleans up resources by destroying all streams if any stream errors.
  4. The last argument is a callback that tells us if the operation succeeded or failed.

Additional Methods

We discussed some of the important concepts in readable streams. Here are some more stream methods you need to know:

  1. Readable.pause() – This method pauses the stream. If the stream is already flowing, it won’t emit data events anymore. The data will be kept in buffer. If you call this on a static (non-flowing) stream, there is no effect and the stream remains paused.
  2. Readable.resume() – Resumes a paused stream.
  3. readable.unpipe() – This removes destination streams from pipe destinations. If an argument is passed, it stops the readable stream from piping into the particular destination stream. Otherwise, all the destination streams are removed.

Writable Streams

Writable streams let you write data to a destination. Like readable streams, these are also EventEmitters and emit various events at various points. Let’s see various methods and events available in writable streams.

Writing to Streams

To write data to a writable stream you need to call write() on the stream instance. The following snippet demonstrates this technique.

const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  writableStream.write(chunk);
});

The above code is straightforward. It simply reads chunks of data from an input stream and writes to the destination using write(). This function returns a Boolean value indicating if the operation was successful.

The return value of writableStream.write(chunk) indicates whether the internal buffer is ready for more data, which is crucial for handling backpressure:

  • true: The data was successfully written, and you can continue writing more data immediately.
  • false: The internal buffer is full (reaching the highWaterMark limit). It doesn’t mean an error occurred but signals that you should pause writing to prevent overloading the buffer. You should wait for the 'drain' event before resuming writing.

A better approach that handles backpressure:

const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  // If write returns false, pause reading until drain event
  const canContinue = writableStream.write(chunk);
  if (!canContinue) {
    readableStream.pause();
  }
});

writableStream.on('drain', function() {
  // Resume reading when the drain event occurs
  readableStream.resume();
});

readableStream.on('end', function() {
  writableStream.end();
});

// Error handling
readableStream.on('error', (err) => {
  console.error('Read error:', err);
  writableStream.end();
});

writableStream.on('error', (err) => {
  console.error('Write error:', err);
});

This example handles backpressure, which is a critical concept in streams:

  1. When write() returns false, it means the internal buffer is full, and we should stop sending more data.
  2. We pause the readable stream to stop receiving data temporarily.
  3. When the writable stream emits ‘drain’, it means the buffer has emptied and we can resume reading.
  4. We’ve also added proper error handling for both streams.
  5. When reading completes, we call end() on the writable stream to signal completion.
  6. This approach prevents memory from growing unbounded when the writer can’t keep up with the reader.

End of Data

When you don’t have more data to write you can simply call end() to notify the stream that you have finished writing. Assuming res is an HTTP response object, you often do the following to send the response to browser:

res.write('Some Data!!');
res.end('Ended.');

When end() is called and every chunk of data has been flushed, a finish event is emitted by the stream. Just note that you can’t write to the stream after calling end(). For example, the following will result in an error.

res.write('Some Data!!');
res.end();
res.write('Trying to write again'); //Error!

Here are some important events related to writable streams:

  1. error – Emitted to indicate that an error has occurred while writing/piping.
  2. pipe – When a readable stream is piped into a writable stream, this event is emitted by the writable stream.
  3. unpipe – Emitted when you call unpipe on the readable stream and stop it from piping into the destination stream.

Duplex and Transform Streams

Duplex streams are readable and writable streams combined. They maintain two separate internal buffers, one for reading and one for writing, which operate independently from each other.

Duplex streams are useful when you need simultaneous but separate input and output streams, such as in network sockets (like TCP).

const { Duplex } = require('stream');

// Create a custom duplex stream
const myDuplex = new Duplex({
  read(size) {
    // Implementation for reading
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  },
  write(chunk, encoding, callback) {
    // Implementation for writing
    console.log(chunk.toString());
    callback();
  }
});

// Initialize the starting character code
myDuplex.currentCharCode = 65; // ASCII for 'A'

This example creates a custom Duplex stream:

  1. The read() method generates uppercase letters from A to Z (ASCII codes 65-90).
  2. Each time read() is called, it pushes the next letter and increments the counter.
  3. When we reach ‘Z’, we push null to signal the end of the read stream.
  4. The write() method simply logs any data written to the stream to the console.
  5. Duplex streams are useful when you need independent read and write operations in a single stream.

Transform streams are a special type of Duplex stream that can modify or transform the data as it is written and read. Unlike Duplex streams, where the input and output are separate, Transform streams have their output directly related to the input. Typical examples include zlib streams for compression/decompression and crypto streams for encryption/decryption.

const { Transform } = require('stream');

// Create a transform stream that converts incoming data to uppercase
const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    // Convert the chunk to uppercase and push it to the read queue
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Use the transform stream
process.stdin
  .pipe(upperCaseTr)
  .pipe(process.stdout);

This Transform stream example:

  1. Creates a transform stream that converts input text to uppercase.
  2. The transform() method takes input chunks, transforms them, and pushes them to the output.
  3. We’re piping from standard input, through our transformer, to standard output.
  4. When you run this code, anything you type will be displayed in uppercase.
  5. Transform streams are ideal for processing or modifying data as it flows through, like parsing JSON, converting encodings, or encrypting data.

Conclusion

This was all about the basics of streams. Streams, pipes, and chaining are the core and most powerful features in Node.js. If used responsibly, streams can indeed help you write neat and performant code to perform I/O. Just make sure to handle stream errors and close streams appropriately to prevent memory leaks.

With the newer additions to the Node.js API like stream.pipeline(), stream.finished(), and Promise-based stream APIs, handling streams has become more robust and easier to work with. When dealing with large amounts of data, streams should be your go-to solution for efficient memory usage and performance.

What are Node.js Streams?

What are Node.js streams?

Node.js streams are a feature of the Node.js standard library that allow you to work with data in a more efficient and scalable way, by processing it in smaller, more manageable chunks, as opposed to loading entire data sets into memory.

What are the main types of Node.js streams?

Node.js streams come in four main types: Readable, Writable, Duplex, and Transform. Readable streams are for reading data, Writable streams are for writing data, Duplex streams allow both reading and writing, and Transform streams modify the data as it passes through.

How do I create a Readable stream in Node.js?

To create a Readable stream, you can use the stream.Readable class provided by Node.js. You can extend this class and implement the _read method to provide data to be read.

What are the common use cases for Readable streams?

Readable streams are useful for reading large files, processing data from external sources like HTTP requests, and handling data in real-time, such as log file monitoring.

How do I create a Writable stream in Node.js?

To create a Writable stream, you can use the stream.Writable class provided by Node.js. You need to implement the _write method to handle data as it’s written to the stream.

What are some common uses of Writable streams?

Writable streams are used for saving data to files, sending data to external services, or processing and filtering data as it’s written.

What is a Duplex stream in Node.js?

A Duplex stream is a combination of a Readable and Writable stream, allowing both reading and writing. It’s useful when you need to transform data while also providing an interface for further data input.

What are Transform streams and when should I use them?

Transform streams are a subclass of Duplex streams that allow data to be modified as it passes through. They are often used for tasks like data compression, encryption, and parsing.

How can I pipe data between streams in Node.js?

You can pipe data between streams using the .pipe() method. For example, you can pipe data from a Readable stream to a Writable stream, allowing for efficient data transfer without manually managing the data flow.

Are there any best practices for working with Node.js streams?

Some best practices include using streams for handling large datasets efficiently, handling errors and backpressure correctly, and using the util.promisify function for working with streams in a more promise-friendly manner.

What are the advantages of using streams.pipeline() over pipe()?

The streams.pipeline() method provides automatic error handling and cleanup of resources when an error occurs, which pipe() doesn’t. It also provides a callback when the operation completes or errors, and has a Promise-based version for use with async/await.

How can I convert traditional callback-based stream operations to use Promises?

You can use the util.promisify() function to convert callback-based stream methods to Promise-based ones. Additionally, Node.js now provides built-in Promise-based APIs for streams in the ‘stream/promises’ module starting from Node.js 15.0.0.

How do I handle backpressure in Node.js streams?

Backpressure occurs when a writable stream can’t keep up with the readable stream providing data. You can handle this by monitoring the return value of the write() method and pausing the readable stream if it returns false, then resuming when the ‘drain’ event is emitted.

Sandeep is the Co-Founder of Hashnode. He loves startups and web technologies.

ColinILearn-Node-JSnode.jsstreams

Share this article

Subscribe to our newsletter

Get the freshest news and resources for developers, designers and digital creators in your inbox each week

© 2000 – 2025 SitePoint Pty. Ltd.
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.