The first two ( Basics and Advanced ) mainly introduces the basic usage and principles of streams. From the perspective of application, this article introduces how to use pipelines for programming. The main contents include:
- pipe concept
- Browserify's pipeline design
- Gulp's Pipeline Design
- Comparison of Two Pipeline Design Patterns
- example
The so-called "pipe" refers to the combination of multiple Stream objects connected in the form of a.pipe(b).
If there are two Transform s: bold and red, some keywords in the text stream can be bolded and reddish respectively. Text can be bold and reddish at the same time as follows:
// source: input stream // dest: output destination source.pipe(bold).pipe(red).pipe(dest)
bold.pipe(red) can be regarded as a pipeline, and the input stream is transformed by bold and red and then output.
But if this bold and reddish feature has a wide range of application scenarios, the way we expect to use it is:
// source: input stream // dest: output destination // pipeline: bold and red source.pipe(pipeline).pipe(dest)
At this point, the pipeline encapsulates bold.pipe(red), which is also called a pipeline logically. Its implementation can be simplified to:
var pipeline = new Duplex() var streams = pipeline._streams = [bold, red] // The underlying write logic: write data to the first Stream of the pipeline, that is, bold pipeline._write = function (buf, enc, next) { streams[0].write(buf, enc, next) } // The underlying read logic: read data from the last Stream (ie red) of the pipeline pipeline._read = function () { var buf var reads = 0 var r = streams[streams.length - 1] // empty the cache while ((buf = r.read()) !== null) { pipeline.push(buf) reads++ } if (reads === 0) { // The cache is originally empty, waiting for the arrival of new data r.once('readable', function () { pipeline._read() }) } } // Combine individual Stream s (here equivalent to `bold.pipe(red)`) streams.reduce(function (r, next) { r.pipe(next) return next })
When writing data to the pipeline, the data is directly written to bold, then flows to red, and finally read from red when reading data from the pipeline.
If you need to add an underline Stream in the middle, you can:
pipeline._streams.splice(1, 0, underline) bold.unpipe(red) bold.pipe(underline).pipe(red)
If you want to replace red with green, you can:
// delete red pipeline._streams.pop() bold.unpipe(red) // add green pipeline._streams.push(green) bold.pipe(green)
It can be seen that all aspects of this pipeline can be modified.
stream-splicer The above logic is further encapsulated, and methods such as splice, push, and pop are provided, so that the pipeline can be modified like an array:
var splicer = require('stream-splicer') var pipeline = splicer([bold, red]) // add underline in the middle pipeline.splice(1, 0, underline) // delete red pipeline.pop() // add green pipeline.push(green)
labeled-stream-splicer On this basis, the function of using names to replace subscripts is added:
var splicer = require('labeled-stream-splicer') var pipeline = splicer([ 'bold', bold, 'red', red, ]) // add underline before `red` pipeline.splice('red', 0, underline) // remove `bold` pipeline.splice('bold', 1)
Since the pipeline itself, like its various links, is also a Stream object, it can be nested:
var splicer = require('labeled-stream-splicer') var pipeline = splicer([ 'style', [ bold, red ], 'insert', [ comma ], ]) pipeline.get('style') // Get pipeline: [bold, red] .splice(1, 0, underline) // add underline
Browserify The function introduction can be seen in substack/browserify-handbook , the implementation of its core logic lies in the design of the pipeline:
var splicer = require('labeled-stream-splicer') var pipeline = splicer.obj([ // Record the data input to the pipeline, and write the recorded data directly when the pipeline is rebuilt. // It is used for situations like watch that need to be packaged multiple times 'record', [ this._recorder() ], // Dependency parsing, preprocessing 'deps', [ this._mdeps ], // Process JSON files 'json', [ this._json() ], // Delete the BOM in front of the file 'unbom', [ this._unbom() ], // Delete the `#!` line in front of the file 'unshebang', [ this._unshebang() ], // grammar check 'syntax', [ this._syntax() ], // Sorting to ensure stability of packed results 'sort', [ depsSort(dopts) ], // Deduplicate modules with the same content 'dedupe', [ this._dedupe() ], // Convert id from file path to number to avoid exposing system path information 'label', [ this._label(opts) ], // fire a dep event for each module 'emit-deps', [ this._emitDeps() ], 'debug', [ this._debug(opts) ], // package the module 'pack', [ this._bpack ], // More custom processing 'wrap', [], ])
Each module is represented by a row and is defined as follows:
{ // Unique ID of the module id: id, // The file path corresponding to the module file: '/path/to/file', // Module content source: '', // module dependencies deps: { // `require(expr)` expr: id, } }
Before the wrap stage, all stages process such a stream of objects and output such a stream except for pack. Some supplement some information in the row, some transform the information, and some just read and output. Generally, the source and deps content in the row are parsed in the deps stage.
A modification is provided below Browserify function of the pipeline.
var Transform = require('stream').Transform // Create a Transform object function through(write, end) { return Transform({ transform: write, flush: end, }) } // `b` is the Browserify instance // This plugin prints out the packing time function log(b) { // When watch needs to be repackaged, the entire pipeline will be rebuilt, so it must be re-modified b.on('reset', reset) // Modify the current pipeline reset() function reset () { var time = null var bytes = 0 b.pipeline.get('record').on('end', function () { // Start with the end of the record phase time = Date.now() }) // `wrap` is the last stage, after which the Transform that records the end moment is added b.pipeline.get('wrap').push(through(write, end)) function write (buf, enc, next) { // Cumulative size bytes += buf.length this.push(buf) next() } function end () { // Packing time var delta = Date.now() - time b.emit('time', delta) b.emit('bytes', bytes) b.emit('log', bytes + ' bytes written (' + (delta / 1000).toFixed(2) + ' seconds)' ) this.push(null) } } } var fs = require('fs') var browserify = require('browserify') var b = browserify(opts) // Apply plugin b.plugin(log) b.bundle().pipe(fs.createWriteStream('bundle.js'))
In fact, b.plugin(log) here directly executes log(b).
In the plug-in, you can modify any link in b.pipeline. therefore, Browserify It only retains the necessary functions, and the others are implemented by plug-ins, such as watchify,factor-bundle Wait.
In addition to the above plugin mechanism, Browserify There is also a set of Transform mechanism, that is, through b.transform(transform), you can add some Transforms for file content preprocessing. Preprocessing occurs in the deps stage. When the content of the module file is read, it will be processed by these Transform s, and then the dependency analysis will be done, such as babelify,envify.
Gulp The core logic is divided into two parts: task scheduling and file processing. Task scheduling is based on orchestrator , while file processing is based on vinyl-fs.
similar to Browserify the supplied module definition (represented by row), vinyl-fs File definitions are also provided ( vinyl object).
Browserify The pipeline handles the row stream, Gulp Pipeline processing vinyl flow:
gulp.task('scripts', ['clean'], function() { // Minify and copy all JavaScript (except vendor scripts) // with sourcemaps all the way down return gulp.src(paths.scripts) .pipe(sourcemaps.init()) .pipe(coffee()) .pipe(uglify()) .pipe(concat('all.min.js')) .pipe(sourcemaps.write()) .pipe(gulp.dest('build/js')); });
The pipeline created in the task starts in gulp.src and ends in gulp.dest, with several other Transform s (plugins) in between.
if with Browserify The pipeline comparison can be found Browserify is to identify a fully functional pipeline, while Gulp itself only provides the creation of vinyl flow and will vinyl A tool for writing streams to disk, and what goes through the middle of the pipeline is entirely up to the user. This is because there are no restrictions on what to do in the task, and file processing is only a common situation, and it is not necessary to use gulp.src and gulp.dest.
Browserify and Gulp Both rely on the concept of pipelines to implement the plug-in mechanism.
Browserify The module's data structure is defined, default pipelines are provided to process such data streams, and plugins can be used to modify the pipeline structure to customize processing behavior.
Gulp Although the data structure of the file is also defined, it only provides an interface for generating and consuming this data stream, and it is entirely up to the user to construct a processing pipeline through a plug-in.
When specifying specific processing requirements, it can be done like Browserify That way, a basic processing pipeline is constructed to provide the plugin mechanism. If you need a pipeline that implements arbitrary functions, you can do it like Gulp That way, only an abstraction of the data flow is provided.
This section implements a Git A tool for the warehouse to automatically generate changelog, see the complete code ezchangelog.
ezchangelog The input is the text stream generated by git log, and the output defaults to the text stream in markdown format, but it can be modified to any custom format.
Input hint:
commit 9c5829ce45567bedccda9beb7f5de17574ea9437 Author: zoubin <zoubin04@gmail.com> Date: Sat Nov 7 18:42:35 2015 +0800 CHANGELOG commit 3bf9055b732cc23a9c14f295ff91f48aed5ef31a Author: zoubin <zoubin04@gmail.com> Date: Sat Nov 7 18:41:37 2015 +0800 4.0.3 commit 87abe8e12374079f73fc85c432604642059806ae Author: zoubin <zoubin04@gmail.com> Date: Sat Nov 7 18:41:32 2015 +0800 fix readme add more tests
Output hint:
* [[`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c)] CHANGELOG ## [v4.0.3](https://github.com/zoubin/ezchangelog/commit/3bf9055) (2015-11-07) * [[`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e)] fix readme add more tests
In fact, what is needed is such a pipeline:
source.pipe(pipeline).pipe(dest)
It can be divided into two stages: * parse: parse the commit information from the input text stream * format: transform the commit stream into a text stream
By default, to get the markdown in the example, you need to parse out the sha1, date, message, and whether it is a tag of each commit. The format for defining a commit is as follows:
{ commit: { // commit sha1 long: '3bf9055b732cc23a9c14f295ff91f48aed5ef31a', short: '3bf9055', }, committer: { // commit date date: new Date('Sat Nov 7 18:41:37 2015 +0800'), }, // raw message lines messages: ['', ' 4.0.3', ''], // raw headers before the messages headers: [ ['Author', 'zoubin <zoubin04@gmail.com>'], ['Date', 'Sat Nov 7 18:41:37 2015 +0800'], ], // the first non-empty message line subject: '4.0.3', // other message lines body: '', // git tag tag: 'v4.0.3', // link to the commit. opts.baseUrl should be specified. url: 'https://github.com/zoubin/ezchangelog/commit/3bf9055', }
So there are:
var splicer = require('labeled-stream-splicer') pipeline = splicer.obj([ 'parse', [ // separated by line 'split', split(), // Generate commit object, parse out sha1 and date 'commit', commit(), // Parse out the tag 'tag', tag(), // Parse out the url 'url', url({ baseUrl: opts.baseUrl }), ], 'format', [ // Combine commit s into markdown text 'markdownify', markdownify(), ], ])
So far, the basic functions have been realized. Now wrap it up and provide a plugin mechanism.
function Changelog(opts) { opts = opts || {} this._options = opts // Create a pipeline this.pipeline = splicer.obj([ 'parse', [ 'split', split(), 'commit', commit(), 'tag', tag(), 'url', url({ baseUrl: opts.baseUrl }), ], 'format', [ 'markdownify', markdownify(), ], ]) // Apply plugin ;[].concat(opts.plugin).filter(Boolean).forEach(function (p) { this.plugin(p) }, this) } Changelog.prototype.plugin = function (p, opts) { if (Array.isArray(p)) { opts = p[1] p = p[0] } // Execute the plugin function and modify the pipeline p(this, opts) return this }
The above implementation provides two ways to apply plugins. One is to pass in through configuration, and the other is to create an instance and then call the plugin method, which is essentially the same.
For ease of use, it can also be simply packaged.
function changelog(opts) { return new Changelog(opts).pipeline }
Thus, it can be used as follows:
source.pipe(changelog()).pipe(dest)
This is very close to our expectations.
Now let's develop a plugin to modify the default rendering method.
var through = require('through2') function customFormatter(c) { // c is the `Changelog` instance // Add a transform that parses the author c.pipeline.get('parse').push(through.obj(function (ci, enc, next) { // parse the author name from: 'zoubin <zoubin04@gmail.com>' ci.committer.author = ci.headers[0][1].split(/\s+/)[0] next(null, ci) })) // replace the original rendering c.pipeline.get('format').splice('markdownify', 1, through.obj(function (ci, enc, next) { var sha1 = ci.commit.short sha1 = '[`' + sha1 + '`](' + c._options.baseUrl + sha1 + ')' var date = ci.committer.date.toISOString().slice(0, 10) next(null, '* ' + sha1 + ' ' + date + ' @' + ci.committer.author + '\n') })) } source .pipe(changelog({ baseUrl: 'https://github.com/zoubin/ezchangelog/commit/', plugin: [customFormatter], })) .pipe(dest)
With the same input, the output will be:
* [`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c) 2015-11-07 @zoubin * [`3bf9055`](https://github.com/zoubin/ezchangelog/commit/3bf9055) 2015-11-07 @zoubin * [`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e) 2015-11-07 @zoubin
As can be seen, by creating a modifiable pipeline, ezchangelog It maintains the singleness of its own logic, while providing a powerful space for customization.