Streams in Node.js, part 1: Basic concepts

When I started with Node.js, I started with the context of a lot of different programming environments from Objective C to C# to Bash.  Each of these has a notion of processing a large data sets by operating on little bits at a time, and I expected to find something similar in Node.  However, given Node’s way of embracing the asynchronous, I’d expected it to be something quite different.

What I found was actually more straight-forward than I’d expected.  In a typical stream metaphor, you have sources which produce data, filters which modify data, and sinks which consume data.  In Node.js, these are represented by three classes from the stream module: Readable, Transform and Writable.  Each of them is very simple to override to create your own, and the result is a very nicely factored set of classes.

Overriding Readable

As the “source” part of the stream metaphor, Readable subclasses are expected to provide data.  Any Readable can have data pushed into it manually by calling the push method.  The addition of new data immediately triggers the appropriate events which makes the data trickle downstream to any listeners.

When making your own Readable, you override the psuedo-hidden _read(size) function.  This is called by the machinery of the stream module whenever it determines that more data is needed from your class.  You then do whatever it is that you have to do to get the data and end by calling the push method to make it available to the underlying stream machinery.

You don’t have to worry about pushing too much data (multiple calls to push are handled gracefully), and when you’re done, you just push null to end the stream.

Here’s a simple Readable (in CoffeeScript) which returns words from a given sentence:

class Source extends Readable
    constructor: (sentence)->
        @words = sentence.split ' '
        @index = 0
    _read: ->
        if @index < @words.length
            @push @words[index]
        else
            @push null

Overriding Writable

The Writable provides the “sink” part of the stream metaphor.  To create one of your own, you only need to override the _write(chunk, encoding, callback) method.  The chunk argument is the data itself (typically a Buffer with some bytes in it).  The encoding argument tells you the encoding of the bytes in the chunk argument if it was translated from a String.  Finally, you are expected to call callback when you’re finished (with an error if something went wrong).

Overriding Writable is about as easy as it gets.  Your _write method will be called whenever new data arrives, and you just need to deal with it as you like.  The only slight complexity is that, depending up on how you set up the stream, your may either get a Buffer, String, or a plain JavaScript object, and you may need to be ready to deal with multiple input types.  Here’s a simple example which accepts any type of data and writes it to the console:

class Sink extends Writable

    _write: (chunk, encoding, callback)->
        if Buffer.isBuffer chunk
            text = chunk.toString encoding
        else if typeof(chunk) is 'string'
            text = chunk
        else
            text = chunk.toString()

        console.log text
        callback()

Overriding Transform

A Transform fits between a source and a sink, and allows you to transform the data in any way you like.  For example, you might have a stream of binary data flowing through a Transform which compresses the data, or you might have a text stream flowing through a Transform which capitalizes all the letters.

Transforms don’t actually have to output data each time they receive data, however.  So, you could have a Transform which breaks up a incoming binary stream into lines of text by buffering enough raw data until a full line is received, and only at that point, emitting the string as a result.  In fact, you could even have a Transform which merely counts the lines, and only emits a single integer when the end of the stream is reached.

Fortunately, creating your own Transform is nearly the same as writing a class which implements both Readable and Writable.  However, in this case instead of overriding the _write(chunk, encoding, callback) method, you override the _transform(chunk, encoding, callback) method.  And, instead of overriding the _read method to gather data in preparation for calling push, you simply call push from within your _transform method.

Here’s a small example of a transform which capitalizes letters:

class Capitalizer extends Transform
    _transform: (chunk, encoding, callback)->
        text = chunk.toString encoding
        text = text.toUpperCase()
        @push text
        callback()

✧✧✧

All this is very interesting, but hardly unique to the Node.js platform. Where things get really interesting is when you start dealing with Object streams. I’ll talk more about those in a future post.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s