Debezium specifies binlog location to initialize exception troubleshooting and repair

Abnormal phenomenon

Recently, the project needs to use Debezium to specify the binlog location to read data. When FileDatabaseHistory is configured to save offset, an unrecognized schema exception will appear.

14:52:18.237 [blc-] ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Encountered change event 'Event{header=EventHeaderV4{timestamp=1652710509000, eventType=TABLE_MAP, serverId=123454, headerLength=19, dataLength=30, nextPosition=5761, flags=0}, data=TableMapEventData{tableId=375, database='etl', table='test', columnTypes=3, 15, columnMetadata=0, 64, columnNullability={0, 1}, eventMetadata=null}}' at offset {transaction_id=null, file=mysql-bin.000001, pos=5641, server_id=123454, event=1} for table etl.test whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=5712 --stop-position=5761 --verbose mysql-bin.000001
14:52:18.237 [blc-] ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Error during binlog processing. Last offset stored = {transaction_id=null, file=mysql-bin.000001, pos=5641, server_id=249, event=1}, binlog reader near position = mysql-bin.000001/5712
14:52:18.237 [blc-] ERROR io.debezium.pipeline.ErrorHandler - Producer failure
io.debezium.DebeziumException: Error processing binlog event
Caused by: io.debezium.DebeziumException: Encountered change event for table etl.test whose schema isn't known to this connector

How to reproduce

We configure the engine parameter database History is the FileDatabaseHistory class to save the binlog records we read. Specify its custom DebeziumOffset configuration offset through the flinkovetbackingstore class.

props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
props.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
props.setProperty("", FlinkOffsetBackingStore.class.getCanonicalName());
props.setProperty("snapshot.mode", "schema_only_recovery");

Put the snapshot The mode is configured as full + initial. There is no problem with the incremental mode, but the above exception will occur in the specified location (schema_only_recovery) mode.

Troubleshooting and positioning


Before specifying the location mode (schema_only_recovery), there is a reference to the flink cdc implementation. You can specify its custom DebeziumOffset through the FlinkOffsetBackingStore class. Instead, use database When configuring flinkdatabase history instead of FileDatabaseHistory, specifying the available binlog location will not throw an exception. Careful comparison of the implementation of the two classes is also difficult to see the reason.


  1. Because we know that flinkdatabase history can be executed normally, we use two databasehistories to test and compare the output logs respectively. It is found that flinkdatabase history has an important initialization step

  2. According to the log, you can search the MySQL snapshot changeeventsource class and find the log keyword output part
    "A previous offset indicating a completed"

  3. Following the log execution, we will find that in the upper layer, abstractsnapshotchangeeventsource Execute calls the following of the getSnapshottingTask method. There is a logic to judge skipping snapshot.
    The test will find that the logic behind this method is to read the snapshot and initialize the relevant library tables. In FileDatabaseHistory, the snapshottingtask Shouldskipsnapshot() is true and is skipped directly. And flinkdatabase history is false. Continue to execute below.

  4. So the question goes back to snapshottingtask The logic of shouldskipsnapshot(). If one of the snapshotSchema and snapshotData is true, initialization must be performed.

  5. The assignment of these attributes is through the constructor. Through the test, it is found that the above two kinds of snapshotData are false, only the snapshotSchema is different. And this constructor goes back to our step 1, the databaseschema of the two classes Isstorageinitializationexecuted() is different

  6. MySqlDatabaseSchema.isStorageInitializationExecuted() returns the value of storageinitializationexecuted, and it only calls the assignment in the initializeStorage() method.

  7. However, initializeStorage is called in several places.

  8. Through the breakpoint backtracking method stack, we can find that the call of flinkdatabase history is in MySQL connectortask Andloadhistory. The reason why FileDatabaseHistory is not called is because of schema Historyexists() returns true.

  9. Then check the implementation of historyExists, which is the method rewritten in DatabaseHistory.

  10. The existence of FileDatabaseHistory is actually to judge whether the specified file exists. Here we need to explain that we will still throw an exception when specifying a new file because FileDatabaseHistory The start () method will initialize and generate the corresponding file first, so subsequent calls will return true.

  11. Flinkdatabase history returns the Record list, which is naturally empty before no events are read in.

Combine flinkdatabase history and FileDatabaseHistory to re implement a DatabaseHistory, focusing on changing exists to! this.schemaRecords.isEmpty();

Tags: Database MySQL Big Data binlog debezium

Posted by jasonscherer on Wed, 18 May 2022 01:40:00 +0300