Learn about node JS (Stream)

Node. The Stream in JS is notoriously difficult to use or even understand.

In the words of Dominic Tarr, "flow is the best and most misunderstood idea in Node." Even the creator of Redux and react Dan Abramov, a core team member of JS, is also afraid of Node flow.

This article will help you understand streams and how to use them. Don't be afraid, you can figure it out!

What is a Stream?

A Stream is a node JS application is one of the basic concepts to provide power. They are data processing methods, which are used to read the input data sequentially or write the data to the output.

Streaming is a way to deal with reading and writing files, network communication or any type of end-to-end information exchange in an effective way.

The processing method of stream is very unique. Instead of reading all the files into the memory at one time as in the traditional way, stream reads the data blocks and processes the contents of the data segment by segment, and does not keep them all in the memory.

This method makes the stream very powerful when processing a large amount of data. For example, the size of the file may be larger than the available memory space, so the whole file cannot be read into memory for processing. That's where Liu can use it!

Streams can be used to process smaller data blocks or read larger files.

Take "streaming" services like YouTube or Netflix as an example: these services don't let you download video and audio files immediately. Instead, your browser receives the video in a continuous block stream, so that the recipient can start watching and listening almost immediately.

However, streaming involves not only processing media and big data. They also give us the power of "composability" in the code. A design that considers composability means that multiple components can be combined in some way to produce the same type of results. On node JS, you can transfer data in other smaller code segments through flow, so as to form powerful code segments.

Why use streaming?

Compared with other data stream processing methods, it has two main advantages:

  1. Memory efficiency: you can process without loading a large amount of data into memory in advance
  2. Time efficiency: the time required to start processing immediately after getting the data is greatly reduced. It is not necessary to wait until the whole valid data is sent

Node.js has four streams:

  1. Writable stream: a stream to which data can be written. For example, FS Createwritestream () enables us to write data to a file using a stream.
  2. Readable stream: a stream from which data can be read. For example: FS Createreadstream () lets us read the contents of the file.
  3. Duplex stream (read-write stream): a stream that is readable and writable. For example, net Socket
  4. Transform: data can be modified or transformed during writing and reading. For example, in the case of file compression, you can write compressed data in the file or read the decompressed data from the file.

If you have used node JS, you may encounter overcurrent. For example, based on node JS HTTP server, request , is a readable stream and , response , is a writable stream. You may have used the fs} module, which allows you to stream files with both readable and writable files. Whenever you use Express, you are using streaming to interact with the client, and because TCP sockets, TLS stack and other connections are based on node JS, so use streams in every database connection driver that can be used.

example

How do I create a readable stream?

The readability stream is required first and then initialized.

const Stream = require('stream')
const readableStream = new Stream.Readable()

Now that the stream is initialized, you can send data to it:

readableStream.push('ping!')
readableStream.push('pong!')

Asynchronous iterator

It is strongly recommended to work with asynchronous iterators when using streams. According to Axel Rauschmayer According to the doctor, asynchronous iteration is a protocol for asynchronously retrieving the contents of data containers (which means that the current "task" can be suspended before retrieving items). It must also be mentioned that the stream asynchronous iterator implementation uses an internal "readable" event.

When reading from a readable stream, you can use asynchronous iterators:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'

You can also use strings to collect the contents of readable streams:

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

Note that asynchronous functions must be used in this case because we want to return Promise.

Please remember not to mix the asynchronous function with EventEmitter, because the rejection cannot be captured when it is issued in the event handler, which makes it difficult to track errors and memory leaks. The current best practice is to always wrap the contents of asynchronous functions in try/catch blocks and handle errors, but this is error prone.   This pull request It aims to solve the problems once it falls on the core of the Node.

To learn about asynchronous iteration node For more information about JS streams, please see This is a great article.

Readable.from(): create a readable stream from an iteratable object

stream. Readable. From (iteratable, [options]) this is a practical way to create a readable stream from an iterator that holds the data contained in an iteratable object. The iteratable object can be a synchronous iteratable object or an asynchronous iteratable object. Parameter options are optional and can be used to specify text encoding among other functions.

const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
});

Two reading modes

According to Streams API , readable streams run effectively in one of two modes: flowing and paused. Readable streams can be in object mode, whether in flowing mode or paused mode.

  • In streaming mode, the data will be automatically read from the underlying system and provided to the program as soon as possible through the EventEmitter interface.
  • In # paused mode, you must explicitly call # stream The read () method to read the data block from the stream.

In the flowing mode, to read data from the stream, you can listen for data events and attach callbacks. When a large amount of data is available, the readable stream will issue a data event and execute your callback. Look at the following code snippet:

var fs = require("fs");
var data = '';

var readerStream = fs.createReadStream('file.txt'); //Create a readable stream

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 

// Handle stream events --> data, end, and error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function() {
   console.log(data);
});

readerStream.on('error', function(err) {
   console.log(err.stack);
});

console.log("Program Ended");

Function call FS Createreadstream () gives you a readable stream. Initially, the flow is in a static state. Once you listen for data events and attach callbacks, it starts flowing. The chunk of data will then be read and passed to your callback. The stream implementer decides how often to send data events. For example, an HTTP request may issue a data event whenever several kilobytes of data are read. When reading data from a file, you may decide to issue a data event after reading a line.

When there is no more data to read (end), the stream will issue an end event. In the above code snippet, we listen for this event to be notified at the end.

In addition, if there is an error, the flow issues and notifies the error.

