Node.js Stream - Actual Combat

The first two ( Basics and Advanced ) mainly introduces the basic usage and principles of streams. From the perspective of application, this article introduces how to use pipelines for programming. The main contents include:

  1. pipe concept
  2. Browserify's pipeline design
  3. Gulp's Pipeline Design
  4. Comparison of Two Pipeline Design Patterns
  5. example

The so-called "pipe" refers to the combination of multiple Stream objects connected in the form of a.pipe(b).

If there are two Transform s: bold and red, some keywords in the text stream can be bolded and reddish respectively. Text can be bold and reddish at the same time as follows:

// source: input stream
// dest: output destination
source.pipe(bold).pipe(red).pipe(dest)

bold.pipe(red) can be regarded as a pipeline, and the input stream is transformed by bold and red and then output.

But if this bold and reddish feature has a wide range of application scenarios, the way we expect to use it is:

// source: input stream
// dest: output destination
// pipeline: bold and red
source.pipe(pipeline).pipe(dest)

At this point, the pipeline encapsulates bold.pipe(red), which is also called a pipeline logically. Its implementation can be simplified to:

var pipeline = new Duplex()
var streams = pipeline._streams = [bold, red]

// The underlying write logic: write data to the first Stream of the pipeline, that is, bold
pipeline._write = function (buf, enc, next) {
  streams[0].write(buf, enc, next)
}

// The underlying read logic: read data from the last Stream (ie red) of the pipeline
pipeline._read = function () {
  var buf
  var reads = 0
  var r = streams[streams.length - 1]
  // empty the cache
  while ((buf = r.read()) !== null) {
    pipeline.push(buf)
    reads++
  }
  if (reads === 0) {
    // The cache is originally empty, waiting for the arrival of new data
    r.once('readable', function () {
      pipeline._read()
    })
  }
}

// Combine individual Stream s (here equivalent to `bold.pipe(red)`)
streams.reduce(function (r, next) {
  r.pipe(next)
  return next
})

When writing data to the pipeline, the data is directly written to bold, then flows to red, and finally read from red when reading data from the pipeline.

If you need to add an underline Stream in the middle, you can:

pipeline._streams.splice(1, 0, underline)
bold.unpipe(red)
bold.pipe(underline).pipe(red)

If you want to replace red with green, you can:

// delete red
pipeline._streams.pop()
bold.unpipe(red)

// add green
pipeline._streams.push(green)
bold.pipe(green)

It can be seen that all aspects of this pipeline can be modified.

stream-splicer The above logic is further encapsulated, and methods such as splice, push, and pop are provided, so that the pipeline can be modified like an array:

var splicer = require('stream-splicer')
var pipeline = splicer([bold, red])
// add underline in the middle
pipeline.splice(1, 0, underline)

// delete red
pipeline.pop()

// add green
pipeline.push(green)

labeled-stream-splicer On this basis, the function of using names to replace subscripts is added:

var splicer = require('labeled-stream-splicer')
var pipeline = splicer([
  'bold', bold,
  'red', red,
])

// add underline before `red`
pipeline.splice('red', 0, underline)

// remove `bold`
pipeline.splice('bold', 1)

Since the pipeline itself, like its various links, is also a Stream object, it can be nested:

var splicer = require('labeled-stream-splicer')
var pipeline = splicer([
  'style', [ bold, red ],
  'insert', [ comma ],
])

pipeline.get('style')     // Get pipeline: [bold, red]
  .splice(1, 0, underline) // add underline

Browserify The function introduction can be seen in substack/browserify-handbook , the implementation of its core logic lies in the design of the pipeline:

var splicer = require('labeled-stream-splicer')
var pipeline = splicer.obj([
    // Record the data input to the pipeline, and write the recorded data directly when the pipeline is rebuilt.
    // It is used for situations like watch that need to be packaged multiple times
    'record', [ this._recorder() ],
    // Dependency parsing, preprocessing
    'deps', [ this._mdeps ],
    // Process JSON files
    'json', [ this._json() ],
    // Delete the BOM in front of the file
    'unbom', [ this._unbom() ],
    // Delete the `#!` line in front of the file
    'unshebang', [ this._unshebang() ],
    // grammar check
    'syntax', [ this._syntax() ],
    // Sorting to ensure stability of packed results
    'sort', [ depsSort(dopts) ],
    // Deduplicate modules with the same content
    'dedupe', [ this._dedupe() ],
    // Convert id from file path to number to avoid exposing system path information
    'label', [ this._label(opts) ],
    // fire a dep event for each module
    'emit-deps', [ this._emitDeps() ],
    'debug', [ this._debug(opts) ],
    // package the module
    'pack', [ this._bpack ],
    // More custom processing
    'wrap', [],
])

Each module is represented by a row and is defined as follows:

