Flink Table API & SQL Programming Guide

Apache Flink provides two top-level relational APIs: Table API and SQL. Flink realizes batch flow unification through Table API & SQL. Among them, Table API is a language integrated query API for Scala and Java, which allows the query of relational operators (such as select, where and join) to be combined in a very intuitive way. Flink SQL based Apache Calcite The standard SQL is implemented, and users can use the standard SQL to process data sets. Table API and SQL are closely integrated with Flink's DataStream and DataSet API s. Users can realize mutual transformation. For example, they can register DataStream or DataSet as table to operate data. It is worth noting that table API and SQL are not yet completely perfect and are still under active development, so not all operator operations can be realized through it.

<!-- more -->

rely on

From flink1 Since September 9, Flink has provided two kinds of planners for the table & SQL API, namely Blink planner and old planner, of which the old planner is in flink1 Versions before 9. The main differences are as follows:

Tip: for production environment, old planner is currently recommended

  • Flink table common: a general module, which contains some common codes of Flink Planner and Blink Planner
  • Flink table API Java: Table & SQL API of Java language, only for table (in early development stage, not recommended)
  • Flink table API Scala: Table & SQL API of Scala language, only for table (in early development stage, not recommended)
  • Flex table API Java bridge: Table & SQL API of Java language, supporting datastream / dataset API (recommended)
  • Flex table API Scala bridge: Table & SQL API of Scala language, supporting datastream / dataset API (recommended)
  • Flink table Planner: planner and runtime The planner is the old planner before Flink1,9 (recommended)
  • Blink table planner blink: new Blink planner
  • Blink table runtime blink: new Blink runtime
  • Flink table Uber: make the above API module and old planner into a jar package, such as Flink table - * Jar, bit and / lib directory
  • Flash table Uber Blink: make the above API module and Blink module into a jar package, such as fflink table Blink - * Jar, bit and / lib directory

Blink planner & old planner

Blink planner and old planner have many different characteristics, which are listed as follows:

  • Blink planner regards batch jobs as a special case of stream processing jobs. Therefore, the conversion between Table and DataSet is not supported, and the batch job will not be converted into DataSet program, but into DataStream program.
  • Blink planner does not support BatchTableSource, but uses bounded StreamTableSource.
  • Blink planner only supports new catalogs, not externalcatalogs (obsolete).
  • For the implementation of filterable tablesource, the two planners are different. The old planner will push the predicate down to planerexpression (which will be removed in the future), while the Blink planner will push the predicate down to expression (which represents a logical tree that produces calculation results).
  • Only Blink planner supports key value Configuration, that is, parameter setting through Configuration.
  • As for the implementation of planerconfig, the two planner s are different.
  • Blink planner will optimize multiple sinks into one DAG (only TableEnvironment is supported, but StreamTableEnvironment does not). old planner always optimizes each sink into a new DAG, and each DAG is independent of each other.
  • The old planner does not support catalog statistics, and the Blink planner supports catalog statistics.

Flink table & pom dependency of SQL program

Depending on the language used, you can choose the following dependencies, including scala version and java version, as follows:

<!-- java edition -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- scala edition -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

In addition, if you need to run the table API & SQL program in the local IDE, you need to add the following pom dependencies:

<!-- Flink 1.9 previous old planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- new Blink planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

In addition, if you need to implement custom formats (such as interacting with kafka) or user-defined functions, you need to add the following dependencies:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

Table API & SQL programming template

All table API & SQL programs (whether batch processing or stream processing) have the same form. The general programming structure form will be given below:

// Create a TableEnvironment object and specify the planner and processing mode (batch and streaming)
TableEnvironment tableEnv = ...; 
// Create a table
tableEnv.connect(...).createTemporaryTable("table1");
// Register an external table
tableEnv.connect(...).createTemporaryTable("outputTable");
// Create a Table object through the query of the Table API
Table tapiResult = tableEnv.from("table1").select(...);
// Create a Table object through the query of SQL query
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// Write results to TableSink
tapiResult.insertInto("outputTable");
// implement
tableEnv.execute("java_job");

