Table API and Flink SQL - Chapter 2 API call

2.1 basic program structure

The program structure of Table API and SQL is similar to that of streaming processing; It can also be roughly considered that there are several steps: first create the execution environment, and then define source, transform and sink.
The specific operation process is as follows:
val tableEnv = ... // Execution environment for creating tables
// Create a table for reading data
// Register a table to output the calculation results
// Through the Table API query operator, a result table is obtained
val result = tableEnv.from("inputTable").select(...)
// Get a result table through SQL query statement
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")
// Write the result table to the output table

2.2 create table environment

The simplest way to create a table environment is to directly create it by calling the Create method based on the stream processing execution environment:
val tableEnv = StreamTableEnvironment.create(env)


Table environment is the core concept of integrating table API & SQL in flink. It is responsible for:
⚫ Register catalog
⚫ Register in internal catalog
⚫ Execute SQL query
⚫ Register user-defined functions
⚫ Convert a DataStream or DataSet to a table
⚫ Save a reference to the ExecutionEnvironment or StreamExecutionEnvironment
When creating TableEnv, you can pass in an additional EnvironmentSettings or TableConfig parameter, which can be used to configure some features of TableEnvironment.
For example, configure the old version of Flink streaming query
val settings = EnvironmentSettings.newInstance()
 .useOldPlanner() // Use the old version of planner
 .inStreamingMode() // Stream processing mode
val tableEnv = StreamTableEnvironment.create(env, settings)

Based on the old batch environment (Flink batch query):

val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)


blink streaming query:
val bsSettings = EnvironmentSettings.newInstance()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
Blink version based batch environment (blink batch query):
val bbSettings = EnvironmentSettings.newInstance()
val bbTableEnv = TableEnvironment.create(bbSettings)

2.3 register in Catalog

Concept (Table 1.2)

TableEnvironment can register the Catalog and can be based on the Catalog registry. It maintains a map between Catalog table tables.
A Table is specified by an "identifier" and consists of three parts: Catalog name, database name and object name (Table name). If no directory or database is specified, the current default value is used.
Tables can be conventional (Table) or virtual (View). General tables can generally be used to describe external data, such as files, database tables or message queue data, or can be directly converted from DataStream. A View can be created from an existing Table, usually a result of a table API or SQL query.

2.3.2 connect to file system (Csv format)

Connect the external system, register in the Catalog, and directly call tableenv Connect() is OK, and the parameters in it should be passed
Enter a ConnectorDescriptor, that is, the connector descriptor. For the connector of the file system, flink
It has been provided internally, which is called file system ().
The code is as follows:
.connect( new FileSystem().path("sensor.txt")) // Define table data source, external connection
 .withFormat(new OldCsv()) // Defines the formatting method after reading data from an external system
 .withSchema( new Schema()
 .field("id", DataTypes.STRING())
 .field("timestamp", DataTypes.BIGINT())
 .field("temperature", DataTypes.DOUBLE())
 ) // Define table structure
 .createTemporaryTable("inputTable") // Create temporary table


This is an older version of the csv format descriptor. Because it is non-standard and is not universal for interfacing with external systems, it will be discarded and replaced by a new format descriptor conforming to RFC-4180 standard in the future. The new descriptor is called Csv(), but it is not directly provided by flick. It needs to introduce the dependency flick csv:


The code is very similar. Just change OldCsv in withFormat to Csv.

2.3.3 connection to Kafka

