一、MapReduce工作流程概述
MapReduce是用於分散式數據處理的編程模型。它採用了劃分-映射-合併的思想,將大型數據集分成小塊,由多個計算節點並行處理,並將小塊結果合併成一個最終結果。
MapReduce的工作流程包括兩個階段:Map(映射)和Reduce(合併)。
二、Map階段詳解
在(MapReduce中,映射)階段,首先需要對輸入數據進行劃分,將數據劃分為若干份較小的數據塊,每個數據塊交由一個Map任務處理。Map任務將數據塊轉化為一系列鍵值對,並輸出為一個新的鍵值對序列。
// Map示例代碼 public static class Map extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
三、Shuffle和排序詳解
Shuffle是MapReduce的一個重要步驟,它負責將Map輸出的鍵值對按照鍵進行分組,將同一組內的記錄發給同一個Reduce任務進行處理。
Shuffle完成後,Reduce任務接收到的記錄已經按照鍵值進行了分組,只需要對每個鍵值組進行合併和處理即可。
MapReduce默認的排序方式是根據鍵值對的鍵對記錄進行排序。如果需要自定義排序方式,可以實現WritableComarable介面,並重寫compareTo方法。
四、Reduce階段詳解
Reduce階段的處理對象是Map階段輸出的鍵值對序列,Reduce任務將同一組內的記錄進行合併,形成一個更加小的序列,直至處理完所有的記錄。
// Reduce示例代碼 public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
五、Combiner優化
為了減少Reduce任務的負擔,在Map輸出的鍵值對序列傳輸到Reduce之前,可以在Map本地進行一些合併操作。這種方式稱之為Combiner。
Combiner可大大減少Reduce任務所需要處理的數據量,從而提高整個MapReduce任務的效率。
// Combiner示例代碼 public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class Comb extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
六、MapReduce作業提交
MapReduce作業提交有兩種方式:命令行和代碼。其中命令行方式為Hadoop自帶的hadoop命令,代碼方式需要先創建一個配置對象,指定Hadoop集群地址、作業名等,然後將MapReduce任務的輸入輸出路徑和類名封裝到一個Job對象中,最後通過job.waitForCompletion方法提交作業。
// MapReduce作業提交示例代碼 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://localhost:9000"); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
原創文章,作者:ZVJGR,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/371442.html