Skip to content

Instantly share code, notes, and snippets.

@joyrexus
Last active August 21, 2023 16:59
Show Gist options
  • Save joyrexus/10026630 to your computer and use it in GitHub Desktop.
Save joyrexus/10026630 to your computer and use it in GitHub Desktop.
Node.js streams demystified

A quick overview of the node.js streams interface with basic examples.

This is based on @brycebaril's presentation, Node.js Streams2 Demystified

Overview

Streams are a first-class construct in Node.js for handling data.

Think of them as as lazy evaluation applied to data.

There are essentially three major concepts:

  • source - where the data comes from
  • pipeline - where you filter or transform your data as it passes through
  • sink - where your data ultimately goes

Benefits in using streams:

  • lazily produce or consume data in buffered chunks
  • evented and non-blocking
  • low memory footprint
  • automatically handle back-pressure
  • buffers allow you to work around the v8 heap memory limit
  • most core node.js content sources/sinks are streams already!

Five classes of streams:

  • Readable - sources
  • Writable - sinks
  • Duplex - both source and sink
  • Transform - in-flight stream operations
  • Passthrough - stream spy

Below is a quick overview of Readable, Writable, and Transform streams.

See also:


Readable

Use a Readable stream when supplying data as a stream.

Think: spigot/faucet.

How to implement

  1. Subclass stream.Readable.

  2. Implement a _read(size) method.

Methods

_read(size)

  • size is in bytes, but can be ignored (especially for objectMode streams)
  • _read(size) must call this.push(chunk) to send a chunk to the consumer

Options

  • highWaterMark number: maximum number of bytes to store in the internal buffer before ceasing to read (default: 16kb)

  • encoding string: if set, buffers will be decoded to strings instead of passing buffers (default: null)

  • objectmode boolean: instead of using buffers/strings, use javascript objects (default: false)

How to use

  • readable.pipe(target)
  • readable.read(size)
  • readable.on("data", ... )

See also

  • stream-spigot - creates readable streams from Arrays or simple functions

Writable

Use a Writable stream when collecting data from a stream.

Think: drain/collect.

How to implement

  1. Subclass stream.Writable.

  2. Implement a _write(chunk, encoding, callback) method.

Methods

_write(chunk, encoding, callback)

  • chunk is the content to write
  • Call callback() when you're done with this chunk

Options

  • highWaterMark number: maximum number of bytes to store in the internal buffer before ceasing to read (default: 16kb)

  • decodeStrings boolean: whether to decode strings to Buffers before passing them to _write() (default: true)

How to use

  • source.pipe(sink)
  • writable.write(chunk [,encoding] [,callback])

See also

  • concat-stream - writable stream that concatenates strings or binary data and calls a callback with the result

Transform

Use a Transform stream when you want to operate on a stream in transit. This is a special kind of Duplex stream where the input and output stream are the same stream.

Think: filter/map.

How to implement

  1. Subclass stream.Transform.
  2. Implement a _transform(chunk, encoding, callback) method.
  3. Optionally implement a _flush(callback) method.

Methods

_transform(chunk, encoding, callback)

Call this.push(something) to forward it to the next consumer. You don't have to push anything, this will skip a chunk. You must call callback one time per _transform call.

_flush(callback)

When the stream ends, this is your chance to do any cleanup or last-minute this.push() calls to clear any buffers or work. Call callback() when done.

Options

Superset of Readable and Writable options.

How to use

  • source.pipe(transform).pipe(drain)
  • transform.on("data", ... )

See also

  • through2 - makes it easy to generate Transforms without all the subclassing boilerplate
  • through2-map - Array.prototype.map analog for streams
  • through2-filter - Array.prototype.filter analog for streams
  • through2-reduce - Array.prototype.reduce analog for streams
  • stream reducer demo - showing how to extend a Transform stream to create reducers/accumulators for streamed objects
  • sculpt - a collection of transform stream utilities (all operating in objectMode)
  • pipe-iterators - another collection of functions for iterating over object mode streams