{
  // Unique ID of the module
  id: id,
  // The file path corresponding to the module
  file: '/path/to/file',
  // Module content
  source: '',
  // module dependencies
  deps: {
    // `require(expr)`
    expr: id,
  }
}

Before the wrap stage, all stages process such a stream of objects and output such a stream except for pack. Some supplement some information in the row, some transform the information, and some just read and output. Generally, the source and deps content in the row are parsed in the deps stage.

A modification is provided below Browserify function of the pipeline.

var Transform = require('stream').Transform
// Create a Transform object
function through(write, end) {
  return Transform({
    transform: write,
    flush: end,
  })
}

// `b` is the Browserify instance
// This plugin prints out the packing time
function log(b) {
  // When watch needs to be repackaged, the entire pipeline will be rebuilt, so it must be re-modified
  b.on('reset', reset)
  // Modify the current pipeline
  reset()

  function reset () {
    var time = null
    var bytes = 0
    b.pipeline.get('record').on('end', function () {
      // Start with the end of the record phase
      time = Date.now()
    })

    // `wrap` is the last stage, after which the Transform that records the end moment is added
    b.pipeline.get('wrap').push(through(write, end))
    function write (buf, enc, next) {
      // Cumulative size
      bytes += buf.length
      this.push(buf)
      next()
    }
    function end () {
      // Packing time
      var delta = Date.now() - time
      b.emit('time', delta)
      b.emit('bytes', bytes)
      b.emit('log', bytes + ' bytes written ('
        + (delta / 1000).toFixed(2) + ' seconds)'
      )
      this.push(null)
    }
  }
}

var fs = require('fs')
var browserify = require('browserify')
var b = browserify(opts)
// Apply plugin
b.plugin(log)
b.bundle().pipe(fs.createWriteStream('bundle.js'))

In fact, b.plugin(log) here directly executes log(b).

In the plug-in, you can modify any link in b.pipeline. therefore, Browserify It only retains the necessary functions, and the others are implemented by plug-ins, such as watchify,factor-bundle Wait.

In addition to the above plugin mechanism, Browserify There is also a set of Transform mechanism, that is, through b.transform(transform), you can add some Transforms for file content preprocessing. Preprocessing occurs in the deps stage. When the content of the module file is read, it will be processed by these Transform s, and then the dependency analysis will be done, such as babelify,envify.

Gulp The core logic is divided into two parts: task scheduling and file processing. Task scheduling is based on orchestrator , while file processing is based on vinyl-fs.

similar to Browserify the supplied module definition (represented by row), vinyl-fs File definitions are also provided ( vinyl object).

Browserify The pipeline handles the row stream, Gulp Pipeline processing vinyl flow:

gulp.task('scripts', ['clean'], function() {
  // Minify and copy all JavaScript (except vendor scripts) 
  // with sourcemaps all the way down 
  return gulp.src(paths.scripts)
    .pipe(sourcemaps.init())
    .pipe(coffee())
    .pipe(uglify())
    .pipe(concat('all.min.js'))
    .pipe(sourcemaps.write())
    .pipe(gulp.dest('build/js'));
});

The pipeline created in the task starts in gulp.src and ends in gulp.dest, with several other Transform s (plugins) in between.

if with Browserify The pipeline comparison can be found Browserify is to identify a fully functional pipeline, while Gulp itself only provides the creation of vinyl flow and will vinyl A tool for writing streams to disk, and what goes through the middle of the pipeline is entirely up to the user. This is because there are no restrictions on what to do in the task, and file processing is only a common situation, and it is not necessary to use gulp.src and gulp.dest.

Browserify and Gulp Both rely on the concept of pipelines to implement the plug-in mechanism.

Browserify The module's data structure is defined, default pipelines are provided to process such data streams, and plugins can be used to modify the pipeline structure to customize processing behavior.

Gulp Although the data structure of the file is also defined, it only provides an interface for generating and consuming this data stream, and it is entirely up to the user to construct a processing pipeline through a plug-in.

When specifying specific processing requirements, it can be done like Browserify That way, a basic processing pipeline is constructed to provide the plugin mechanism. If you need a pipeline that implements arbitrary functions, you can do it like Gulp That way, only an abstraction of the data flow is provided.

This section implements a Git A tool for the warehouse to automatically generate changelog, see the complete code ezchangelog.

ezchangelog The input is the text stream generated by git log, and the output defaults to the text stream in markdown format, but it can be modified to any custom format.

Input hint:

commit 9c5829ce45567bedccda9beb7f5de17574ea9437
Author: zoubin <zoubin04@gmail.com>
Date:   Sat Nov 7 18:42:35 2015 +0800

    CHANGELOG

commit 3bf9055b732cc23a9c14f295ff91f48aed5ef31a
Author: zoubin <zoubin04@gmail.com>
Date:   Sat Nov 7 18:41:37 2015 +0800

    4.0.3

