搜 索

大数据之Hadoop从入门到放弃之MapReduce

  • 20阅读
  • 2023年01月07日
  • 0评论
首页 / AI/大数据 / 正文

时间一晃而过,距离第一篇的大数据之Hadoop从入门到放弃(一)已经半年了,现在就连疫情也放开了,笔者刚从高烧中恢复,发烧的过程中害怕自己会不会挂掉,这不想起了未竟的学习事业,惊坐起继续写文。

前言:从"存得下"到"算得动"

上一篇我们认识了HDFS——一个能存下整个互联网的分布式文件系统。但是,光能存有什么用?就像你有一个装满了书的图书馆,但是没有图书管理员帮你找书、整理书,那这个图书馆就只是一个大号仓库。

所以,这一篇我们来学习如何让大象干活——MapReduce。

先讲一个悲伤的故事:

有一天,老板让我分析一下公司过去5年的用户行为日志,大概10TB。

我说:"好的,我写个Python脚本跑一下。"

3天后,老板问我结果。

我说:"脚本还在跑......"

老板说:"你被开除了。"

这个故事告诉我们:单机处理大数据 = 职业生涯自杀

而MapReduce就是来拯救我们职业生涯的。


一、MapReduce:分而治之的艺术

1.1 核心思想:把大问题拆成小问题

MapReduce的核心思想其实很简单,用四个字就能概括:分而治之

graph LR subgraph 传统思路["❌ 单机硬刚"] A[10TB数据] --> B[一台机器处理] B --> C[处理3天] C --> D[老板发火🔥] end subgraph MR思路["✅ MapReduce分治"] E[10TB数据] --> F[拆成1000份] F --> G[1000台机器并行处理] G --> H[合并结果] H --> I[几分钟搞定✨] end style D fill:#ff0000,color:#fff style I fill:#00ff00

这就像什么呢?想象一下你要数清楚一个图书馆里所有书的总页数:

方案做法耗时
单人方案你一个人一本一本数一辈子
MapReduce方案叫100个人,每人数一个书架(Map),最后把结果加起来(Reduce)一下午

1.2 Map和Reduce到底是什么?

graph TB subgraph MapReduce流程["🔄 MapReduce处理流程"] subgraph Input["📥 输入"] I1[数据块1] I2[数据块2] I3[数据块3] end subgraph Map阶段["🗺️ Map阶段"] M1[Mapper1
处理 + 输出KV对] M2[Mapper2
处理 + 输出KV对] M3[Mapper3
处理 + 输出KV对] end subgraph Shuffle["🔀 Shuffle阶段"] S[排序 + 分组
相同Key放一起] end subgraph Reduce阶段["📊 Reduce阶段"] R1[Reducer1
聚合计算] R2[Reducer2
聚合计算] end subgraph Output["📤 输出"] O1[结果文件1] O2[结果文件2] end I1 --> M1 I2 --> M2 I3 --> M3 M1 --> S M2 --> S M3 --> S S --> R1 S --> R2 R1 --> O1 R2 --> O2 end style M1 fill:#ff6b6b style M2 fill:#ff6b6b style M3 fill:#ff6b6b style S fill:#ffe66d style R1 fill:#4ecdc4 style R2 fill:#4ecdc4

用人话来说:

阶段做什么类比
Map把输入数据转换成Key-Value对图书管理员给每本书贴标签
Shuffle把相同Key的数据聚到一起把相同标签的书放到同一个箱子
Reduce对相同Key的Value做聚合运算统计每个箱子里有多少本书

1.3 一个公式理解MapReduce

map:     (K1, V1)           → list(K2, V2)
reduce:  (K2, list(V2))     → list(K3, V3)

翻译成人话:

  • Map函数:输入一个键值对,输出一堆键值对
  • Reduce函数:输入一个Key和它对应的一堆Value,输出结果

二、经典案例:Word Count(程序员的Hello World)

如果说学编程要写Hello World,那学MapReduce就必须写Word Count。

2.1 问题描述

统计一堆文本文件中,每个单词出现了多少次。

输入

文件1: Hello World Hello Hadoop
文件2: Hello MapReduce World
文件3: Hadoop MapReduce Hadoop

期望输出

Hadoop    3
Hello     3
MapReduce 2
World     2

2.2 MapReduce是怎么做的?

