在上一篇中,我们深入探讨了MapReduce
编程模型的工作流程,了解了数据是如何由“输入”经过“映射”与“归约”阶段处理,并最终生成所需结果。在本篇中,我们将通过具体实例来分析MapReduce
编程模型的实际应用,并加深对这一模型的理解。
MapReduce编程模型概述
MapReduce
是一个编程模型和关联的实现,用于处理和生成大规模数据集。MapReduce
将计算分为两个主要阶段:Map
阶段和Reduce
阶段。
- Map阶段:将输入数据分解为一系列键值对(key-value pairs)。
- Reduce阶段:将Map阶段输出的中间结果进行合并,并生成最终结果。
实例分析:词频统计
接下来,我们将通过一个简单的案例——词频统计
来具体分析MapReduce
的工作流程。
需求说明
假设我们需要统计一篇文本中每个单词出现的次数。输入数据为一段文本,而输出结果为每个单词及其对应的出现次数。
Map阶段实现
在Map
阶段,我们将每个单词作为键(key),出现的次数作为值(value)。我们首先需要定义一个Mapper
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text();
@Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\\s+"); for (String w : words) { word.set(w); context.write(word, one); } } }
|
在上述代码中,我们导入了Hadoop的Mapper
类,并重写了map
方法。在这个方法中,文本行被分割成单词,然后我们将每个单词作为key
,并写入对应的value
为1。
Reduce阶段实现
在Reduce
阶段,我们将相同键的值进行归约。即将相同单词的出现次数进行汇总。我们需要定义一个Reducer
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
|
在WordCountReducer
中,我们重写了reduce
方法,计算相同单词的出现总次数并输出。
作业的主类
最后,我们需要一个主要的作业类来配置整个MapReduce
作业。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
在这个类中,我们创建了一个MapReduce
作业,设置了相关的\Mapper
和\Reducer
类,并指定输入输出路径。
运行和输出结果
我们可以通过以下命令来运行这个MapReduce
作业:
1
| hadoop jar yourjarfile.jar WordCount input.txt output
|
假设input.txt
文件的内容是:
1 2 3
| hello world hello hadoop hello mapreduce
|
最终输出结果会被保存到output
目录中,格式如下:
1 2 3 4
| hello 3 world 1 hadoop 1 mapreduce 1
|
在这个案例中,我们通过一个具体的MapReduce
实例,深入理解了MapReduce
编程模型的工作原理和具体应用。
小结
通过本篇的实例分析,我们看到了MapReduce
编程模型如何处理数据。在接下来的章节中,我们将继续探索Hadoop生态系统,介绍各种常用工具和技术,让你在大数据处理的世界中游刃有余。