Flink learning from 0 to 1 - Chapter 5 Flink stream processing API

1. Flink stream processing API

1.1 Environment

1.1.1 getExecutionEnvironment

Create an execution environment that represents the context of the current executing program. If the program is called independently, this method returns to the local execution environment; If the program is called from the command-line client to submit to the cluster, this method returns the execution environment of the cluster, that is, getExecutionEnvironment will determine the returned operation environment according to the query operation mode, which is the most commonly used way to create the execution environment.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

If the parallelism is not set, the configuration in flink-conf.yaml will prevail, and the default is 1.

parallelism.default: 1

1.1.2 createLocalEnvironment

To return to the local execution environment, you need to specify the default parallelism when calling.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1)

1.1.3 createRemoteEnvironment

Return to the cluster execution environment and submit the Jar to the remote server. JobManager needs to be specified when calling
And specify the Jar package to run in the cluster.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("jobManagerHost",6123,"jarPath")

1.2 Source

1.2.1 reading data from collection

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

case class SensorReading(id:String, timestamp: Long, temperature: Double)
object Sensor {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val strem: DataStream[SensorReading] = env.fromCollection(List(
      SensorReading("sensor_1", 1547718199, 35.80018327300259),
      SensorReading("sensor_6", 1547718201, 15.402984393403084),
      SensorReading("sensor_7", 1547718202, 6.720945201171228),
      SensorReading("sensor_10", 1547718205, 38.101067604893444)
    strem.print("Read data from collection")

1.2.2 reading data from files


1.2.3 take the data of kafka message queue as the source

kafka dependency needs to be added


Specific implementation:

kafka consumer configuration

def getPro():Properties = {
    val props = new Properties
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker_list)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer_group")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    //If a single message exceeds 1 MB, it is recommended to set it to 1.
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000")
    //The setting is slightly larger than the size of a single message.
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "1024")
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1024")
// Initialize environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Get kafka configuration
val properties = getPro()
// Create kafka consumer
val consumer = new FlinkKafkaConsumer011(topic,new SimpleStringSchema,properties)
// Specify kafka consumption location
// add data source
val dataStream: DataStream[String] = env.addSource(consumer)

1.2.4 custom Source

In addition to the above source data sources, we can also customize the source. All you need to do is pass in a SourceFunction. The specific calls are as follows:

val stream = env.addSource(new MySensorSource() )
package com.flink.scala

import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.collection.immutable
import scala.util.Random

class MySensorSource extends SourceFunction[SensorReading]{

  var running: Boolean = true
  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    // Initialize a random number generator
    val rand = new Random()
    var curTemp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(
      i => ("sensor_" + i, 65 + rand.nextGaussian() * 20)
    while (running) {
      // Update temperature value
      curTemp = curTemp.map(
        t => (t._1,t._2 + rand.nextGaussian())

      //Get current timestamp
      val curTime = System.currentTimeMillis()

        t => ctx.collect(SensorReading(t._1, curTime, t._2))

  override def cancel(): Unit = {
    running = false

1.3 Transform operator

1.3.1 map

val streamMap = steam.map(x => (x,1))

1.3.2 flatMap

Function signature of flatMap: def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
For example: flatMap(List(1,2,3))(i ⇒ List(i,i))
The result is a List(1,1,2,2,3,3),
And List("a, B", "C, D") flatMap(line⇒line.split(" "))
The result is a List(a, b, c, d).

val streamFlatMap = stream.flatMap (
    x => x.split(",")

1.3.3 Filter

val streamFilter = stream.filter(
    x => x == 1

1.3.4 keyBy

DataStream → KeyedStream: logically split a stream into disjoint partitions, each partition
The area contains elements with the same key and is implemented internally in the form of hash.

1.3.5 rolling aggregation

These operators can aggregate each tributary of KeyedStream.

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()

Note: the difference between max and maxBy is that max returns the maximum value in the stream, but maxBy returns the element with the maximum value. min and minBy are the same.

1.3.6 Reduce

KeyedStream → DataStream: an aggregation operation of grouped data stream, which combines the current element and the results of the last aggregation to generate a new value. The returned stream contains the results of each aggregation, rather than only the final results of the last aggregation.

val stream = env.readTextFile("/data.txt").map( data => {
    valdataArray = data.split(",")
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong,
.reduce( (x, y) =>SensorReading(x.id, x.timestamp +1, y.temperature) )

1.3.7 split and select

1. split

DataStream → SplitStream: split a DataStream into two or more datastreams according to some characteristics.

2. select

SplitStream → DataStream: get one or more datastreams from a SplitStream. Demand: the sensor data is divided into two streams according to the temperature (bounded by 30 degrees).

val splitStream = stream.split( sensorData => {
    if(sensorData.temperature >30)Seq("high")elseSeq("low")
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high","low")

1.3.8 connect and coMap

Graph Connect operator **DataStream,DataStream → ConnectedStreams * *: Connect two data streams that maintain their types. After the two data streams are connected, they are only placed in the same stream. Their internal data and forms remain unchanged, and the two streams are independent of each other.

Figure CoMap/CoFlatMap **ConnectedStreams → DataStream * *: acts on ConnectedStreams. Its function is the same as that of map and flatMap. Map and flatMap are processed for each Stream in ConnectedStreams respectively.
val warning = high.map( 
    sensorData => (sensorData.id,sensorData.temperature) 
val connected = warning.connect(low)
val coMap = connected.map(
    warningData => (warningData._1, warningData._2,"warning"),
    lowData => (lowData.id,"healthy")

1.3.9 union

Figure Union **DataStream → DataStream * *: union two or more datastreams to generate a new DataStream containing all DataStream elements.
// Print after merging
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)unionStream.print("union:::")

The difference between connect and union:

  1. The types of the two streams before the union must be the same, and the connect can be different. The coMap after the union
    And then adjust to become the same.
  2. connect can only operate two streams, and union can operate multiple streams.

1.4 supported data types

The Flink streaming application handles the flow of events represented by data objects. So inside Flink, we need to be able to handle these objects. They need to be serialized and deserialized in order to transmit them over the network; Or read them from the state backend, checkpoints, and savepoints. To do this effectively, Flink needs to know exactly what type of data the application is processing. Flink uses the concept of type information to represent data types and generates specific serializers, deserializers, and comparators for each data type.

Flink also has a type extraction system, which analyzes the input and return types of functions to automatically obtain type information, so as to obtain serializers and deserializers. However, in some cases, such as lambda
Functions or generic types need to explicitly provide type information in order to make the application work normally or improve its performance.

Flink supports all common data types in Java and Scala. The most widely used types are the following.

1.4.1 basic data type

Flink supports all Java and Scala basic data types, Int, Double, Long, String

val numbers:DataStream[Long]=env.fromElements(1L,2L,3L,4L)

1.4.2 Java and Scala tuples

val persons:DataStream[(String,Integer)]=env.fromElements(
persons.filter(p => p._2 > 18)

1.4.3 Scala sample classes

case class Person(name:String,age:Int)
persons.filter(p => p.age > 18)

1.4.4 Java simple object

public class Person {
    public String name;
    public int age;
    public Person () {}
    public Person(String name, int age) {
        this.name = name;
        this.age = age;

1.4.5 others (Arrays, List, Map, Enum, etc.)

Flink also supports some special purpose types in Java and Scala, such as ArrayList, HashMap, Enum and so on.

1.5 implementing UDF functions - finer grained control flow

1.5.1 function classes

Flink exposes the interfaces of all UDF functions (implemented as interfaces or abstract classes). For example, MapFunction, FilterFunction, ProcessFunction and so on.

The following example implements the FilterFunction interface:

class FilterFilter extends FilterFunction[String]{
val filterStream=stream.filter(new FlinkFilter)

You can also implement functions as anonymous classes

val filterStream = stream.filter(
	new RichFilterFuntion[String] {
        override def filter(value:String):Boolean={

The string "flink" of our filter can also be passed in as a parameter.

val filterStream = stream.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord:String) extends FilterFunction[String]{
    override def filter(value:String):Boolean={

1.5.2 anonymous functions

val filterStream = stream.filter(_.contains("flink"))

1.5.3 rich functions

"Rich function" is a function class interface provided by DataStream API. All Flink function classes have their rich versions. It is different from conventional functions in that it can obtain the context of the running environment and has some life-cycle methods, so it can realize more complex functions.

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  • ...

Rich Function has a concept of life cycle. Typical life cycle methods are:

  • The open() method is the initialization method of Rich Function. When an operator such as map or filter is called, open() will be called.
  • The close() method is the last method called in the life cycle to do some cleaning.
  • The getRuntimeContext() method provides some information about the RuntimeContext of the function, such as the parallelism of the function execution, the name of the task, and the state.
package com.flink.scala

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector

class MyFlatMap extends RichFlatMapFunction[Int ,(Int,Int)] {
  var subTaskIndex = 0;

  override def open(config: Configuration): Unit = {
    subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
    // todo can do some initialization work here, such as establishing an HDFS connection

  override def flatMap(in: Int, collector: Collector[(Int, Int)]): Unit = {
    if (in % 2 == subTaskIndex) {

  override def close(): Unit = {
    // todo can do some cleaning work here, such as closing the HDFS connection

1.6 Sink

Flink does not have a foreach method similar to spark, which allows users to iterate. Although there are external output operations, Sink should be used to complete them. Finally, the final output operation of the whole task is completed in the following way.

stream.addSink(new MySink())

The official provides part of the framework of sink. In addition, you need to customize the implementation of sink.

1.6.1 Sink Kafka

pom add dependency:


Add sink to main function:

stream.addSink(new FlinkKafkaProducer011[String]("localhost:9092","sinkTopic",new SimpleStringSchema()))


def getProp: Properties = {
    val props = new Properties
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "car_kafka")
    props.put(ProducerConfig.ACKS_CONFIG, "-1")
    props.put(ProducerConfig.RETRIES_CONFIG, "3")
    // The amount of message cache sent to each partition to improve throughput
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096)
    // The maximum time each message stays in the cache to control the delay
    props.put(ProducerConfig.LINGER_MS_CONFIG, 10000)
    // When the overall size of all cached messages exceeds this value, it will trigger the message to be sent to the server
    // Batch. Is ignored Size and linker MS limit.
    // The default value is 32 MB
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960)
val producerConfig = getProp
stream.addSink(new FlinkKafkaProducer011[String]("sinkTopic",new SimpleStringSchema(),producerConfig))

1.6.2 Sink Redis

pom add dependency:


Define a Mapper class of redis, which is used to define the command to be called when saving to redis:

class MyRedisMapper() extends RedisMapper[SensorReading] {
    *  Define the redis command to save to data
    * @return
    override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET,"aaaa")
    * Define the key saved to redis
    * @param t
    * @return
    override def getKeyFromData(t: SensorReading): String = t.id
    * Define the value saved to redis
    * @param t
    * @return
    override def getValueFromData(t: SensorReading): String = t.temperature.toString

Called in the main function:

val conf = new FlinkJedisPoolConfig.Builder()

stream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper))

1.6.4 Sink Elasticsearch

pom add dependency:

<!-- flink-connector-elasticsearch6 -->

Call in the main function:

package com.flink.scala

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

object ElasticSearchSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromCollection(List(
      new SensorReading("sensor_1", 1547718199, 35.80018327300259),
      new SensorReading("sensor_6", 1547718201, 15.402984393403084),
      new  SensorReading("sensor_7", 1547718202, 6.720945201171228),
      new SensorReading("sensor_10", 1547718205, 38.101067604893444)

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost",9200))
    val builder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading]() {
      override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        println("saving data: " + t)
        val json = new util.HashMap[String, String]()
        json.put("data", t.toString)
        val indexRequest =
        println("saved successfully")


    env.execute("es sink")

1.6.4 JDBC custom sink

pom add dependency:


Custom mysql sink:

package com.flink.scala

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class MysqlSink extends RichSinkFunction[SensorReading] {

  var conn : Connection = _
  var insertStmt : PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    val url = "jdbc:mysql://localhost:3306/db"
    val username = "root"
    val password = "123456"
    conn = DriverManager.getConnection(url,username,password)
    val insertSql = "insert into temperatures (sensor, temp) values (?,?)"
    insertStmt = conn.prepareStatement(insertSql)
    val updateSql = "update temperatures set temp = ? where sensor = ?"
    updateStmt = conn.prepareStatement(updateSql)

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    if (updateStmt.getUpdateCount == 0) {

  override def close(): Unit = {


Add in the main method:

dataStream.addSink(new MysqlSink())

Tags: Big Data flink

Posted by abhishekphp6 on Wed, 11 May 2022 16:00:44 +0300