02. Implementation principle and source code analysis of Flink Client

Personal official account:

Implementation principle of Flink Client

Before we interpret the source code of Flink Client, we must first understand its implementation principle.

Main functions of Flink Client

We can see that the FLink Client mainly has three tasks. The first is to run the Application, the second is to operate and manage the tasks, and the third is to manage the cluster on the Client side.

##Main components of Flink Client

ContexEnvironment: build different execution environments

PipelineExecutor: the execution of Stream Graph generates JobGraph

CluterDescriptor: connections to different clusters

Application Code run

When we submit the code to run our code, the client mainly does three things at the same time:

1. Load the ClusterClientFactory through the ClusterClientServiceLoader to create a cluster.

2. Create the ContexEnvironment through the ContexEnvironmentFactory to build the StreamExecutionEnvironment

3. Build the PackagedProgram to obtain the environment required by the Flink task to run, so as to call the main() method of our code through reflection, execute the execute () method through the StreamExecutionEnvironment, obtain the StreamGraph, submit the StreamGraph to the PipelineExecutor for execution, generate the JobGraph object, and finally submit the JobGraph to the Cluster.

ExecutionEnvironment classification

#Flink Client source code analysis

Through the answers to the above questions, we have a general understanding of the process of Flink Client submitting jobs. Next, let's go into the source code and see how to implement it.


When our user runs a task on the client, click Run SH will call the main() method in the CliFrontend class, so let's take a specific look at the main() method.

  /** Submits the job based on the arguments. */
    public static void main(final String[] args) {
        // Obtain the location of the configuration file (flink-conf.yaml) from the system environment (environment variable or conf or.. / conf directory of the current directory)
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory, parse the flink-conf.yaml file, and put the attributes in the Configuration
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration loads the global configuration and adds the given dynamic attribute configuration.
        final Configuration configuration =

        // 3. load the custom command lines to initialize custom command line parameters
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        int retCode = 31;
        try {
            //4. Initialize CliFronted, use the constructor, assign some attributes of CliFrontend class, and provide attributes for subsequent execution
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            //5. Call the parseAndRun method to execute the task
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
        } finally {

Building CliFrontend objects

We can see that the first three steps are to load the configuration file, and the fourth step is to construct the CliFrontend object through the previous configuration. Let's take a further look at the properties of CliFrontend

    public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
        this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);

    public CliFrontend(
            Configuration configuration,
            ClusterClientServiceLoader clusterClientServiceLoader,
            List<CustomCommandLine> customCommandLines) {
        // The configuration is initialized, which is the attribute of flink-conf.yaml
        this.configuration = checkNotNull(configuration);
        // Custom command line parameters initialized
        this.customCommandLines = checkNotNull(customCommandLines);
        //The clusterClientServiceLoader is initialized and org.org is loaded using SPI apache. flink. client. deployment. Standaloneclientfactory class
        this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);

                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        this.customCommandLineOptions = new Options();

        for (CustomCommandLine customCommandLine : customCommandLines) {
        //Client timeout initialized
        this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
        // The default parallelism is initialized
        this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);

We can see that when using the CliFrontend construct, DefaultClusterClientServiceLoader() is initialized, and this class implements the ClusterClientServiceLoader interface. Two methods are defined in the ClusterClientServiceLoader interface, getClusterClientFactory and getApplicationModeTargetNames.

Let's take a further look at the implementation of the getClusterClientFactory method in the DefaultClusterClientServiceLoader class

We can see that getClusterClientFactory () loads ClusterClientFactory through SPI technology, and all the information required by the Flink cluster client is created in the ClusterClientFactory interface to prepare for the creation of the cluster.

Call the parseAndRun method to execute the task

There is a switch case in the parseAndRun method, which executes different actions according to different action types of the command line. Here we focus on the executed run method

run method

In the run method, two objects, ProgramOptions and Configuration, are used to build a PackagedProgram to execute the program.

     * Executions the run action.
     * @param args Command line arguments for the run action.
    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        // evaluate help flag
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {

        final CustomCommandLine activeCommandLine =

        //1. Program options
        final ProgramOptions programOptions = ProgramOptions.create(commandLine);

        final List<URL> jobJars = getJobJarAndDependencies(programOptions);
        //2. Configuration
        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
        //3. To build a packaged program (a packaged program is to package all necessary information, including runtime parameters and program configuration, into an object)
        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
          // 4. Execute user code
            executeProgram(effectiveConfiguration, program);
ProgramOptions properties

The properties in program options describe all the basic information Flink needs when executing a program


We can see that the PackagedProgram describes the environment required for the Flink task to run. With these attributes, our task can run.


What we want to express here is that Flink's class loading mechanism is different from the class loading mechanism provided by Java virtual machine by default.

We can review the default class loading mechanism provided by the Java virtual machine (parental delegation). If a class loader receives a class loading request, it does not load by default, but delegates the request to the parent class loader to load and pass it to the BootStrap ClassLoader on the top level. Only when the parent loader cannot be loaded can the following class loader be loaded.

If Flink uses this kind of loading mechanism, the problem may be: Flink cluster runs the code of Flink framework, which includes various dependencies of Flink. Complex applications written by users may also contain many complex dependencies. There must be a class with the same name. Then, when loading the user's class, once it is loaded by the parent class loader, it will not be loaded again, and the user's program will report an error.

Flink's class loading mechanism

Flink can configure different class loading mechanisms in flink-conf.yml (child first by default):

classloader.resolve-order: parent-first
classloader.resolve-order: child-first

Let's take a direct look at the loadClassWithoutExceptionHandling method of the ChildFirstClassLoader class:

 protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve) throws ClassNotFoundException {

  // First, check whether the class has been loaded
  Class<?> c = findLoadedClass(name);

  if (c == null) {
   // The classes configured in alwaysParentFirstPatterns should be preferentially loaded in the parent class
   for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
    if (name.startsWith(alwaysParentFirstPattern)) {
     return super.loadClassWithoutExceptionHandling(name, resolve);

   try {
    // The user's class is not loaded by the parent class loader, but directly loaded by itself
    c = findClass(name);
   } catch (ClassNotFoundException e) {
    // let URLClassLoader do it, which will eventually call the parent
    c = super.loadClassWithoutExceptionHandling(name, resolve);
  } else if (resolve) {

  return c;

A brief description is as follows:

  • Call findLoadedClass() method to check whether the class corresponding to the fully qualified name has been loaded. If not, continue to execute;
  • Check whether the class to be loaded starts with the prefix in the alwaysParentFirstPatterns collection. If yes, call the corresponding method of the parent class and load it in the form of parent first;
  • If the class does not meet the conditions of the alwaysParentFirstPatterns set, call the findClass() method to find and obtain the definition of the class in the user code (this method has a default implementation in URLClassLoader). If not found, fallback to the parent loader to load.

That completes Flink's class loading mechanism.

executeProgram executes user code

After the PackagedProgram is built, execute the user code.

// --------------------------------------------------------------------------------------------
//  Interaction with programs and JobManager
// -------------------------------------------------------------------------------------------- 
protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
  ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);

Let's look at clientutils Executeprogram() method

Here is a classic usage of ContextClassLoader

First, switch the ContextClassLoader to UserCodeClassLoader, and use this class loader to load the code of the main method;

After execution, switch back the context class loader. When the program executes code, when it needs to execute each class, ClassLoader will load this class, which can be seen from the loadClass() method of Debug ClassLoader class.





Tags: Big Data flink

Posted by statrat on Tue, 17 May 2022 00:17:30 +0300