Correctly use Flink broadcast stream and record the failure of Flink to make checkpoints

Recently, when I was working on a project, I was involved in such a scenario that a relatively small table that will not be changed often should be used as a dimension table to match with the real-time flow. This table is a table in MySQL. My first reaction is to read this table for broadcasting.

Inelegant use of broadcast streams

The brief code is as follows:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val broadcastMysql = env.addSource(new SourceFromMySQL).map(x=>Poi(x._1,x._2)).broadcast(new MapStateDescriptor[String,Poi]("broadcast-state",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Poi] {})))
val kafkaDs = env.addSource(new FlinkKafkaConsumer011("flinkTest", new JSONKeyValueDeserializationSchema(false), properties))
val resultDs = kafkaDs.connect(broadcastMysql).process()

After I wrote it in this way, there was no problem in the local test, and the stream data and dimension table data could also be matched. However, when I felt that everything was all right and threw it into the cluster, there was a problem.

Write checkpoint failure caused by using broadcast stream

When I ran this task on the cluster, I found the following errors:

Checkpoint triggering task XXX of job XXX is not in state RUNNING but
FINISHED instead. Aborting checkpoint.

Then go to the Flink WebUI to check the Checkpoints information. It's also not available, but I clearly set ck. Why didn't I submit it successfully? After repeated investigation, the problem was finally identified:
Because I use the above method to read a bounded stream in MySQL to broadcast variables, when the program runs, when the program reads the table in mysql, this task will be marked as FINISHED, as follows:

And I went to check the source code: org apache. flink. runtime. checkpoint. The trigger checkpoint method of the checkpoint coordinator is not difficult to find:

// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
	for (int i = 0; i < tasksToTrigger.length; i++) {
			Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
			if (ee == null) {"Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
				throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
			} else if (ee.getState() == ExecutionState.RUNNING) {
				executions[i] = ee;
			} else {"Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
				throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);

We can see that the State of each Execution will be judged during checkpoint. When the State is not RUNNING, directly report checkpoint triggering task {} of job {} is not in State {} but {} instead Aborting checkpoint. And end trigger checkpoint.

Therefore, you can know that it is the usage of my broadcast stream that makes the State of an Execution final, so checkpoints fail.

Elegant use of broadcast streams

In this case, we want to use broadcast stream from the root of the problem, but we can't let ck fail. Then our broadcast stream task must be maintained in a running state.
Therefore, we usually broadcast an unbounded stream here. For example, the rule information is in one kafka topic, and then the detailed information is read in real time from another kafka topic, and the two are matched.

val rule = env.addSource(new FlinkKafkaConsumer011(topic1, new SimpleStringSchema(), kafkaPro1))
val event = env.addSource(new FlinkKafkaConsumer011(topic2, new SimpleStringSchema(), kafkaPro2))
val ruleBroadcast = rule.broadcast(new MapStateDescriptor[String,String]("broadcast-state",BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO))

Tags: Big Data flink

Posted by omanush on Sat, 21 May 2022 23:43:47 +0300