In paused mode, you only need to call read() repeatedly on the stream instance until all data blocks are read, as shown in the following example:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;

readableStream.on('readable', function() {
    while ((chunk=readableStream.read()) != null) {
        data += chunk;
    }
});

readableStream.on('end', function() {
    console.log(data)
});

The read() function reads some data from the internal buffer and returns it. Returns null when there is nothing to read. So in the "while" loop, we check whether it is "null" and terminate the loop. Note that when a large amount of data can be read from the stream, a readable event will be issued.

All , Readable , streams start in , paused mode, but can be changed to , flowing mode by any of the following methods:

  • Add a 'data' event handler.
  • Call stream Resume() method.
  • Call stream The pipe () method sends data to a writable object.

Readable can switch all of the following methods back to paused mode:

  • If there is no pipe target, you can call stream Pause() method.
  • If there are pipe targets, delete all pipe targets. You can call stream Unpipe() method to delete multiple pipe targets.

An important concept to keep in mind is that "readable" will not generate data unless a mechanism is provided to consume or ignore the data. If the usage mechanism is disabled or cancelled, readable will attempt to stop generating data. Adding "readable" event processing will automatically stop the flow, and pass "read" Read() gets the data. If the 'readable' event processing is deleted, if there is' data 'event processing, the flow will start flowing again.

How do I create a writable stream?

To write data to a writable stream, you need to call write() on the stream instance. As shown in the following example:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
    writableStream.write(chunk);
});

The code above is very simple. It simply reads the data block from the input stream and writes it to the destination using {write(). This function returns a boolean indicating whether the operation was successful. If true, the write is successful and you can continue to write more data. If false is returned, it indicates that something is wrong and you can't write anything at present. The writable stream will notify you when you can start writing more data by issuing the draw event.

Call {Writable The end () method indicates that no more data will be written to Writable. If provided, the optional callback function is attached as a listener for the finish event.

// Write 'hello, ' and then end with 'world!'.
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!

You can use a writable stream to read data from a readable stream:

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

readableStream.pipe(writableStream)

readableStream.push('ping!')
readableStream.push('pong!')

writableStream.end()

Asynchronous iterators can also be used to write writable streams, which is recommended

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream. The default version of finished () is callback based, but it can be accessed through util Promise() is converted to promise based version (line A).

In this example, the following two modes are used:

Writing to a writable stream while handling backpressure (line B):
Write writable stream (line B) when processing backpressure:

if (!writable.write(chunk)) {
  await once(writable, 'drain');
}

Close the writable stream and wait for the write to complete (line C):

writable.end();
await finished(writable);

pipeline()

Another mechanism is to use pipeline as the input and output of pipeline. It is usually used to get data from one stream and pass the output of that stream to another. There are no restrictions on pipeline operation. In other words, pipelines can be used to process stream data in multiple steps.

On node 10 Stream. Is introduced in X pipeline(). This is a modular method for pipeline transmission between stream forwarding errors and correct cleaning, and provides a callback after the pipeline is completed.

This is an example of using pipes:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Using the pipeline API, you can easily integrate a series of streams
// Transmitted together through the pipeline and notified when the pipeline is fully completed.
// A pipeline that effectively compresses huge video files with gzip:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Because pipe , is not safe, use , pipeline , instead of , pipe.

Flow module

Node.js stream module Provides the basis for building all flow API s.

The stream module is a Node JS is a native module provided by default. Stream is an instance of EventEmitter class, which handles events asynchronously in Node. Therefore, flow is essentially event based.

To access a stream module:

const stream = require('stream');

The stream} module is useful for creating new stream instances. You usually do not need to use the stream module to consume streams.

Stream driven Node API

Because of their advantages, many nodes JS core module provides native stream processing function. The most noteworthy are:

  • net.Socket # is the main API node on which the flow is based. It is the basis of most of the following APIs
  • process.stdin returns the stream connected to stdin
  • process.stdout returns the stream connected to stdout
  • process.stderr # returns the stream connected to stderr
  • fs.createReadStream() creates a readable file stream
  • fs.createWriteStream() creates a writable file stream
  • net.connect() starts a flow based connection
  • http.request() returns http An instance of the clientrequest class, which is a writable stream
  • zlib.createGzip() compresses data into a stream using gzip, a compression algorithm
  • zlib.createGunzip() decompresses the gzip stream.
  • zlib.createDeflate() deflate (compression algorithm) compresses data into a stream
  • zlib.createInflate() decompresses a deflate stream

Stream memo:

See more: Node.js velocity lookup table

Here are some important events related to writable streams:

  • Error – indicates an error occurred while writing or configuring the pipeline.
  • pipeline – this event is emitted by the writable stream when the readable stream is passed to the writable stream.
  • Unpipe – emitted when you call unpipe on a readable stream and stop feeding it to the target stream.

conclusion

This is all about the basics of flow. Flows, pipes, and chains are nodes JS is the core and most powerful function. Streaming can really help you write concise and efficient code to perform I/O.

In addition, there is another thing worth looking forward to Node.js strategic plan , called BOB To improve node JS internal data flow and hope to be a node in the future The public API of JS stream data interface.

Original English Address: https://nodesource.com/blog/understanding-streams-in-nodejs

more web front end development Knowledge, please refer to HTML Chinese website!!

Tags: Javascript node.js Front-end

Posted by isurgeon on Wed, 18 May 2022 10:49:24 +0300