Big data Hadoop learning -- page ranking PageRank algorithm

1, Algorithm description

PageRank is web page ranking, also known as page ranking (SOCIAL). Some basic concepts:

1. Web page entering the chain: that is, voting. Hyperlinks to other web pages in the web page are used as other web pages entering the chain, which is equivalent to voting for other web pages;

2. Number of links: if a web page obtains more links (votes) from other web pages, it means that the web page is more important;

3. Chain entry quality: that is, the voting weight. The chain entry quality is determined by the voting web page. The initialization value is the same for all web pages, which can be set to 1. The more hyperlinks on a web page, the lower the weight of the vote.

4. Damping coefficient d: it is also a constant defined by page. It is used to reflect the proportion of hyperlinks in people's daily access methods and calculate the more realistic web page weight value. The damping coefficient set by Google is 0.85.

5. Calculation formula of web page weight pr value: pr = (1-d)/n + d*sum(tr). d is the damping coefficient, n is the total number of web pages, and tr is the voting weight received by the web page.

6. After n rounds of iterative calculation, the old and new pr values of each web page will be closer to 0. At this time, the pr value can reflect the real importance between web pages

 

As follows, there are four web pages a, B, C and d. A has hyperlinks of B and D, B has hyperlinks of C, C has hyperlinks of a and B, and D has hyperlinks of B and C.

You can get A half ticket for C, B half ticket for A, C half ticket and D half piece, C full ticket for B and D half ticket, and D half ticket for A, which is expressed by the formula:

             tr(A) = 1/2 * pr(C)

             tr(B) = 1/2 * pr(A) + 1/2 * pr(C) + 1/2 * pr(D)

             tr(C) = pr(B) + 1/2 * pr(D)

             tr(D) = 1/2 * pr(A)

It can be seen that web page C obtains the maximum voting weight and then calculates the pr value of each web page through the formula pr = (1-d)/n + d*sum(tr). This calculation process can be iterated all the time. The closer the old and new pr values of the web page are, the more real they are.

 

2, MapReduce analysis

1, mapper maps the input data set and outputs the following two types of data:

1. The list of pages with hyperlinks on the current page and the pr value of the current page;

2. The voting of the current website on each page of the hyperlink page list and the weight of the voting.

2, The reducer takes the web page as the key, counts the sum of the voting values received by each web page, then calculates the pr value, and finally outputs the results of this round of calculation

3, For multiple rounds of iterative calculation, calculate whether to stop the iteration according to the set difference between the old PR and the new pr.

 

3, MapReduce implementation

test data

A B D
B C
C A B
D B C

 

client

/**
 * Calculation formula of website pr: pr = (1-d)/n + d*sum(tr)
 * d: Damping coefficient, 0.85
 * n: The total number of websites, here is the number of records
 * tr: The website gets the voting weight of other websites
 */
public class PageRankDriver {
	/**The difference range of page pr value is used to judge whether to stop iteration*/
	private static final double LIMIT = 0.001;
	private static final double D = 0.85;

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration(true);
		//Whether it is cross platform. For example, if the client submits in windows, it needs to be set to true for compatibility. The default is false
		conf.set("mapreduce.app-submission.cross-platform", "true");
		//Configure the running platform of the program. The local running is configured as local, and the distributed running is configured as yarn. The distributed running program must be packaged in jar
		conf.set("mapreduce.framework.name", "local");

		//The most original website data
		String inputFile = "/test/pagerank/input/page_data.txt";
		FileSystem fs = FileSystem.get(conf);
		//Set total number of websites
		conf.setInt("pageNum", readFileLineNum(fs, inputFile));
		//Set damping coefficient
		conf.setDouble("zuniNumber", D);