In the link kafka connector of kafka, version 1.10 already provides the support of Table API. We can directly pass in a class called kafka in the connect method, which is the connector descriptor of kafka connector.
 new Kafka()
 .version("0.11") // Define the version of kafka
 .topic("sensor") // Subject definition
 .property("zookeeper.connect", "localhost:2181")
 .property("bootstrap.servers", "localhost:9092") )
 .withFormat(new Csv())
 .withSchema(new Schema()
 .field("id", DataTypes.STRING())
 .field("timestamp", DataTypes.BIGINT())
 .field("temperature", DataTypes.DOUBLE())
Of course, it can also be connected to ElasticSearch, MySql, HBase, Hive and other external systems. The implementation method is basically similar.

2.4 table query

Using the connector of the external system, we can read and write data and register in the Catalog of the environment. Next, you can perform query transformation on the table.
Flink provides us with two query methods: Table API and SQL.

2.4.1 call of table API

Table API is a query API integrated in Scala and Java languages. Unlike SQL, the query of table API is not represented by string, but is called step by step in the host language.
The Table API is based on the Table class representing a "Table" and provides a complete set of operation processing method APIs. These methods will return a new Table object, which represents the result of applying the transformation operation to the input Table. Some relational conversion operations can be composed of multiple method calls to form a chain call structure. For example, Table select(…). filter(...), where select (...) represents the field specified in the selection Table, and filter(...) represents the filter criteria.
The implementation in the code is as follows:
val sensorTable: Table = tableEnv.from("inputTable")
val resultTable: Table = senorTable
.select("id, temperature")
.filter("id ='sensor_1'")

2.4.2 SQL query

Flink's SQL integration is based on Apache compute, which implements the SQL standard. In Flink, use regular words
String to define SQL query statements. The result of SQL query is a new Table.
The code implementation is as follows:
val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from 
inputTable where id ='sensor_1'")


val resultSqlTable: Table = tableEnv.sqlQuery(
 |select id, temperature
 |from inputTable
 |where id = 'sensor_1'


Of course, aggregation can also be added. For example, we can count the number of temperature data of each sensor
val aggResultTable = sensorTable
.select('id, 'id.count as 'count)
Implementation of SQL:
val aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from
inputTable group by id")


Here, the fields specified in the Table API are preceded by a single quotation mark ', which is the writing method of Expression type defined in the Table API, which can easily represent the fields in a table.
Fields can be directly enclosed in double quotation marks, or in the form of half side single quotation mark + field name. The latter form is generally used in later codes

2.5 converting DataStream to table

Flink allows us to convert a Table to a DataStream: Based on a DataStream, we can first stream the data source, then map it into a sample class, and then convert it into a Table. The column fields of Table are the fields in the sample class, so there is no need to define schema.

2.5.1 code expression

The implementation in the code is very simple. Use tableenv directly Just use fromdatastream(). The default converted Table # schema corresponds to the field definitions in DataStream one by one, or can be specified separately. This allows us to change the order of fields, rename, or select only some fields, which is equivalent to a map operation (or a select operation of the Table API).
The codes are as follows:
val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
 .map(data => {
 val dataArray = data.split(",")
 SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
val sensorTable: Table = tableEnv.fromDataStream(dataStream)
val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts)

2.5.2 correspondence between data type and Table schema

In the example in the previous section, the correspondence between the data type in the DataStream and the Schema of the table is name based mapping according to the field name in the sample class, so you can also rename it with as. Another way of correspondence is to directly map according to the position of the field. In the process of correspondence, you can directly specify a new field name.
Name based correspondence:
val sensorTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id
as 'myId, 'temperature)
Location based correspondence:
val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)
Flink's DataStream and dataset APIs support multiple types.
Composite types, such as tuples (built-in Scala and Java tuples), POJO s, Scala case classes, and Flink's Row class
Type, which allows nested data structures with multiple fields, which can be accessed in the expression of Table. Other classes
Type, it is regarded as atomic type.
For tuple type and atomic type, it is generally better to use position correspondence; If you have to use a name, it is also possible:
Tuple type. The default names are "_1" and "_2"; For atomic type, the default name is "f0".

2.6. Create Temporary View

The first way to create a temporary view is to convert it directly from DataStream. Similarly, the corresponding field can be converted directly; You can also specify corresponding fields during conversion.
The code is as follows:
tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature, 
'timestamp as 'ts)
In addition, of course, you can also create views based on Table:
tableEnv.createTemporaryView("sensorView", sensorTable)
The Schema of View and Table are identical. In fact, in the Table API, View and Table can be considered equivalent.
2.7. Output table
The output of the table is realized by writing data into the TableSink. TableSink is a general interface that can support different file formats, storage databases and message queues.
The most direct way to output a Table is through Table The insertinto () method writes a Table into the registered TableSink.
2.7.1 output to file
The code is as follows:
// Register output table
 new FileSystem().path("...\\resources\\out.txt")
) // Defines the connection to the file system
 .withFormat(new Csv()) // Define formatting method, Csv format
 .withSchema(new Schema()
 .field("id", DataTypes.STRING())
 .field("temp", DataTypes.DOUBLE())
) // Define table structure
 .createTemporaryTable("outputTable") // Create temporary table

2.7.2 Update Mode

In the process of stream processing, the processing of tables is not as simple as the traditional definition.
For Streaming Queries, you need to declare how to perform transformations between (dynamic) tables and external connectors. The type of message exchanged with the external system is specified by the update mode.
There are three update modes in the Flink Table API:
1) Append Mode
In append mode, tables (dynamic tables) and external connectors only exchange Insert messages.
2) Retract Mode
In recall mode, tables and external connectors exchange Add and Retract messages.
⚫ Insert will be encoded as adding message;
⚫ Delete is encoded as a recall message;
⚫ Update is encoded as the recall message of the updated line (previous line) and the updated line (new line)
Add message for.
In this mode, key s cannot be defined, which is completely different from the upsert mode.
3) Upsert mode
In Upsert mode, dynamic tables and external connectors exchange Upsert and Delete messages. This mode requires a unique key through which update messages can be delivered. In order to correctly apply the message, the external connector needs to know the attribute of this unique key.
⚫ Both Insert and Update are encoded as Upsert messages;
⚫ Delete is encoded as delete information.
The main difference between this mode and the Retract mode is that the Update operation is encoded with a single message, so it will be more efficient.

