Flink processing function practice 4: window processing

Welcome to my GitHub

https://github.com/zq2599/blog_demos

Content: classification and summary of all original articles and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc;

Link to Flink handling function practical series

  1. Learn more about the state operation of ProcessFunction (Flink-1.10)
  2. ProcessFunction
  3. KeyedProcessFunction class
  4. Processallwindowfunction
  5. Coprocessfunction (dual stream processing)

Overview of this article

This article is the fourth in the series of Flink processing function practice. The content is to learn the processing functions related to the following two windows:

    1. ProcessAllWindowFunction: process all elements in each window;
    1. ProcessWindowFunction: process all elements in each window of the specified key;

    About ProcessAllWindowFunction

    1. ProcessAllWindowFunction and Flink processing function Practice II: ProcessFunction class Similar to the ProcessFunction class in, they are all used to process the elements from the upstream. However, ProcessFunction is that each element executes the processElement method once, and ProcessAllWindowFunction is that each window executes the process method once (all elements in the window can be traversed in the method);
    2. The difference can be understood more vividly by comparing with the class diagram. In the following figure, ProcessFunction is on the left and ProcessAllWindowFunction is on the right:

    About ProcessWindowFunction

    1. ProcessWindowFunction is similar to KeyedProcessFunction, < font color = "red" > is the data of processing partition < / font >, but KeyedProcessFunction is to execute < font color = "blue" > processelement < / font > method once for each element, while ProcessWindowFunction is to execute < font color = "blue" > process < / font > method once for each window (all elements in the current window of the key can be traversed in the method);
    2. The differences can be understood more vividly by comparing with class diagrams. KeyedProcessFunction is on the left and ProcessWindowFunction is on the right of the figure below:

    1. There is another difference: processwindowfunction The input parameter of the process method has the key value of the partition, and keyedprocessfunction The input parameter of processelement method does not have this parameter, but requires context Getcurrentkey() to get the key value of the partition;

    matters needing attention

    For the process method of window processing function, take ProcessAllWindowFunction as an example, as shown in the red box below. Its input parameters can traverse all elements in the current window, which means that all elements of the current window are saved in heap memory, so < font color = "red" > please strictly control the memory usage of elements in the window in the design stage < / font > to avoid exhausting the heap memory of TaskManager node:

    Next, learn ProcessAllWindowFunction and ProcessWindowFunction through actual combat;

    Version information

    1. Development environment operating system: MacBook Pro 13 inch, macOS Catalina 10.15.4
    2. Development tool: IntelliJ IDEA 2019.3.2 (Ultimate Edition)
    3. JDK: 1.8.0_121
    4. Maven: 3.3.9
    5. Flink: 1.9.2

    Source download

    If you don't want to write code, the source code of the whole series can be downloaded from GitHub. The address and link information are shown in the table below( https://github.com/zq2599/blo...:

    name link remarks
    Project Home https://github.com/zq2599/blo... The project is on the home page of GitHub
    git warehouse address (https) https://github.com/zq2599/blo... The warehouse address of the source code of the project, https protocol
    git warehouse address (ssh) git@github.com:zq2599/blog_demos.git The warehouse address of the source code of the project, ssh protocol

    There are multiple folders in this git project. The application of this chapter is under the < font color = "blue" > flystudy < / font > folder, as shown in the red box below:

    How to practice ProcessAllWindowFunction

    Next, verify the ProcessAllWindowFunction function by:

    1. A tuple2 < string, integer > object is sent every 1 second. The f0 field of the object changes between aaa and bbb, and the f1 field is fixed to 1;
    2. Set the scrolling window for 5 seconds;
    3. The user-defined ProcessAllWindowFunction extension class is used to count the number of elements in each window and send the statistical results to downstream operators;
    4. The downstream operator prints out the statistical results;
    5. Check the sent data and statistical information to see whether they are consistent;

    Start coding

    1. Continue to use Flink processing function Practice II: ProcessFunction class The project flinkstudy created in this paper;
    2. Create a new ProcessAllWindowFunctionDemo class as follows:
    package com.bolingcavalry.processwindowfunction;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class ProcessAllWindowFunctionDemo {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // Use event time
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
            // The parallelism is 1
            env.setParallelism(1);
    
            // Set the data source. There are three elements in total
            DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
                @Override
                public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                    for(int i=1; i<Integer.MAX_VALUE; i++) {
                        // There are only aaa and bbb name s
                        String name = 0==i%2 ? "aaa" : "bbb";
    
                        // Use current time as timestamp
                        long timeStamp = System.currentTimeMillis();
    
                        // Print out the data and time stamp to verify the data
                        System.out.println(String.format("source,%s, %s\n",
                                name,
                                time(timeStamp)));
    
                        // Emit an element with a timestamp
                        ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
    
                        // Each launch is delayed by 1 second
                        Thread.sleep(1000);
                    }
                }
    
                @Override
                public void cancel() {
    
                }
            });
    
            // Divide the data with a 5-second rolling window, and then use ProcessAllWindowFunction
            SingleOutputStreamOperator<String> mainDataStream = dataStream
                    // 5 second scrolling window
                    .timeWindowAll(Time.seconds(5))
                    // Count the number of elements in the current window, and then sort the number and window start and end time into a string and send it to the downstream operator
                    .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>() {
                        @Override
                        public void process(Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) throws Exception {
                            int count = 0;
    
                            // iterable can access all data in the current window,
                            // Here is a simple process. Only the number of elements is counted
                            for (Tuple2<String, Integer> tuple2 : iterable) {
                                count++;
                            }
    
                            // Arranges the start and end time of the current window and the number of elements into a string
                            String value = String.format("window, %s - %s, %d\n",
                                    // Start time of the current window
                                    time(context.window().getStart()),
                                    // Current window end time
                                    time(context.window().getEnd()),
                                    // The total number of elements of the current key in the current window
                                    count);
    
                            // Transmit to downstream operator
                            collector.collect(value);
                        }
                    });
    
            // Print results. By analyzing the print information, check the data of the whole window that can process all key s in ProcessWindowFunction
            mainDataStream.print();
    
            env.execute("processfunction demo : processallwindowfunction");
        }
    
        public static String time(long timeStamp) {
            return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp));
        }
    }
    1. There are several points to note about ProcessAllWindowFunctionDemo:

    a. The timeWindowAll method is used to set the scrolling window;

    b. In the process method of the anonymous subclass of processallwindowfunction, context window(). Getstart() method can get the start time of the current window, and getEnd() method can get the end time of the current window;

    1. After coding, execute the ProcessAllWindowFunctionDemo class to verify the data, as shown in the following figure. Check the element details of one of the windows and the execution results of ProcessAllWindowFunction. It can be seen that it meets the expectations:

    1. ProcessAllWindowFunction has been understood. Next, try ProcessWindowFunction;

    How to practice ProcessWindowFunction

    Next, verify the ProcessWindowFunction function by:

    1. A tuple2 < string, integer > object is sent every 1 second. The f0 field of the object changes between aaa and bbb, and the f1 field is fixed to 1;
    2. Partition with f0 field as key;
    3. The partitioned data enters the 5-second scrolling window;
    4. User defined ProcessWindowFunction extension class. One of its functions is to count the number of elements in each window of each key and send the statistical results to downstream operators;
    5. The second function is to update the total number of elements of the current key and then save it in the state backend, which is to verify the state reading and writing ability of KeyedStream in the processing function;
    6. The downstream operator prints out the statistical results;
    7. Check the sent data and statistical information (each window and total are checked separately) to see whether they are consistent;

    Start coding

    1. Create a new processwindowfunctiondemo java:
    package com.bolingcavalry.processwindowfunction;
    
    
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class ProcessWindowFunctionDemo {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // Use event time
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
            // The parallelism is 1
            env.setParallelism(1);
    
            // Set the data source. There are three elements in total
            DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
                @Override
                public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                    int aaaNum = 0;
                    int bbbNum = 0;
    
                    for(int i=1; i<Integer.MAX_VALUE; i++) {
                        // There are only aaa and bbb name s
                        String name = 0==i%2 ? "aaa" : "bbb";
    
                        //Update the total number of aaa and bbb elements
                        if(0==i%2) {
                            aaaNum++;
                        } else {
                            bbbNum++;
                        }
    
                        // Use current time as timestamp
                        long timeStamp = System.currentTimeMillis();
    
                        // Print out the data and time stamp to verify the data
                        System.out.println(String.format("source,%s, %s,    aaa total : %d,    bbb total : %d\n",
                                name,
                                time(timeStamp),
                                aaaNum,
                                bbbNum));
    
                        // An element is emitted and time stamped
                        ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
    
                        // Each launch is delayed by 1 second
                        Thread.sleep(1000);
                    }
                }
    
                @Override
                public void cancel() {
    
                }
            });
    
            // Divide the data with a 5-second rolling window, and then use ProcessWindowFunction
            SingleOutputStreamOperator<String> mainDataStream = dataStream
                    // Take the f0 field of Tuple2 as the key. In this example, there are only aaa and bbb keys
                    .keyBy(value -> value.f0)
                    // 5 second scrolling window
                    .timeWindow(Time.seconds(5))
                    // Count the number of elements in the current window of each key, and then sort the key, number and window start and end time into a string and send it to the downstream operator
                    .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
    
                        // Custom status
                        private ValueState<KeyCount> state;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            // Initialization state, name is myState
                            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
                        }
    
                        @Override
                        public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) throws Exception {
    
                            // Gets the myState state of the current word from the backend
                            KeyCount current = state.value();
    
                            // If myState has never been assigned, it is initialized here
                            if (current == null) {
                                current = new KeyCount();
                                current.key = s;
                                current.count = 0;
                            }
    
                            int count = 0;
    
                            // iterable can access all data in the current window of the key,
                            // Here is a simple process. Only the number of elements is counted
                            for (Tuple2<String, Integer> tuple2 : iterable) {
                                count++;
                            }
    
                            // Update the total number of elements of the current key
                            current.count += count;
    
                            // Update status to backend
                            state.update(current);
    
                            // Sort the number of elements of the current key and its window, as well as the start and end time of the window into a string
                            String value = String.format("window, %s, %s - %s, %d,    total : %d\n",
                                    // Current key
                                    s,
                                    // Start time of the current window
                                    time(context.window().getStart()),
                                    // Current window end time
                                    time(context.window().getEnd()),
                                    // The total number of elements of the current key in the current window
                                    count,
                                    // Total number of current key occurrences
                                    current.count);
    
                            // Transmit to downstream operator
                            collector.collect(value);
                        }
                    });
    
            // Print results. By analyzing the print information, check the data of the whole window that can process all key s in ProcessWindowFunction
            mainDataStream.print();
    
            env.execute("processfunction demo : processwindowfunction");
        }
    
        public static String time(long timeStamp) {
            return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp));
        }
    
        static class KeyCount {
            /**
             * Partition key
             */
            public String key;
    
            /**
             * Total number of elements
             */
            public long count;
        }
    
    }
    1. There are several points to pay attention to in the above code:

    a. Static class keycount Java is a data structure used to store the total number of each key element;

    b. The timewindow method sets the scrolling window of the market to 5 seconds;

    c. Each Tuple2 element is partitioned with f0 as the key;

    d. The open method registers the user-defined state named myState;

    e. In the process method, state Value() gets the status of the current key, state Update (current) updates the status of the current key;

    1. Next, run the ProcessWindowFunctionDemo class to check the data, as shown in the following figure. In the process method, the statistics of the elements in the window are consistent with those printed by the data source, and the total number obtained from the backend is consistent with the statistics of the data source after accumulation:


    So far, the actual combat related to window processing in the processing function has been completed. If you are also learning Flink's processing function, I hope this article can give you some reference;

    You're not alone. Xinchen's original accompanies you all the way

    1. Java series
    2. Spring series
    3. Docker series
    4. kubernetes series
    5. Database + middleware series
    6. DevOps series

    Welcome to official account: programmer Xinchen

    Wechat search "programmer Xinchen". I'm Xinchen. I look forward to traveling with you in the Java World
    https://github.com/zq2599/blog_demos

    Tags: cloud computing

    Posted by sorin1982 on Sat, 07 May 2022 02:43:05 +0300