Writable stream

Could you please clarify this statement in node.js ?

Transform stream is the duplex stream which is used in modifying the data (eg zip creation).

modifying the data ? …a writable stream can do that also … is not it ? how it is different then ? any example please to understand the difference.

It can also read, so you can pipe a read stream to a transform stream to a write stream. E.g. if you have

const { createReadStream, createWriteStream } = require('fs')

const read = createReadStream('./example.txt')
const write = createWriteStream('./example.txt.bak')

read.pipe(write)

but you want to modify the data before writing it to example.txt.bak, you can interpose a transform stream:

const { createReadStream, createWriteStream } = require('fs')
const { Transform } = require('stream')

const charShifter = new Transform({
  transform (chunk, enc, callback) {
    const shifted = Array
      .from(chunk, code => String.fromCharCode(code + 1))
      .join('')

    this.push(shifted)
    callback()
  }
})

const read = createReadStream('./example.txt')
const write = createWriteStream('./example.txt.bak')

read.pipe(charShifter).pipe(write)
1 Like

What is callback in this context? Is it just something you need to call to signal that processing has ended for the chunk, or does it have any further use?

Logging it to the console produces:

function (er, data) {
  return afterTransform(stream, er, data);
}

Yes, if you don’t call it, the transform stream will just hang there and block the pipe. As for the er parameter, that’s if something went wrong (error first and all); and then you can pipe some additional data (or simply the data itself). Example:

const { Readable, Transform } = require('stream')

const reverseStream = ([...chars]) => new Readable({
  read () {
    if (chars.length) {
      this.push(chars.pop())
    } else {
      this.push(null)
    }
  }
})

const passStream = new Transform({
  transform (chunk, enc, callback) {
    this.push(chunk)
    callback()
  }
})

reverseStream('foo')
  .pipe(passStream)
  .pipe(process.stdout)

This will output “oof”, but without calling the callback it would just pipe through “o” and not read any further data.

(You might actually also write that simple passStream like this:)

const passStream = new Transform({
  transform (chunk, enc, callback) {
    callback(null, chunk)
  }
})
1 Like

Excellent explanation. Thank you.

It seems like callback() is similar to next() when working with middleware.

1 Like

In fact you’ll sometimes see it called like that. :-) It behaves slightly differently though in that a middleware callback typically directly invokes the next middleware, while the next stream in the pipe will consume the data as soon as we .push() it. Instead, it’s a success (or failure) signal when we’re finished processing the current chunk, and ready (or not) to receive the next one… so omitting the callback in the above example would still pass through the first chunk.

1 Like

This topic was automatically closed 91 days after the last reply. New replies are no longer allowed.