graph TB subgraph 输入["📄 输入文件"] F1["文件1: Hello World Hello Hadoop"] F2["文件2: Hello MapReduce World"] F3["文件3: Hadoop MapReduce Hadoop"] end subgraph Map阶段["🗺️ Map阶段:拆分单词,输出(word, 1)"] M1["Mapper1输出:
(Hello, 1)
(World, 1)
(Hello, 1)
(Hadoop, 1)"] M2["Mapper2输出:
(Hello, 1)
(MapReduce, 1)
(World, 1)"] M3["Mapper3输出:
(Hadoop, 1)
(MapReduce, 1)
(Hadoop, 1)"] end subgraph Shuffle["🔀 Shuffle:相同Key聚合"] S1["(Hadoop, [1,1,1])"] S2["(Hello, [1,1,1])"] S3["(MapReduce, [1,1])"] S4["(World, [1,1])"] end subgraph Reduce阶段["📊 Reduce阶段:求和"] R1["(Hadoop, 3)"] R2["(Hello, 3)"] R3["(MapReduce, 2)"] R4["(World, 2)"] end F1 --> M1 F2 --> M2 F3 --> M3 M1 --> Shuffle M2 --> Shuffle M3 --> Shuffle Shuffle --> R1 Shuffle --> R2 Shuffle --> R3 Shuffle --> R4 style M1 fill:#ff6b6b style M2 fill:#ff6b6b style M3 fill:#ff6b6b style Shuffle fill:#ffe66d style R1 fill:#4ecdc4 style R2 fill:#4ecdc4 style R3 fill:#4ecdc4 style R4 fill:#4ecdc4

2.3 代码实现

// Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    @Override
    protected void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        // key: 行偏移量(没啥用)
        // value: 一行文本
        
        String line = value.toString();
        String[] words = line.split("\\s+");  // 按空格拆分
        
        for (String w : words) {
            word.set(w);
            context.write(word, one);  // 输出 (word, 1)
        }
    }
}

// Reducer类
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private IntWritable result = new IntWritable();
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // key: 单词
        // values: [1, 1, 1, ...]
        
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);  // 输出 (word, count)
    }
}

// Driver类(主程序)
public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

🤔 灵魂拷问:写这么多代码就为了统计单词?用Python一行就能搞定啊!

答:是的,但Python一行能处理10TB数据吗?


三、Shuffle:MapReduce的灵魂(也是最难的部分)

如果说Map和Reduce是MapReduce的两条腿,那Shuffle就是它的腰——没有腰,腿再粗也站不稳。

3.1 Shuffle到底干了什么?

Shuffle是Map阶段和Reduce阶段之间的数据传输过程,它负责:

  1. 把Map的输出按Key分区(决定去哪个Reducer)
  2. 把数据排序(按Key排序)
  3. 把相同Key的数据聚合在一起
  4. 把数据从Mapper所在节点传输到Reducer所在节点
graph TB subgraph MapTask["🗺️ Map Task (Map端)"] subgraph MapOutput["Map输出"] MO[Map函数输出] end subgraph Buffer["环形缓冲区 (100MB)"] B1[收集KV对] end subgraph Spill["溢写过程"] SP1[1. 分区 Partition] SP2[2. 排序 Sort] SP3[3. 可选:Combiner] SP4[4. 溢写到磁盘] end subgraph Merge["合并"] MG[合并所有溢写文件
生成最终文件] end MO --> B1 B1 --> |"达到80%阈值"|SP1 SP1 --> SP2 SP2 --> SP3 SP3 --> SP4 SP4 --> MG end subgraph ReduceTask["📊 Reduce Task (Reduce端)"] subgraph Copy["复制阶段"] CP[从各个Map Task
拉取属于自己的数据] end subgraph MergeSort["合并排序"] MS[合并所有数据
归并排序] end subgraph ReducePhase["Reduce阶段"] RD[调用Reduce函数
输出结果] end CP --> MS MS --> RD end MG --> |"HTTP传输"|CP style Buffer fill:#ffe66d style Spill fill:#ff6b6b style Copy fill:#4ecdc4

3.2 Map端Shuffle详解