		int i = 0;
		while (true) {
			i++;
			try {
				//Setting a custom configuration and reading it in the mapreduce program is equivalent to passing in an input parameter
				conf.setInt("runCount", i);

				//Enter the separator of the format class. The initialization file is a space and the subsequent result file is a tab
				String spiltStr = " ";
				if (i != 1) {
					inputFile = "/test/pagerank/output" + (i - 1) + "/part-r-00000";
					spiltStr = "\t";
				}
				Path input = new Path(inputFile);
				if (!fs.exists(input)) {
					System.out.println("File:" + inputFile + "non-existent");
					break;
				}

				//Set the separator of the input format class KeyValueTextInputFormat. Only the first one is separated. The default is / t
				conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", spiltStr);

				Job job = Job.getInstance(conf);
				job.setJobName("pagerank-" + i);
				job.setJarByClass(PageRankDriver.class);
				//If you want to submit distributed operation locally, you need to configure not only the yran above, but also the full path of the jar package
				// job.setJar("/a/c/d.jar");

				//Set the input format class. KeyValueTextInputFormat will divide a line according to tab characters. You can set the separator through parameters
				job.setInputFormatClass(KeyValueTextInputFormat.class);

				job.setMapperClass(PageRankMapper.class);
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(NetPage.class);

				job.setReducerClass(PageRankReducer.class);

				FileInputFormat.addInputPath(job, input);
				Path output = new Path("/test/pagerank/output" + i);
				FileOutputFormat.setOutputPath(job, output);

				boolean flag = job.waitForCompletion(true);
				if (flag) {
					System.out.println("success.");
					//Take the value in the counter from the job, which is the sum of the difference of pr value of each page
					long sum = job.getCounters().findCounter(MyCounter.MY).getValue();
					System.out.println("pr Sum of differences:" + sum);
					double avgd = sum / 4000.0;
					if (avgd < LIMIT) {
						break;
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * How many lines are there in the read file
	 *
	 * @param fs
	 * @param file
	 * @return
	 * @throws Exception
	 */
	public static int readFileLineNum(FileSystem fs, String file) throws Exception {
		FSDataInputStream fsDataInputStream = fs.open(new Path(file));
		InputStreamReader s = new InputStreamReader(fsDataInputStream);
		BufferedReader reader = new BufferedReader(s);
		int i = 0;
		while (reader.ready()) {
			i++;
			reader.readLine();
		}
		return i;
	}
}

The client needs to execute MapReduce circularly, and then set a threshold value for the total difference of pr, and set a counter in the job China. When the counter value reaches the threshold value, the iteration can be stopped.

And the client needs to pass in a parameter to determine the iteration number in the program, because the first iteration page has no pr value, which is 1 by default.

 

Mapper

public class PageRankMapper extends Mapper<Text, Text, Text, NetPage> {
	/**Current page*/
	private final NetPage currPage = new NetPage(0);
	/**Associated web page*/
	private final NetPage relPage = new NetPage(1);
	private final Text mkey = new Text();


	@Override
	protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		//There are two situations for input data samples:
		//key: A			value:B D
		//key: A:0.5		value:B D
		//Get custom parameters set by the client
		final int runCount = context.getConfiguration().getInt("runCount", 1);

		if (runCount == 1) {
			currPage.setPage(key.toString());
		} else {
			StringTokenizer st = new StringTokenizer(key.toString(), ":");
			currPage.setPage(st.nextToken());
			currPage.setPr(Double.parseDouble(st.nextToken()));
		}
		currPage.setRelationPage(value.toString());
		mkey.set(currPage.getPage());
		//The first type of data is the website relationship corresponding to the current website and the pr value of the current website
		context.write(mkey, currPage);

		String[] users = currPage.getRelationPage();
		if (users != null && users.length > 0) {
			double avgPr = currPage.getPr() / users.length;
			for (int i = 0; i < users.length; i++) {
				relPage.setPage(users[i]);
				relPage.setPr(avgPr);
				mkey.set(users[i]);
				//The second type of data is the current website's voting on related websites and the voting weight
				context.write(mkey, relPage);
			}
		}

	}
}

 

Reducer

public class PageRankReducer extends Reducer<Text, NetPage, Text, Text> {
	private final Text rkey = new Text();
	private final Text rval = new Text();

	@Override
	protected void reduce(Text key, Iterable<NetPage> values, Context context) throws IOException, InterruptedException {
		//There are also two types of input data samples:
		//key A 		value A		1.0 	B C 	0
		//key A			value A 	0.5     		1
		NetPage currPage = new NetPage();
		double trSum = 0.0;
		for (NetPage page : values) {
			if (page.getPageType() == 0) {
				//be careful!!! When reducing the value of the loop, the internal reference cannot be directly assigned to the external use of the for loop,
				// The iterator of reduce will not create a new object, but change the value of the original reference, so you need to copy the object in depth
				// currPage = page;
				currPage.setPage(page.getPage());
				currPage.setPr(page.getPr());
				currPage.setRelationPage(page.getRelationPage());
				currPage.setPageType(page.getPageType());
			} else {
				//Count the total weight of the website to get the vote
				trSum += page.getPr();
			}
		}
		//Get the total number of websites
		int pageNum = context.getConfiguration().getInt("pageNum", 1);
		//Get damping coefficient, default 0.85
		double d = context.getConfiguration().getDouble("zuniNumber", 0.85);
		//Calculate the new pr value according to the formula
		double newPr = (1 - d) / pageNum + d * trSum;
		//Calculate the difference between the new pr value and the old pr value
		double c = newPr - currPage.getPr();

		//Zoom in 1000 times and calculate the absolute value
		int j = (int) (c * 1000);
		//double strong conversion int loses decimal precision, 0.8 - > 0. When j=0, iteration stops, so multiple
		j = Math.abs(j);

		//Put the value into the accumulator and calculate the pr difference and the sum of all web pages
		context.getCounter(MyCounter.MY).setValue(j);

		rkey.set(currPage.getPage() + ":" + newPr);
		rval.set(currPage.arrToStr());
		context.write(rkey, rval);
	}
}

 

Output result set

A:0.4091210396728514	B D
B:0.6997982913818357	C
C:0.7920743121337889	A B
D:0.2304549036865234	B C

 

See code cloud for complete code and test data: Hadoop test portal

Tags: Big Data Hadoop Algorithm mapreduce

Posted by mblack0508 on Sat, 21 May 2022 16:13:37 +0300