2.7.3 output to Kafka

In addition to exporting to files, you can also export to kafka. We can combine the previous kafka as the input data to build a data pipeline, kafka in and kafka out.
The code is as follows:
// Output to kafka
 new Kafka()
 .property("zookeeper.connect", "localhost:2181")
 .property("bootstrap.servers", "localhost:9092") )
 .withFormat( new Csv() )
 .withSchema( new Schema()
 .field("id", DataTypes.STRING())
 .field("temp", DataTypes.DOUBLE())


2.7.4 output to ElasticSearch

The connector of ElasticSearch can be operated in the up SERT (update+insert) mode, so you can use the key defined by Query to exchange up SERT / delete messages with external systems.
In addition, for append only queries, the connector can also operate in append mode, so that only insert messages can be exchanged with external systems.
At present, the data format supported by es is only Json, while flink itself does not have corresponding support, so it needs to be introduced
The code implementation is as follows:
// Output to es
 new Elasticsearch()
 .host("localhost", 9200, "http")
 .documentType("temp") )
 .inUpsertMode() // Specifies the Upsert mode
 .withFormat(new Json())
 .withSchema( new Schema()
 .field("id", DataTypes.STRING())
 .field("count", DataTypes.BIGINT())

2.7.5 output to MySql

Flink specifically provides a Flink jdbc connector for jdbc connection of Table API. We need to introduce dependency first:


The code implementation of jdbc connection is quite special, because there is no corresponding java/scala class to implement ConnectorDescriptor, so tableenv cannot be directly implemented connect(). However, Flink SQL leaves the interface for executing DDL: tableenv sqlUpdate().
For jdbc table creation, it is naturally suitable to write DDL directly, so our code can be written as follows:
// Output to Mysql
val sinkDDL: String =
 |create table jdbcOutputTable (
 | id varchar(20) not null,
 | cnt bigint not null
 |) with (
 | 'connector.type' = 'jdbc',
 | 'connector.url' = 'jdbc:mysql://localhost:3306/test',
 | 'connector.table' = 'sensor_count',
 | 'connector.driver' = 'com.mysql.jdbc.Driver',
 | 'connector.username' = 'root',
 | 'connector.password' = '123456'


2.8 converting tables to DataStream

Tables can be converted to DataStream or DataSet. In this way, the custom stream or batch program can continue to run on the results of Table API or SQL query.
When converting a table to DataStream or DataSet, you need to specify the generated data type, that is, the data type to convert each Row of the table. Generally, the most convenient conversion type is Row. Of course, because all the field types of the results are explicit, we often use tuple types. As the result of streaming query, the table is dynamically updated. Therefore, to convert this dynamic query into a data stream, it is also necessary to encode the update operation of the table, and then there are different conversion modes.
There are two modes of table to DataStream in the Table API:
⚫ Append Mode
Used in scenarios where the table will only be changed by the Insert operation.
⚫ Retract Mode
For any scene. Some are similar to the Retract mode in the update mode. It has only Insert and Delete operations.
The obtained data will be added with a Boolean type identification bit (the first field returned) to represent the end
Is it new data (Insert) or deleted data (old data, Delete).
The code implementation is as follows:
val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
val aggResultStream: DataStream[(Boolean, (String, Long))] = 
tableEnv.toRetractStream[(String, Long)](aggResultTable)
Therefore, toAppendStream can be directly used to convert without aggregation operations such as groupby; And if
After aggregation, if there is an update operation, you must generally use toRetractDstream.

2.9 interpretation and implementation of query

The Table API provides a mechanism to Explain the logic of computing tables and optimize query plans. This is through
TableEnvironment.explain (table) method or tableenvironment The explain () method completes.
The explain method returns a string describing three plans:
⚫ Unoptimized logical query plan
⚫ Optimized logical query plan
⚫ Actual implementation plan
We can view the execution plan in the code:
val explaination: String = tableEnv.explain(resultTable)


The interpretation and execution process of Query is generally the same as that of blink planner, but different. On the whole, Query will be expressed as a logical Query plan, and then explained in two steps:
1. Optimize query plan
2. Interpret as a DataStream or DataSet program
The Blink version is batch flow unified, so all queries will only be interpreted as DataStream programs; In addition, in the batch environment TableEnvironment, the Blink version should go to tableenv Execute() executes the call before the explanation begins.

































Tags: flink

Posted by Sentosa on Mon, 16 May 2022 13:15:06 +0300