Comments

The Power of NodeJS Streams and the event-stream Module

Streams are the number one topic in the nodejs community this year, so I decided to get a better understanding of what everybody is talking about by reading through one of the most cited libraries related to streams.

I took a look at Dominic Tarr's event-stream module and do understand now why people are so excited.

It's all about piping and transforming data as is common in functional languages (e.g, Haskell).

As pointed out by James Halliday (aka substack) in his lxjs talk this approach is also a big part of Unix.

Unix is all about using lots of tools - which each of which do only one small thing - and string them together to archieve a bigger goal.

In nodejs/javascript land the tools are modules.

Not all of them implement the streaming API, and that is where modules like event-stream come in.

It allows modules that only focus on one aspect to be wrapped in order to expose their functionality via a streaming API.

Additionally if the outputs of one module don't match the inputs of the other, we can use event-stream's features to transform them so that they do.

Finally nodejs's core modules support streams (e.g., fs.createReadStream and http.request), which allows to handle certain tasks via streams from beginning to end.

Below are my notes I took while reading through event-stream and related modules.

If you want to run the examples do the following:

git clone git://github.com/thlorenz/thlorenz.com-blog.git
cd thlorenz.com-blog/event-stream
npm install
cd snippets

Now you can run each snippet with node.

Also if you have suggestions, additions or corrections, please fork this blog. I'm open to pull requests.

Modules that support event-stream

Event stream uses and exposes a bunch of useful functions that have been split up into separate modules.

This has the advantage that they can be used separately and/or swapped out as needed.

In order to understand the event-stream module we need to look at them first.

through

Under the hood

split

Under the hood

Example

We split a file it into separate lines and re-emit them one by one with number of characters prepended.

Additionally we keep track of the number of lines and emit that information when the read stream ends.

var through =  require('through')
  , split   =  require('split')
  , fs      =  require('fs');

function count () {
  var lines = 0
    , nonEmptyLines = 0;

  return through(
    function write (data) {
      lines++;
      data.length && nonEmptyLines++;
      this.emit('data', 'chars: ' + data.length + '\t' + data + '\n');
    }
  , function end () {
      this.emit('data', 'total lines: ' + lines + ' | non empty lines: ' + nonEmptyLines);
      this.emit('end');
    }
  );
}

fs.createReadStream(__filename, { encoding: 'utf-8' })
  .pipe(split())
  .pipe(count())
  .pipe(process.stdout);
Output:
➝  node through
chars: 33 var through =  require('through')
chars: 31   , split   =  require('split')
chars: 29   , fs      =  require('fs');
chars: 0  
chars: 19 function count () {
chars: 15   var lines = 0
chars: 24     , nonEmptyLines = 0;
chars: 0  
chars: 17   return through(
chars: 27     function write (data) {
chars: 14       lines++;
chars: 37       data.length && nonEmptyLines++;
chars: 70       this.emit('data', 'chars: ' + data.length + '\t' + data + '\n');
chars: 5      }
chars: 21   , function end () {
chars: 90       this.emit('data', 'total lines: ' + lines + ' | non empty lines: ' + nonEmptyLines);
chars: 23       this.emit('end');
chars: 5      }
chars: 4    );
chars: 1  }
chars: 0  
chars: 54 fs.createReadStream(__filename, { encoding: 'utf-8' })
chars: 16   .pipe(split())
chars: 16   .pipe(count())
chars: 24   .pipe(process.stdout);
total lines: 25 | non empty lines: 22

map-stream

Under the hood

Example

Similar to above except here we filter out empty lines and don't emit the number of lines at the end.

var map   =  require('map-stream')
  , split =  require('split')
  , fs    =  require('fs');

function count () {
  return map(function (data, cb) {
    // ignore empty lines
    data.length ? 
      cb(null, 'chars: ' + data.length + '\t' + data + '\n') : 
      cb();
  });
}

fs.createReadStream(__filename, { encoding: 'utf-8' })
  .pipe(split())
  .pipe(count())
  .pipe(process.stdout);
Output:
➝  node map-stream.js 
chars: 34    var map   =  require('map-stream')
chars: 29      , split =  require('split')
chars: 27      , fs    =  require('fs');
chars: 19    function count () {
chars: 34      return map(function (data, cb) {
chars: 25        // ignore empty lines
chars: 18        data.length ? 
chars: 63          cb(null, 'chars: ' + data.length + '\t' + data + '\n') : 
chars: 11          cb();
chars: 5      });
chars: 1    }
chars: 54    fs.createReadStream(__filename, { encoding: 'utf-8' })
chars: 16      .pipe(split())
chars: 16      .pipe(count())
chars: 24      .pipe(process.stdout);

duplexer

Under the hood

pause-stream

from

This module is neither used nor documented inside event-stream.

I only include it here for completeness' sake.

event-stream

Exposes all functions from the modules described above.

It also introduces additional functions. Some of those are implemented using the above described module functions as building blocks.

mapSync

Under the hood

Example

Exact same as map-stream example above with same kind of output.

var mapSync =  require('event-stream').mapSync
  , split   =  require('split')
  , fs      =  require('fs');

function count () {
  return mapSync(function (data) {
    // ignore empty lines
    return data.length ? 
      'chars: ' + data.length + '\t' + data + '\n' : 
      undefined;
  });
}

fs.createReadStream(__filename, { encoding: 'utf-8' })
  .pipe(split())
  .pipe(count())
  .pipe(process.stdout);

Array/String operations

join

Under the hood

Example

We first split the data into lines and then join them together, injecting an extra line each time.

var join  =  require('event-stream').join
  , split =  require('split')
  , fs    =  require('fs');

