MapReduce programming example

According to the source code of WordCount MapReduce program, we can draw the following conclusions:
(1) The program has a main method to start the operation of the task, in which the job object stores the necessary information for the operation of the program, such as specifying Mapper class and Reducer class

job.setMapperClass(TokenizerMapper.class);
job.setReduceClass(IntSumReducer.class);

(2) The TokenizerMapper class in the program inherits the Mapper class
(3) The intsumeducer class in this program inherits the Reducer class

Summary: the business logic of the MapReduce program is divided into two parts. One part configures the operation information of the program, and the other part writes the business logic of the MapReduce program. The code of the map phase and the reduce phase of the business logic inherit Mapper class and reduce class respectively
So let's write our own WordCount program now
According to the above specifications, the main structure should be like this

The main entry of mapreduce, where job is used to manage the program

public static void main(String[] args) throws Exception{
	Job job = Job.getInstance();
	job.setMapperClass(TokenizerMapper);
	job.setReduceMapper(IntSumReducer);
	job.submit();
}
private static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
	@Override
	protected void map(LongWritable, Text, Map<LongWritable key, Text , Text, LongWritable>.Context 	context) throws IOException, InterruptedException{
	//Write business code here
}
}

private static class WCReducer extends Mapper<Text,LongWritable,  Text, LongWritable>{
	@Override
	protected void reduce(Text args0, Iterable<LongWritable> arg1, Reducer<Text, LongWritable, Text, 		LongWritable>.Context arg2) throws IOException, InterruptedException {
		//Fill in the business code of reduceTask again
	}
}

(1) The program written by the user is divided into three parts: mapepr, reducer and driver (submit the client running MR program)
(2)Mapper's input data is in the form of KV pairs (the type of KV can be customized)
(3)Mapper's output data is in the form of KV pairs (the type of KV can be customized)
(4)Mapper's business logic is written in the mapper() method
(5) The map () method (maptask process) calls every < K, V >
(6) Input data of reducer; Type corresponds to Mapper's output data type, which is also KV
(7) The business logic of reducer is unloaded in the reduce() method
(8) The reducertask process calls the reduce() method once for < k, V > groups with the same k in each group
(9) User defined Mapper and Reducer inherit their respective parent classes
(10) The whole program needs a table Driver to submit, which is a job object describing their necessary information
Business logic of WordCount:
(1) The maptask stage processes the word statistical analysis of each data block. The idea is to convert each word into a key value pair. For example, the word hello is converted into < 'hello', 1 > and sent to reducetask for summary
(2) The reducetask phase will accept the results of maptask for summary counting
The following is the specific implementation. First, look at the map

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount1 {

	/**
	 * KEYIN It refers to the type of key of the data read by the framework. In the default InputFormat, the read key is the starting offset of a line of text, so the type of key is Long
	 * VALUEIN Input value refers to the value of the text read in the next line of the frame, so the default value is the value of the text read in the next line of the frame
	 * KEYOUT It refers to the type of key in the data returned by the user-defined logic method, which is determined by the user's business logic. In this WordCount, the key we output is a word, so it is a string
	 * VALUEOUT It refers to the type of value in the data returned by the user-defined logic method, which is determined by the user's business logic. In this WordCount program, the value we output is the number of words, so it is Integer
	 * However, the data types and contained in JDK such as String and Long are inefficient in serialization. In order to improve serialization efficiency, hadoop has customized a set of serialization framework
	 * Therefore, in hadoop programs, if the data needs to be serialized (written to disk or network transmission), the data type that implements the hadoop serialization framework must be used
	 * 
	 * Long -------> LongWritable
	 * String -----> Text
	 * Integer-----> IntWritable
	 * Null--------> NullWritable
	 * @author 201811621209lidengyin
	 *
	 */
	static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String [] words = value.toString().split(" ");
			for(String word : words) {
				context.write(new Text(word), new IntWritable(1));
			}
		}
		
			
	}
	/**
	 * First, as before, the Reducer class also has input and output. The input is the processing result of the Map stage, and the output is the final output of REduce
	 * reducetask When calling the reduce method we wrote, reducetask should receive part of the data output from all maptask in the previous stage (map)
	 * (Key of data Hashcode% reducetask number = = this reduce number), so the input type of reducetask must be the same as the output type of maptask
	 * 
	 * reducetask When these received kv data are processed, our reduce method is called as follows:
	 * Group according to their own k (according to whether they receive the same kv first)
	 * Pass the k in the first kv of a group to the key variable of the reduce method, and pass the v in this group of kv to the variable values of the reduce method with an iterator
	 * @author 201811621209lidengyin
	 *
	 */
	static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			int sum = 0;
			for(IntWritable v : values) {
				sum+=v.get();
			}
			context.write(key, new IntWritable(sum));
		}
		
	}
	
	/**
	 * The main method is the entry for the mapreduce program to run. A Job class object is used to manage many parameters required for the program to run:
	 * For example, specify which component to use as the data reader and the data result output
	 * Specify which class to use as the business logic class in the map phase and which class to use as the business logic class in the reduce phase
	 * Specify the path where the jar package of wordCount job program is located
	 * And various other required parameters
	 * @param args
	 */
	  public static void main(String[] args) throws Exception {
		    Configuration conf = new Configuration();
		    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    if (otherArgs.length < 2) {
		      System.err.println("Usage: wordcount <in> [<in>...] <out>");
		      System.exit(2);
		    }
		    Job job = new Job(conf, "word count");
		    job.setJarByClass(WordCount.class);
		    job.setMapperClass(WordCountMapper.class);
		    job.setCombinerClass(WordCountReducer.class);
		    job.setReducerClass(WordCountReducer.class);
		    job.setOutputKeyClass(Text.class);
		    job.setOutputValueClass(IntWritable.class);
		    for (int i = 0; i < otherArgs.length - 1; ++i) {
		      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
		    }
		    FileOutputFormat.setOutputPath(job,
		      new Path(otherArgs[otherArgs.length - 1]));
		    System.exit(job.waitForCompletion(true) ? 0 : 1);
		  }

}

  1. MapReduce operation mode and debug
    Operation mode:
    Local operation mode: the Eclipse development environment runs locally, which is convenient for debugging and testing
    Point 1: the mapreduce program is submitted to the LocalJobRunner and runs locally in the form of a single process
    Point 2: data input and output can be local or HDFS
    Point 3: how to realize local operation? You can leave the cluster configuration file in your MapReduce program (essentially determined by the parameters mapreduce.framework. name and yarn.resourcemanager.hostname)
    Point 4: to run locally, you have to do one more thing: install hadoop for eclipse
    The environment variable Hadoop needs to be configured locally_ Home and join Path

Cluster operation mode: make jar package and submit tasks to the cluster for operation
Point 1: first, type the code into jar and upload it to the linux server
Point 2: use the hadoop jar command to submit the code to the yarn cluster for operation
Point 3: the processed data and output results should be located in the hdfs file system
Point 4: if you need to submit job s directly to the cluster in eclipse in windows, you need to modify the YarnRUnner class. This is complex and is not recommended

Tags: Java Big Data Hadoop mapreduce

Posted by php_newB on Mon, 02 May 2022 12:43:15 +0300