Note: Table API & SQL queries can be integrated with each other. In addition, Table API & SQL API can be used in DataStream or DataSet to realize the mutual conversion between DataStream, DataSet and Table.

Create TableEnvironment

TableEnvironment is an entry of table API & SQL program, which mainly includes the following functions:

  • Register the Table in the internal catalog
  • Register catalog
  • Loading pluggable modules
  • Execute SQL query
  • Register user-defined functions
  • Conversion between DataStream, DataSet and Table
  • Holds references to ExecutionEnvironment and StreamExecutionEnvironment

A Table must belong to a specific TableEnvironment. Tables of different tableenvironments cannot be used together (such as join, union and other operations).

TableEnvironment is created by calling batchtableenvironment Create () or streamtableenvironment The static method of create(). In addition, by default, the jar packages of the two planners exist under the classpath, and all planners need to be clearly specified.

// **********************
// Flow processing query
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//Or tableenvironment fstableenv = tableenvironment create(fsSettings);

// ******************
// FLINK batch query
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK stream processing query
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// Or tableenvironment bstableenv = tableenvironment create(bsSettings);

// ******************
// BLINK batch query
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

Create in catalog table

Temporary and permanent tables

Tables can be divided into temporary tables and permanent tables. Permanent tables need a catalog (such as Hive's Metastore) to maintain the metadata information of the table. Once a permanent table is created, you can access the table as long as you connect to the catalog. The table can be deleted only when the permanent table is displayed and deleted. The life cycle of temporary tables is Flink Session. These tables cannot be accessed by other Flink sessions. These tables do not belong to any catalog or database. If the database corresponding to the temporary table is deleted, the temporary table will not be deleted.

Create table

Virtual tables

A Table object is equivalent to a view (virtual Table) in SQL. It encapsulates a logical execution plan and can be created through a catalog, as follows:

// Get a TableEnvironment
TableEnvironment tableEnv = ...; 
// table object, the result set of the query
Table projTable = tableEnv.from("X").select(...);
// Register a table named "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);

External data source tables (Connector Tables)

External data sources can be registered as tables, such as MySQL database data and Kafka data

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

Identity attribute of extension creation table

Table registration always contains three identification attributes: catalog, database and table name. Users can set a catalog and a database internally as the current catalog and database, so the two identification attributes of catalog and database are optional, that is, if they are not specified, current catalog and current database are used by default.

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");//Set catalog
tEnv.useDatabase("custom_database");//Set up database
Table table = ...;
// Register a view named exampleView and catalog named custom_catalog
// The name of the database is custom_database
tableEnv.createTemporaryView("exampleView", table);

// Register a view named exampleView and catalog named custom_catalog
// The name of the database is other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
 
// Register a View named 'View', and the name of catalog is custom_catalog
// The name of the database is custom_database, 'View' is a reserved keyword. You need to use ` ` (backquote)
tableEnv.createTemporaryView("`View`", table);

// Register a file named example The view of view and the name of catalog are custom_catalogļ¼Œ
// The database name is custom_database
tableEnv.createTemporaryView("`example.View`", table);

// A view named 'other' is registered_ catalog'
// The database name is other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

Query table

Table API

Table API is a query API integrating Scala and Java language. Compared with SQL, its query is not a standard SQL statement, but consists of step-by-step operations. The following shows a simple aggregate query using the table API.

// Get TableEnvironment
TableEnvironment tableEnv = ...;
//Register Orders table

// Query registered tables
Table orders = tableEnv.from("Orders");
// Calculation operation
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

SQL

Flink SQL depends on Apache Calcite The standard syntax of SQL is as follows:

// Get TableEnvironment
TableEnvironment tableEnv = ...;

//Register Orders table

// The calculation logic is the same as the Table API above
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// Register "RevenueFrance" external output table
// The calculation results are inserted into the "RevenueFrance" table
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

Output table

A table is written to TableSink and then output. TableSink is a general interface that supports multiple file formats (CSV, Parquet, Avro), multiple external storage systems (JDBC, Apache HBase, Apache Cassandra, Elasticsearch) and multiple message pairs (Apache Kafka, RabbitMQ).

Batch tables can only be written to BatchTableSink, and stream tables need to indicate AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink

// Get TableEnvironment
TableEnvironment tableEnv = ...;

// Create output table
final Schema schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG());

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable");

