mapreduce job submitted to yarn analysis
Related classes
Configuration
Configure the job. If not, the default configuration will be used.
Job
Encapsulates the running information of a job.
Cluster
An object representing the local connection between ResourceManager and file system; Internally encapsulates the file system information of JobRunner and runtime; If the mode is local, the mode is local, and the mode is local.
ClientProtocolProvider
Used to generate JobRunner; If it is local mode, the ClientProtocolProvider is local ClientProtocolProvider, and the cluster mode is YarnClientProtocolProvider.
ServiceLoader
Used to load the ClientProtocolProvider.
InputFormat
Logical slicing of jobs.
JobSubmitter
Submit a job; It includes obtaining jobId, submitting directory path, and uploading job running files (jar, slice information file, job.xml).
Submission process
New job
-
Create a new Configuration object, and set the Configuration parameters of job running and the required parameters for connecting to hadoop cluster through the set() method of the object.
-
Create a new Job object and transfer the above Configuration object into the Job object.
-
Set Job runtime parameters, jar local location, full class names of map and reduce, input and output paths, etc.
Job start
- Start the Job by calling the Job #waitforcompletement() or Job#submit() methods of the Job object.
Job#submit
-
First, confirm that the operation status is DEFINE; Then call setUseNewAPI() to determine whether to use the new API. The new API is used by default.
-
Call Job#connect() to connect the Cluster; First, judge whether the Cluster object exists. If it does not exist, finally call Cluster#initialize() to create a Cluster object to represent the connection between local and Cluster.
-
Create a JobSubmitter object for submitting jobs.
-
Use the above JobSubmitter object to call the JobSubmitter#submitJobInternal() method to submit jobs to the cluster.
Cluster#initialize()
-
Use ServiceLoader to load ClientProtocolProvider through foreach statement; First, ServiceLoader will create a LocalClientProtocolProvider object.
-
Call the ClientProtocolProvider#create() method to prepare to generate the JobRunner object
-
Determine whether the MapReduce set in the Configuration object framework. Whether name is yarn or not. If it is not set, the LocalClientProtocolProvider object directly generates the LocalJobRunner object and returns it; If set, null is returned and the next foreach is entered.
-
The second run of forench will return the YarnClientProtocolProvider, call the ClientProtocolProvider#create() method to return the YarnRunner object and connect to the ResourceManager.
JobSubmitter#submitJobInternal()
- First, check whether the output path meets the requirements; Judgment parameter MapReduce application. framework. Whether path is configured (see the official document for this parameter).
- Obtain a jobId from Yarn and generate a path to the submission directory; Submit the job jar file and the crc verification file of the file to the submission directory.
- Call the JobSubmitter#writeSplits() method. If you use the newAPI, finally call JobSubmitter#writeNewSplits() to write the slice information file to the submission directory.
- The above function continues to call InputFormat#getSplits() to get slice information, because the default InputFormat object is TextInputFormat. So let's analyze the getSplit() method of TextInputFormat; TextInputFormat inherits from FileInputFormat.
- After obtaining the slice information, the slices will be arranged in descending order according to the size.
- Write out the slice information file and job XML to the submission directory.
- Submit the assignment.
FileInputFormat#getSplit()
-
Obtain the minimum slice and maximum slice size (unit B), and then obtain the information of all files in the input directory. Slicing is just a logical slicing, which saves the segmentation information about the source file through the slicing file, rather than directly slicing the source file.
-
Traverse the input file:
-
Get the block information of the file in the corresponding file system.
-
Call TextInputFormat to determine whether the current configuration supports slicing. If the compressor is not set, it supports slicing. If it is set, it determines whether the compressor is a subclass of SplittableCompressionCodec. If yes, it can be sliced.
-
If it is not slicable, the whole file is constructed into a large logical slice and returned.
-
If slicing is possible, obtain the file block size of the file system; Calculate the slice size according to (file block size, set minimum slice size, maximum slice size) and the calculation formula math max(minSize, Math.min(maxSize, blockSize)).
-
Loop slicing, and judge whether the remaining part divided by the slice size is greater than 1.1 each time. If it is greater than, the loop continues; Otherwise, add the rest directly to the slice information.
Partial code
Job#submit()
//Confirm job status ensureState(JobState.DEFINE); //Set api setUseNewAPI(); //--------------------------------Connection cluster---------------------------------------- connect(); //getClient() returns the JobRunner object JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //----------------------------Submit job----------------------------------------- return submitter.submitJobInternal(Job.this, cluster); } }); //Set the job status to run state = JobState.RUNNING;
Cluster#initialize()
//ClientProtocolProvider for (ClientProtocolProvider provider : frameworkLoader) { //Return to LocalJobRunner in local mode //Return to YarnRunner in cluster mode clientProtocol = provider.create(conf); }
JobSubmitter#submitJobInternal()
//Add the path of MapReduce framework archive to conf addMRFrameworkToDistributedCache(conf); //Get jobId JobID jobId = submitClient.getNewJobID(); //Get a job submission directory from. Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //Submit the job jar file and the crc verification file of the file to the submission directory. copyAndConfigureFiles(job, submitJobDir); //Slice and write out the slice information file to the submission directory int maps = writeSplits(job, submitJobDir); //Write job XML to submit directory writeConf(conf, submitJobFile);
JobSubmitter#writeNewSplits()
//Gets the configured InputFormat object. The default is TextInputFormat InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); //Get slice information. List<InputSplit> splits = input.getSplits(job) // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); //Write out the slice information to the submission directory JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
FileInputFormat#getSplit()
//----------------------------------------Slice size in bytes---------------------------------------- long minSize = Math.max(getFormatMinSplitSize()/*long(1)*/, getMinSplitSize(job)/*job conf The minimum slice size can be set in. The default is long(1)*/); //The maximum slice size can be set in job conf, and the default is the maximum value of long type long maxSize = getMaxSplitSize(job); //Get the information of all files in the input directory. List<FileStatus> files = listStatus(job); //Traversal file for (FileStatus file: files) { //Get the block information where the file is located if (file instanceof LocatedFileStatus) { //Get from local file system blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { //Get from hdfs FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } //If the current configuration environment supports slicing if (isSplitable(job, path)) { //Get block size long blockSize = file.getBlockSize(); //Use the formula math Max (minsize, math.min (maxsize, blocksize)) calculates the slice size. long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; //Write slice information //When writing, you need to judge bytesremaining / splitsize > split_ SLOP while (((double) bytesRemaining)/splitSize > SPLIT_SLOP/*1.1*/) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } //Write slice information if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } //If the current environment configuration does not support slicing, directly treat the entire file as a slice. else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); }
TextInputFormat#isSplitable()
//Get compressor final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); //Returns null if no compressor is set if (null == codec) { return true; } //Returns whether the compressor is a subclass of SplittableCompressionCodec. If yes, it can be sliced. return codec instanceof SplittableCompressionCodec;
Summary of points for attention
Set the minimum and maximum job slice sizes
- By configuring MapReduce input. fileinputformat. split. Minsize sets the minimum value, which is 1 by default; mapreduce.input.fileinputformat.split.maxsize sets the maximum value. The default value is the maximum value of long type.
section
- Slicing is just a logical slicing, which saves the segmentation information about the source file through the slicing file, rather than directly slicing the source file.
- The isSplitable() method needs to be called to judge whether the current configuration environment can be sliced. In TextInputFormat#isSplitable(), if slicing is required, the configured compressor needs to be a subclass of SplittableCompressionCodec; Or compressor is not configured.
- The formula for calculating the slice size is math max(minSize, Math.min(maxSize, blockSize)). Therefore, to increase splitSize, you need to increase minsize value > blocksize, and to decrease splitSize, you need to decrease maxsize value < blocksize.
- The default splitSize==blockSize.
- Circular slicing: if the remaining unprocessed size of the current file divided by splitSize is greater than 1.1 before slicing each time, the remaining part will be sliced (Note: Although 1.1 is used instead of 1 for judgment, the slice size is still splitSize instead of splitSize*1.1), otherwise it will be directly added to the slice information.
- The slices are arranged in descending order according to size.