前言:为什么要学Hive
还记得第一次接触大数据的场景吗?
领导说:"我们有10TB的日志数据需要分析。"
你信心满满地打开MySQL:SELECT * FROM logs WHERE ...
然后...数据库就崩了。
领导又说:"用Hadoop吧。"
你打开MapReduce文档,看到要写几百行Java代码才能实现一个WordCount,当场想辞职。
这时候,Hive出现了——它让你用熟悉的SQL,操作不熟悉的大数据。
"Hive不是让大数据变简单了,而是让写SQL的人也能混进大数据圈子。" —— 某不愿透露姓名的数据工程师
一、Hive是什么
1.1 官方定义
Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。
1.2 人话翻译
Hive = SQL翻译器 + 元数据管理
你写SQL,Hive帮你翻译成MapReduce/Tez/Spark任务,然后在Hadoop集群上跑。
flowchart LR
A[你写的SQL] --> B[Hive]
B --> C[翻译成MR/Tez/Spark]
C --> D[Hadoop集群执行]
D --> E[返回结果]
style A fill:#e1f5fe
style B fill:#fff3e0
style E fill:#e8f5e9
1.3 Hive的定位
flowchart TB
subgraph 实时处理
direction LR
F[Flink SQL
流处理王者] P[Presto/Trino
交互式查询] end subgraph 批处理 direction LR H[Hive
批处理稳如狗] S[Spark SQL
批流一体] M[MapReduce
上古神器] end subgraph 传统数据库 MY[MySQL
小数据首选] end 批处理 -.->|数据量增大| 实时处理 传统数据库 -.->|数据量增大| 批处理
流处理王者] P[Presto/Trino
交互式查询] end subgraph 批处理 direction LR H[Hive
批处理稳如狗] S[Spark SQL
批流一体] M[MapReduce
上古神器] end subgraph 传统数据库 MY[MySQL
小数据首选] end 批处理 -.->|数据量增大| 实时处理 传统数据库 -.->|数据量增大| 批处理
1.4 Hive不是什么
| Hive是 | Hive不是 |
|---|---|
| 数据仓库工具 | 关系型数据库 |
| 批处理引擎 | 实时查询引擎 |
| OLAP分析 | OLTP事务处理 |
| 适合大数据量 | 适合频繁更新 |
| 高延迟高吞吐 | 低延迟响应 |
灵魂拷问:什么时候用Hive?
- ✅ 数据量大(GB~PB级别)
- ✅ 批量分析、报表统计
- ✅ 对延迟不敏感(秒级~分钟级)
- ❌ 需要毫秒级响应
- ❌ 需要频繁增删改
- ❌ 数据量小(MySQL就够了)
二、Hive架构
2.1 整体架构
flowchart TB
subgraph Client[客户端]
C1[CLI]
C2[JDBC/ODBC]
C3[Web UI]
C4[Beeline]
end
subgraph HiveServer[Hive服务层]
HS[HiveServer2]
D[Driver驱动器]
CP[编译器 Compiler]
OP[优化器 Optimizer]
EX[执行器 Executor]
end
subgraph MetaStore[元数据]
MS[Metastore服务]
DB[(MySQL/PostgreSQL)]
end
subgraph Compute[计算引擎]
MR[MapReduce]
TEZ[Tez]
SPARK[Spark]
end
subgraph Storage[存储层]
HDFS[(HDFS)]
end
C1 & C2 & C3 & C4 --> HS
HS --> D
D --> CP --> OP --> EX
D <--> MS
MS <--> DB
EX --> MR & TEZ & SPARK
MR & TEZ & SPARK --> HDFS
2.2 核心组件详解
2.2.1 Metastore(元数据存储)
Metastore是Hive的灵魂,存储了表的"身份证信息":
📦 Metastore存储内容
├── 数据库信息(名称、位置、属性)
├── 表信息
│ ├── 表名、列名、列类型
│ ├── 表类型(内部表/外部表)
│ ├── 存储位置(HDFS路径)
│ ├── 文件格式(TextFile/ORC/Parquet)
│ └── 分区信息
├── 分区信息
└── 用户权限信息重要:Metastore只存元数据,真正的数据在HDFS上!
2.2.2 Driver(驱动器)
SQL执行的总指挥:
sequenceDiagram
participant C as Client
participant D as Driver
participant CP as Compiler
participant MS as Metastore
participant OP as Optimizer
participant EX as Executor
participant YARN as YARN/计算引擎
C->>D: 1. 提交SQL
D->>CP: 2. 编译SQL
CP->>MS: 3. 获取元数据
MS-->>CP: 4. 返回表结构
CP->>CP: 5. 语法分析、语义分析
CP-->>D: 6. 生成执行计划
D->>OP: 7. 优化执行计划
OP-->>D: 8. 返回优化后的计划
D->>EX: 9. 执行
EX->>YARN: 10. 提交任务
YARN-->>EX: 11. 执行结果
EX-->>D: 12. 返回结果
D-->>C: 13. 展示结果
2.3 Metastore三种部署模式
flowchart TB
subgraph 内嵌模式
A1[Hive CLI] --> A2[Metastore]
A2 --> A3[(Derby)]
end
subgraph 本地模式
B1[Hive CLI] --> B2[Metastore]
B2 --> B3[(MySQL)]
end
subgraph 远程模式
C1[Hive CLI 1]
C2[Hive CLI 2]
C3[HiveServer2]
C1 & C2 & C3 --> C4[Metastore Service]
C4 --> C5[(MySQL)]
end
| 模式 | 特点 | 适用场景 |
|---|---|---|
| 内嵌模式 | Derby数据库,单用户 | 学习测试 |
| 本地模式 | 外部数据库,单Metastore | 开发环境 |
| 远程模式 | 独立Metastore服务 | 生产环境 |
三、Hive安装配置
3.1 前置条件
# 检查Java
java -version
# 要求:JDK 1.8+
# 检查Hadoop
hadoop version
# 要求:Hadoop 2.x/3.x
# 检查环境变量
echo $JAVA_HOME
echo $HADOOP_HOME3.2 安装步骤
# 1. 下载Hive
wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
# 2. 解压
tar -zxvf apache-hive-3.1.3-bin.tar.gz -C /opt/
mv /opt/apache-hive-3.1.3-bin /opt/hive
# 3. 配置环境变量
cat >> ~/.bashrc << 'EOF'
export HIVE_HOME=/opt/hive
export PATH=$PATH:$HIVE_HOME/bin
EOF
source ~/.bashrc
# 4. 解决Guava版本冲突(重要!)
rm $HIVE_HOME/lib/guava-19.0.jar
cp $HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $HIVE_HOME/lib/3.3 配置文件
hive-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- Metastore数据库配置 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive_metastore?createDatabaseIfNotExist=true&useSSL=false&serverTimezone=Asia/Shanghai</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive123</value>
</property>
<!-- Hive数据仓库位置 -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!-- 显示当前数据库 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<!-- 显示表头 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<!-- HiveServer2配置 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>0.0.0.0</value>
</property>
<!-- 执行引擎(mr/tez/spark) -->
<property>
<name>hive.execution.engine</name>
<value>tez</value>
</property>
<!-- 本地模式(小数据自动启用) -->
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
</property>
</configuration>3.4 初始化与启动
# 1. 上传MySQL驱动
cp mysql-connector-java-8.0.28.jar $HIVE_HOME/lib/
# 2. 初始化元数据库(只需执行一次)
schematool -dbType mysql -initSchema
# 3. 启动Metastore服务(后台运行)
nohup hive --service metastore > /var/log/hive/metastore.log 2>&1 &
# 4. 启动HiveServer2(后台运行)
nohup hive --service hiveserver2 > /var/log/hive/hiveserver2.log 2>&1 &
# 5. 验证服务
netstat -tlnp | grep -E "9083|10000"
# 9083: Metastore端口
# 10000: HiveServer2端口
# 6. 连接Hive
# 方式1:Hive CLI(不推荐,已弃用)
hive
# 方式2:Beeline(推荐)
beeline -u jdbc:hive2://localhost:10000 -n hive四、Hive数据类型
4.1 基本数据类型
-- 数值类型
TINYINT -- 1字节,-128 ~ 127
SMALLINT -- 2字节,-32768 ~ 32767
INT/INTEGER -- 4字节,-2^31 ~ 2^31-1
BIGINT -- 8字节,-2^63 ~ 2^63-1
FLOAT -- 4字节单精度浮点
DOUBLE -- 8字节双精度浮点
DECIMAL(p,s) -- 高精度小数,p总位数,s小数位数
-- 字符串类型
STRING -- 无限长度字符串(常用)
VARCHAR(n) -- 可变长度,最大n个字符
CHAR(n) -- 固定长度,n个字符
-- 日期时间类型
DATE -- 日期,格式:YYYY-MM-DD
TIMESTAMP -- 时间戳,格式:YYYY-MM-DD HH:MM:SS.fffffffff
INTERVAL -- 时间间隔
-- 布尔类型
BOOLEAN -- true/false
-- 二进制类型
BINARY -- 二进制数据4.2 复杂数据类型
-- ARRAY:数组,相同类型元素的集合
-- 示例:['apple', 'banana', 'cherry']
CREATE TABLE test_array (
id INT,
fruits ARRAY<STRING>
);
-- 访问:fruits[0] 获取第一个元素
-- MAP:键值对集合
-- 示例:{'name': 'Joey', 'age': '28'}
CREATE TABLE test_map (
id INT,
info MAP<STRING, STRING>
);
-- 访问:info['name'] 获取name的值
-- STRUCT:结构体,不同类型字段的集合
-- 示例:{'name': 'Joey', 'age': 28}
CREATE TABLE test_struct (
id INT,
user_info STRUCT<name:STRING, age:INT, email:STRING>
);
-- 访问:user_info.name 获取name字段
-- 复合嵌套示例
CREATE TABLE complex_table (
id INT,
name STRING,
scores ARRAY<INT>, -- 成绩数组
attributes MAP<STRING, STRING>, -- 属性映射
address STRUCT<city:STRING, street:STRING>, -- 地址结构
friends ARRAY<STRUCT<name:STRING, age:INT>> -- 朋友列表
);4.3 类型转换
-- 隐式转换(自动)
-- 小类型 → 大类型:TINYINT → SMALLINT → INT → BIGINT → FLOAT → DOUBLE
SELECT 1 + 1.5; -- INT自动转为DOUBLE
-- 显式转换(CAST)
SELECT CAST('123' AS INT); -- 字符串转整数
SELECT CAST(123 AS STRING); -- 整数转字符串
SELECT CAST('2024-01-15' AS DATE); -- 字符串转日期
SELECT CAST(1705286400 AS TIMESTAMP); -- 时间戳转换
-- 注意:转换失败返回NULL
SELECT CAST('abc' AS INT); -- 返回NULL五、DDL数据定义
5.1 数据库操作
-- 创建数据库
CREATE DATABASE IF NOT EXISTS mydb
COMMENT '我的数据库'
LOCATION '/user/hive/warehouse/mydb'
WITH DBPROPERTIES ('creator'='joey', 'date'='2024-01-15');
-- 查看数据库
SHOW DATABASES;
SHOW DATABASES LIKE 'my*'; -- 模糊匹配
-- 查看数据库详情
DESCRIBE DATABASE mydb;
DESCRIBE DATABASE EXTENDED mydb; -- 详细信息
-- 切换数据库
USE mydb;
-- 修改数据库
ALTER DATABASE mydb SET DBPROPERTIES ('editor'='claude');
-- 删除数据库
DROP DATABASE IF EXISTS mydb; -- 空数据库
DROP DATABASE IF EXISTS mydb CASCADE; -- 强制删除(包含表)5.2 表操作
5.2.1 创建表
-- 基础建表语法
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
(
column1 data_type [COMMENT 'column comment'],
column2 data_type,
...
)
[COMMENT 'table comment']
[PARTITIONED BY (partition_column data_type, ...)]
[CLUSTERED BY (column) [SORTED BY (column)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION 'hdfs_path']
[TBLPROPERTIES ('key'='value', ...)];5.2.2 内部表 vs 外部表
flowchart TB
subgraph 内部表[内部表 Managed Table]
A1[创建表] --> A2[Hive管理数据]
A2 --> A3[删除表]
A3 --> A4[元数据删除 + 数据删除]
end
subgraph 外部表[外部表 External Table]
B1[创建表] --> B2[数据独立存在]
B2 --> B3[删除表]
B3 --> B4[只删除元数据
数据保留] end style A4 fill:#ffcdd2 style B4 fill:#c8e6c9
数据保留] end style A4 fill:#ffcdd2 style B4 fill:#c8e6c9
-- 内部表(默认)
CREATE TABLE managed_table (
id INT,
name STRING
) STORED AS ORC;
-- 外部表(推荐生产环境使用)
CREATE EXTERNAL TABLE external_table (
id INT,
name STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/external_data/';
-- 查看表类型
DESCRIBE FORMATTED managed_table;
-- Table Type: MANAGED_TABLE 或 EXTERNAL_TABLE选择建议:
| 场景 | 推荐类型 | 原因 |
|---|---|---|
| 临时数据、中间表 | 内部表 | 方便清理 |
| 原始数据、共享数据 | 外部表 | 安全,数据不会误删 |
| ETL处理结果 | 外部表 | 数据重要 |
| 日志数据 | 外部表 | 数据源独立 |
5.2.3 分区表(重要!)
分区是Hive性能优化的核心手段!
-- 创建分区表
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
product_id BIGINT,
amount DECIMAL(10,2),
order_time TIMESTAMP
)
PARTITIONED BY (dt STRING, hour STRING) -- 按天和小时分区
STORED AS ORC;
-- 分区表的HDFS结构:
-- /user/hive/warehouse/orders/
-- ├── dt=2024-01-15/
-- │ ├── hour=00/
-- │ ├── hour=01/
-- │ └── ...
-- └── dt=2024-01-16/
-- ├── hour=00/
-- └── ...
-- 添加分区
ALTER TABLE orders ADD PARTITION (dt='2024-01-15', hour='00');
-- 删除分区
ALTER TABLE orders DROP PARTITION (dt='2024-01-15', hour='00');
-- 查看分区
SHOW PARTITIONS orders;
-- 动态分区(重要!)
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO orders PARTITION (dt, hour)
SELECT
order_id, user_id, product_id, amount, order_time,
DATE_FORMAT(order_time, 'yyyy-MM-dd') as dt,
DATE_FORMAT(order_time, 'HH') as hour
FROM source_orders;5.2.4 分桶表
-- 创建分桶表
CREATE TABLE user_bucket (
user_id BIGINT,
name STRING,
age INT
)
CLUSTERED BY (user_id) INTO 8 BUCKETS -- 按user_id分8个桶
STORED AS ORC;
-- 分桶原理:hash(user_id) % 8 决定数据进入哪个桶
-- 插入数据时需要开启分桶
SET hive.enforce.bucketing=true;
-- 分桶的好处:
-- 1. 提升JOIN性能(桶对桶JOIN)
-- 2. 提升采样效率
-- 3. 数据更均匀分布5.3 表操作语法大全
-- 查看表
SHOW TABLES;
SHOW TABLES IN mydb;
SHOW TABLES LIKE '*order*';
-- 查看表结构
DESC table_name;
DESC FORMATTED table_name; -- 详细信息
DESC EXTENDED table_name; -- 扩展信息
-- 查看建表语句
SHOW CREATE TABLE table_name;
-- 修改表名
ALTER TABLE old_name RENAME TO new_name;
-- 添加列
ALTER TABLE table_name ADD COLUMNS (
new_col1 STRING COMMENT '新列1',
new_col2 INT COMMENT '新列2'
);
-- 修改列
ALTER TABLE table_name CHANGE old_col new_col STRING COMMENT '修改后的列';
-- 替换所有列(危险!)
ALTER TABLE table_name REPLACE COLUMNS (
id INT,
name STRING
);
-- 修改表属性
ALTER TABLE table_name SET TBLPROPERTIES ('comment'='新注释');
-- 修改存储格式
ALTER TABLE table_name SET FILEFORMAT ORC;
-- 修改位置
ALTER TABLE table_name SET LOCATION '/new/path/';
-- 删除表
DROP TABLE IF EXISTS table_name;
-- 清空表(保留结构)
TRUNCATE TABLE table_name;
-- 注意:只能清空内部表!六、DML数据操作
6.1 数据加载
-- 从本地文件加载
LOAD DATA LOCAL INPATH '/local/path/data.txt'
INTO TABLE table_name;
-- 从本地加载并覆盖
LOAD DATA LOCAL INPATH '/local/path/data.txt'
OVERWRITE INTO TABLE table_name;
-- 从HDFS加载(会移动文件!)
LOAD DATA INPATH '/hdfs/path/data.txt'
INTO TABLE table_name;
-- 加载到分区
LOAD DATA LOCAL INPATH '/local/path/data.txt'
INTO TABLE orders PARTITION (dt='2024-01-15', hour='00');6.2 INSERT语句
-- 插入查询结果
INSERT INTO TABLE target_table
SELECT * FROM source_table;
-- 覆盖插入
INSERT OVERWRITE TABLE target_table
SELECT * FROM source_table;
-- 插入到分区
INSERT INTO TABLE orders PARTITION (dt='2024-01-15', hour='00')
SELECT order_id, user_id, product_id, amount, order_time
FROM source_orders
WHERE DATE_FORMAT(order_time, 'yyyy-MM-dd') = '2024-01-15'
AND DATE_FORMAT(order_time, 'HH') = '00';
-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE orders PARTITION (dt, hour)
SELECT
order_id, user_id, product_id, amount, order_time,
DATE_FORMAT(order_time, 'yyyy-MM-dd'),
DATE_FORMAT(order_time, 'HH')
FROM source_orders;
-- 多表插入(一次扫描,多处输出)
FROM source_table
INSERT OVERWRITE TABLE target1
SELECT col1, col2 WHERE condition1
INSERT OVERWRITE TABLE target2
SELECT col1, col3 WHERE condition2;
-- INSERT VALUES(Hive 0.14+,不推荐用于大数据量)
INSERT INTO TABLE users VALUES
(1, 'Joey', 28),
(2, 'Claude', 3);6.3 数据导出
-- 导出到本地目录
INSERT OVERWRITE LOCAL DIRECTORY '/local/output/'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
SELECT * FROM table_name;
-- 导出到HDFS
INSERT OVERWRITE DIRECTORY '/hdfs/output/'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
SELECT * FROM table_name;
-- 使用Hive命令导出
hive -e "SELECT * FROM mydb.users" > /local/users.txt
-- 导出为指定格式
INSERT OVERWRITE LOCAL DIRECTORY '/local/output/'
STORED AS ORC
SELECT * FROM table_name;6.4 更新和删除(ACID表)
Hive默认不支持UPDATE/DELETE,需要开启ACID特性:
-- 配置ACID支持
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;
-- 创建ACID表(必须是ORC格式+分桶)
CREATE TABLE acid_table (
id INT,
name STRING,
age INT
)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
-- 现在可以UPDATE了
UPDATE acid_table SET age = 30 WHERE id = 1;
-- DELETE也可以了
DELETE FROM acid_table WHERE id = 2;
-- MERGE语句(Hive 2.2+)
MERGE INTO target AS t
USING source AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET name = s.name
WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.name, s.age);七、DQL数据查询
7.1 基础查询
-- 基本SELECT
SELECT * FROM users;
SELECT id, name, age FROM users;
-- 别名
SELECT
id AS user_id,
name AS user_name,
age * 2 AS double_age
FROM users u;
-- DISTINCT去重
SELECT DISTINCT department FROM employees;
-- WHERE条件
SELECT * FROM users
WHERE age > 18
AND name LIKE 'J%'
AND department IN ('IT', 'HR')
AND create_time BETWEEN '2024-01-01' AND '2024-12-31';
-- LIMIT限制
SELECT * FROM users LIMIT 10;
SELECT * FROM users LIMIT 10 OFFSET 20; -- 从第21条开始取10条
-- 排序
SELECT * FROM users ORDER BY age DESC, name ASC;
-- CASE WHEN
SELECT
name,
age,
CASE
WHEN age < 18 THEN '未成年'
WHEN age < 60 THEN '成年'
ELSE '老年'
END AS age_group
FROM users;7.2 聚合查询
-- 聚合函数
SELECT
COUNT(*) AS total_count, -- 总数
COUNT(DISTINCT user_id) AS uv, -- 去重计数
SUM(amount) AS total_amount, -- 求和
AVG(amount) AS avg_amount, -- 平均值
MAX(amount) AS max_amount, -- 最大值
MIN(amount) AS min_amount -- 最小值
FROM orders;
-- GROUP BY分组
SELECT
department,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department;
-- HAVING过滤分组
SELECT
department,
COUNT(*) AS emp_count
FROM employees
GROUP BY department
HAVING COUNT(*) > 10;
-- GROUPING SETS(多维分析)
SELECT
department,
job_title,
COUNT(*) AS cnt
FROM employees
GROUP BY department, job_title
GROUPING SETS (
(department, job_title), -- 按部门+职位
(department), -- 只按部门
() -- 总计
);
-- CUBE(所有组合)
SELECT department, job_title, COUNT(*)
FROM employees
GROUP BY CUBE (department, job_title);
-- 等价于 GROUPING SETS ((dept, job), (dept), (job), ())
-- ROLLUP(层级汇总)
SELECT department, job_title, COUNT(*)
FROM employees
GROUP BY ROLLUP (department, job_title);
-- 等价于 GROUPING SETS ((dept, job), (dept), ())7.3 JOIN连接
flowchart LR
subgraph JOIN类型
A[INNER JOIN
交集] B[LEFT JOIN
左表全部+右表匹配] C[RIGHT JOIN
右表全部+左表匹配] D[FULL JOIN
并集] E[CROSS JOIN
笛卡尔积] F[LEFT SEMI JOIN
左表存在于右表] end
交集] B[LEFT JOIN
左表全部+右表匹配] C[RIGHT JOIN
右表全部+左表匹配] D[FULL JOIN
并集] E[CROSS JOIN
笛卡尔积] F[LEFT SEMI JOIN
左表存在于右表] end
-- INNER JOIN
SELECT
o.order_id,
o.amount,
u.name
FROM orders o
INNER JOIN users u ON o.user_id = u.id;
-- LEFT JOIN
SELECT
u.name,
COUNT(o.order_id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.name;
-- 多表JOIN
SELECT
o.order_id,
u.name AS user_name,
p.name AS product_name
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN products p ON o.product_id = p.id;
-- LEFT SEMI JOIN(类似IN子查询,但更高效)
SELECT u.*
FROM users u
LEFT SEMI JOIN orders o ON u.id = o.user_id;
-- 等价于:SELECT * FROM users WHERE id IN (SELECT user_id FROM orders)
-- CROSS JOIN(笛卡尔积,慎用!)
SELECT * FROM table1 CROSS JOIN table2;
-- Map-side JOIN(小表广播,重要优化!)
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=25000000; -- 25MB以下自动启用
SELECT /*+ MAPJOIN(small_table) */
b.id, s.name
FROM big_table b
JOIN small_table s ON b.id = s.id;7.4 子查询
-- WHERE子查询
SELECT * FROM users
WHERE id IN (SELECT user_id FROM orders WHERE amount > 1000);
-- FROM子查询
SELECT dept, avg_salary
FROM (
SELECT department AS dept, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
) t
WHERE avg_salary > 10000;
-- 相关子查询
SELECT * FROM users u
WHERE EXISTS (
SELECT 1 FROM orders o
WHERE o.user_id = u.id AND o.amount > 1000
);
-- WITH子句(CTE,推荐!)
WITH
dept_stats AS (
SELECT department, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
),
high_salary_dept AS (
SELECT department FROM dept_stats WHERE avg_salary > 10000
)
SELECT e.*
FROM employees e
JOIN high_salary_dept h ON e.department = h.department;7.5 窗口函数(重点!)
窗口函数是Hive的杀手锏,解决"组内排名"、"累计计算"等问题。
-- 窗口函数语法
function_name(args) OVER (
[PARTITION BY column1, ...]
[ORDER BY column2, ...]
[ROWS/RANGE BETWEEN ... AND ...]
)-- 排名函数
SELECT
user_id,
department,
salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rn, -- 行号,不重复
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk, -- 排名,有间隔
DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS drnk, -- 排名,无间隔
NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS quartile -- 分组
FROM employees;
-- 聚合窗口函数
SELECT
order_date,
amount,
SUM(amount) OVER (ORDER BY order_date) AS cumulative_sum, -- 累计和
AVG(amount) OVER (ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS moving_avg_7d, -- 7日移动平均
SUM(amount) OVER (PARTITION BY user_id) AS user_total -- 用户总额
FROM orders;
-- 偏移函数
SELECT
order_date,
amount,
LAG(amount, 1, 0) OVER (ORDER BY order_date) AS prev_amount, -- 上一行
LEAD(amount, 1, 0) OVER (ORDER BY order_date) AS next_amount, -- 下一行
FIRST_VALUE(amount) OVER (PARTITION BY user_id ORDER BY order_date) AS first_order, -- 第一个值
LAST_VALUE(amount) OVER (PARTITION BY user_id ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_order -- 最后一个值
FROM orders;
-- 实战:取每个部门薪资TOP 3
SELECT * FROM (
SELECT
department,
name,
salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rn
FROM employees
) t
WHERE rn <= 3;
-- 实战:计算环比增长率
SELECT
month,
revenue,
LAG(revenue) OVER (ORDER BY month) AS prev_revenue,
(revenue - LAG(revenue) OVER (ORDER BY month)) / LAG(revenue) OVER (ORDER BY month) * 100 AS growth_rate
FROM monthly_revenue;7.6 窗口边界详解
-- ROWS BETWEEN 语法
ROWS BETWEEN start_point AND end_point
-- start_point / end_point 可选值:
-- UNBOUNDED PRECEDING : 从分区第一行开始
-- n PRECEDING : 前n行
-- CURRENT ROW : 当前行
-- n FOLLOWING : 后n行
-- UNBOUNDED FOLLOWING : 到分区最后一行
-- 示例
SUM(amount) OVER (ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) -- 累计到当前
SUM(amount) OVER (ORDER BY date ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) -- 前后各2行
AVG(amount) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) -- 7日移动平均八、文件格式与压缩
8.1 文件格式对比
flowchart TB
subgraph 行式存储
A[TextFile] --> A1[人类可读
压缩率低
查询慢] B[SequenceFile] --> B1[二进制
可分割
压缩一般] end subgraph 列式存储 C[ORC] --> C1[压缩率高
查询快
Hive原生] D[Parquet] --> D1[压缩率高
查询快
通用格式] end style C fill:#c8e6c9 style D fill:#c8e6c9
压缩率低
查询慢] B[SequenceFile] --> B1[二进制
可分割
压缩一般] end subgraph 列式存储 C[ORC] --> C1[压缩率高
查询快
Hive原生] D[Parquet] --> D1[压缩率高
查询快
通用格式] end style C fill:#c8e6c9 style D fill:#c8e6c9
| 格式 | 存储方式 | 压缩率 | 查询性能 | 适用场景 |
|---|---|---|---|---|
| TextFile | 行式 | 低 | 慢 | 原始数据导入 |
| SequenceFile | 行式 | 中 | 中 | 中间结果存储 |
| ORC | 列式 | 高 | 快 | Hive首选 |
| Parquet | 列式 | 高 | 快 | 跨平台首选 |
| Avro | 行式 | 中 | 中 | Schema演进 |
8.2 ORC格式详解
-- 创建ORC表
CREATE TABLE orc_table (
id BIGINT,
name STRING,
amount DECIMAL(10,2)
)
STORED AS ORC
TBLPROPERTIES (
'orc.compress'='ZLIB', -- 压缩方式:NONE/ZLIB/SNAPPY/LZ4
'orc.compress.size'='262144', -- 压缩块大小
'orc.stripe.size'='67108864', -- Stripe大小(64MB)
'orc.row.index.stride'='10000', -- 索引步长
'orc.create.index'='true' -- 创建索引
);
-- 将TextFile转为ORC
INSERT OVERWRITE TABLE orc_table
SELECT * FROM text_table;
-- 查看ORC文件信息
hive --orcfiledump /user/hive/warehouse/orc_table/000000_08.3 Parquet格式
-- 创建Parquet表
CREATE TABLE parquet_table (
id BIGINT,
name STRING,
amount DECIMAL(10,2)
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.compression'='SNAPPY' -- 压缩方式:UNCOMPRESSED/SNAPPY/GZIP/LZO
);8.4 压缩配置
-- 开启中间结果压缩
SET hive.exec.compress.intermediate=true;
SET mapreduce.map.output.compress=true;
SET mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 开启最终输出压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;| 压缩格式 | 压缩率 | 压缩速度 | 是否可分割 | 推荐场景 |
|---|---|---|---|---|
| GZIP | 高 | 慢 | 否 | 冷数据归档 |
| Snappy | 中 | 快 | 否 | 热数据查询 |
| LZ4 | 中 | 最快 | 否 | 实时处理 |
| ZLIB | 高 | 中 | 否 | ORC默认 |
| LZO | 中 | 快 | 是 | 需要可分割 |
| BZIP2 | 最高 | 最慢 | 是 | 极致压缩 |
九、Hive函数大全
9.1 字符串函数
-- 长度和截取
LENGTH('hello') -- 5
SUBSTR('hello', 2, 3) -- 'ell'(从第2位取3个字符)
SUBSTRING('hello', 2) -- 'ello'(从第2位到结尾)
-- 拼接
CONCAT('hello', ' ', 'world') -- 'hello world'
CONCAT_WS(',', 'a', 'b', 'c') -- 'a,b,c'
-- 大小写
UPPER('hello') -- 'HELLO'
LOWER('HELLO') -- 'hello'
INITCAP('hello world') -- 'Hello World'
-- 去空格
TRIM(' hello ') -- 'hello'
LTRIM(' hello') -- 'hello'
RTRIM('hello ') -- 'hello'
-- 替换
REPLACE('hello', 'l', 'L') -- 'heLLo'
REGEXP_REPLACE('hello123', '[0-9]', '') -- 'hello'
TRANSLATE('hello', 'el', 'ip') -- 'hippo'
-- 分割
SPLIT('a,b,c', ',') -- ['a', 'b', 'c']
SPLIT('a,b,c', ',')[0] -- 'a'
-- 正则
REGEXP_EXTRACT('hello123world', '([a-z]+)([0-9]+)([a-z]+)', 2) -- '123'
-- 查找
INSTR('hello', 'l') -- 3(第一次出现位置)
LOCATE('l', 'hello') -- 3
-- 填充
LPAD('hi', 5, '0') -- '000hi'
RPAD('hi', 5, '0') -- 'hi000'
-- JSON解析
GET_JSON_OBJECT('{"name":"joey"}', '$.name') -- 'joey'9.2 日期函数
-- 当前日期时间
CURRENT_DATE() -- '2024-01-15'
CURRENT_TIMESTAMP() -- '2024-01-15 10:30:00'
UNIX_TIMESTAMP() -- 1705286400(当前时间戳)
-- 日期转换
TO_DATE('2024-01-15 10:30:00') -- '2024-01-15'
DATE_FORMAT('2024-01-15', 'yyyyMMdd') -- '20240115'
FROM_UNIXTIME(1705286400) -- '2024-01-15 00:00:00'
UNIX_TIMESTAMP('2024-01-15', 'yyyy-MM-dd') -- 1705248000
-- 日期计算
DATE_ADD('2024-01-15', 7) -- '2024-01-22'
DATE_SUB('2024-01-15', 7) -- '2024-01-08'
DATEDIFF('2024-01-15', '2024-01-01') -- 14
MONTHS_BETWEEN('2024-03-01', '2024-01-01') -- 2.0
ADD_MONTHS('2024-01-15', 2) -- '2024-03-15'
-- 日期提取
YEAR('2024-01-15') -- 2024
MONTH('2024-01-15') -- 1
DAY('2024-01-15') -- 15
HOUR('2024-01-15 10:30:45') -- 10
MINUTE('2024-01-15 10:30:45') -- 30
SECOND('2024-01-15 10:30:45') -- 45
WEEKOFYEAR('2024-01-15') -- 3
DAYOFWEEK('2024-01-15') -- 2(周一=2)
-- 日期截断
TRUNC('2024-01-15', 'MM') -- '2024-01-01'(月初)
TRUNC('2024-01-15', 'YYYY') -- '2024-01-01'(年初)
-- 最后一天
LAST_DAY('2024-01-15') -- '2024-01-31'
NEXT_DAY('2024-01-15', 'MON') -- '2024-01-22'(下个周一)9.3 数学函数
-- 取整
ROUND(3.1415, 2) -- 3.14
FLOOR(3.9) -- 3
CEIL(3.1) -- 4
TRUNCATE(3.1415, 2) -- 3.14(截断)
-- 数学运算
ABS(-10) -- 10
POWER(2, 10) -- 1024
SQRT(16) -- 4.0
LOG(10, 100) -- 2.0
LN(2.718) -- ≈1
EXP(1) -- 2.718...
-- 随机
RAND() -- 0~1之间的随机数
RAND(42) -- 指定种子的随机数
-- 最大最小
GREATEST(1, 2, 3) -- 3
LEAST(1, 2, 3) -- 1
-- 取模
MOD(10, 3) -- 1
PMOD(-10, 3) -- 2(正取模)
-- 进制转换
CONV('1010', 2, 10) -- '10'(二进制转十进制)
HEX(255) -- 'FF'
UNHEX('FF') -- 二进制数据9.4 条件函数
-- IF
IF(score >= 60, '及格', '不及格')
-- CASE WHEN
CASE
WHEN score >= 90 THEN 'A'
WHEN score >= 80 THEN 'B'
WHEN score >= 60 THEN 'C'
ELSE 'D'
END
-- COALESCE(返回第一个非NULL值)
COALESCE(col1, col2, 'default')
-- NVL(NULL值替换)
NVL(col, 'default')
-- NULLIF(相等则返回NULL)
NULLIF(a, b) -- 如果a=b返回NULL,否则返回a
-- ISNULL / ISNOTNULL
ISNULL(col)
ISNOTNULL(col)
-- DECODE(类似CASE)
DECODE(status, 1, '启用', 2, '禁用', '未知')9.5 集合函数
-- ARRAY操作
ARRAY(1, 2, 3) -- 创建数组
ARRAY_CONTAINS(arr, 2) -- 是否包含
SIZE(arr) -- 数组大小
SORT_ARRAY(arr) -- 排序
ARRAY_DISTINCT(arr) -- 去重(Hive 2.3+)
-- MAP操作
MAP('a', 1, 'b', 2) -- 创建Map
MAP_KEYS(m) -- 获取所有key
MAP_VALUES(m) -- 获取所有value
SIZE(m) -- Map大小
-- 集合运算
ARRAY_UNION(arr1, arr2) -- 并集
ARRAY_INTERSECT(arr1, arr2) -- 交集
ARRAY_EXCEPT(arr1, arr2) -- 差集
-- 展开(炸裂)
-- EXPLODE:将数组/Map展开为多行
SELECT EXPLODE(ARRAY(1, 2, 3)); -- 输出3行
-- LATERAL VIEW:配合EXPLODE使用
SELECT
id,
tag
FROM users
LATERAL VIEW EXPLODE(tags) t AS tag;
-- POSEXPLODE:带位置的展开
SELECT
id,
pos,
tag
FROM users
LATERAL VIEW POSEXPLODE(tags) t AS pos, tag;9.6 行转列 / 列转行
-- 行转列(多行合并为一行)
-- COLLECT_SET:去重聚合
-- COLLECT_LIST:保留重复
SELECT
user_id,
COLLECT_SET(tag) AS tags,
COLLECT_LIST(tag) AS all_tags
FROM user_tags
GROUP BY user_id;
-- 结合CONCAT_WS
SELECT
user_id,
CONCAT_WS(',', COLLECT_SET(tag)) AS tag_str
FROM user_tags
GROUP BY user_id;
-- 列转行(一行拆分为多行)
SELECT
user_id,
tag
FROM user_info
LATERAL VIEW EXPLODE(SPLIT(tags, ',')) t AS tag;十、UDF自定义函数
10.1 UDF类型
flowchart LR
A[Hive UDF] --> B[UDF]
A --> C[UDAF]
A --> D[UDTF]
B --> B1[一进一出
如:upper, substr] C --> C1[多进一出
如:sum, count] D --> D1[一进多出
如:explode]
如:upper, substr] C --> C1[多进一出
如:sum, count] D --> D1[一进多出
如:explode]
10.2 编写UDF
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
/**
* 手机号脱敏UDF
* 使用:mask_phone('13812345678') → '138****5678'
*/
@Description(
name = "mask_phone",
value = "_FUNC_(phone) - 手机号脱敏",
extended = "Example: SELECT mask_phone('13812345678') → '138****5678'"
)
public class MaskPhoneUDF extends GenericUDF {
private StringObjectInspector inputOI;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
// 参数校验
if (arguments.length != 1) {
throw new UDFArgumentException("mask_phone只接受一个参数");
}
if (!(arguments[0] instanceof StringObjectInspector)) {
throw new UDFArgumentException("mask_phone参数必须是字符串");
}
inputOI = (StringObjectInspector) arguments[0];
// 返回类型
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (arguments[0].get() == null) {
return null;
}
String phone = inputOI.getPrimitiveJavaObject(arguments[0].get());
if (phone == null || phone.length() != 11) {
return phone;
}
// 脱敏处理
return phone.substring(0, 3) + "****" + phone.substring(7);
}
@Override
public String getDisplayString(String[] children) {
return "mask_phone(" + children[0] + ")";
}
}10.3 编写UDAF
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* 字符串拼接聚合函数
* 使用:string_agg(name, ',') → 'a,b,c'
*/
@Description(
name = "string_agg",
value = "_FUNC_(column, separator) - 字符串聚合"
)
public class StringAggUDAF extends UDAF {
public static class StringAggEvaluator implements UDAFEvaluator {
private StringBuilder result;
private String separator;
public StringAggEvaluator() {
super();
init();
}
@Override
public void init() {
result = new StringBuilder();
separator = ",";
}
// 迭代(处理每一行)
public boolean iterate(String value, String sep) {
if (sep != null) {
separator = sep;
}
if (value != null) {
if (result.length() > 0) {
result.append(separator);
}
result.append(value);
}
return true;
}
// 部分聚合结果
public String terminatePartial() {
return result.toString();
}
// 合并部分结果
public boolean merge(String partial) {
if (partial != null && !partial.isEmpty()) {
if (result.length() > 0) {
result.append(separator);
}
result.append(partial);
}
return true;
}
// 最终结果
public String terminate() {
return result.toString();
}
}
}10.4 注册和使用UDF
-- 方式1:临时函数(会话级别)
ADD JAR /path/to/my-udf.jar;
CREATE TEMPORARY FUNCTION mask_phone AS 'com.example.hive.udf.MaskPhoneUDF';
-- 使用
SELECT mask_phone(phone) FROM users;
-- 方式2:永久函数(Hive 0.13+)
CREATE FUNCTION mydb.mask_phone AS 'com.example.hive.udf.MaskPhoneUDF'
USING JAR 'hdfs:///user/hive/lib/my-udf.jar';
-- 查看函数
SHOW FUNCTIONS LIKE '*mask*';
DESCRIBE FUNCTION mask_phone;
DESCRIBE FUNCTION EXTENDED mask_phone;
-- 删除函数
DROP FUNCTION IF EXISTS mydb.mask_phone;十一、性能优化
11.1 优化总览
mindmap
root((Hive优化))
数据层优化
分区裁剪
列裁剪
文件格式ORC/Parquet
压缩
SQL层优化
避免SELECT *
谓词下推
JOIN优化
GROUP BY优化
引擎层优化
执行引擎选择
并行执行
本地模式
向量化
资源层优化
Map/Reduce数量
内存配置
Container配置
11.2 数据层优化
-- 1. 使用分区表(最重要!)
-- 不好:全表扫描
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- 好:分区裁剪
SELECT * FROM orders WHERE dt = '2024-01-15';
-- 2. 使用ORC/Parquet格式
CREATE TABLE orders_orc STORED AS ORC AS SELECT * FROM orders;
-- 3. 开启压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 4. 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000; -- 256MB
SET hive.merge.smallfiles.avgsize=128000000; -- 128MB11.3 SQL层优化
-- 1. 避免SELECT *
-- 不好
SELECT * FROM users;
-- 好
SELECT id, name, age FROM users;
-- 2. 分区字段放在WHERE最前面
-- 不好
SELECT * FROM orders WHERE amount > 100 AND dt = '2024-01-15';
-- 好
SELECT * FROM orders WHERE dt = '2024-01-15' AND amount > 100;
-- 3. 谓词下推
SET hive.optimize.ppd=true;
-- 4. JOIN优化 - 小表在前(或用MAPJOIN)
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=25000000;
-- 5. GROUP BY优化
SET hive.groupby.skewindata=true; -- 数据倾斜时开启
-- 6. COUNT DISTINCT优化
-- 不好:单个Reducer
SELECT COUNT(DISTINCT user_id) FROM orders;
-- 好:先GROUP BY再COUNT
SELECT COUNT(*) FROM (
SELECT user_id FROM orders GROUP BY user_id
) t;
-- 7. 多重INSERT
FROM source_table
INSERT OVERWRITE TABLE target1 SELECT col1, col2 WHERE condition1
INSERT OVERWRITE TABLE target2 SELECT col1, col3 WHERE condition2;11.4 执行引擎优化
-- 使用Tez引擎(比MR快很多)
SET hive.execution.engine=tez;
-- 或使用Spark引擎
SET hive.execution.engine=spark;
-- 开启向量化执行(大幅提升性能)
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
-- 开启CBO(基于代价的优化器)
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
-- 收集统计信息(CBO依赖)
ANALYZE TABLE orders COMPUTE STATISTICS;
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS;
-- 并行执行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;
-- 本地模式(小数据自动启用)
SET hive.exec.mode.local.auto=true;
SET hive.exec.mode.local.auto.inputbytes.max=134217728; -- 128MB
SET hive.exec.mode.local.auto.input.files.max=4;11.5 数据倾斜处理
数据倾斜是Hive性能杀手!
flowchart TB
A[数据倾斜] --> B{倾斜类型}
B --> C[JOIN倾斜]
B --> D[GROUP BY倾斜]
B --> E[COUNT DISTINCT倾斜]
C --> C1[小表广播]
C --> C2[倾斜Key单独处理]
C --> C3[加盐打散]
D --> D1[两阶段聚合]
D --> D2[hive.groupby.skewindata]
E --> E1[改写为GROUP BY]
E --> E2[加盐去重]
-- 1. JOIN倾斜 - 小表广播
SET hive.auto.convert.join=true;
SELECT /*+ MAPJOIN(small_table) */ *
FROM big_table JOIN small_table ON ...;
-- 2. JOIN倾斜 - 倾斜Key单独处理
-- 假设user_id=0是异常数据
SELECT * FROM orders WHERE user_id = 0
UNION ALL
SELECT * FROM orders o JOIN users u ON o.user_id = u.id
WHERE o.user_id != 0;
-- 3. JOIN倾斜 - 加盐打散
-- 大表加随机前缀
SELECT /*+ MAPJOIN(small_table_expand) */
regexp_replace(t1.salted_key, '^[0-9]+_', '') AS key,
t1.value
FROM (
SELECT CONCAT(CAST(RAND() * 10 AS INT), '_', key) AS salted_key, value
FROM big_table
) t1
JOIN (
-- 小表复制10份
SELECT CONCAT(salt, '_', key) AS salted_key, value
FROM small_table
LATERAL VIEW EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9)) t AS salt
) small_table_expand
ON t1.salted_key = small_table_expand.salted_key;
-- 4. GROUP BY倾斜
SET hive.groupby.skewindata=true;
-- 原理:两阶段聚合,第一阶段随机分发
-- 5. COUNT DISTINCT倾斜
-- 改写为GROUP BY
SELECT COUNT(*) FROM (
SELECT user_id FROM orders GROUP BY user_id
) t;
-- 6. 手动两阶段聚合
SELECT
key,
SUM(cnt) AS total_cnt
FROM (
-- 第一阶段:加盐聚合
SELECT
key,
COUNT(*) AS cnt
FROM (
SELECT key, CONCAT(key, '_', CAST(RAND() * 100 AS INT)) AS salted_key
FROM source_table
) t
GROUP BY key, salted_key
) t2
GROUP BY key;11.6 Map和Reduce数量调优
-- Map数量优化
-- 合并小文件,减少Map数
SET mapreduce.input.fileinputformat.split.minsize=268435456; -- 256MB
-- 或拆分大文件,增加Map数
SET mapreduce.input.fileinputformat.split.maxsize=134217728; -- 128MB
-- Reduce数量优化
SET hive.exec.reducers.bytes.per.reducer=256000000; -- 每个Reducer处理256MB
SET hive.exec.reducers.max=1000; -- 最大Reducer数
SET mapreduce.job.reduces=10; -- 手动指定Reducer数
-- 只有Map任务(无Reduce)
SET mapreduce.job.reduces=0;
-- 或使用 ORDER BY 改为 SORT BY + DISTRIBUTE BY十二、Hive实战案例
12.1 用户行为分析
-- 场景:分析用户留存率
-- 1. 创建用户行为表
CREATE EXTERNAL TABLE user_actions (
user_id STRING,
action_type STRING,
action_time TIMESTAMP,
page_id STRING,
duration INT
)
PARTITIONED BY (dt STRING)
STORED AS ORC
LOCATION '/data/user_actions';
-- 2. 计算次日留存率
WITH first_day AS (
-- 获取用户首次访问日期
SELECT
user_id,
MIN(dt) AS first_date
FROM user_actions
GROUP BY user_id
),
retention AS (
SELECT
f.first_date,
f.user_id,
CASE WHEN a.user_id IS NOT NULL THEN 1 ELSE 0 END AS is_retained
FROM first_day f
LEFT JOIN (
SELECT DISTINCT user_id, dt FROM user_actions
) a ON f.user_id = a.user_id
AND DATE_ADD(f.first_date, 1) = a.dt
)
SELECT
first_date,
COUNT(DISTINCT user_id) AS new_users,
SUM(is_retained) AS retained_users,
ROUND(SUM(is_retained) / COUNT(DISTINCT user_id) * 100, 2) AS retention_rate
FROM retention
GROUP BY first_date
ORDER BY first_date;12.2 漏斗分析
-- 场景:电商转化漏斗
WITH funnel AS (
SELECT
user_id,
MAX(CASE WHEN action_type = 'view' THEN 1 ELSE 0 END) AS step1_view,
MAX(CASE WHEN action_type = 'add_cart' THEN 1 ELSE 0 END) AS step2_cart,
MAX(CASE WHEN action_type = 'checkout' THEN 1 ELSE 0 END) AS step3_checkout,
MAX(CASE WHEN action_type = 'pay' THEN 1 ELSE 0 END) AS step4_pay
FROM user_actions
WHERE dt = '2024-01-15'
GROUP BY user_id
)
SELECT
'Step1: 浏览' AS step_name,
COUNT(*) AS user_count,
100.0 AS conversion_rate
FROM funnel WHERE step1_view = 1
UNION ALL
SELECT
'Step2: 加购',
SUM(step2_cart),
ROUND(SUM(step2_cart) / COUNT(*) * 100, 2)
FROM funnel WHERE step1_view = 1
UNION ALL
SELECT
'Step3: 结算',
SUM(step3_checkout),
ROUND(SUM(step3_checkout) / SUM(step2_cart) * 100, 2)
FROM funnel WHERE step2_cart = 1
UNION ALL
SELECT
'Step4: 支付',
SUM(step4_pay),
ROUND(SUM(step4_pay) / SUM(step3_checkout) * 100, 2)
FROM funnel WHERE step3_checkout = 1;12.3 用户分层RFM模型
-- RFM模型:Recency(最近)、Frequency(频率)、Monetary(金额)
WITH rfm_base AS (
SELECT
user_id,
DATEDIFF('2024-01-15', MAX(order_date)) AS recency,
COUNT(DISTINCT order_id) AS frequency,
SUM(amount) AS monetary
FROM orders
WHERE dt >= '2023-01-15' AND dt <= '2024-01-15'
GROUP BY user_id
),
rfm_score AS (
SELECT
user_id,
recency,
frequency,
monetary,
NTILE(5) OVER (ORDER BY recency ASC) AS r_score, -- 越近越好,正序
NTILE(5) OVER (ORDER BY frequency DESC) AS f_score, -- 越多越好,倒序
NTILE(5) OVER (ORDER BY monetary DESC) AS m_score -- 越高越好,倒序
FROM rfm_base
)
SELECT
user_id,
recency,
frequency,
monetary,
r_score,
f_score,
m_score,
CASE
WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN '重要价值用户'
WHEN r_score >= 4 AND f_score < 4 AND m_score >= 4 THEN '重要发展用户'
WHEN r_score < 4 AND f_score >= 4 AND m_score >= 4 THEN '重要保持用户'
WHEN r_score < 4 AND f_score < 4 AND m_score >= 4 THEN '重要挽留用户'
WHEN r_score >= 4 AND f_score >= 4 AND m_score < 4 THEN '一般价值用户'
WHEN r_score >= 4 AND f_score < 4 AND m_score < 4 THEN '一般发展用户'
WHEN r_score < 4 AND f_score >= 4 AND m_score < 4 THEN '一般保持用户'
ELSE '一般挽留用户'
END AS user_segment
FROM rfm_score;12.4 连续登录天数
-- 经典SQL问题:计算用户最大连续登录天数
WITH login_data AS (
SELECT DISTINCT user_id, dt
FROM user_actions
WHERE action_type = 'login'
),
grouped AS (
SELECT
user_id,
dt,
DATE_SUB(dt, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY dt)) AS grp
FROM login_data
)
SELECT
user_id,
MIN(dt) AS start_date,
MAX(dt) AS end_date,
COUNT(*) AS consecutive_days
FROM grouped
GROUP BY user_id, grp
ORDER BY user_id, start_date;
-- 获取每个用户的最大连续登录天数
WITH login_data AS (
SELECT DISTINCT user_id, dt FROM user_actions WHERE action_type = 'login'
),
grouped AS (
SELECT
user_id, dt,
DATE_SUB(dt, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY dt)) AS grp
FROM login_data
),
consecutive AS (
SELECT user_id, COUNT(*) AS days
FROM grouped
GROUP BY user_id, grp
)
SELECT user_id, MAX(days) AS max_consecutive_days
FROM consecutive
GROUP BY user_id;十三、Hive与生态集成
13.1 Hive on Spark
-- 设置Spark为执行引擎
SET hive.execution.engine=spark;
-- Spark相关配置
SET spark.master=yarn;
SET spark.submit.deployMode=cluster;
SET spark.executor.memory=4g;
SET spark.executor.cores=2;
SET spark.executor.instances=10;13.2 Hive on Tez
-- 设置Tez为执行引擎
SET hive.execution.engine=tez;
-- Tez相关配置
SET tez.am.resource.memory.mb=4096;
SET tez.task.resource.memory.mb=2048;
SET hive.tez.container.size=4096;
SET hive.tez.java.opts=-Xmx3276m;13.3 HiveServer2 + JDBC
// Java JDBC连接Hive示例
import java.sql.*;
public class HiveJdbcExample {
private static final String DRIVER = "org.apache.hive.jdbc.HiveDriver";
private static final String URL = "jdbc:hive2://localhost:10000/default";
public static void main(String[] args) throws Exception {
Class.forName(DRIVER);
try (Connection conn = DriverManager.getConnection(URL, "hive", "");
Statement stmt = conn.createStatement()) {
// 执行查询
ResultSet rs = stmt.executeQuery("SELECT * FROM users LIMIT 10");
while (rs.next()) {
System.out.println(rs.getString("name"));
}
}
}
}13.4 Hive + HBase
-- 创建HBase外部表
CREATE EXTERNAL TABLE hbase_users (
row_key STRING,
name STRING,
age INT,
email STRING
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = ":key,info:name,info:age,info:email"
)
TBLPROPERTIES (
"hbase.table.name" = "users"
);
-- 查询HBase数据
SELECT * FROM hbase_users WHERE row_key = 'user001';十四、常见问题与排错
14.1 常见错误
# 1. Guava版本冲突
# 错误:java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument
# 解决:
rm $HIVE_HOME/lib/guava-*.jar
cp $HADOOP_HOME/share/hadoop/common/lib/guava-*.jar $HIVE_HOME/lib/
# 2. Metastore连接失败
# 错误:Unable to open a test connection to the given database
# 解决:检查MySQL服务、用户权限、JDBC驱动
# 3. 权限问题
# 错误:Permission denied: user=xxx, access=WRITE
# 解决:
hdfs dfs -chmod -R 777 /user/hive/warehouse
hdfs dfs -chown -R hive:hive /user/hive/warehouse
# 4. OOM内存溢出
# 错误:java.lang.OutOfMemoryError
# 解决:增加内存配置
SET mapreduce.map.memory.mb=4096;
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.map.java.opts=-Xmx3276m;
SET mapreduce.reduce.java.opts=-Xmx6553m;14.2 性能诊断
-- 查看执行计划
EXPLAIN SELECT * FROM orders WHERE dt = '2024-01-15';
-- 查看详细执行计划
EXPLAIN EXTENDED SELECT * FROM orders WHERE dt = '2024-01-15';
-- 查看执行依赖
EXPLAIN DEPENDENCY SELECT * FROM orders WHERE dt = '2024-01-15';
-- 查看向量化执行
EXPLAIN VECTORIZATION SELECT * FROM orders;
-- 分析表统计信息
ANALYZE TABLE orders COMPUTE STATISTICS;
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS;
-- 查看表统计信息
DESCRIBE FORMATTED orders;14.3 日志位置
# Hive日志
$HIVE_HOME/logs/
/tmp/$USER/hive.log
# HiveServer2日志
/var/log/hive/hiveserver2.log
# YARN应用日志
yarn logs -applicationId application_xxx
# 开启Debug日志
hive --hiveconf hive.root.logger=DEBUG,console十五、总结
Hive学习路线
flowchart LR
A[基础] --> B[进阶] --> C[优化] --> D[实战]
A --> A1[SQL基础
DDL/DML/DQL] A --> A2[数据类型
文件格式] B --> B1[窗口函数] B --> B2[UDF开发] B --> B3[分区分桶] C --> C1[执行计划分析] C --> C2[数据倾斜处理] C --> C3[参数调优] D --> D1[数仓建模] D --> D2[ETL开发] D --> D3[报表分析]
DDL/DML/DQL] A --> A2[数据类型
文件格式] B --> B1[窗口函数] B --> B2[UDF开发] B --> B3[分区分桶] C --> C1[执行计划分析] C --> C2[数据倾斜处理] C --> C3[参数调优] D --> D1[数仓建模] D --> D2[ETL开发] D --> D3[报表分析]
核心要点
| 主题 | 关键点 |
|---|---|
| 定位 | 批处理、OLAP分析、高延迟高吞吐 |
| 存储 | 外部表 + ORC/Parquet + Snappy压缩 |
| 分区 | 必须用!按时间分区最常见 |
| 优化 | 分区裁剪 > 文件格式 > 执行引擎 > 参数调优 |
| 倾斜 | MapJoin、加盐打散、两阶段聚合 |
| 函数 | 窗口函数是杀手锏 |
最后的话
"Hive不是万能的,但在大数据批处理场景,它依然是最稳的选择。"
学完Hive之后,你可能会:
- 觉得自己会大数据了(其实只是会写SQL)
- 开始嫌弃Hive太慢(然后去学Spark SQL)
- 发现数据倾斜是永恒的话题(然后头秃)
- 最终承认:Hive虽然慢,但真的很稳