// Table of calculation results
Table result = ...
// Output the result table to the registered TableSink
result.insertInto("CsvSinkTable");

Table API & SQL underlying transformation and execution

As mentioned above, Flink provides two kinds of planners, old planner and Blink planner. For different planners, the execution and transformation of the underlying table API & SQL are different.

Old planner

Depending on whether it is a stream job or a batch job, the table API & SQL will be converted into a DataStream or DataSet program. A query is internally represented as a logical query plan and will be converted into two stages:

  • 1. Logical query plan optimization
  • 2. Convert to DataStream or DataSet program

The above two phases are executed only when the following operations are executed:

  • When a table is output to TableSink, such as calling table Insertinto() method
  • When an update query is executed, such as calling tableenvironment Sqlupdate() method
  • When a table is converted to a DataStream or DataSet

Once the above two phases are executed, the operation of table API & SQL will be regarded as an ordinary DataStream or DataSet program, so when streamexecutionenvironment Execute() or executionenvironment When execute() is called, the converted program will be executed.

Blink planner

Whether it is a batch job or a stream job, if the Blink planner is used, the bottom layer will be converted into a DataStream program. A query is internally represented as a logical query plan, which will be converted into two stages:

  • 1. Logical query plan optimization
  • 2. Convert to DataStream program

For TableEnvironment and StreamTableEnvironment, the transformation of a query is different

First, for TableEnvironment, when TableEnvironment When the execute () method is executed, the query of table API & SQL will be converted, because the TableEnvironment will optimize multiple sink s into one DAG.

For StreamTableEnvironment, the conversion occurs at the same time as the old planner.

Integration with datastream & dataset API

For the Old planner and the Blink planner, as long as it is a stream processing operation, it can be integrated with the DataStream API. Only the Old planner can be integrated with the DataSet API. Because the batch job of the Blink planner will be converted into a DataStream program, it cannot be integrated with the DataSet API. It is worth noting that the conversion between table and DataSet mentioned below is only applicable to the Old planner.

The query of table API & SQL can be easily integrated with DataStream or DataSet programs, and the query of table API & SQL can be embedded into DataStream or DataSet programs. DataStream or DataSet can be converted into table. Conversely, table can also be converted into DataStream or DataSet.

Register temporary tables (Views) from DataStream or DataSet

Prompt: only DataStream or DataSet can be converted into temporary table (view)

The following demonstrates the transformation of DataStream, which is similar to the transformation of DataSet.

// Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// Register DataStream as a view named myTable, where the fields are "F0" and "F1" respectively
tableEnv.createTemporaryView("myTable", stream);
// Register DataStream as a view named myTable2, where the fields are "mylong" and "mystring" respectively
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");

Convert DataStream or DataSet to Table object

You can directly convert a DataStream or DataSet into a Table object, and then use the Table API to query. The following demonstrates the transformation of DataStream, which is similar to the transformation of DataSet.

// Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// Convert DataStream to Table object. The default fields are "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert DataStream into a Table object. The default fields are "mylong" and "mystring"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

Convert table to DataStream or DataSet

When converting a Table to a DataStream or DataSet, you need to specify the data type of the DataStream or DataSet. Generally, the most convenient data type is row type. Flink provides many data types for users to choose from, including row, POJO, sample class, Tuple and atomic type.

Convert table to DataStream

The results of a stream processing query are dynamic, so when converting a table to DataStream, you need to specify an update mode. There are two modes: Append Mode and Retract Mode.

  • Append Mode

If the dynamic table only has Insert operation, that is, the previously output results will not be updated, this mode is used. If the update or delete operation uses the append mode, it will fail with an error

  • Retract Mode

