时间一晃而过,距离第一篇的大数据之Hadoop从入门到放弃(一)已经半年了,现在就连疫情也放开了,笔者刚从高烧中恢复,发烧的过程中害怕自己会不会挂掉,这不想起了未竟的学习事业,惊坐起继续写文。
前言:从"存得下"到"算得动"
上一篇我们认识了HDFS——一个能存下整个互联网的分布式文件系统。但是,光能存有什么用?就像你有一个装满了书的图书馆,但是没有图书管理员帮你找书、整理书,那这个图书馆就只是一个大号仓库。
所以,这一篇我们来学习如何让大象干活——MapReduce。
先讲一个悲伤的故事:
有一天,老板让我分析一下公司过去5年的用户行为日志,大概10TB。
我说:"好的,我写个Python脚本跑一下。"
3天后,老板问我结果。
我说:"脚本还在跑......"
老板说:"你被开除了。"
这个故事告诉我们:单机处理大数据 = 职业生涯自杀。
而MapReduce就是来拯救我们职业生涯的。
一、MapReduce:分而治之的艺术
1.1 核心思想:把大问题拆成小问题
MapReduce的核心思想其实很简单,用四个字就能概括:分而治之。
这就像什么呢?想象一下你要数清楚一个图书馆里所有书的总页数:
| 方案 | 做法 | 耗时 |
|---|---|---|
| 单人方案 | 你一个人一本一本数 | 一辈子 |
| MapReduce方案 | 叫100个人,每人数一个书架(Map),最后把结果加起来(Reduce) | 一下午 |
1.2 Map和Reduce到底是什么?
处理 + 输出KV对] M2[Mapper2
处理 + 输出KV对] M3[Mapper3
处理 + 输出KV对] end subgraph Shuffle["🔀 Shuffle阶段"] S[排序 + 分组
相同Key放一起] end subgraph Reduce阶段["📊 Reduce阶段"] R1[Reducer1
聚合计算] R2[Reducer2
聚合计算] end subgraph Output["📤 输出"] O1[结果文件1] O2[结果文件2] end I1 --> M1 I2 --> M2 I3 --> M3 M1 --> S M2 --> S M3 --> S S --> R1 S --> R2 R1 --> O1 R2 --> O2 end style M1 fill:#ff6b6b style M2 fill:#ff6b6b style M3 fill:#ff6b6b style S fill:#ffe66d style R1 fill:#4ecdc4 style R2 fill:#4ecdc4
用人话来说:
| 阶段 | 做什么 | 类比 |
|---|---|---|
| Map | 把输入数据转换成Key-Value对 | 图书管理员给每本书贴标签 |
| Shuffle | 把相同Key的数据聚到一起 | 把相同标签的书放到同一个箱子 |
| Reduce | 对相同Key的Value做聚合运算 | 统计每个箱子里有多少本书 |
1.3 一个公式理解MapReduce
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)翻译成人话:
- Map函数:输入一个键值对,输出一堆键值对
- Reduce函数:输入一个Key和它对应的一堆Value,输出结果
二、经典案例:Word Count(程序员的Hello World)
如果说学编程要写Hello World,那学MapReduce就必须写Word Count。
2.1 问题描述
统计一堆文本文件中,每个单词出现了多少次。
输入:
文件1: Hello World Hello Hadoop
文件2: Hello MapReduce World
文件3: Hadoop MapReduce Hadoop期望输出:
Hadoop 3
Hello 3
MapReduce 2
World 22.2 MapReduce是怎么做的?
(Hello, 1)
(World, 1)
(Hello, 1)
(Hadoop, 1)"] M2["Mapper2输出:
(Hello, 1)
(MapReduce, 1)
(World, 1)"] M3["Mapper3输出:
(Hadoop, 1)
(MapReduce, 1)
(Hadoop, 1)"] end subgraph Shuffle["🔀 Shuffle:相同Key聚合"] S1["(Hadoop, [1,1,1])"] S2["(Hello, [1,1,1])"] S3["(MapReduce, [1,1])"] S4["(World, [1,1])"] end subgraph Reduce阶段["📊 Reduce阶段:求和"] R1["(Hadoop, 3)"] R2["(Hello, 3)"] R3["(MapReduce, 2)"] R4["(World, 2)"] end F1 --> M1 F2 --> M2 F3 --> M3 M1 --> Shuffle M2 --> Shuffle M3 --> Shuffle Shuffle --> R1 Shuffle --> R2 Shuffle --> R3 Shuffle --> R4 style M1 fill:#ff6b6b style M2 fill:#ff6b6b style M3 fill:#ff6b6b style Shuffle fill:#ffe66d style R1 fill:#4ecdc4 style R2 fill:#4ecdc4 style R3 fill:#4ecdc4 style R4 fill:#4ecdc4
2.3 代码实现
// Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// key: 行偏移量(没啥用)
// value: 一行文本
String line = value.toString();
String[] words = line.split("\\s+"); // 按空格拆分
for (String w : words) {
word.set(w);
context.write(word, one); // 输出 (word, 1)
}
}
}
// Reducer类
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// key: 单词
// values: [1, 1, 1, ...]
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result); // 输出 (word, count)
}
}
// Driver类(主程序)
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}🤔 灵魂拷问:写这么多代码就为了统计单词?用Python一行就能搞定啊!
答:是的,但Python一行能处理10TB数据吗?
三、Shuffle:MapReduce的灵魂(也是最难的部分)
如果说Map和Reduce是MapReduce的两条腿,那Shuffle就是它的腰——没有腰,腿再粗也站不稳。
3.1 Shuffle到底干了什么?
Shuffle是Map阶段和Reduce阶段之间的数据传输过程,它负责:
- 把Map的输出按Key分区(决定去哪个Reducer)
- 把数据排序(按Key排序)
- 把相同Key的数据聚合在一起
- 把数据从Mapper所在节点传输到Reducer所在节点
生成最终文件] end MO --> B1 B1 --> |"达到80%阈值"|SP1 SP1 --> SP2 SP2 --> SP3 SP3 --> SP4 SP4 --> MG end subgraph ReduceTask["📊 Reduce Task (Reduce端)"] subgraph Copy["复制阶段"] CP[从各个Map Task
拉取属于自己的数据] end subgraph MergeSort["合并排序"] MS[合并所有数据
归并排序] end subgraph ReducePhase["Reduce阶段"] RD[调用Reduce函数
输出结果] end CP --> MS MS --> RD end MG --> |"HTTP传输"|CP style Buffer fill:#ffe66d style Spill fill:#ff6b6b style Copy fill:#4ecdc4
3.2 Map端Shuffle详解
(100MB) participant Disk as 本地磁盘 participant Reducer as Reducer Note over Map,Buffer: 阶段1:写入缓冲区 Map->>Buffer: 输出(K,V)对 Map->>Buffer: 继续输出... Note over Buffer: 缓冲区使用达到80% Note over Buffer,Disk: 阶段2:溢写(Spill) Buffer->>Buffer: 1. 根据Key计算分区号 Buffer->>Buffer: 2. 按(分区,Key)排序 Buffer->>Buffer: 3. 执行Combiner(可选) Buffer->>Disk: 4. 溢写到磁盘文件 Note over Map,Disk: 继续处理,可能多次溢写 Map->>Buffer: 继续输出... Buffer->>Disk: 再次溢写... Note over Disk: 阶段3:合并(Merge) Disk->>Disk: 合并所有溢写文件
生成一个分区有序文件 Note over Disk,Reducer: 阶段4:等待Reducer来拉取 Reducer->>Disk: HTTP请求获取数据 Disk->>Reducer: 返回对应分区的数据
关键点解释
环形缓冲区(100MB):
- Map输出先写到内存缓冲区,不是直接写磁盘
- 默认100MB,可以通过
mapreduce.task.io.sort.mb配置 - 为什么是"环形"?写满一圈可以继续覆盖已溢写的部分
溢写阈值(80%):
- 缓冲区用到80%就开始溢写,不等用满
- 这样可以边写边溢,提高效率
- 参数:
mapreduce.map.sort.spill.percent
分区(Partition):
- 决定每条记录去哪个Reducer
- 默认用Key的HashCode对Reducer数量取模
- 可以自定义Partitioner实现特殊分区逻辑
// 默认分区逻辑
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}3.3 Reduce端Shuffle详解
Copy阶段:
- Reducer通过HTTP从各个Map Task拉取属于自己分区的数据
- 默认5个并行复制线程(
mapreduce.reduce.shuffle.parallelcopies) - 数据先放内存,内存放不下才写磁盘
Merge阶段:
- 把从各个Mapper拉取的数据合并
- 使用归并排序,保证最终数据按Key有序
- 合并后相同Key的数据就挨在一起了
3.4 Shuffle的性能杀手
一个Reducer累死
其他Reducer闲着] B[过多小文件] --> B1[大量溢写文件
磁盘IO爆炸] C[网络传输] --> C1[Map到Reduce跨节点传输
带宽成为瓶颈] D[内存不足] --> D1[频繁溢写磁盘
GC频繁] end style A fill:#ff6b6b style B fill:#ff6b6b style C fill:#ff6b6b style D fill:#ff6b6b
四、Combiner:Map端的小Reducer
4.1 Combiner是什么?
Combiner是一个在Map端运行的Mini Reducer,用来在数据传输前做一次本地聚合,减少网络传输量。
(Hello,1)(Hello,1)(Hello,1)"] --> |"传输3条"|R1[Reducer] M2["Mapper2输出:
(Hello,1)(Hello,1)"] --> |"传输2条"|R1 Note1[网络传输5条记录] end subgraph 有Combiner["✅ 有Combiner"] M3["Mapper1输出:
(Hello,1)(Hello,1)(Hello,1)"] --> C1[Combiner] C1 --> |"(Hello,3)"|R2[Reducer] M4["Mapper2输出:
(Hello,1)(Hello,1)"] --> C2[Combiner] C2 --> |"(Hello,2)"|R2 Note2[网络传输2条记录] end style Note1 fill:#ff6b6b style Note2 fill:#4ecdc4
4.2 Combiner的使用条件
重要:不是所有场景都能用Combiner!
结果正确"] A2[求最大值 Max] --> A2D["(a,3)(a,7)→(a,7)→最终(a,10)
结果正确"] A3[求最小值 Min] --> A3D["同上"] end subgraph 不能用["❌ 不能用Combiner"] B1[求平均值 Avg] --> B1D["(a,1)(a,3)→avg=2
(a,5)(a,7)→avg=6
合并avg=(2+6)/2=4
实际avg=(1+3+5+7)/4=4
碰巧对了,但换个数就错"] B2[求中位数] --> B2D["局部中位数无法合并"] end style A1 fill:#4ecdc4 style A2 fill:#4ecdc4 style A3 fill:#4ecdc4 style B1 fill:#ff6b6b style B2 fill:#ff6b6b
判断标准:Combiner函数必须满足结合律和交换律。
用数学表示:combine(combine(a,b), c) = combine(a, combine(b,c))
4.3 代码中使用Combiner
// 在Driver中添加一行即可
job.setCombinerClass(WordCountReducer.class); // 通常Combiner逻辑和Reducer一样五、MapReduce完整执行流程
让我们把所有知识串起来,看看一个MapReduce作业从提交到完成的完整流程:
5.1 InputSplit vs Block
很多人分不清InputSplit和Block的区别:
5.2 Map Task数量计算
Map Task数量 ≈ 输入数据总大小 / Split大小例如:
- 输入数据:1GB
- Block/Split大小:128MB
- Map Task数量 ≈ 1024MB / 128MB = 8个
5.3 Reduce Task数量
Reduce Task数量由你指定:
job.setNumReduceTasks(3); // 设置3个Reducer如何选择Reduce数量?
| Reduce数量 | 效果 |
|---|---|
| 太少 | 单个Reducer负担重,可能OOM |
| 太多 | 启动开销大,产生太多小文件 |
| 经验值 | 0.95 (集群节点数 每节点最大Container数) |
六、MapReduce编程进阶
6.1 自定义数据类型
Hadoop的Key和Value必须实现特定接口:
自定义Writable示例:
public class UserBehavior implements Writable {
private String userId;
private String action;
private long timestamp;
// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(userId);
out.writeUTF(action);
out.writeLong(timestamp);
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
userId = in.readUTF();
action = in.readUTF();
timestamp = in.readLong();
}
// Getter/Setter省略...
}6.2 自定义Partitioner
当默认的Hash分区不满足需求时:
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
String phone = key.toString();
String province = phone.substring(0, 3); // 手机号前3位
switch (province) {
case "136": return 0;
case "137": return 1;
case "138": return 2;
case "139": return 3;
default: return 4;
}
}
}
// Driver中设置
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5); // 必须和分区数一致!6.3 多文件输出
// 使用MultipleOutputs实现多文件输出
public class MultiOutputReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs<Text, IntWritable> multipleOutputs;
@Override
protected void setup(Context context) {
multipleOutputs = new MultipleOutputs<>(context);
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 根据Key输出到不同文件
if (key.toString().startsWith("A")) {
multipleOutputs.write("A", key, new IntWritable(sum));
} else {
multipleOutputs.write("Other", key, new IntWritable(sum));
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}七、MapReduce调优指南
7.1 调优参数速查表
7.2 数据倾斜问题
这是MapReduce最常见的性能问题:
加随机前缀打散示例:
// Map阶段:给热点Key加随机前缀
String newKey = random.nextInt(100) + "_" + originalKey;
context.write(new Text(newKey), value);
// 第一次Reduce:局部聚合
// 第二次MapReduce:去掉前缀,最终聚合7.3 小文件问题(又是它!)
// 使用CombineTextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 128 * 1024 * 1024); // 128MB八、MapReduce的局限性
说了这么多优点,也要客观看看MapReduce的缺点:
延迟太高] B[编程模型受限] --> B1[只有Map和Reduce两个算子
复杂逻辑要写多个Job串联] C[中间结果写磁盘] --> C1[Shuffle大量磁盘IO
性能瓶颈] D[不支持迭代计算] --> D1[机器学习算法需要多次迭代
每次都要读写HDFS] E[学习成本高] --> E1[Java代码写一堆
不如SQL来得爽] end subgraph 替代方案["🚀 后来者"] F[Spark] --> F1[内存计算,比MR快10-100倍] G[Flink] --> G1[流批一体,真正的实时] H[Hive/Presto] --> H1[SQL on Hadoop,简单易用] end style A fill:#ff6b6b style B fill:#ff6b6b style C fill:#ff6b6b style D fill:#ff6b6b style E fill:#ff6b6b style F fill:#4ecdc4 style G fill:#4ecdc4 style H fill:#4ecdc4
🤔 那还学MapReduce干嘛?
- 它是分布式计算的基础思想,Spark/Flink都借鉴了它
- 很多老系统还在用,你得能维护
- 面试要考(这才是重点!)
九、本篇小结
十、写在最后
到这里,MapReduce的核心内容就讲完了。我们学会了如何让大象干活,虽然这头大象干活的方式有点笨——每次干完都要把中间结果写到磁盘上,就像一个每写一行代码就要按一次Ctrl+S的程序员。
但不管怎么说,MapReduce开创了分布式计算的新纪元。在它之前,处理海量数据是少数巨头的特权;在它之后,任何公司只要有一堆便宜的PC服务器,就能处理PB级的数据。
下一篇,我们将学习YARN——Hadoop的资源管理系统,以及Hadoop生态圈的其他成员。
预告一下第三篇的内容:
- YARN架构详解
- ResourceManager、NodeManager、ApplicationMaster
- YARN调度策略
- Hadoop生态圈全景图(Hive、HBase、Spark、Flink...)
如果你现在觉得脑子有点乱,别担心,这很正常。当年我学Shuffle的时候,画了不下20遍流程图才搞明白。
最后送你一句话:
"MapReduce虽然慢,但它能跑完;你的Python脚本虽然快,但它会OOM。"
下一篇见!🐘
本文作者:一个曾经以为Shuffle是洗牌的程序员
上次翻车原因:Reducer设成1个,跑了一整夜
下期预告:《大数据之Hadoop从入门到放弃(三)——YARN:给大象找一群小伙伴》
附录:MapReduce面试高频题
MapReduce的执行流程是什么?
Input → Split → Map → Shuffle(Partition, Sort, Spill, Merge) → Copy → Merge → Reduce → Output
Shuffle过程做了什么?
Map端:分区、排序、溢写、合并;Reduce端:复制、合并、排序
Combiner和Reducer的区别?
Combiner在Map端本地运行,用于预聚合减少网络传输;Reducer在Reduce端运行,做最终聚合。Combiner必须满足结合律。
如何解决数据倾斜?
增加Reducer数量、使用Combiner、自定义Partitioner、对热点Key加随机前缀打散、过滤异常Key
MapReduce和Spark的区别?
MR中间结果写磁盘,Spark基于内存;MR只有Map和Reduce两个算子,Spark有丰富的算子;MR不支持迭代计算,Spark支持。
InputSplit和Block的关系?
Block是HDFS物理存储单位,InputSplit是MR逻辑划分单位。通常1个Split对应1个Block,1个Split对应1个Map Task。