Viewing successfully submitted offset indicator in Flink WebUI is negative

1. Problems

After submitting a flink task, I went to WebUi to check the offset indicator, and found that the value turned out to be a negative number (as shown below):

The definition given on the official website of this indicator is: For each partition, the offset of the last successful submission to Kafka; so this value is not normal anyway.

2. Conclusion

First give the conclusion of the problem (solution): The reason why such a value appears is because checkpoints are not set.

3. Detailed explanation

But why not set ck, this indicator will become negative? This requires us to dig into the source code to see

First, we locate this class:

org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher

/**
 * For each partition, register a new metric group to expose current offsets and committed offsets.
 * Per-partition metric groups can be scoped by user variables {@link KafkaConsumerMetricConstants#OFFSETS_BY_TOPIC_METRICS_GROUP}
 * and {@link KafkaConsumerMetricConstants#OFFSETS_BY_PARTITION_METRICS_GROUP}.
 *
 * <p>Note: this method also registers gauges for deprecated offset metrics, to maintain backwards compatibility.
 *
 * @param consumerMetricGroup The consumer metric group
 * @param partitionOffsetStates The partition offset state holders, whose values will be used to update metrics
 */
private void registerOffsetMetrics(
		MetricGroup consumerMetricGroup,
		List<KafkaTopicPartitionState<KPH>> partitionOffsetStates) {

	for (KafkaTopicPartitionState<KPH> ktp : partitionOffsetStates) {
		MetricGroup topicPartitionGroup = consumerMetricGroup
			.addGroup(OFFSETS_BY_TOPIC_METRICS_GROUP, ktp.getTopic())
			.addGroup(OFFSETS_BY_PARTITION_METRICS_GROUP, Integer.toString(ktp.getPartition()));

		topicPartitionGroup.gauge(CURRENT_OFFSETS_METRICS_GAUGE, new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
		topicPartitionGroup.gauge(COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));

		legacyCurrentOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
		legacyCommittedOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
	}
}

We can see in the registerOffsetMetrics method:

topicPartitionGroup.gauge(COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));

Obviously, the offset to be submitted is encapsulated in this object of ktp; then entering the object is a KafkaTopicPartitionState type:

public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
		this.partition = partition;
		this.kafkaPartitionHandle = kafkaPartitionHandle;
		this.offset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
		this.committedOffset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
	}
/** Magic number that defines an unset offset. */
	public static final long OFFSET_NOT_SET = -915623761776L;

Here we can see that the indicator committedOffset we want will be initialized with such a negative value at the beginning;
At this point, we seem to understand how the negative value seen in the web ui comes from, but why is the offset we submitted not assigned to the committedOffset indicator? Then we have to locate the problem to the code that submits the offset. Let's continue to look at this class:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

public final void notifyCheckpointComplete(long checkpointId) throws Exception {
		if (!running) {
			LOG.debug("notifyCheckpointComplete() called on closed source");
			return;
		}

		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
		if (fetcher == null) {
			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
			return;
		}

		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
			// only one commit operation must be in progress
			if (LOG.isDebugEnabled()) {
				LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
					getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
			}

			try {
				final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
				if (posInMap == -1) {
					LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
						getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
					return;
				}

				@SuppressWarnings("unchecked")
				Map<KafkaTopicPartition, Long> offsets =
					(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

				// remove older checkpoints in map
				for (int i = 0; i < posInMap; i++) {
					pendingOffsetsToCommit.remove(0);
				}

				if (offsets == null || offsets.size() == 0) {
					LOG.debug("Consumer subtask {} has empty checkpoint state.", getRuntimeContext().getIndexOfThisSubtask());
					return;
				}

				fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
			} catch (Exception e) {
				if (running) {
					throw e;
				}
				// else ignore exception if we are no longer running
			}
		}
	}

There is a judgment logic in the above method, and then when your commit mode uses ck, go to a commitInternalOffsetsToKafka method:

 if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS)
//......
fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);

After entering this method, I found that another doCommitInternalOffsetsToKafka method was called:

public final void commitInternalOffsetsToKafka(
			Map<KafkaTopicPartition, Long> offsets,
			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
		// Ignore sentinels. They might appear here if snapshot has started before actual offsets values
		// replaced sentinels
		doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
	}

Then it will jump to the doCommitInternalOffsetsToKafka method of org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher:

protected void doCommitInternalOffsetsToKafka(
			Map<KafkaTopicPartition, Long> offsets,
			@Nonnull KafkaCommitCallback commitCallback) throws Exception {

		@SuppressWarnings("unchecked")
		List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();

		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());

		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
			Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
			if (lastProcessedOffset != null) {
				checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");

				// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
				// This does not affect Flink's checkpoints/saved state.
				long offsetToCommit = lastProcessedOffset + 1;

				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
				partition.setCommittedOffset(offsetToCommit);
			}
		}

There is logic in it:

partition.setCommittedOffset(offsetToCommit);

Here, it is found that offsetToCommit has been reset, and the truth is revealed;

Summarizing it like this:
When flink submits offsetToCommit, it will judge the current mode

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS)

If ck is not currently used, the value of the offsetToCommit indicator has not been updated, and is the initial value of -915623761776L;
If ck is currently used, it will eventually go to a

partition.setCommittedOffset(offsetToCommit);

This logic updates the offsetToCommit indicator; so when ck is not set, the value on the webui is negative.

Tags: Big Data flink

Posted by jefrat72 on Wed, 18 May 2022 15:44:05 +0300