Installation and application of offline digital warehouse maxwell

maxwell synchronize incremental data

(1) Overview

Maxwell will monitor the data change operations of Mysql database in real time (including insert, update and delete), and send the changed data to Kafka, Kinesi and other stream data processing platforms in JSON format.

Maxwell's working principle is to read the binary log of MySQL database in real time, obtain the change data, and then send the change data to Kafka and other stream processing platforms in JSON format. Official website address: http://maxwells-daemon.io/

fieldexplain
databaseChange the database to which the data belongs
tableTable to which the data belongs
typeData change type
tsTime of data change
xidTransaction id
commitTransaction commit flag, which can be used to reassemble transactions
dataFor insert type, it indicates the inserted data; For update type, identify the modified data; For delete type, it indicates the deleted data
oldFor update type, it means the data before modification and only contains the change field

flink cdc reference: https://blog.csdn.net/qq_44665283/article/details/123674962?spm=1001.2014.3001.5501

(2)maxwell installation and use

Download address: https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

Note: Maxwell-1.30.0 and above no longer support jdk1 8.

Upload the installation package to the / opt/yyds/apps directory of the server16 node

Unzip the installation package to / opt/yyds/apps

[root@server16 apps]# tar -zxvf maxwell-1.29.2.tar.gz

Note: the Binlog of MySQL server is not enabled by default. If synchronization is required, it needs to be enabled first.

1) Modify MySQL configuration file / etc / my cnf

[root@server15 apps]# vim /etc/my.cnf

2) Add the following configuration

[mysqld]

#Database id
server-id = 1
#Start binlog, and the value of this parameter will be used as the file name of binlog
log-bin=mysql-bin
#binlog type, maxwell requires row type
binlog_format=row
#The database with binlog enabled needs to be modified according to the actual situation
binlog-do-db=gmall2022

Restart Mysql service

[root@server15 apps]# systemctl restart mysqld

Maxwell needs to store some data required during its operation in mysql, including the breakpoint location of binlog synchronization (Maxwell supports breakpoint continuation), so it needs to create a database and user for Maxwell in MySQL.

-- Create database
mysql> CREATE DATABASE maxwell;
Query OK, 1 row affected (0.00 sec)

-- Adjust password level
mysql> set global validate_password_policy=0;
mysql> set global validate_password_length=4;

-- establish Maxwell Users and give them the necessary permissions
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
[root@server16 maxwell-1.29.2]# cp config.properties.example config.properties

-- Modify profile
[root@server16 maxwell-1.29.2]# vim config.properties
[atguigu@hadoop102 maxwell]$ vim config.properties

#Maxwell data sending destination, optionally configured with stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#Target Kafka cluster address
kafka.bootstrap.servers=server15:9092,server16:9092,server17:9092
#The target Kafka topic can be configured statically, such as maxwell, or dynamically, such as% {database}% {table}
kafka_topic=maxwell

#MySQL related configuration
host=server15
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
-- start-up maxwell
[root@server16 ~]# /opt/yyds/apps/maxwell-1.29.2/bin/maxwell --config /opt/yyds/apps/maxwell-1.29.2/config.properties --daemon
Redirecting STDOUT to /opt/yyds/apps/maxwell-1.29.2/bin/../logs/MaxwellDaemon.out
-- stop it maxwell
[root@server16 ~]# ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill



-- establish kafka theme
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/kafka/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --zookeeper server15:2181 --topic maxwell

-- Exhibition kafka theme
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/kafka/bin/kafka-topics.sh --list --zookeeper server15:2181


-- start-up kafka consumer
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server server16:9092 --topic maxwell --from-beginning
-- insert data
{
    "database":"gmall2022",
    "table":"base_trademark",
    "type":"insert",
    "ts":1650938288,
    "xid":14691,
    "commit":true,
    "data":{
        "id":12,
        "tm_name":"aaa",
        "logo_url":null
    }
}
-- Modify data
{
    "database":"gmall2022",
    "table":"base_trademark",
    "type":"update",
    "ts":1650938351,
    "xid":15362,
    "commit":true,
    "data":{
        "id":12,
        "tm_name":"aaa",
        "logo_url":"bbb"
    },
    "old":{
        "logo_url":null
    }
}
-- Delete data
{
    "database":"gmall2022",
    "table":"base_trademark",
    "type":"delete",
    "ts":1650938398,
    "xid":15814,
    "commit":true,
    "data":{
        "id":12,
        "tm_name":"aaa",
        "logo_url":"bbb"
    }
}

Sometimes only incremental data is not enough. We may need to use a complete data set from history to now in MySQL database. This requires us to perform a full synchronization of historical data before incremental synchronization. This ensures a complete data set.

[root@server16 maxwell-1.29.2]# /opt/yyds/apps/maxwell-1.29.2/bin/maxwell-bootstrap --database gmall2022 --table user_info --config /opt/yyds/apps/maxwell-1.29.2/config.properties
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-start",
    "ts": 1450557744,
    "data": {}
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "hello"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "bootstrap!"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-complete",
    "ts": 1450557744,
    "data": {}
}

be careful:

1) The first piece of data with type bootstrap start and the last piece of data with type bootstrap complete are the marks of the start and end of bootstrap and do not contain data. Only the data with type bootstrap insert in the middle contains data.

2) The ts of all records output by a bootstrap is the same, which is the start time of bootstrap.

Tags: Big Data

Posted by mass on Tue, 26 Apr 2022 05:27:50 +0300