sequenceDiagram participant Map as Map函数 participant Buffer as 环形缓冲区
(100MB) participant Disk as 本地磁盘 participant Reducer as Reducer Note over Map,Buffer: 阶段1:写入缓冲区 Map->>Buffer: 输出(K,V)对 Map->>Buffer: 继续输出... Note over Buffer: 缓冲区使用达到80% Note over Buffer,Disk: 阶段2:溢写(Spill) Buffer->>Buffer: 1. 根据Key计算分区号 Buffer->>Buffer: 2. 按(分区,Key)排序 Buffer->>Buffer: 3. 执行Combiner(可选) Buffer->>Disk: 4. 溢写到磁盘文件 Note over Map,Disk: 继续处理,可能多次溢写 Map->>Buffer: 继续输出... Buffer->>Disk: 再次溢写... Note over Disk: 阶段3:合并(Merge) Disk->>Disk: 合并所有溢写文件
生成一个分区有序文件 Note over Disk,Reducer: 阶段4:等待Reducer来拉取 Reducer->>Disk: HTTP请求获取数据 Disk->>Reducer: 返回对应分区的数据

关键点解释

环形缓冲区(100MB)

  • Map输出先写到内存缓冲区,不是直接写磁盘
  • 默认100MB,可以通过mapreduce.task.io.sort.mb配置
  • 为什么是"环形"?写满一圈可以继续覆盖已溢写的部分

溢写阈值(80%)

  • 缓冲区用到80%就开始溢写,不等用满
  • 这样可以边写边溢,提高效率
  • 参数:mapreduce.map.sort.spill.percent

分区(Partition)

  • 决定每条记录去哪个Reducer
  • 默认用Key的HashCode对Reducer数量取模
  • 可以自定义Partitioner实现特殊分区逻辑
// 默认分区逻辑
public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

3.3 Reduce端Shuffle详解

flowchart TB subgraph Reduce端Shuffle["📥 Reduce端Shuffle"] subgraph Copy["复制阶段 Copy"] C1[启动多个线程] C2[从各个Map Task拉取数据] C3[先放内存,内存不够放磁盘] end subgraph Merge["合并阶段 Merge"] M1[内存中的数据] M2[磁盘上的数据] M3[归并排序合并] end subgraph Reduce["Reduce阶段"] R1[对每个Key调用reduce函数] R2[输出最终结果] end C1 --> C2 --> C3 C3 --> M1 C3 --> M2 M1 --> M3 M2 --> M3 M3 --> R1 --> R2 end style Copy fill:#4ecdc4 style Merge fill:#ffe66d style Reduce fill:#ff6b6b