fs.createReadStream(__filename, { encoding: 'utf-8' })
  .pipe(split())
  .pipe(join('\n******\n'))
  .pipe(process.stdout);
Output:
➝  node join
var join  =  require('event-stream').join
******
  , split =  require('split')
******
  , fs    =  require('fs');
[..]

replace

Under the hood

Example

The below has the exact same effect as the above example for join.

var replace =  require('event-stream').replace
  , fs    =  require('fs');

fs.createReadStream(__filename, { encoding: 'utf-8' })
  .pipe(replace('\n', '\n******\n'))
  .pipe(process.stdout);

JSON converters

parse

Under the hood

stringify

Under the hood

Example

This example should give a glimpse on how powerful streams can be.

Notably, the ability to inject simple transformer functions in order to adapt outputs to inputs expected by the function that is next in the flow is important.

This allows to compose all kinds of small functions that do one tiny thing in order to archieve quite complex tasks in a most performant way with the smallest memory footprint possible.

The comments should suffice to show what is going on in the code.

var Stream    =  require('stream')
  , es        =  require('event-stream');

function objectStream () {
  var s = new Stream()
    , objects = 0;

  var iv = setInterval(
      function () {
        s.emit('data', { id: objects, created: new Date() });
        if (++objects === 3) {
            s.emit('end');
            clearInterval(iv);
        }
      }
    , 20);
  return s;
}

function tap () {
  return es.through(
    function write (data) {
      console.log('\n' + data);
      this.emit('data', data);
    }
  );
}

function padId () {
  return es.mapSync(function (obj) {
    obj.id = '000' + obj.id;
    return obj;
  });
}

objectStream()
  .pipe(es.stringify())   // prepare for printing
  .pipe(tap())            // print intermediate result
  .pipe(es.parse())       // convert back to object
  .pipe(padId())          // change it a bit
  .pipe(es.stringify())   // prepare for printing
  .pipe(process.stdout);  // print final result
Output:
node json

{"id":0,"created":"2012-10-01T12:10:13.905Z"}

{"id":"0000","created":"2012-10-01T12:10:13.905Z"}

{"id":1,"created":"2012-10-01T12:10:13.928Z"}

{"id":"0001","created":"2012-10-01T12:10:13.928Z"}

{"id":2,"created":"2012-10-01T12:10:13.950Z"}

{"id":"0002","created":"2012-10-01T12:10:13.950Z"}

readable

Under the hood

Example

Ten Squares shows how to use plain old callback to pass on data.

var es = require('event-stream');

function tenSquares (count, cb) {
  return count < 10 ? cb(null, count * count) : this.emit('end');
}

es.readable(tenSquares)
  .pipe(es.stringify())
  .pipe(process.stdout);
Output:
node readable-squares
0
1
4
9
16
25
36
49
64
81

Three Cubes shows how to manually stream.emit the data and then invoke the callback to be called again.

Note how we can emit as many times as we like.

var es = require('event-stream');

function threeCubes (count, cb) {
  if (count < 3) {
    this.emit('data', 'Cubing ' + count);
    this.emit('data', count * count * count);
    this.emit('data', 'OK');
    cb();
  } else {
    this.emit('end');
  }
}

es.readable(threeCubes)
  .pipe(es.stringify())
  .pipe(process.stdout);
Output:
node readable-cubes
"Cubing 0"
0
"OK"
"Cubing 1"
1
"OK"
"Cubing 2"
8
"OK"

readArray

Under the hood

writeArray

Under the hood

Example

We use readArray to generate a stream of values which we the multiply by 10 and pipe into writeArray so we can validate the resulting array.

var es = require('event-stream') ;

function multiplyByTen (item, cb) {
  // long running async operation ;)
  setTimeout(
      function () { cb(null, item * 10); }
    , 50
  );
}

function validate(err, array) {
  if (!err && array.toString() === '0,10,20,30')
   console.log('OK');
  else 
   console.log('NOT OK');
}

es.readArray([0, 1, 2, 3])        // generate data
  .pipe(es.map(multiplyByTen))    // transform asynchronously 
  .pipe(es.writeArray(validate)); // validate and print result
Output:
➝  node readArray
OK

child

Under the hood

Example

var cp = require('child_process')
  , fs = require('fs')
  , es = require('event-stream');

// same as: > cat thisfile | grep Stream
fs.createReadStream(__filename)
  .pipe(es.child(cp.exec('grep Stream')))
  .pipe(process.stdout);
Output:
➝  node child
// same as: > cat thisfile | grep Stream
fs.createReadStream(__filename)
  .pipe(es.child(cp.exec('grep Stream')))

wait

Under the hood

Example

We emit an array of characters via readArray and use wait to aggregate them into one string so we can then surround it using mapSync.

var es = require('event-stream');

es.readArray([ 'e', 'l', 'u', 'r', ' ', 's', 'm', 'a', 'e', 'r', 't', 's' ].reverse())
  .pipe(es.wait())
  .pipe(es.mapSync(function (data) { return '"' + data + '!"'; }))
  .pipe(process.stdout);
Output:

I leave this one to the reader to find out :)

pipeline

Under the hood

Example

someStream()
  .pipe(parse())         
  .pipe(stringify())      
  .pipe(process.stdout);

// can be rewritten as
es.pipeline(
    someStream()
  , parse()
  , stringify()
  , process.stdout
);

I hope this helps to shine some light on the power and inner workings of streams and associated modules.

Reading through these surely did hat for me, so I encourage you to do the same.

Be on the lookout for more posts about streaming, but in the meantime I recommend the following nodejs stream resources: