Learning from: https://www.cnblogs.com/ShadowFiend/p/11951948.html
sketch
WordCount (word count) has always been a classic case for getting started with big data. Below, we use java and scala to implement Flink's WordCount code;
Adopt IDEA + Maven + Flink environment; pom documents and summary of relevant technical points are attached at the end of the paper;
Java implementation Flink batch version
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountBatchByJava { public static void main(String[] args) throws Exception { // Create execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Load or create source data DataSet<String> text = env.fromElements("this a book", "i love china", "i am chinese"); // Transform processing data DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1); // Output data to destination ds.print(); // Perform task operations // Because it is a Batch operation, when the DataSet calls the print method, the excete method has been called inside the source code, so it will not be called here. If it is called, an error will occur //env.execute("Flink Batch Word Count By Java"); } static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word:line.split(" ")) { collector.collect(new Tuple2<>(word,1)); } } } }
The operation output results are as follows:
(a,1) (am,1) (love,1) (china,1) (this,1) (i,2) (book,1) (chinese,1)
Java implementation of Flink stream processing version
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WordCountStreamingByJava { public static void main(String[] args) throws Exception { // Create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set socket data source DataStreamSource<String> source = env.socketTextStream("192.168.1.111", 9999, "\n"); // Transform processing data DataStream<WordWithCount> dataStream = source.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String line, Collector<WordWithCount> collector) throws Exception { for (String word : line.split(" ")) { collector.collect(new WordWithCount(word, 1)); } } }).keyBy("word")//Group statistics by key .timeWindow(Time.seconds(2),Time.seconds(2))//Set a window function to simulate data flow .sum("count");//Calculate the number of words in the time window // Output data to destination dataStream.print(); // Perform task operations env.execute("Flink Streaming Word Count By Java"); } public static class WordWithCount{ public String word; public int count; public WordWithCount(){ } public WordWithCount(String word, int count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
Start a shell window, connect 9999 port, and enter data:
[root@spark111 flink-1.6.2]# nc -l 9999 Shandong Tianjin Beijing Hebei Henan Shandong Shanghai Beijing Shandong Hainan Qinghai Tibet Sichuan Hainan
IDEA output results are as follows:
4> WordWithCount{word='Beijing', count=2} 1> WordWithCount{word='Shanghai', count=1} 5> WordWithCount{word='Tianjin', count=1} 4> WordWithCount{word='Henan', count=1} 7> WordWithCount{word='Shandong', count=2} 3> WordWithCount{word='Hebei', count=1} ------------------------In order to distinguish the results of the time window before and after, this line is added manually-------------------------- 8> WordWithCount{word='Hainan', count=2} 8> WordWithCount{word='Sichuan', count=1} 7> WordWithCount{word='Shandong', count=1} 1> WordWithCount{word='Tibet', count=1} 5> WordWithCount{word='Qinghai', count=1}
Scala implements Flink batch version
import org.apache.flink.api.scala._ import org.apache.flink.api.scala.ExecutionEnvironment object WordCountBatchByScala { def main(args: Array[String]): Unit = { //Get execution environment val env = ExecutionEnvironment.getExecutionEnvironment //Load data source val source = env.fromElements("china is the best country","beijing is the capital of china") //Transform processing data val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) //Output to destination ds.print() // Perform operations // Because it is a Batch operation, when the DataSet calls the print method, the excete method has been called inside the source code, so it will not be called here. If it is called, an error will occur //env.execute("Flink Batch Word Count By Scala") } }
The operation results are as follows:
(is,2) (beijing,1) (the,2) (china,2) (country,1) (of,1) (best,1) (capital,1)
Scala implements the Flink stream processing version
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time object WordCountStreamingByScala { def main(args: Array[String]): Unit = { //Get execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment //Load or create a data source val source = env.socketTextStream("192.168.1.111",9999,'\n') //Transform processing data val dataStream = source.flatMap(_.split(" ")) .map((_,1)) .keyBy(0) .timeWindow(Time.seconds(2),Time.seconds(2)) .sum(1) //Output to destination dataStream.print() //Perform operations env.execute("Flink Streaming Word Count By Scala") } }
Start the shell window, open 9999 port communication, and enter the words:
[root@spark111 flink-1.6.2]# nc -l 9999 time is passed what is the time? time is nine time passed again
The operation results are as follows:
4> (what,1) 5> (time,1) 8> (is,2) 5> (time?,1) 8> (passed,1) 5> (the,1) ------------------------In order to distinguish the results of the time window before and after, this line is added manually-------------------------- 8> (is,1) 5> (time,2) 8> (passed,1) 7> (nine,1) 6> (again,1)
POM file
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ssrs</groupId> <artifactId>flinkdemo</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <hadoop.version>2.8.4</hadoop.version> <flink.version>1.6.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> </project>
summary
-
The task flow of flink is as follows:
① obtain execution Environment
② load or create data source
③ transformation
④ output destination (sink)
⑤ execute
-
In batch processing, if the print command executed by the output destination (in addition to the count and collect methods), the Execute task does not need to be called (because the Execute method has been called inside these methods); If called, although there are correct results, there will be error information output; The error is as follows:
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:85) at com.ssrs.WordCountBatchByJava.main(WordCountBatchByJava.java:27)
-
If the output destination calls writeAsCsv, writeAsText and other methods in the batch code, Execute needs to be called later;
-
The execution environment is used for batch processing to obtain the execution environment, and the StreamExecutionEnvironment is used for stream processing to obtain the execution environment
-
The data after batch processing is DataSet, and the data after stream processing is DataStream