Copy阶段

  • Reducer通过HTTP从各个Map Task拉取属于自己分区的数据
  • 默认5个并行复制线程(mapreduce.reduce.shuffle.parallelcopies
  • 数据先放内存,内存放不下才写磁盘

Merge阶段

  • 把从各个Mapper拉取的数据合并
  • 使用归并排序,保证最终数据按Key有序
  • 合并后相同Key的数据就挨在一起了

3.4 Shuffle的性能杀手

graph TB subgraph 性能问题["⚠️ Shuffle性能杀手"] A[数据倾斜] --> A1[某个Key数据特别多
一个Reducer累死
其他Reducer闲着] B[过多小文件] --> B1[大量溢写文件
磁盘IO爆炸] C[网络传输] --> C1[Map到Reduce跨节点传输
带宽成为瓶颈] D[内存不足] --> D1[频繁溢写磁盘
GC频繁] end style A fill:#ff6b6b style B fill:#ff6b6b style C fill:#ff6b6b style D fill:#ff6b6b

四、Combiner:Map端的小Reducer

4.1 Combiner是什么?

Combiner是一个在Map端运行的Mini Reducer,用来在数据传输前做一次本地聚合,减少网络传输量。

graph TB subgraph 没有Combiner["❌ 没有Combiner"] M1["Mapper1输出:
(Hello,1)(Hello,1)(Hello,1)"] --> |"传输3条"|R1[Reducer] M2["Mapper2输出:
(Hello,1)(Hello,1)"] --> |"传输2条"|R1 Note1[网络传输5条记录] end subgraph 有Combiner["✅ 有Combiner"] M3["Mapper1输出:
(Hello,1)(Hello,1)(Hello,1)"] --> C1[Combiner] C1 --> |"(Hello,3)"|R2[Reducer] M4["Mapper2输出:
(Hello,1)(Hello,1)"] --> C2[Combiner] C2 --> |"(Hello,2)"|R2 Note2[网络传输2条记录] end style Note1 fill:#ff6b6b style Note2 fill:#4ecdc4

4.2 Combiner的使用条件

重要:不是所有场景都能用Combiner!

graph TB subgraph 可以用["✅ 可以用Combiner"] A1[求和 Sum] --> A1D["(a,1)(a,1)→(a,2)→最终(a,5)
结果正确"] A2[求最大值 Max] --> A2D["(a,3)(a,7)→(a,7)→最终(a,10)
结果正确"] A3[求最小值 Min] --> A3D["同上"] end subgraph 不能用["❌ 不能用Combiner"] B1[求平均值 Avg] --> B1D["(a,1)(a,3)→avg=2
(a,5)(a,7)→avg=6
合并avg=(2+6)/2=4
实际avg=(1+3+5+7)/4=4
碰巧对了,但换个数就错"] B2[求中位数] --> B2D["局部中位数无法合并"] end style A1 fill:#4ecdc4 style A2 fill:#4ecdc4 style A3 fill:#4ecdc4 style B1 fill:#ff6b6b style B2 fill:#ff6b6b

判断标准:Combiner函数必须满足结合律和交换律

用数学表示:combine(combine(a,b), c) = combine(a, combine(b,c))

4.3 代码中使用Combiner

// 在Driver中添加一行即可
job.setCombinerClass(WordCountReducer.class);  // 通常Combiner逻辑和Reducer一样

五、MapReduce完整执行流程

让我们把所有知识串起来,看看一个MapReduce作业从提交到完成的完整流程:

sequenceDiagram participant Client as 客户端 participant RM as ResourceManager participant NM as NodeManager participant AM as ApplicationMaster participant MT as Map Task participant RT as Reduce Task participant HDFS as HDFS Note over Client,HDFS: 阶段1:作业提交 Client->>Client: 1. 检查输入输出路径 Client->>HDFS: 2. 计算分片(InputSplit) Client->>HDFS: 3. 上传作业资源(JAR、配置) Client->>RM: 4. 提交作业 Note over RM,AM: 阶段2:作业初始化 RM->>NM: 5. 分配Container给AM NM->>AM: 6. 启动ApplicationMaster AM->>HDFS: 7. 获取分片信息 AM->>AM: 8. 创建Map/Reduce Task Note over AM,MT: 阶段3:Map阶段 AM->>RM: 9. 申请Map Task资源 RM->>NM: 10. 分配Container NM->>MT: 11. 启动Map Task MT->>HDFS: 12. 读取输入分片 MT->>MT: 13. 执行Map函数 MT->>MT: 14. Shuffle写(分区、排序、溢写) MT->>AM: 15. 报告完成 Note over AM,RT: 阶段4:Reduce阶段 AM->>RM: 16. 申请Reduce Task资源 RM->>NM: 17. 分配Container NM->>RT: 18. 启动Reduce Task RT->>MT: 19. 拉取Map输出(Shuffle读) RT->>RT: 20. 合并排序 RT->>RT: 21. 执行Reduce函数 RT->>HDFS: 22. 写入最终结果 RT->>AM: 23. 报告完成 Note over Client,HDFS: 阶段5:作业完成 AM->>RM: 24. 作业完成,释放资源 Client->>Client: 25. 获取作业状态,结束

5.1 InputSplit vs Block

很多人分不清InputSplit和Block的区别:

graph TB subgraph 概念对比["📊 InputSplit vs Block"] subgraph Block["Block (物理概念)"] B1[HDFS存储的基本单位] B2[默认128MB] B3[实际存在于磁盘] end subgraph Split["InputSplit (逻辑概念)"] S1[MapReduce的输入单位] S2[默认等于Block大小] S3[只是逻辑划分,不实际存储] S4[可以跨Block] end end subgraph 关系["🔗 关系"] R1["通常 1 Split ≈ 1 Block"] R2["1个Split对应1个Map Task"] R3["Split可以自定义大小"] end style Block fill:#ff6b6b style Split fill:#4ecdc4

5.2 Map Task数量计算

Map Task数量 ≈ 输入数据总大小 / Split大小

例如:

  • 输入数据:1GB
  • Block/Split大小:128MB
  • Map Task数量 ≈ 1024MB / 128MB = 8个

5.3 Reduce Task数量

Reduce Task数量由你指定:

job.setNumReduceTasks(3);  // 设置3个Reducer

如何选择Reduce数量?

Reduce数量效果
太少单个Reducer负担重,可能OOM
太多启动开销大,产生太多小文件
经验值0.95 (集群节点数 每节点最大Container数)

六、MapReduce编程进阶

6.1 自定义数据类型

Hadoop的Key和Value必须实现特定接口:

graph TB subgraph 接口要求["📋 数据类型接口要求"] K[Key类型] --> K1[必须实现Writable] K --> K2[必须实现Comparable] K --> K3["即:WritableComparable接口"] V[Value类型] --> V1[必须实现Writable] end style K fill:#ff6b6b style V fill:#4ecdc4

自定义Writable示例

public class UserBehavior implements Writable {
    private String userId;
    private String action;
    private long timestamp;
    
    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(userId);
        out.writeUTF(action);
        out.writeLong(timestamp);
    }
    
    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        userId = in.readUTF();
        action = in.readUTF();
        timestamp = in.readLong();
    }
    
    // Getter/Setter省略...
}

6.2 自定义Partitioner

当默认的Hash分区不满足需求时:

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        String phone = key.toString();
        String province = phone.substring(0, 3);  // 手机号前3位
        
        switch (province) {
            case "136": return 0;
            case "137": return 1;
            case "138": return 2;
            case "139": return 3;
            default: return 4;
        }
    }
}