commit 87abe8e12374079f73fc85c432604642059806ae
Author: zoubin <zoubin04@gmail.com>
Date:   Sat Nov 7 18:41:32 2015 +0800

    fix readme
    add more tests

Output hint:

* [[`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c)] CHANGELOG

## [v4.0.3](https://github.com/zoubin/ezchangelog/commit/3bf9055) (2015-11-07)

* [[`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e)] fix readme

    add more tests

In fact, what is needed is such a pipeline:

source.pipe(pipeline).pipe(dest)

It can be divided into two stages: * parse: parse the commit information from the input text stream * format: transform the commit stream into a text stream

By default, to get the markdown in the example, you need to parse out the sha1, date, message, and whether it is a tag of each commit. The format for defining a commit is as follows:

{
  commit: {
    // commit sha1
    long: '3bf9055b732cc23a9c14f295ff91f48aed5ef31a',
    short: '3bf9055',
  },
  committer: {
    // commit date
    date: new Date('Sat Nov 7 18:41:37 2015 +0800'),
  },
  // raw message lines
  messages: ['', '    4.0.3', ''],
  // raw headers before the messages
  headers: [
    ['Author', 'zoubin <zoubin04@gmail.com>'],
    ['Date', 'Sat Nov 7 18:41:37 2015 +0800'],
  ],
  // the first non-empty message line
  subject: '4.0.3',
  // other message lines
  body: '',
  // git tag
  tag: 'v4.0.3',
  // link to the commit. opts.baseUrl should be specified.
  url: 'https://github.com/zoubin/ezchangelog/commit/3bf9055',
}

So there are:

var splicer = require('labeled-stream-splicer')
pipeline = splicer.obj([
  'parse', [
    // separated by line
    'split', split(),
    // Generate commit object, parse out sha1 and date
    'commit', commit(),
    // Parse out the tag
    'tag', tag(),
    // Parse out the url
    'url', url({ baseUrl: opts.baseUrl }),
  ],
  'format', [
    // Combine commit s into markdown text
    'markdownify', markdownify(),
  ],
])

So far, the basic functions have been realized. Now wrap it up and provide a plugin mechanism.

function Changelog(opts) {
  opts = opts || {}
  this._options = opts
  // Create a pipeline
  this.pipeline = splicer.obj([
    'parse', [
      'split', split(),
      'commit', commit(),
      'tag', tag(),
      'url', url({ baseUrl: opts.baseUrl }),
    ],
    'format', [
      'markdownify', markdownify(),
    ],
  ])

  // Apply plugin
  ;[].concat(opts.plugin).filter(Boolean).forEach(function (p) {
    this.plugin(p)
  }, this)
}

Changelog.prototype.plugin = function (p, opts) {
  if (Array.isArray(p)) {
    opts = p[1]
    p = p[0]
  }
  // Execute the plugin function and modify the pipeline
  p(this, opts)
  return this
}

The above implementation provides two ways to apply plugins. One is to pass in through configuration, and the other is to create an instance and then call the plugin method, which is essentially the same.

For ease of use, it can also be simply packaged.

function changelog(opts) {
  return new Changelog(opts).pipeline
}

Thus, it can be used as follows:

source.pipe(changelog()).pipe(dest)

This is very close to our expectations.

Now let's develop a plugin to modify the default rendering method.

var through = require('through2')

function customFormatter(c) {
  // c is the `Changelog` instance

  // Add a transform that parses the author
  c.pipeline.get('parse').push(through.obj(function (ci, enc, next) {
    // parse the author name from: 'zoubin <zoubin04@gmail.com>'
    ci.committer.author = ci.headers[0][1].split(/\s+/)[0]
    next(null, ci)
  }))

  // replace the original rendering
  c.pipeline.get('format').splice('markdownify', 1, through.obj(function (ci, enc, next) {
    var sha1 = ci.commit.short
    sha1 = '[`' + sha1 + '`](' + c._options.baseUrl + sha1 + ')'
    var date = ci.committer.date.toISOString().slice(0, 10)
    next(null, '* ' + sha1 + ' ' + date + ' @' + ci.committer.author + '\n')
  }))
}

source
  .pipe(changelog({
    baseUrl: 'https://github.com/zoubin/ezchangelog/commit/',
    plugin: [customFormatter],
  }))
  .pipe(dest)

With the same input, the output will be:

* [`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c) 2015-11-07 @zoubin
* [`3bf9055`](https://github.com/zoubin/ezchangelog/commit/3bf9055) 2015-11-07 @zoubin
* [`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e) 2015-11-07 @zoubin

As can be seen, by creating a modifiable pipeline, ezchangelog It maintains the singleness of its own logic, while providing a powerful space for customization.

Posted by mbdonner on Wed, 25 May 2022 14:08:44 +0300