Personal official account:

Implementation principle of Flink Client
Before we interpret the source code of Flink Client, we must first understand its implementation principle.
Main functions of Flink Client
We can see that the FLink Client mainly has three tasks. The first is to run the Application, the second is to operate and manage the tasks, and the third is to manage the cluster on the Client side.
##Main components of Flink Client
ContexEnvironment: build different execution environments
PipelineExecutor: the execution of Stream Graph generates JobGraph
CluterDescriptor: connections to different clusters
Application Code run
When we submit the code to run our code, the client mainly does three things at the same time:
1. Load the ClusterClientFactory through the ClusterClientServiceLoader to create a cluster.
2. Create the ContexEnvironment through the ContexEnvironmentFactory to build the StreamExecutionEnvironment
3. Build the PackagedProgram to obtain the environment required by the Flink task to run, so as to call the main() method of our code through reflection, execute the execute () method through the StreamExecutionEnvironment, obtain the StreamGraph, submit the StreamGraph to the PipelineExecutor for execution, generate the JobGraph object, and finally submit the JobGraph to the Cluster.
ExecutionEnvironment classification
#Flink Client source code analysis
Through the answers to the above questions, we have a general understanding of the process of Flink Client submitting jobs. Next, let's go into the source code and see how to implement it.
CliFrontend
When our user runs a task on the client, click Run SH will call the main() method in the CliFrontend class, so let's take a specific look at the main() method.
/** Submits the job based on the arguments. */ public static void main(final String[] args) { // Obtain the location of the configuration file (flink-conf.yaml) from the system environment (environment variable or conf or.. / conf directory of the current directory) EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. find the configuration directory, parse the flink-conf.yaml file, and put the attributes in the Configuration final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration loads the global configuration and adds the given dynamic attribute configuration. final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines to initialize custom command line parameters final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory); int retCode = 31; try { //4. Initialize CliFronted, use the constructor, assign some attributes of CliFrontend class, and provide attributes for subsequent execution final CliFrontend cli = new CliFrontend(configuration, customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration)); //5. Call the parseAndRun method to execute the task retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); } finally { System.exit(retCode); } }
Building CliFrontend objects
We can see that the first three steps are to load the configuration file, and the fourth step is to construct the CliFrontend object through the previous configuration. Let's take a further look at the properties of CliFrontend
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) { this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines); } public CliFrontend( Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader, List<CustomCommandLine> customCommandLines) { // The configuration is initialized, which is the attribute of flink-conf.yaml this.configuration = checkNotNull(configuration); // Custom command line parameters initialized this.customCommandLines = checkNotNull(customCommandLines); //The clusterClientServiceLoader is initialized and org.org is loaded using SPI apache. flink. client. deployment. Standaloneclientfactory class this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); FileSystem.initialize( configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); this.customCommandLineOptions = new Options(); for (CustomCommandLine customCommandLine : customCommandLines) { customCommandLine.addGeneralOptions(customCommandLineOptions); customCommandLine.addRunOptions(customCommandLineOptions); } //Client timeout initialized this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); // The default parallelism is initialized this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); }
We can see that when using the CliFrontend construct, DefaultClusterClientServiceLoader() is initialized, and this class implements the ClusterClientServiceLoader interface. Two methods are defined in the ClusterClientServiceLoader interface, getClusterClientFactory and getApplicationModeTargetNames.
Let's take a further look at the implementation of the getClusterClientFactory method in the DefaultClusterClientServiceLoader class
We can see that getClusterClientFactory () loads ClusterClientFactory through SPI technology, and all the information required by the Flink cluster client is created in the ClusterClientFactory interface to prepare for the creation of the cluster.
Call the parseAndRun method to execute the task
There is a switch case in the parseAndRun method, which executes different actions according to different action types of the command line. Here we focus on the executed run method
run method
In the run method, two objects, ProgramOptions and Configuration, are used to build a PackagedProgram to execute the program.
/** * Executions the run action. * * @param args Command line arguments for the run action. */ protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); //1. Program options final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List<URL> jobJars = getJobJarAndDependencies(programOptions); //2. Configuration final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); //3. To build a packaged program (a packaged program is to package all necessary information, including runtime parameters and program configuration, into an object) try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { // 4. Execute user code executeProgram(effectiveConfiguration, program); } }
ProgramOptions properties
The properties in program options describe all the basic information Flink needs when executing a program
PackagedProgram
We can see that the PackagedProgram describes the environment required for the Flink task to run. With these attributes, our task can run.
URLClassLoader
What we want to express here is that Flink's class loading mechanism is different from the class loading mechanism provided by Java virtual machine by default.
We can review the default class loading mechanism provided by the Java virtual machine (parental delegation). If a class loader receives a class loading request, it does not load by default, but delegates the request to the parent class loader to load and pass it to the BootStrap ClassLoader on the top level. Only when the parent loader cannot be loaded can the following class loader be loaded.
If Flink uses this kind of loading mechanism, the problem may be: Flink cluster runs the code of Flink framework, which includes various dependencies of Flink. Complex applications written by users may also contain many complex dependencies. There must be a class with the same name. Then, when loading the user's class, once it is loaded by the parent class loader, it will not be loaded again, and the user's program will report an error.
Flink's class loading mechanism
Flink can configure different class loading mechanisms in flink-conf.yml (child first by default):
classloader.resolve-order: parent-first classloader.resolve-order: child-first
Let's take a direct look at the loadClassWithoutExceptionHandling method of the ChildFirstClassLoader class:
@Override protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve) throws ClassNotFoundException { // First, check whether the class has been loaded Class<?> c = findLoadedClass(name); if (c == null) { // The classes configured in alwaysParentFirstPatterns should be preferentially loaded in the parent class for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) { if (name.startsWith(alwaysParentFirstPattern)) { return super.loadClassWithoutExceptionHandling(name, resolve); } } try { // The user's class is not loaded by the parent class loader, but directly loaded by itself c = findClass(name); } catch (ClassNotFoundException e) { // let URLClassLoader do it, which will eventually call the parent c = super.loadClassWithoutExceptionHandling(name, resolve); } } else if (resolve) { resolveClass(c); } return c; }
A brief description is as follows:
- Call findLoadedClass() method to check whether the class corresponding to the fully qualified name has been loaded. If not, continue to execute;
- Check whether the class to be loaded starts with the prefix in the alwaysParentFirstPatterns collection. If yes, call the corresponding method of the parent class and load it in the form of parent first;
- If the class does not meet the conditions of the alwaysParentFirstPatterns set, call the findClass() method to find and obtain the definition of the class in the user code (this method has a default implementation in URLClassLoader). If not found, fallback to the parent loader to load.
That completes Flink's class loading mechanism.
executeProgram executes user code
After the PackagedProgram is built, execute the user code.
// -------------------------------------------------------------------------------------------- // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException { ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false); }
Let's look at clientutils Executeprogram() method
Here is a classic usage of ContextClassLoader
First, switch the ContextClassLoader to UserCodeClassLoader, and use this class loader to load the code of the main method;
After execution, switch back the context class loader. When the program executes code, when it needs to execute each class, ClassLoader will load this class, which can be seen from the loadClass() method of Debug ClassLoader class.
StreamGraph
JobGraph
ExecutionGraph
###Execution