Welcome to my GitHub
https://github.com/zq2599/blog_demos
Content: classification and summary of all original articles and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc;
Link to Flink handling function practical series
- Learn more about the state operation of ProcessFunction (Flink-1.10);
- ProcessFunction;
- KeyedProcessFunction class;
- Processallwindowfunction;
- Coprocessfunction (dual stream processing);
Overview of this article
This article is the fourth in the series of Flink processing function practice. The content is to learn the processing functions related to the following two windows:
- ProcessAllWindowFunction: process all elements in each window;
- ProcessWindowFunction: process all elements in each window of the specified key;
About ProcessAllWindowFunction
- ProcessAllWindowFunction and Flink processing function Practice II: ProcessFunction class Similar to the ProcessFunction class in, they are all used to process the elements from the upstream. However, ProcessFunction is that each element executes the processElement method once, and ProcessAllWindowFunction is that each window executes the process method once (all elements in the window can be traversed in the method);
- The difference can be understood more vividly by comparing with the class diagram. In the following figure, ProcessFunction is on the left and ProcessAllWindowFunction is on the right:
About ProcessWindowFunction
- ProcessWindowFunction is similar to KeyedProcessFunction, < font color = "red" > is the data of processing partition < / font >, but KeyedProcessFunction is to execute < font color = "blue" > processelement < / font > method once for each element, while ProcessWindowFunction is to execute < font color = "blue" > process < / font > method once for each window (all elements in the current window of the key can be traversed in the method);
- The differences can be understood more vividly by comparing with class diagrams. KeyedProcessFunction is on the left and ProcessWindowFunction is on the right of the figure below:
- There is another difference: processwindowfunction The input parameter of the process method has the key value of the partition, and keyedprocessfunction The input parameter of processelement method does not have this parameter, but requires context Getcurrentkey() to get the key value of the partition;
matters needing attention
For the process method of window processing function, take ProcessAllWindowFunction as an example, as shown in the red box below. Its input parameters can traverse all elements in the current window, which means that all elements of the current window are saved in heap memory, so < font color = "red" > please strictly control the memory usage of elements in the window in the design stage < / font > to avoid exhausting the heap memory of TaskManager node:
Next, learn ProcessAllWindowFunction and ProcessWindowFunction through actual combat;
Version information
- Development environment operating system: MacBook Pro 13 inch, macOS Catalina 10.15.4
- Development tool: IntelliJ IDEA 2019.3.2 (Ultimate Edition)
- JDK: 1.8.0_121
- Maven: 3.3.9
- Flink: 1.9.2
Source download
If you don't want to write code, the source code of the whole series can be downloaded from GitHub. The address and link information are shown in the table below( https://github.com/zq2599/blo...:
name | link | remarks |
---|---|---|
Project Home | https://github.com/zq2599/blo... | The project is on the home page of GitHub |
git warehouse address (https) | https://github.com/zq2599/blo... | The warehouse address of the source code of the project, https protocol |
git warehouse address (ssh) | git@github.com:zq2599/blog_demos.git | The warehouse address of the source code of the project, ssh protocol |
There are multiple folders in this git project. The application of this chapter is under the < font color = "blue" > flystudy < / font > folder, as shown in the red box below:
How to practice ProcessAllWindowFunction
Next, verify the ProcessAllWindowFunction function by:
- A tuple2 < string, integer > object is sent every 1 second. The f0 field of the object changes between aaa and bbb, and the f1 field is fixed to 1;
- Set the scrolling window for 5 seconds;
- The user-defined ProcessAllWindowFunction extension class is used to count the number of elements in each window and send the statistical results to downstream operators;
- The downstream operator prints out the statistical results;
- Check the sent data and statistical information to see whether they are consistent;
Start coding
- Continue to use Flink processing function Practice II: ProcessFunction class The project flinkstudy created in this paper;
- Create a new ProcessAllWindowFunctionDemo class as follows:
package com.bolingcavalry.processwindowfunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; import java.util.Date; public class ProcessAllWindowFunctionDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Use event time env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // The parallelism is 1 env.setParallelism(1); // Set the data source. There are three elements in total DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { for(int i=1; i<Integer.MAX_VALUE; i++) { // There are only aaa and bbb name s String name = 0==i%2 ? "aaa" : "bbb"; // Use current time as timestamp long timeStamp = System.currentTimeMillis(); // Print out the data and time stamp to verify the data System.out.println(String.format("source,%s, %s\n", name, time(timeStamp))); // Emit an element with a timestamp ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp); // Each launch is delayed by 1 second Thread.sleep(1000); } } @Override public void cancel() { } }); // Divide the data with a 5-second rolling window, and then use ProcessAllWindowFunction SingleOutputStreamOperator<String> mainDataStream = dataStream // 5 second scrolling window .timeWindowAll(Time.seconds(5)) // Count the number of elements in the current window, and then sort the number and window start and end time into a string and send it to the downstream operator .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>() { @Override public void process(Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) throws Exception { int count = 0; // iterable can access all data in the current window, // Here is a simple process. Only the number of elements is counted for (Tuple2<String, Integer> tuple2 : iterable) { count++; } // Arranges the start and end time of the current window and the number of elements into a string String value = String.format("window, %s - %s, %d\n", // Start time of the current window time(context.window().getStart()), // Current window end time time(context.window().getEnd()), // The total number of elements of the current key in the current window count); // Transmit to downstream operator collector.collect(value); } }); // Print results. By analyzing the print information, check the data of the whole window that can process all key s in ProcessWindowFunction mainDataStream.print(); env.execute("processfunction demo : processallwindowfunction"); } public static String time(long timeStamp) { return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp)); } }
- There are several points to note about ProcessAllWindowFunctionDemo:
a. The timeWindowAll method is used to set the scrolling window;
b. In the process method of the anonymous subclass of processallwindowfunction, context window(). Getstart() method can get the start time of the current window, and getEnd() method can get the end time of the current window;
- After coding, execute the ProcessAllWindowFunctionDemo class to verify the data, as shown in the following figure. Check the element details of one of the windows and the execution results of ProcessAllWindowFunction. It can be seen that it meets the expectations:
- ProcessAllWindowFunction has been understood. Next, try ProcessWindowFunction;
How to practice ProcessWindowFunction
Next, verify the ProcessWindowFunction function by:
- A tuple2 < string, integer > object is sent every 1 second. The f0 field of the object changes between aaa and bbb, and the f1 field is fixed to 1;
- Partition with f0 field as key;
- The partitioned data enters the 5-second scrolling window;
- User defined ProcessWindowFunction extension class. One of its functions is to count the number of elements in each window of each key and send the statistical results to downstream operators;
- The second function is to update the total number of elements of the current key and then save it in the state backend, which is to verify the state reading and writing ability of KeyedStream in the processing function;
- The downstream operator prints out the statistical results;
- Check the sent data and statistical information (each window and total are checked separately) to see whether they are consistent;
Start coding
- Create a new processwindowfunctiondemo java:
package com.bolingcavalry.processwindowfunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; import java.util.Date; public class ProcessWindowFunctionDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Use event time env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // The parallelism is 1 env.setParallelism(1); // Set the data source. There are three elements in total DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { int aaaNum = 0; int bbbNum = 0; for(int i=1; i<Integer.MAX_VALUE; i++) { // There are only aaa and bbb name s String name = 0==i%2 ? "aaa" : "bbb"; //Update the total number of aaa and bbb elements if(0==i%2) { aaaNum++; } else { bbbNum++; } // Use current time as timestamp long timeStamp = System.currentTimeMillis(); // Print out the data and time stamp to verify the data System.out.println(String.format("source,%s, %s, aaa total : %d, bbb total : %d\n", name, time(timeStamp), aaaNum, bbbNum)); // An element is emitted and time stamped ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp); // Each launch is delayed by 1 second Thread.sleep(1000); } } @Override public void cancel() { } }); // Divide the data with a 5-second rolling window, and then use ProcessWindowFunction SingleOutputStreamOperator<String> mainDataStream = dataStream // Take the f0 field of Tuple2 as the key. In this example, there are only aaa and bbb keys .keyBy(value -> value.f0) // 5 second scrolling window .timeWindow(Time.seconds(5)) // Count the number of elements in the current window of each key, and then sort the key, number and window start and end time into a string and send it to the downstream operator .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() { // Custom status private ValueState<KeyCount> state; @Override public void open(Configuration parameters) throws Exception { // Initialization state, name is myState state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class)); } @Override public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) throws Exception { // Gets the myState state of the current word from the backend KeyCount current = state.value(); // If myState has never been assigned, it is initialized here if (current == null) { current = new KeyCount(); current.key = s; current.count = 0; } int count = 0; // iterable can access all data in the current window of the key, // Here is a simple process. Only the number of elements is counted for (Tuple2<String, Integer> tuple2 : iterable) { count++; } // Update the total number of elements of the current key current.count += count; // Update status to backend state.update(current); // Sort the number of elements of the current key and its window, as well as the start and end time of the window into a string String value = String.format("window, %s, %s - %s, %d, total : %d\n", // Current key s, // Start time of the current window time(context.window().getStart()), // Current window end time time(context.window().getEnd()), // The total number of elements of the current key in the current window count, // Total number of current key occurrences current.count); // Transmit to downstream operator collector.collect(value); } }); // Print results. By analyzing the print information, check the data of the whole window that can process all key s in ProcessWindowFunction mainDataStream.print(); env.execute("processfunction demo : processwindowfunction"); } public static String time(long timeStamp) { return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp)); } static class KeyCount { /** * Partition key */ public String key; /** * Total number of elements */ public long count; } }
- There are several points to pay attention to in the above code:
a. Static class keycount Java is a data structure used to store the total number of each key element;
b. The timewindow method sets the scrolling window of the market to 5 seconds;
c. Each Tuple2 element is partitioned with f0 as the key;
d. The open method registers the user-defined state named myState;
e. In the process method, state Value() gets the status of the current key, state Update (current) updates the status of the current key;
- Next, run the ProcessWindowFunctionDemo class to check the data, as shown in the following figure. In the process method, the statistics of the elements in the window are consistent with those printed by the data source, and the total number obtained from the backend is consistent with the statistics of the data source after accumulation:
So far, the actual combat related to window processing in the processing function has been completed. If you are also learning Flink's processing function, I hope this article can give you some reference;
You're not alone. Xinchen's original accompanies you all the way
- Java series
- Spring series
- Docker series
- kubernetes series
- Database + middleware series
- DevOps series
Welcome to official account: programmer Xinchen
Wechat search "programmer Xinchen". I'm Xinchen. I look forward to traveling with you in the Java World
https://github.com/zq2599/blog_demos