// Driver中设置
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);  // 必须和分区数一致!

6.3 多文件输出

// 使用MultipleOutputs实现多文件输出
public class MultiOutputReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private MultipleOutputs<Text, IntWritable> multipleOutputs;
    
    @Override
    protected void setup(Context context) {
        multipleOutputs = new MultipleOutputs<>(context);
    }
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        
        // 根据Key输出到不同文件
        if (key.toString().startsWith("A")) {
            multipleOutputs.write("A", key, new IntWritable(sum));
        } else {
            multipleOutputs.write("Other", key, new IntWritable(sum));
        }
    }
    
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

七、MapReduce调优指南

7.1 调优参数速查表

mindmap root((MapReduce调优)) Map端 mapreduce.task.io.sort.mb Map输出缓冲区大小 默认100MB 增大可减少溢写次数 mapreduce.map.sort.spill.percent 溢写阈值 默认0.8 mapreduce.task.io.sort.factor 合并文件数 默认10 Reduce端 mapreduce.reduce.shuffle.parallelcopies 并行复制线程数 默认5 mapreduce.reduce.shuffle.input.buffer.percent Shuffle内存占比 默认0.7 mapreduce.reduce.input.buffer.percent Reduce内存占比 默认0 通用 mapreduce.job.reduces Reducer数量 mapreduce.map.memory.mb Map内存 mapreduce.reduce.memory.mb Reduce内存

7.2 数据倾斜问题

这是MapReduce最常见的性能问题:

graph TB subgraph 数据倾斜["😱 数据倾斜现象"] A[99个Reducer 1分钟完成] B[1个Reducer 跑了1小时] C[整个Job等这1个] end subgraph 原因["🔍 原因分析"] D[某些Key数据量特别大] E[例如:null值、热门商品、大V用户] end subgraph 解决方案["💡 解决方案"] F[方案1:过滤异常Key] G[方案2:加随机前缀打散] H[方案3:自定义Partitioner] I[方案4:使用Combiner预聚合] J[方案5:增加Reducer数量] end A --> C B --> C D --> B E --> D style B fill:#ff0000,color:#fff

加随机前缀打散示例

// Map阶段:给热点Key加随机前缀
String newKey = random.nextInt(100) + "_" + originalKey;
context.write(new Text(newKey), value);

// 第一次Reduce:局部聚合
// 第二次MapReduce:去掉前缀,最终聚合

7.3 小文件问题(又是它!)

graph TB subgraph 问题["😰 小文件问题"] A[10000个1MB小文件] B[产生10000个Map Task] C[Map Task启动开销巨大] D[NameNode元数据压力] end subgraph 解决方案["💡 解决方案"] E[CombineTextInputFormat] F[把多个小文件合并成一个Split] G[大幅减少Map Task数量] end A --> B --> C A --> D E --> F --> G style C fill:#ff6b6b style D fill:#ff6b6b style G fill:#4ecdc4
// 使用CombineTextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 128 * 1024 * 1024);  // 128MB

八、MapReduce的局限性

说了这么多优点,也要客观看看MapReduce的缺点:

graph TB subgraph 局限性["⚠️ MapReduce的局限性"] A[只支持批处理] --> A1[不能做实时计算
延迟太高] B[编程模型受限] --> B1[只有Map和Reduce两个算子
复杂逻辑要写多个Job串联] C[中间结果写磁盘] --> C1[Shuffle大量磁盘IO
性能瓶颈] D[不支持迭代计算] --> D1[机器学习算法需要多次迭代
每次都要读写HDFS] E[学习成本高] --> E1[Java代码写一堆
不如SQL来得爽] end subgraph 替代方案["🚀 后来者"] F[Spark] --> F1[内存计算,比MR快10-100倍] G[Flink] --> G1[流批一体,真正的实时] H[Hive/Presto] --> H1[SQL on Hadoop,简单易用] end style A fill:#ff6b6b style B fill:#ff6b6b style C fill:#ff6b6b style D fill:#ff6b6b style E fill:#ff6b6b style F fill:#4ecdc4 style G fill:#4ecdc4 style H fill:#4ecdc4

🤔 那还学MapReduce干嘛?

  1. 它是分布式计算的基础思想,Spark/Flink都借鉴了它
  2. 很多老系统还在用,你得能维护
  3. 面试要考(这才是重点!)

九、本篇小结

graph TB subgraph 本篇知识点["📚 第二篇知识点总结"] A[MapReduce核心思想] --> A1[分而治之] A --> A2[Map:转换] A --> A3[Reduce:聚合] B[Shuffle过程] --> B1[Map端:分区→排序→溢写→合并] B --> B2[Reduce端:复制→合并→排序] C[优化技巧] --> C1[Combiner:本地预聚合] C --> C2[自定义Partitioner] C --> C3[CombineInputFormat:合并小文件] D[常见问题] --> D1[数据倾斜] D --> D2[小文件问题] D --> D3[内存不足] E[局限性] --> E1[只支持批处理] E --> E2[中间结果写磁盘] E --> E3[编程模型受限] end style A fill:#ff6b6b style B fill:#ffe66d style C fill:#4ecdc4 style D fill:#ff6b6b style E fill:#95a5a6

十、写在最后

到这里,MapReduce的核心内容就讲完了。我们学会了如何让大象干活,虽然这头大象干活的方式有点笨——每次干完都要把中间结果写到磁盘上,就像一个每写一行代码就要按一次Ctrl+S的程序员。

但不管怎么说,MapReduce开创了分布式计算的新纪元。在它之前,处理海量数据是少数巨头的特权;在它之后,任何公司只要有一堆便宜的PC服务器,就能处理PB级的数据。

下一篇,我们将学习YARN——Hadoop的资源管理系统,以及Hadoop生态圈的其他成员。

预告一下第三篇的内容:

  • YARN架构详解
  • ResourceManager、NodeManager、ApplicationMaster
  • YARN调度策略
  • Hadoop生态圈全景图(Hive、HBase、Spark、Flink...)

如果你现在觉得脑子有点乱,别担心,这很正常。当年我学Shuffle的时候,画了不下20遍流程图才搞明白。

最后送你一句话:

"MapReduce虽然慢,但它能跑完;你的Python脚本虽然快,但它会OOM。"

下一篇见!🐘


本文作者:一个曾经以为Shuffle是洗牌的程序员

上次翻车原因:Reducer设成1个,跑了一整夜

下期预告:《大数据之Hadoop从入门到放弃(三)——YARN:给大象找一群小伙伴》


附录:MapReduce面试高频题

  1. MapReduce的执行流程是什么?

    Input → Split → Map → Shuffle(Partition, Sort, Spill, Merge) → Copy → Merge → Reduce → Output
  2. Shuffle过程做了什么?

    Map端:分区、排序、溢写、合并;Reduce端:复制、合并、排序
  3. Combiner和Reducer的区别?

    Combiner在Map端本地运行,用于预聚合减少网络传输;Reducer在Reduce端运行,做最终聚合。Combiner必须满足结合律。
  4. 如何解决数据倾斜?

    增加Reducer数量、使用Combiner、自定义Partitioner、对热点Key加随机前缀打散、过滤异常Key
  5. MapReduce和Spark的区别?

    MR中间结果写磁盘,Spark基于内存;MR只有Map和Reduce两个算子,Spark有丰富的算子;MR不支持迭代计算,Spark支持。
  6. InputSplit和Block的关系?

    Block是HDFS物理存储单位,InputSplit是MR逻辑划分单位。通常1个Split对应1个Block,1个Split对应1个Map Task。
评论区
暂无评论
avatar