This mode is always available. The return value is of type boolean. It uses true or false to mark the insertion and withdrawal of data. Returning true represents the insertion of data and false represents the withdrawal of data.

// Get StreamTableEnvironment 
StreamTableEnvironment tableEnv = ...; 
// Table with two fields (String name, Integer age)
Table table = ...
// Convert the table to DataStream, use Append Mode to append the mode, and the data type is Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// Convert the table to DataStream, append the mode with Append Mode, and the data type is defined TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// Convert the table to DataStream. The mode used is Retract Mode and the type is Row
// For the converted datastream < tuple2 < Boolean, x > >, X represents the data type of the stream,
// boolean value indicates the type of data change, where INSERT returns true and DELETE returns false
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);

Convert table to DataSet

// Get BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// Table with two fields (String name, Integer age)
Table table = ...
// Convert the table to DataSet and the data type is Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// Turn the table into a DataSet, and define the tuple2 < string, integer > data type through TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toDataSet(table, tupleType);

Mapping between Schema and data type of table

There are two ways to map the Schema and data type of a table: one is based on the position of the field subscript, and the other is based on the field name.

Mapping based on field subscript position

This method is to map one by one according to the order of fields. The usage method is as follows:

// Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// Convert DataStream to a table. The default field names are "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// Turn DataStream into a table, select the first element of tuple, and specify a field name named "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
// Turn DataStream into a table, specify "myLong" for the first element of tuple, and specify the field name of myInt for the second element
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");

Field name based mapping

The mapping method based on field name supports any data type, including POJO type. You can flexibly define table Schema mapping. All fields are mapped to a specific field name. At the same time, you can also use "as" to alias the field. The first element of Tuple element is f0, the second element is f1, and so on.

// Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// Convert DataStream to a table. The default field names are "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// Turn DataStream into a table, select the second element of tuple, and specify a field name named "f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// Convert DataStream into a table and exchange the order of fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// Convert DataStream into a table, exchange the order of fields, and alias "myInt" for f1 and "myLong" for f0
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");

Atomic type

Flink refers to Integer, Double, String or ordinary types as atomic types. A DataStream or DataSet with atomic data type can be converted into a table of single field properties. The type of this field is consistent with the data type of DataStream or DataSet, and the name of this field can be specified.

//Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// The data type is atomic type Long
DataStream<Long> stream = ...
// Convert DataStream to a table. The default field name is "f0"
Table table = tableEnv.fromDataStream(stream);
// Convert DataStream into a table and specify the field name as myLong“
Table table = tableEnv.fromDataStream(stream, "myLong");

Tuple type

The DataStream or DataSet of tuple type can be converted into a table. You can reset the field name of the table (that is, map one by one according to the position of tuple elements. After conversion to a table, each element has an alias). If you do not specify a name for the field, the default name is used (F0 and F1 in java language and _1in Scala). Users can also rearrange the order of fields and give an alias to each field.

// Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//Tuple2 < long, string > type DataStream
DataStream<Tuple2<Long, String>> stream = ...
// Convert DataStream into a table. The default field names are "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// Convert DataStream into a table, and specify the field name as "mylong" and "mystring" (in the order of Tuple elements)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// Convert DataStream into a table, specify the field name as "f0", "f1", and exchange order
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// Turn DataStream into a table, select only the second element of Tuple, and specify the field name as "f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// Turn DataStream into a table, specify the alias myString for the second element of Tuple, and specify the field name myLong for the first element
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");

POJO type

When converting a DataStream or DataSet of POJO type into a table, if the table name is not specified, the name of the POJO field itself is used by default. The mapping of the original field name needs to specify the name of the original field. You can alias it, change the order of the fields, or select only some fields.

// Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//The data type is POJO type of Person, and the fields include "name" and "age"
DataStream<Person> stream = ...
// Convert DataStream into a table. The default field names are "age" and "name"
Table table = tableEnv.fromDataStream(stream);
//  Convert DataStream to a table, specify the alias myAge for the "age" field and myName for the "name" field
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
//  Turn DataStream into a table and select only one name field
Table table = tableEnv.fromDataStream(stream, "name");
//  Turn DataStream into a table, select only one name field and alias myName
Table table = tableEnv.fromDataStream(stream, "name as myName");

Row type

In the process of converting a Row type DataStream or DataSet into a table, it can be mapped according to the location or name of the field. At the same time, it can also give an alias to the field, or only select some fields.

// Get StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// DataStream of Row type. Specify two fields "name" and "age" through RowTypeInfo
DataStream<Row> stream = ...
// Convert DataStream into a table. The default field names are original field names "name" and "age"
Table table = tableEnv.fromDataStream(stream);
// Convert DataStream into a table, and specify myName alias for the first field and myAge alias for the second field according to location mapping
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// Convert DataStream into a table and map according to the field name. Alias the name field myName and the age field myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// Convert DataStream into a table, map according to the field name, and select only the name field
Table table = tableEnv.fromDataStream(stream, "name");
// Turn DataStream into a table, map according to the field name, select only the name field, and give an alias "myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");

Query optimization

Old planner

Apache Flink leverages Apache compute to optimize and transform queries. Current optimizations include projection and filter pushdown, decorrelation of subqueries, and other types of query rewriting. The Old Planner currently does not support optimizing the order of joins, but executes them in the order defined in the query.

By providing a CalciteConfig object, you can adjust the optimization rule set applied in different stages. This can be done by calling CalciteConfig The createbuilder () method is used to create and call tableenv getConfig. The setplanerconfig (CalciteConfig) method passes the object to the TableEnvironment.

Blink planner

Apache Flink leverages and extends Apache compute to perform complex query optimization. This includes a series of rule-based and cost_based optimizations, such as:

  • Decorrelation subquery based on Apache compute
  • Projection clipping
  • partition pruning
  • Filter predicate push down
  • Filter push down
  • Sub plan deduplication to avoid double calculation
  • Special subquery rewriting, including two parts:

    • Convert IN and EXISTS to left semi join
    • Convert NOT IN and NOT EXISTS to left anti join
  • To adjust the join order, you need to enable table optimizer. join-reorder-enabled

Note: IN / EXISTS / NOT IN / NOT EXISTS is currently supported only in combination with subquery rewriting.

The query optimizer can make more informed and reasonable optimization decisions not only based on the plan, but also based on the statistics of the data source and the fine-grained overhead of each operation (such as io, cpu, network and memory).

Advanced users can provide user-defined optimization rules through the CalciteConfig object by calling tableenv getConfig. Setplanerconfig (CalciteConfig), pass the parameter to TableEnvironment.

View execution plan

The SQL language supports viewing the execution plan of an SQL through explain. The Flink Table API can also view the specific execution plan by calling the explain() method. This method returns a string to describe three partial plans:

  1. The abstract syntax tree of relational query, that is, the unoptimized logical query plan,
  2. Optimized logical query plan
  3. Actual execution plan
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);
// View execution plan
String explanation = tEnv.explain(table);
System.out.println(explanation);

The results of the implementation plan are:

== Abstract syntax tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized logical execution plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== Physical execution plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

Stage 2 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 3 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE

        Stage 4 : Operator
            content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
            ship_strategy : FORWARD

            Stage 5 : Operator
                content : from: (count, word)
                ship_strategy : REBALANCE

Summary

This paper mainly introduces the Flink table API & SQL, first introduces the basic concept of Flink table API & SQL, then introduces the dependencies needed to build the Flink table API & SQL program, then introduces the two kinds of Flink planners, and also introduces how to register and the conversion between DataStream, DataSet and table. Finally, it introduces the query optimization corresponding to the two kinds of Flink planners, and gives a case of viewing the execution plan.

Official account "big data technology and data warehouse", reply to "data" to receive the big data package

Tags: flink

Posted by rtconner on Tue, 24 May 2022 19:33:05 +0300