Fluent asynchronous programming - event loop, Isolate, Stream

Event loop, Isolate

Before we start, we need to understand that Dart is single threaded and that fluent depends on Dart

If you know event loop in js The whole process of asynchronous dart is well understood

Let's look at a piece of code first

import 'dart:async';

Future eventLoop() async{

print('A');

Future((){

print('F');

scheduleMicrotask((){print('H');});

Future((){

print('M');

}).then((_){

print('N');

});

}).then((_){

print('G');

});

Future((){

print('I');

}).then((_){

print('J');

});

scheduleMicrotask(text1);

scheduleMicrotask((){print('D');});

print('B');

}

void text1() {

print('C');

scheduleMicrotask((){print('E');});

Future((){

print('K');

}).then((_){

print('L');

});

}

Do you only see the output

The correct output sequence is A B C D E F G H I J K L M N

eventLoop

1. MicroTask queue

Micro task queue, which is usually added to the queue using the scheduleMicroTask method

This is something you don't have to use most of the time. For example, the scheduleMicroTask() method is only referenced seven times in the whole Flutter source code, so it's best to give priority to using the Event queue

2. Event queue

Asynchronous operations such as I/O, gestures, drawings, timers, streams, futures, etc. will enter the event queue

Using the event queue as much as possible can make the micro task queue shorter and reduce the possibility that the event queue is stuck

Code execution sequence

First of all, we know that dart is single threaded, so the code execution order of dart is:

  1. The synchronization code is executed in sequence
  2. When encountering asynchronous code, enter the corresponding queue first, and then continue to execute the following code
  3. When the synchronization code is executed, first look at the tasks in the MicroTask queue and complete the tasks in the MicroTask queue in turn
  4. After the tasks in MicroTask are executed, look at the tasks in the event queue. A task is listed in the event queue and then executed, and then return to the third step of the cycle until all queues are empty

Isolate

Isolate is the thread in Dart, and the code of fluent runs on root isolate by default

Isolate does not share memory in fluent. Different "isolate" communicate through "message".

import 'dart:async';

import 'dart:io';

import 'dart:isolate';

import 'package:flutter/foundation.dart';

import 'package:flutter/material.dart';

//A common entrance to Flutter application

//The async keyword is used in the main function because the created isolate is asynchronous

void main() async{

runApp(MyApp());

//asyncFibonacci function will create an isolate and return the running result

print(await asyncFibonacci(20));

}

//Here, take the calculation of Fibonacci sequence as an example. The returned value is Future because it is asynchronous

Future<dynamic> asyncFibonacci(int n) async{

//First, create a ReceivePort. Why do you want to create this?

//Because the parameters required to create isolate must have SendPort, which needs ReceivePort to create

final response = new ReceivePort();

//Start creating isolate, isolate The spawn function is isolate The code in dart_ Isolate is a function that we implement ourselves

//_ Isolate is a required parameter to create an isolate.

await Isolate.spawn(_isolate,response.sendPort);

//Get sendPort to send data

final sendPort = await response.first as SendPort;

//ReceivePort to receive the message

final answer = new ReceivePort();

//send data

sendPort.send([n,answer.sendPort]);

//Get data and return

return answer.first;

}

//Parameters required to create an isolate

void _isolate(SendPort initialReplyTo){

final port = new ReceivePort();

//binding

initialReplyTo.send(port.sendPort);

//monitor

port.listen((message){

//Get data and parse

final data = message[0] as int;

final send = message[1] as SendPort;

//Return results

send.send(syncFibonacci(data));

});

}

int syncFibonacci(int n){

return n < 2 ? n : syncFibonacci(n-2) + syncFibonacci(n-1);

}

Because Root isolate is responsible for rendering and UI interaction, what if we have a very time-consuming operation? We know that there is an event loop in the isolate. If a time-consuming task is running all the time, the subsequent UI operations will be blocked. Therefore, if we have time-consuming operations, we should put them in the isolate!

Stream

What is flow?

  • This big machine is StreamController, which is one of the ways to create streams.
  • StreamController has an entry called sink
  • sink can use the add method to put things in. After putting them in, they don't care anymore.
  • When something came in from sink, our machine began to work
  • The StreamController has an exit called stream
  • The machine will throw the product out of the exit after processing, but we don't know when it will come out, so we need to use the listen method to monitor the exit all the time
  • And when multiple items are put in, it will not disturb the order, but first in, first out

Use Stream

StreamController controller = StreamController();

//Monitor the exit of this stream. When data flows out, print the data

StreamSubscription subscription =

controller.stream.listen((data)=>print("$data"));

controller.sink.add(123);

// Output: 123

You need to give a method to the listen function of stream. The input parameter (data) of this method is the result generated after the processing of our StreamController. We listen to the exit and obtain this result (data). You can use a lambda expression here, or any other function

transform

If you need more control over the transformation, use the transform() method. It needs to be used with StreamTransformer.

StreamController<int> controller = StreamController<int>();

final transformer = StreamTransformer<int,String>.fromHandlers(

handleData:(value, sink){

if(value==100){

sink.add("Bingo");

}

else{ sink.addError('I haven't guessed yet. Let's try again');

}

});

controller.stream

.transform(transformer)

.listen(

(data) => print(data),

onError:(err) => print(err));

controller.sink.add(23);

//controller.sink.add(100);

// Output: I haven't guessed it yet. Try again

Streamtransformer < s, t > is the inspector of our stream. He is responsible for receiving the information passed by the stream, then processing it and returning a new stream.

  • S represents the input type of the previous stream. We input a number here, so it is int.
  • T represents the input type of the converted stream. What we add here is a String, so it is String.
  • handleData receives a value and creates a new stream and exposes sink, where we can convert the stream

Type of Stream

  • Single subscription streams
  • "broadcast" streams multi subscription streams

Single subscription stream

StreamController controller = StreamController();

controller.stream.listen((data)=> print(data));

controller.stream.listen((data)=> print(data));

controller.sink.add(123);

// Output: Bad state: Stream has already been listened to A single subscription stream cannot have multiple listeners.

A single subscription stream is allowed to have only one listener in the whole life cycle of the stream

Multi subscription stream

StreamController controller = StreamController();

//Convert a single subscription stream to a broadcast stream

Stream stream = controller.stream.asBroadcastStream();

stream.listen((data)=> print(data));

stream.listen((data)=> print(data));

controller.sink.add(123);

// Output: 123

Broadcast streams allow any number of listeners, and they can generate events regardless of whether there are listeners or not. Therefore, listeners who come in halfway will not receive the previous message.

reference resources Learn more about isolate
Permanent original address Fluent asynchronous programming

Tags: Front-end iOS Android Flutter dart

Posted by WinterDragon on Mon, 16 May 2022 23:21:38 +0300