How does FlinkX read and write to the Clickhouse?

Introduction: This paper will mainly introduce the process and related parameters of reading and writing Clickhouse by FlinkX. The core content will focus on the following three questions: 1 Which version does FlinkX Clickhouse support 2. What are the parameters for Clickhouse to read and write Clickhouse 3. What are the instructions for Clickhouse to read and write Clickhouse parameters?

This article will mainly introduce the process and related parameters of reading and writing Clickhouse by FlinkX. The core content will focus on the following three issues. Download the FlinkX plug-in:

https://github.com/DTStack/flinkx

  1. Which version does FlinkX Clickhouse support?
  2. What parameters does Clickhouse have for reading and writing Clickhouse?
  3. What are the instructions for Clickhouse to read and write Clickhouse parameters?

ClickHouse read

1, Plug in name

Name: clickhousereader

2, Supported data source versions

ClickHouse 19.x and above

3, Parameter description

「jdbcUrl」

  • Description: jdbc connection string for relational database
  • JDBC URL reference document: Clickhouse JDBC official document
  • Required: Yes
  • Default: None

「username」

  • Description: user name of the data source
  • Required: Yes
  • Default: None

「password」

  • Description: the data source specifies the password for the user name
  • Required: Yes
  • Default: None

「where」

  • Description: filter conditions. The reader plug-in splices SQL according to the specified column, table and where conditions, and extracts data according to this SQL. In the actual business scenario, the data of the current day is often selected for synchronization, and the where condition can be specified as GMT_ create > time.
  • Note: you cannot specify the where condition as limit 10. Limit is not a legal where clause of SQL.
  • Required: No
  • Default: None

「splitPk」

  • Description: specify this parameter when the channel in the speed configuration is greater than 1. The Reader plug-in splices sql according to the concurrency number and the field specified by this parameter to make each concurrent read different data and improve the reading rate. In the case of split tables, it is recommended to use split tables. Therefore, it is easy to use split tables for split data. At present, splitPk only supports shaping data segmentation, and does not support other types such as floating point, string, date, etc. If the user specifies other unsupported types, FlinkX will report an error! If the channel is greater than 1 but this parameter is not configured, the task will be set as failed.
  • Required: No
  • Default: None

「fetchSize」

  • Description: the number of pieces of data read in each batch during reading.
  • Note: the value of this parameter cannot be set too large, otherwise the reading will timeout and the task will fail.
  • Required: No
  • Default: 1000

「queryTimeOut」

  • Description: query timeout, in seconds.
  • Note: when there is a large amount of data, or query from the view, or custom sql query, you can specify the timeout through this parameter.
  • Required: No
  • Default: 1000

「customSql」

  • Description: user defined query statement. If only specified fields cannot meet the requirements, you can specify the sql of the query through this parameter. It can be any complex query statement. Note: only query statements can be used, otherwise the task will fail; The fields returned by the query statement need to be strictly corresponding to the fields in the column list; When this parameter is specified, the table specified in the connection is invalid; When specifying this parameter, column must specify specific field information and cannot be replaced by * sign;
  • Required: No
  • Default: None

「column」

  • Description: the field to be read.
  • Format: three formats are supported

1. Read all fields. If there are many fields, you can use the following writing method:

"column":["*"]

2. Specify only the field name:

"column":["id","name"]

3. Specify specific information:

"column": [{
    "name": "col",
    "type": "datetime",
    "format": "yyyy-MM-dd hh:mm:ss",
    "value": "value"
}]

Attribute description:

  1. Name: field name
  2. Type: field type, which can be different from the field type in the database. The program will make a type conversion
  3. Format: if the field is a time string, you can specify the format of time and convert the field type to date format for return
  4. Value: if the specified field does not exist in the database, an error will be reported. If the specified field exists, when the value of the specified field is null, this value will be returned as the default value
  • Required: Yes
  • Default: None

「polling」

  • Description: whether interval polling is enabled. After it is enabled, data will be pulled from the database periodically according to the polling interval. To enable interval polling, you also need to configure the parameters pollingInterval and increColumn. You can select the configuration parameter startLocation. If the parameter startLocation is not configured, the maximum value of the increment field will be queried from the database as the starting position of polling when the task is started.
  • Required: No
  • Default: false

「pollingInterval」

Description: polling interval, the interval between pulling data from the database. The default is 5000 milliseconds.
Required: No
Default: 5000

「requestAccumulatorInterval」

  • Description: the interval between sending query accumulator requests.
  • Required: No
  • Default: 2

Configuration example

1. Basic configuration

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

2. Multichannel

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

3. Specify customSql

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "select id from tableTest",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

4. Specify startLocation for incremental synchronization

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "increColumn": "id",
          "startLocation": "20",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

5. Interval polling

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2,
          "polling": true,
          "pollingInterval": 3000
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

ClickHouse write

1, Plug in name

Name: clickhousewriter

2, Supported data source versions

ClickHouse 19.x and above

3, Parameter description

「jdbcUrl」

  • Description: jdbc connection string for relational database
  • Required: Yes
  • Default: None

「username」

  • Description: user name of the data source
  • Required: Yes
  • Default: None

「password」

  • Description: the data source specifies the password for the user name
  • Required: Yes
  • Default: None

「column」

  • Description: the fields to be written in the destination table are separated by English commas. For example: "column": ["id","name","age"].
  • Required: Yes
  • Default: no
  • Default: None

「preSql」

  • Description: a set of standard statements here will be executed before writing data to the destination table
  • Required: No
  • Default: None

「postSql」

  • Description: after writing data to the destination table, a set of standard statements here will be executed
  • Required: No
  • Default: None

「table」

  • Description: the name of the destination table. At present, only single table configuration is supported, and multiple tables will be supported later
  • Required: Yes
  • Default: None

「writeMode」

  • Description: the insert into statement is used to control the writing of data to the target table, and only the insert operation is supported
  • Required: Yes
  • All options: insert
  • Default: insert

「batchSize」

  • Description: the number of records submitted in batch at one time. This value can greatly reduce the number of network interactions between FlinkX and the database and improve the overall throughput. However, setting this value too large may cause the FlinkX process to run OOM
  • Required: No
  • Default value: 1024

The source of the article is as follows. Interested students can check the original text:
https://www.aboutyun.com/forum.php?mod=viewthread&tid=29271

More Flink technical questions can be exchanged in the nail group

Original link: https://developer.aliyun.com/article/770821?

Copyright notice: the content of this article is spontaneously contributed by Alibaba cloud real name registered users, and the copyright belongs to the original author. Alibaba cloud developer community does not own its copyright or bear corresponding legal liabilities. Please refer to Alibaba cloud developer community user service agreement and Alibaba cloud developer community intellectual property protection guidelines for specific rules. If you find any content suspected of plagiarism in the community, fill in the infringement complaint form to report. Once verified, the community will immediately delete the content suspected of infringement.

Tags: Java Database SQL

Posted by pedroz on Fri, 20 May 2022 16:16:32 +0300