Readable = require("stream").Readable
class Source extends Readable
constructor: (@content, options) ->
super options
_read: (size) ->
if not @content
@push null
else
@push(@content.slice(0, size))
@content = @content.slice(size)
s = new Source("The quick brown fox jumps over the lazy dog.")
console.log(chunk.toString()) while chunk = s.read(10) # print in 10 byte chunks
# The quick
# brown fox
# jumps over
# the lazy
# dog.
s = new Source("How now brown cow?")
s.pipe(process.stdout)
# How now brown cow?
var Readable = require("stream").Readable
var inherits = require("util").inherits
function Source(content, options) {
Readable.call(this, options)
this.content = content
}
inherits(Source, Readable)
Source.prototype._read = function (size) {
if (!this.content) this.push(null)
else {
this.push(this.content.slice(0, size))
this.content = this.content.slice(size)
}
}
var s = new Source("The quick brown fox jumps over the lazy dog.")
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
// The quick
// brown fox
// jumps over
// the lazy
// dog.
var q = new Source("How now brown cow?")
q.pipe(process.stdout)
// How now brown cow?
{Readable, Transform} = require("stream")
class ToUpper extends Transform
_transform: (data, enc, next) ->
@push data.toString().toUpperCase()
next()
# a simple transform stream
tx = new ToUpper
# a simple source stream
rs = new Readable
rs.push 'the quick brown fox jumps over the lazy dog!\n'
rs.push null
rs.pipe(tx)
.pipe(process.stdout) # THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG!
var Transform = require("stream").Transform
var inherits = require("util").inherits
function ToUpper (options) {
Transform.call(this, options)
}
inherits(ToUpper, Transform)
ToUpper.prototype._transform = function (chunk, encoding, callback) {
var str = chunk.toString().toUpperCase()
this.push(str)
callback()
}
// a simple transform stream
var tx = new ToUpper;
// a simple source stream
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('the quick brown fox ');
rs.push('jumps over the lazy dog.\n');
rs.push(null);
rs.pipe(tx).pipe(process.stdout);
// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
{Readable, Writable} = require('stream')
class Sink extends Writable
_write: (data, enc, next) ->
console.log(data.toString())
next()
# a simple source stream
source = new Readable
source.push 'the quick brown fox '
source.push 'jumps over the lazy dog.\n'
source.push null
sink = new Sink
source.pipe(sink)
# the quick brown fox
# jumps over the lazy dog.
###
Same example as above except that the source stream passes strings (instead of buffers) and the sink stream doesn't decode the strings to buffers before writing.
###
class Sink extends Writable
constructor: ->
super(decodeStrings: false) # don't decode strings
_write: (data, enc, next) ->
console.log(data)
next()
# a simple source stream
source = new Readable(encoding: 'utf8') # buffers will be decoded to strings
source.push 'the quick brown fox '
source.push 'jumps over the lazy dog.\n'
source.push null
sink = new Sink
source.pipe(sink)
# the quick brown fox
# jumps over the lazy dog.
var Writable = require("stream").Writable
var inherits = require("util").inherits
function Sink(options) {
Writable.call(this, options)
}
inherits(Sink, Writable)
Sink.prototype._write = function (chunk, encoding, callback) {
console.log(chunk.toString())
callback()
}
// a simple source stream
var Readable = require('stream').Readable;
var source = new Readable;
source.push('the quick brown fox ');
source.push('jumps over the lazy dog.\n');
source.push(null);
var sink = new Sink;
source.pipe(sink);
@ORESoftware
Copy link

var sink = new Sink(); //?

@Ravenstine
Copy link

Thanks for this! It's amazing how other guides and documentation on streams overcomplicate the subject. You've distilled it down to the basics very well.

@humoyun
Copy link

humoyun commented Jan 14, 2018

You have revealed stream API of node

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment