mapreduce job submitted to yarn analysis

mapreduce job submitted to yarn analysis

Related classes

Configuration

Configure the job. If not, the default configuration will be used.

Job

Encapsulates the running information of a job.

Cluster

An object representing the local connection between ResourceManager and file system; Internally encapsulates the file system information of JobRunner and runtime; If the mode is local, the mode is local, and the mode is local.

ClientProtocolProvider

Used to generate JobRunner; If it is local mode, the ClientProtocolProvider is local ClientProtocolProvider, and the cluster mode is YarnClientProtocolProvider.

ServiceLoader

Used to load the ClientProtocolProvider.

InputFormat

Logical slicing of jobs.

JobSubmitter

Submit a job; It includes obtaining jobId, submitting directory path, and uploading job running files (jar, slice information file, job.xml).

Submission process

New job
  • Create a new Configuration object, and set the Configuration parameters of job running and the required parameters for connecting to hadoop cluster through the set() method of the object.

  • Create a new Job object and transfer the above Configuration object into the Job object.

  • Set Job runtime parameters, jar local location, full class names of map and reduce, input and output paths, etc.

Job start
  • Start the Job by calling the Job #waitforcompletement() or Job#submit() methods of the Job object.
Job#submit
  • First, confirm that the operation status is DEFINE; Then call setUseNewAPI() to determine whether to use the new API. The new API is used by default.

  • Call Job#connect() to connect the Cluster; First, judge whether the Cluster object exists. If it does not exist, finally call Cluster#initialize() to create a Cluster object to represent the connection between local and Cluster.

  • Create a JobSubmitter object for submitting jobs.

  • Use the above JobSubmitter object to call the JobSubmitter#submitJobInternal() method to submit jobs to the cluster.

Cluster#initialize()
  • Use ServiceLoader to load ClientProtocolProvider through foreach statement; First, ServiceLoader will create a LocalClientProtocolProvider object.

  • Call the ClientProtocolProvider#create() method to prepare to generate the JobRunner object

  • Determine whether the MapReduce set in the Configuration object framework. Whether name is yarn or not. If it is not set, the LocalClientProtocolProvider object directly generates the LocalJobRunner object and returns it; If set, null is returned and the next foreach is entered.

  • The second run of forench will return the YarnClientProtocolProvider, call the ClientProtocolProvider#create() method to return the YarnRunner object and connect to the ResourceManager.

JobSubmitter#submitJobInternal()
  • First, check whether the output path meets the requirements; Judgment parameter MapReduce application. framework. Whether path is configured (see the official document for this parameter).
  • Obtain a jobId from Yarn and generate a path to the submission directory; Submit the job jar file and the crc verification file of the file to the submission directory.
  • Call the JobSubmitter#writeSplits() method. If you use the newAPI, finally call JobSubmitter#writeNewSplits() to write the slice information file to the submission directory.
  • The above function continues to call InputFormat#getSplits() to get slice information, because the default InputFormat object is TextInputFormat. So let's analyze the getSplit() method of TextInputFormat; TextInputFormat inherits from FileInputFormat.
  • After obtaining the slice information, the slices will be arranged in descending order according to the size.
  • Write out the slice information file and job XML to the submission directory.
  • Submit the assignment.
FileInputFormat#getSplit()
  • Obtain the minimum slice and maximum slice size (unit B), and then obtain the information of all files in the input directory. Slicing is just a logical slicing, which saves the segmentation information about the source file through the slicing file, rather than directly slicing the source file.

  • Traverse the input file:

  1. Get the block information of the file in the corresponding file system.

  2. Call TextInputFormat to determine whether the current configuration supports slicing. If the compressor is not set, it supports slicing. If it is set, it determines whether the compressor is a subclass of SplittableCompressionCodec. If yes, it can be sliced.

  3. If it is not slicable, the whole file is constructed into a large logical slice and returned.

  4. If slicing is possible, obtain the file block size of the file system; Calculate the slice size according to (file block size, set minimum slice size, maximum slice size) and the calculation formula math max(minSize, Math.min(maxSize, blockSize)).

  5. Loop slicing, and judge whether the remaining part divided by the slice size is greater than 1.1 each time. If it is greater than, the loop continues; Otherwise, add the rest directly to the slice information.

Partial code

Job#submit()
//Confirm job status
ensureState(JobState.DEFINE);
//Set api
setUseNewAPI();
//--------------------------------Connection cluster----------------------------------------
connect();
//getClient() returns the JobRunner object
JobSubmitter submitter = 
      getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException, 
    ClassNotFoundException {
        //----------------------------Submit job-----------------------------------------
      return submitter.submitJobInternal(Job.this, cluster);
    }
  });
//Set the job status to run
state = JobState.RUNNING;
Cluster#initialize()
//ClientProtocolProvider
for (ClientProtocolProvider provider : frameworkLoader) {
    //Return to LocalJobRunner in local mode
    //Return to YarnRunner in cluster mode
    clientProtocol = provider.create(conf);
}
JobSubmitter#submitJobInternal()
//Add the path of MapReduce framework archive to conf
addMRFrameworkToDistributedCache(conf);
//Get jobId
JobID jobId = submitClient.getNewJobID();
//Get a job submission directory from.
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//Submit the job jar file and the crc verification file of the file to the submission directory.
copyAndConfigureFiles(job, submitJobDir);
//Slice and write out the slice information file to the submission directory
int maps = writeSplits(job, submitJobDir);
//Write job XML to submit directory
writeConf(conf, submitJobFile);
JobSubmitter#writeNewSplits()
//Gets the configured InputFormat object. The default is TextInputFormat
InputFormat<?, ?> input =
  ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//Get slice information.
List<InputSplit> splits = input.getSplits(job)
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
//Write out the slice information to the submission directory
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
				jobSubmitDir.getFileSystem(conf), array);
FileInputFormat#getSplit()
//----------------------------------------Slice size in bytes----------------------------------------
long minSize = Math.max(getFormatMinSplitSize()/*long(1)*/, getMinSplitSize(job)/*job conf The minimum slice size can be set in. The default is long(1)*/);
//The maximum slice size can be set in job conf, and the default is the maximum value of long type
long maxSize = getMaxSplitSize(job);
//Get the information of all files in the input directory.
List<FileStatus> files = listStatus(job);
//Traversal file
for (FileStatus file: files) {
    	//Get the block information where the file is located
		if (file instanceof LocatedFileStatus) {
          //Get from local file system
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          //Get from hdfs
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
    	//If the current configuration environment supports slicing
    	if (isSplitable(job, path)) {
            //Get block size
			long blockSize = file.getBlockSize();
            //Use the formula math Max (minsize, math.min (maxsize, blocksize)) calculates the slice size.
			long splitSize = computeSplitSize(blockSize, minSize, maxSize);
			long bytesRemaining = length;
            //Write slice information
            //When writing, you need to judge bytesremaining / splitsize > split_ SLOP
         	 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP/*1.1*/) {
            	int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            	splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
           		 bytesRemaining -= splitSize;
          	}
            //Write slice information
          	if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            	splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
            	blkLocations[blkIndex].getHosts(),
            	blkLocations[blkIndex].getCachedHosts()));
          } 
        }
    	//If the current environment configuration does not support slicing, directly treat the entire file as a slice.
       else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
       }
TextInputFormat#isSplitable()
//Get compressor
final CompressionCodec codec =
  new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
//Returns null if no compressor is set
if (null == codec) {
  return true;
}
//Returns whether the compressor is a subclass of SplittableCompressionCodec. If yes, it can be sliced.
return codec instanceof SplittableCompressionCodec;

Summary of points for attention

Set the minimum and maximum job slice sizes
  1. By configuring MapReduce input. fileinputformat. split. Minsize sets the minimum value, which is 1 by default; mapreduce.input.fileinputformat.split.maxsize sets the maximum value. The default value is the maximum value of long type.
section
  1. Slicing is just a logical slicing, which saves the segmentation information about the source file through the slicing file, rather than directly slicing the source file.
  2. The isSplitable() method needs to be called to judge whether the current configuration environment can be sliced. In TextInputFormat#isSplitable(), if slicing is required, the configured compressor needs to be a subclass of SplittableCompressionCodec; Or compressor is not configured.
  3. The formula for calculating the slice size is math max(minSize, Math.min(maxSize, blockSize)). Therefore, to increase splitSize, you need to increase minsize value > blocksize, and to decrease splitSize, you need to decrease maxsize value < blocksize.
  4. The default splitSize==blockSize.
  5. Circular slicing: if the remaining unprocessed size of the current file divided by splitSize is greater than 1.1 before slicing each time, the remaining part will be sliced (Note: Although 1.1 is used instead of 1 for judgment, the slice size is still splitSize instead of splitSize*1.1), otherwise it will be directly added to the slice information.
  6. The slices are arranged in descending order according to size.

Tags: Java Big Data Hadoop

Posted by grenouille on Tue, 03 May 2022 13:23:53 +0300