搜 索

Hive从入门到放弃

  • 302阅读
  • 2023年01月21日
  • 0评论
首页 / AI/大数据 / 正文

前言:为什么要学Hive

还记得第一次接触大数据的场景吗?

领导说:"我们有10TB的日志数据需要分析。"

你信心满满地打开MySQL:SELECT * FROM logs WHERE ...

然后...数据库就崩了。

领导又说:"用Hadoop吧。"

你打开MapReduce文档,看到要写几百行Java代码才能实现一个WordCount,当场想辞职。

这时候,Hive出现了——它让你用熟悉的SQL,操作不熟悉的大数据。

"Hive不是让大数据变简单了,而是让写SQL的人也能混进大数据圈子。" —— 某不愿透露姓名的数据工程师

一、Hive是什么

1.1 官方定义

Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。

1.2 人话翻译

Hive = SQL翻译器 + 元数据管理

你写SQL,Hive帮你翻译成MapReduce/Tez/Spark任务,然后在Hadoop集群上跑。

flowchart LR A[你写的SQL] --> B[Hive] B --> C[翻译成MR/Tez/Spark] C --> D[Hadoop集群执行] D --> E[返回结果] style A fill:#e1f5fe style B fill:#fff3e0 style E fill:#e8f5e9

1.3 Hive的定位

flowchart TB subgraph 实时处理 direction LR F[Flink SQL
流处理王者] P[Presto/Trino
交互式查询] end subgraph 批处理 direction LR H[Hive
批处理稳如狗] S[Spark SQL
批流一体] M[MapReduce
上古神器] end subgraph 传统数据库 MY[MySQL
小数据首选] end 批处理 -.->|数据量增大| 实时处理 传统数据库 -.->|数据量增大| 批处理

1.4 Hive不是什么

Hive是Hive不是
数据仓库工具关系型数据库
批处理引擎实时查询引擎
OLAP分析OLTP事务处理
适合大数据量适合频繁更新
高延迟高吞吐低延迟响应

灵魂拷问:什么时候用Hive?

  • ✅ 数据量大(GB~PB级别)
  • ✅ 批量分析、报表统计
  • ✅ 对延迟不敏感(秒级~分钟级)
  • ❌ 需要毫秒级响应
  • ❌ 需要频繁增删改
  • ❌ 数据量小(MySQL就够了)

二、Hive架构

2.1 整体架构

flowchart TB subgraph Client[客户端] C1[CLI] C2[JDBC/ODBC] C3[Web UI] C4[Beeline] end subgraph HiveServer[Hive服务层] HS[HiveServer2] D[Driver驱动器] CP[编译器 Compiler] OP[优化器 Optimizer] EX[执行器 Executor] end subgraph MetaStore[元数据] MS[Metastore服务] DB[(MySQL/PostgreSQL)] end subgraph Compute[计算引擎] MR[MapReduce] TEZ[Tez] SPARK[Spark] end subgraph Storage[存储层] HDFS[(HDFS)] end C1 & C2 & C3 & C4 --> HS HS --> D D --> CP --> OP --> EX D <--> MS MS <--> DB EX --> MR & TEZ & SPARK MR & TEZ & SPARK --> HDFS

2.2 核心组件详解

2.2.1 Metastore(元数据存储)

Metastore是Hive的灵魂,存储了表的"身份证信息":

📦 Metastore存储内容
├── 数据库信息(名称、位置、属性)
├── 表信息
│   ├── 表名、列名、列类型
│   ├── 表类型(内部表/外部表)
│   ├── 存储位置(HDFS路径)
│   ├── 文件格式(TextFile/ORC/Parquet)
│   └── 分区信息
├── 分区信息
└── 用户权限信息

重要:Metastore只存元数据,真正的数据在HDFS上!

2.2.2 Driver(驱动器)

SQL执行的总指挥:

sequenceDiagram participant C as Client participant D as Driver participant CP as Compiler participant MS as Metastore participant OP as Optimizer participant EX as Executor participant YARN as YARN/计算引擎 C->>D: 1. 提交SQL D->>CP: 2. 编译SQL CP->>MS: 3. 获取元数据 MS-->>CP: 4. 返回表结构 CP->>CP: 5. 语法分析、语义分析 CP-->>D: 6. 生成执行计划 D->>OP: 7. 优化执行计划 OP-->>D: 8. 返回优化后的计划 D->>EX: 9. 执行 EX->>YARN: 10. 提交任务 YARN-->>EX: 11. 执行结果 EX-->>D: 12. 返回结果 D-->>C: 13. 展示结果

2.3 Metastore三种部署模式

flowchart TB subgraph 内嵌模式 A1[Hive CLI] --> A2[Metastore] A2 --> A3[(Derby)] end subgraph 本地模式 B1[Hive CLI] --> B2[Metastore] B2 --> B3[(MySQL)] end subgraph 远程模式 C1[Hive CLI 1] C2[Hive CLI 2] C3[HiveServer2] C1 & C2 & C3 --> C4[Metastore Service] C4 --> C5[(MySQL)] end
模式特点适用场景
内嵌模式Derby数据库,单用户学习测试
本地模式外部数据库,单Metastore开发环境
远程模式独立Metastore服务生产环境

三、Hive安装配置

3.1 前置条件

# 检查Java
java -version
# 要求:JDK 1.8+

# 检查Hadoop
hadoop version
# 要求:Hadoop 2.x/3.x

# 检查环境变量
echo $JAVA_HOME
echo $HADOOP_HOME

3.2 安装步骤

# 1. 下载Hive
wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz

# 2. 解压
tar -zxvf apache-hive-3.1.3-bin.tar.gz -C /opt/
mv /opt/apache-hive-3.1.3-bin /opt/hive

# 3. 配置环境变量
cat >> ~/.bashrc << 'EOF'
export HIVE_HOME=/opt/hive
export PATH=$PATH:$HIVE_HOME/bin
EOF
source ~/.bashrc

# 4. 解决Guava版本冲突(重要!)
rm $HIVE_HOME/lib/guava-19.0.jar
cp $HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $HIVE_HOME/lib/

3.3 配置文件

hive-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!-- Metastore数据库配置 -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost:3306/hive_metastore?createDatabaseIfNotExist=true&amp;useSSL=false&amp;serverTimezone=Asia/Shanghai</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.cj.jdbc.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>hive123</value>
    </property>

    <!-- Hive数据仓库位置 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>

    <!-- 显示当前数据库 -->
    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>

    <!-- 显示表头 -->
    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>

    <!-- HiveServer2配置 -->
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 执行引擎(mr/tez/spark) -->
    <property>
        <name>hive.execution.engine</name>
        <value>tez</value>
    </property>

    <!-- 本地模式(小数据自动启用) -->
    <property>
        <name>hive.exec.mode.local.auto</name>
        <value>true</value>
    </property>
</configuration>

3.4 初始化与启动

# 1. 上传MySQL驱动
cp mysql-connector-java-8.0.28.jar $HIVE_HOME/lib/

# 2. 初始化元数据库(只需执行一次)
schematool -dbType mysql -initSchema

# 3. 启动Metastore服务(后台运行)
nohup hive --service metastore > /var/log/hive/metastore.log 2>&1 &

# 4. 启动HiveServer2(后台运行)
nohup hive --service hiveserver2 > /var/log/hive/hiveserver2.log 2>&1 &

# 5. 验证服务
netstat -tlnp | grep -E "9083|10000"
# 9083: Metastore端口
# 10000: HiveServer2端口

# 6. 连接Hive
# 方式1:Hive CLI(不推荐,已弃用)
hive

# 方式2:Beeline(推荐)
beeline -u jdbc:hive2://localhost:10000 -n hive

四、Hive数据类型

4.1 基本数据类型

-- 数值类型
TINYINT      -- 1字节,-128 ~ 127
SMALLINT     -- 2字节,-32768 ~ 32767
INT/INTEGER  -- 4字节,-2^31 ~ 2^31-1
BIGINT       -- 8字节,-2^63 ~ 2^63-1
FLOAT        -- 4字节单精度浮点
DOUBLE       -- 8字节双精度浮点
DECIMAL(p,s) -- 高精度小数,p总位数,s小数位数

-- 字符串类型
STRING       -- 无限长度字符串(常用)
VARCHAR(n)   -- 可变长度,最大n个字符
CHAR(n)      -- 固定长度,n个字符

-- 日期时间类型
DATE         -- 日期,格式:YYYY-MM-DD
TIMESTAMP    -- 时间戳,格式:YYYY-MM-DD HH:MM:SS.fffffffff
INTERVAL     -- 时间间隔

-- 布尔类型
BOOLEAN      -- true/false

-- 二进制类型
BINARY       -- 二进制数据

4.2 复杂数据类型

-- ARRAY:数组,相同类型元素的集合
-- 示例:['apple', 'banana', 'cherry']
CREATE TABLE test_array (
    id INT,
    fruits ARRAY<STRING>
);
-- 访问:fruits[0] 获取第一个元素

-- MAP:键值对集合
-- 示例:{'name': 'Joey', 'age': '28'}
CREATE TABLE test_map (
    id INT,
    info MAP<STRING, STRING>
);
-- 访问:info['name'] 获取name的值

-- STRUCT:结构体,不同类型字段的集合
-- 示例:{'name': 'Joey', 'age': 28}
CREATE TABLE test_struct (
    id INT,
    user_info STRUCT<name:STRING, age:INT, email:STRING>
);
-- 访问:user_info.name 获取name字段

-- 复合嵌套示例
CREATE TABLE complex_table (
    id INT,
    name STRING,
    scores ARRAY<INT>,                           -- 成绩数组
    attributes MAP<STRING, STRING>,              -- 属性映射
    address STRUCT<city:STRING, street:STRING>,  -- 地址结构
    friends ARRAY<STRUCT<name:STRING, age:INT>>  -- 朋友列表
);

4.3 类型转换

-- 隐式转换(自动)
-- 小类型 → 大类型:TINYINT → SMALLINT → INT → BIGINT → FLOAT → DOUBLE
SELECT 1 + 1.5;  -- INT自动转为DOUBLE

-- 显式转换(CAST)
SELECT CAST('123' AS INT);                    -- 字符串转整数
SELECT CAST(123 AS STRING);                   -- 整数转字符串
SELECT CAST('2024-01-15' AS DATE);           -- 字符串转日期
SELECT CAST(1705286400 AS TIMESTAMP);        -- 时间戳转换

-- 注意:转换失败返回NULL
SELECT CAST('abc' AS INT);  -- 返回NULL

五、DDL数据定义

5.1 数据库操作

-- 创建数据库
CREATE DATABASE IF NOT EXISTS mydb
COMMENT '我的数据库'
LOCATION '/user/hive/warehouse/mydb'
WITH DBPROPERTIES ('creator'='joey', 'date'='2024-01-15');

-- 查看数据库
SHOW DATABASES;
SHOW DATABASES LIKE 'my*';  -- 模糊匹配

-- 查看数据库详情
DESCRIBE DATABASE mydb;
DESCRIBE DATABASE EXTENDED mydb;  -- 详细信息

-- 切换数据库
USE mydb;

-- 修改数据库
ALTER DATABASE mydb SET DBPROPERTIES ('editor'='claude');

-- 删除数据库
DROP DATABASE IF EXISTS mydb;           -- 空数据库
DROP DATABASE IF EXISTS mydb CASCADE;   -- 强制删除(包含表)

5.2 表操作

5.2.1 创建表

-- 基础建表语法
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
(
    column1 data_type [COMMENT 'column comment'],
    column2 data_type,
    ...
)
[COMMENT 'table comment']
[PARTITIONED BY (partition_column data_type, ...)]
[CLUSTERED BY (column) [SORTED BY (column)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION 'hdfs_path']
[TBLPROPERTIES ('key'='value', ...)];

5.2.2 内部表 vs 外部表

flowchart TB subgraph 内部表[内部表 Managed Table] A1[创建表] --> A2[Hive管理数据] A2 --> A3[删除表] A3 --> A4[元数据删除 + 数据删除] end subgraph 外部表[外部表 External Table] B1[创建表] --> B2[数据独立存在] B2 --> B3[删除表] B3 --> B4[只删除元数据
数据保留] end style A4 fill:#ffcdd2 style B4 fill:#c8e6c9
-- 内部表(默认)
CREATE TABLE managed_table (
    id INT,
    name STRING
) STORED AS ORC;

-- 外部表(推荐生产环境使用)
CREATE EXTERNAL TABLE external_table (
    id INT,
    name STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/external_data/';

-- 查看表类型
DESCRIBE FORMATTED managed_table;
-- Table Type: MANAGED_TABLE 或 EXTERNAL_TABLE

选择建议

场景推荐类型原因
临时数据、中间表内部表方便清理
原始数据、共享数据外部表安全,数据不会误删
ETL处理结果外部表数据重要
日志数据外部表数据源独立

5.2.3 分区表(重要!)

分区是Hive性能优化的核心手段!

-- 创建分区表
CREATE TABLE orders (
    order_id BIGINT,
    user_id BIGINT,
    product_id BIGINT,
    amount DECIMAL(10,2),
    order_time TIMESTAMP
)
PARTITIONED BY (dt STRING, hour STRING)  -- 按天和小时分区
STORED AS ORC;

-- 分区表的HDFS结构:
-- /user/hive/warehouse/orders/
-- ├── dt=2024-01-15/
-- │   ├── hour=00/
-- │   ├── hour=01/
-- │   └── ...
-- └── dt=2024-01-16/
--     ├── hour=00/
--     └── ...

-- 添加分区
ALTER TABLE orders ADD PARTITION (dt='2024-01-15', hour='00');

-- 删除分区
ALTER TABLE orders DROP PARTITION (dt='2024-01-15', hour='00');

-- 查看分区
SHOW PARTITIONS orders;

-- 动态分区(重要!)
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO orders PARTITION (dt, hour)
SELECT 
    order_id, user_id, product_id, amount, order_time,
    DATE_FORMAT(order_time, 'yyyy-MM-dd') as dt,
    DATE_FORMAT(order_time, 'HH') as hour
FROM source_orders;

5.2.4 分桶表

-- 创建分桶表
CREATE TABLE user_bucket (
    user_id BIGINT,
    name STRING,
    age INT
)
CLUSTERED BY (user_id) INTO 8 BUCKETS  -- 按user_id分8个桶
STORED AS ORC;

-- 分桶原理:hash(user_id) % 8 决定数据进入哪个桶

-- 插入数据时需要开启分桶
SET hive.enforce.bucketing=true;

-- 分桶的好处:
-- 1. 提升JOIN性能(桶对桶JOIN)
-- 2. 提升采样效率
-- 3. 数据更均匀分布

5.3 表操作语法大全

-- 查看表
SHOW TABLES;
SHOW TABLES IN mydb;
SHOW TABLES LIKE '*order*';

-- 查看表结构
DESC table_name;
DESC FORMATTED table_name;  -- 详细信息
DESC EXTENDED table_name;   -- 扩展信息

-- 查看建表语句
SHOW CREATE TABLE table_name;

-- 修改表名
ALTER TABLE old_name RENAME TO new_name;

-- 添加列
ALTER TABLE table_name ADD COLUMNS (
    new_col1 STRING COMMENT '新列1',
    new_col2 INT COMMENT '新列2'
);

-- 修改列
ALTER TABLE table_name CHANGE old_col new_col STRING COMMENT '修改后的列';

-- 替换所有列(危险!)
ALTER TABLE table_name REPLACE COLUMNS (
    id INT,
    name STRING
);

-- 修改表属性
ALTER TABLE table_name SET TBLPROPERTIES ('comment'='新注释');

-- 修改存储格式
ALTER TABLE table_name SET FILEFORMAT ORC;

-- 修改位置
ALTER TABLE table_name SET LOCATION '/new/path/';

-- 删除表
DROP TABLE IF EXISTS table_name;

-- 清空表(保留结构)
TRUNCATE TABLE table_name;
-- 注意:只能清空内部表!

六、DML数据操作

6.1 数据加载

-- 从本地文件加载
LOAD DATA LOCAL INPATH '/local/path/data.txt' 
INTO TABLE table_name;

-- 从本地加载并覆盖
LOAD DATA LOCAL INPATH '/local/path/data.txt' 
OVERWRITE INTO TABLE table_name;

-- 从HDFS加载(会移动文件!)
LOAD DATA INPATH '/hdfs/path/data.txt' 
INTO TABLE table_name;

-- 加载到分区
LOAD DATA LOCAL INPATH '/local/path/data.txt' 
INTO TABLE orders PARTITION (dt='2024-01-15', hour='00');

6.2 INSERT语句

-- 插入查询结果
INSERT INTO TABLE target_table
SELECT * FROM source_table;

-- 覆盖插入
INSERT OVERWRITE TABLE target_table
SELECT * FROM source_table;

-- 插入到分区
INSERT INTO TABLE orders PARTITION (dt='2024-01-15', hour='00')
SELECT order_id, user_id, product_id, amount, order_time
FROM source_orders
WHERE DATE_FORMAT(order_time, 'yyyy-MM-dd') = '2024-01-15'
  AND DATE_FORMAT(order_time, 'HH') = '00';

-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE orders PARTITION (dt, hour)
SELECT 
    order_id, user_id, product_id, amount, order_time,
    DATE_FORMAT(order_time, 'yyyy-MM-dd'),
    DATE_FORMAT(order_time, 'HH')
FROM source_orders;

-- 多表插入(一次扫描,多处输出)
FROM source_table
INSERT OVERWRITE TABLE target1
SELECT col1, col2 WHERE condition1
INSERT OVERWRITE TABLE target2
SELECT col1, col3 WHERE condition2;

-- INSERT VALUES(Hive 0.14+,不推荐用于大数据量)
INSERT INTO TABLE users VALUES 
(1, 'Joey', 28),
(2, 'Claude', 3);

6.3 数据导出

-- 导出到本地目录
INSERT OVERWRITE LOCAL DIRECTORY '/local/output/'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
SELECT * FROM table_name;

-- 导出到HDFS
INSERT OVERWRITE DIRECTORY '/hdfs/output/'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
SELECT * FROM table_name;

-- 使用Hive命令导出
hive -e "SELECT * FROM mydb.users" > /local/users.txt

-- 导出为指定格式
INSERT OVERWRITE LOCAL DIRECTORY '/local/output/'
STORED AS ORC
SELECT * FROM table_name;

6.4 更新和删除(ACID表)

Hive默认不支持UPDATE/DELETE,需要开启ACID特性:

-- 配置ACID支持
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;

-- 创建ACID表(必须是ORC格式+分桶)
CREATE TABLE acid_table (
    id INT,
    name STRING,
    age INT
)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');

-- 现在可以UPDATE了
UPDATE acid_table SET age = 30 WHERE id = 1;

-- DELETE也可以了
DELETE FROM acid_table WHERE id = 2;

-- MERGE语句(Hive 2.2+)
MERGE INTO target AS t
USING source AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET name = s.name
WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.name, s.age);

七、DQL数据查询

7.1 基础查询

-- 基本SELECT
SELECT * FROM users;
SELECT id, name, age FROM users;

-- 别名
SELECT 
    id AS user_id,
    name AS user_name,
    age * 2 AS double_age
FROM users u;

-- DISTINCT去重
SELECT DISTINCT department FROM employees;

-- WHERE条件
SELECT * FROM users 
WHERE age > 18 
  AND name LIKE 'J%'
  AND department IN ('IT', 'HR')
  AND create_time BETWEEN '2024-01-01' AND '2024-12-31';

-- LIMIT限制
SELECT * FROM users LIMIT 10;
SELECT * FROM users LIMIT 10 OFFSET 20;  -- 从第21条开始取10条

-- 排序
SELECT * FROM users ORDER BY age DESC, name ASC;

-- CASE WHEN
SELECT 
    name,
    age,
    CASE 
        WHEN age < 18 THEN '未成年'
        WHEN age < 60 THEN '成年'
        ELSE '老年'
    END AS age_group
FROM users;

7.2 聚合查询

-- 聚合函数
SELECT 
    COUNT(*) AS total_count,           -- 总数
    COUNT(DISTINCT user_id) AS uv,     -- 去重计数
    SUM(amount) AS total_amount,       -- 求和
    AVG(amount) AS avg_amount,         -- 平均值
    MAX(amount) AS max_amount,         -- 最大值
    MIN(amount) AS min_amount          -- 最小值
FROM orders;

-- GROUP BY分组
SELECT 
    department,
    COUNT(*) AS emp_count,
    AVG(salary) AS avg_salary
FROM employees
GROUP BY department;

-- HAVING过滤分组
SELECT 
    department,
    COUNT(*) AS emp_count
FROM employees
GROUP BY department
HAVING COUNT(*) > 10;

-- GROUPING SETS(多维分析)
SELECT 
    department,
    job_title,
    COUNT(*) AS cnt
FROM employees
GROUP BY department, job_title
GROUPING SETS (
    (department, job_title),  -- 按部门+职位
    (department),             -- 只按部门
    ()                        -- 总计
);

-- CUBE(所有组合)
SELECT department, job_title, COUNT(*)
FROM employees
GROUP BY CUBE (department, job_title);
-- 等价于 GROUPING SETS ((dept, job), (dept), (job), ())

-- ROLLUP(层级汇总)
SELECT department, job_title, COUNT(*)
FROM employees
GROUP BY ROLLUP (department, job_title);
-- 等价于 GROUPING SETS ((dept, job), (dept), ())

7.3 JOIN连接

flowchart LR subgraph JOIN类型 A[INNER JOIN
交集] B[LEFT JOIN
左表全部+右表匹配] C[RIGHT JOIN
右表全部+左表匹配] D[FULL JOIN
并集] E[CROSS JOIN
笛卡尔积] F[LEFT SEMI JOIN
左表存在于右表] end
-- INNER JOIN
SELECT 
    o.order_id,
    o.amount,
    u.name
FROM orders o
INNER JOIN users u ON o.user_id = u.id;

-- LEFT JOIN
SELECT 
    u.name,
    COUNT(o.order_id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.name;

-- 多表JOIN
SELECT 
    o.order_id,
    u.name AS user_name,
    p.name AS product_name
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN products p ON o.product_id = p.id;

-- LEFT SEMI JOIN(类似IN子查询,但更高效)
SELECT u.*
FROM users u
LEFT SEMI JOIN orders o ON u.id = o.user_id;
-- 等价于:SELECT * FROM users WHERE id IN (SELECT user_id FROM orders)

-- CROSS JOIN(笛卡尔积,慎用!)
SELECT * FROM table1 CROSS JOIN table2;

-- Map-side JOIN(小表广播,重要优化!)
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=25000000;  -- 25MB以下自动启用

SELECT /*+ MAPJOIN(small_table) */ 
    b.id, s.name
FROM big_table b
JOIN small_table s ON b.id = s.id;

7.4 子查询

-- WHERE子查询
SELECT * FROM users
WHERE id IN (SELECT user_id FROM orders WHERE amount > 1000);

-- FROM子查询
SELECT dept, avg_salary
FROM (
    SELECT department AS dept, AVG(salary) AS avg_salary
    FROM employees
    GROUP BY department
) t
WHERE avg_salary > 10000;

-- 相关子查询
SELECT * FROM users u
WHERE EXISTS (
    SELECT 1 FROM orders o 
    WHERE o.user_id = u.id AND o.amount > 1000
);

-- WITH子句(CTE,推荐!)
WITH 
dept_stats AS (
    SELECT department, AVG(salary) AS avg_salary
    FROM employees
    GROUP BY department
),
high_salary_dept AS (
    SELECT department FROM dept_stats WHERE avg_salary > 10000
)
SELECT e.*
FROM employees e
JOIN high_salary_dept h ON e.department = h.department;

7.5 窗口函数(重点!)

窗口函数是Hive的杀手锏,解决"组内排名"、"累计计算"等问题。

-- 窗口函数语法
function_name(args) OVER (
    [PARTITION BY column1, ...]
    [ORDER BY column2, ...]
    [ROWS/RANGE BETWEEN ... AND ...]
)
-- 排名函数
SELECT 
    user_id,
    department,
    salary,
    ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rn,    -- 行号,不重复
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,         -- 排名,有间隔
    DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS drnk,  -- 排名,无间隔
    NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS quartile   -- 分组
FROM employees;

-- 聚合窗口函数
SELECT 
    order_date,
    amount,
    SUM(amount) OVER (ORDER BY order_date) AS cumulative_sum,           -- 累计和
    AVG(amount) OVER (ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS moving_avg_7d,  -- 7日移动平均
    SUM(amount) OVER (PARTITION BY user_id) AS user_total              -- 用户总额
FROM orders;

-- 偏移函数
SELECT 
    order_date,
    amount,
    LAG(amount, 1, 0) OVER (ORDER BY order_date) AS prev_amount,        -- 上一行
    LEAD(amount, 1, 0) OVER (ORDER BY order_date) AS next_amount,       -- 下一行
    FIRST_VALUE(amount) OVER (PARTITION BY user_id ORDER BY order_date) AS first_order,  -- 第一个值
    LAST_VALUE(amount) OVER (PARTITION BY user_id ORDER BY order_date 
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_order          -- 最后一个值
FROM orders;

-- 实战:取每个部门薪资TOP 3
SELECT * FROM (
    SELECT 
        department,
        name,
        salary,
        ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rn
    FROM employees
) t
WHERE rn <= 3;

-- 实战:计算环比增长率
SELECT 
    month,
    revenue,
    LAG(revenue) OVER (ORDER BY month) AS prev_revenue,
    (revenue - LAG(revenue) OVER (ORDER BY month)) / LAG(revenue) OVER (ORDER BY month) * 100 AS growth_rate
FROM monthly_revenue;

7.6 窗口边界详解

-- ROWS BETWEEN 语法
ROWS BETWEEN start_point AND end_point

-- start_point / end_point 可选值:
-- UNBOUNDED PRECEDING : 从分区第一行开始
-- n PRECEDING         : 前n行
-- CURRENT ROW         : 当前行
-- n FOLLOWING         : 后n行
-- UNBOUNDED FOLLOWING : 到分区最后一行

-- 示例
SUM(amount) OVER (ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)  -- 累计到当前
SUM(amount) OVER (ORDER BY date ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)          -- 前后各2行
AVG(amount) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)          -- 7日移动平均

八、文件格式与压缩

8.1 文件格式对比

flowchart TB subgraph 行式存储 A[TextFile] --> A1[人类可读
压缩率低
查询慢] B[SequenceFile] --> B1[二进制
可分割
压缩一般] end subgraph 列式存储 C[ORC] --> C1[压缩率高
查询快
Hive原生] D[Parquet] --> D1[压缩率高
查询快
通用格式] end style C fill:#c8e6c9 style D fill:#c8e6c9
格式存储方式压缩率查询性能适用场景
TextFile行式原始数据导入
SequenceFile行式中间结果存储
ORC列式Hive首选
Parquet列式跨平台首选
Avro行式Schema演进

8.2 ORC格式详解

-- 创建ORC表
CREATE TABLE orc_table (
    id BIGINT,
    name STRING,
    amount DECIMAL(10,2)
)
STORED AS ORC
TBLPROPERTIES (
    'orc.compress'='ZLIB',           -- 压缩方式:NONE/ZLIB/SNAPPY/LZ4
    'orc.compress.size'='262144',    -- 压缩块大小
    'orc.stripe.size'='67108864',    -- Stripe大小(64MB)
    'orc.row.index.stride'='10000',  -- 索引步长
    'orc.create.index'='true'        -- 创建索引
);

-- 将TextFile转为ORC
INSERT OVERWRITE TABLE orc_table
SELECT * FROM text_table;

-- 查看ORC文件信息
hive --orcfiledump /user/hive/warehouse/orc_table/000000_0

8.3 Parquet格式

-- 创建Parquet表
CREATE TABLE parquet_table (
    id BIGINT,
    name STRING,
    amount DECIMAL(10,2)
)
STORED AS PARQUET
TBLPROPERTIES (
    'parquet.compression'='SNAPPY'   -- 压缩方式:UNCOMPRESSED/SNAPPY/GZIP/LZO
);

8.4 压缩配置

-- 开启中间结果压缩
SET hive.exec.compress.intermediate=true;
SET mapreduce.map.output.compress=true;
SET mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

-- 开启最终输出压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
压缩格式压缩率压缩速度是否可分割推荐场景
GZIP冷数据归档
Snappy热数据查询
LZ4最快实时处理
ZLIBORC默认
LZO需要可分割
BZIP2最高最慢极致压缩

九、Hive函数大全

9.1 字符串函数

-- 长度和截取
LENGTH('hello')                    -- 5
SUBSTR('hello', 2, 3)              -- 'ell'(从第2位取3个字符)
SUBSTRING('hello', 2)              -- 'ello'(从第2位到结尾)

-- 拼接
CONCAT('hello', ' ', 'world')      -- 'hello world'
CONCAT_WS(',', 'a', 'b', 'c')      -- 'a,b,c'

-- 大小写
UPPER('hello')                     -- 'HELLO'
LOWER('HELLO')                     -- 'hello'
INITCAP('hello world')             -- 'Hello World'

-- 去空格
TRIM('  hello  ')                  -- 'hello'
LTRIM('  hello')                   -- 'hello'
RTRIM('hello  ')                   -- 'hello'

-- 替换
REPLACE('hello', 'l', 'L')         -- 'heLLo'
REGEXP_REPLACE('hello123', '[0-9]', '')  -- 'hello'
TRANSLATE('hello', 'el', 'ip')     -- 'hippo'

-- 分割
SPLIT('a,b,c', ',')                -- ['a', 'b', 'c']
SPLIT('a,b,c', ',')[0]             -- 'a'

-- 正则
REGEXP_EXTRACT('hello123world', '([a-z]+)([0-9]+)([a-z]+)', 2)  -- '123'

-- 查找
INSTR('hello', 'l')                -- 3(第一次出现位置)
LOCATE('l', 'hello')               -- 3

-- 填充
LPAD('hi', 5, '0')                 -- '000hi'
RPAD('hi', 5, '0')                 -- 'hi000'

-- JSON解析
GET_JSON_OBJECT('{"name":"joey"}', '$.name')  -- 'joey'

9.2 日期函数

-- 当前日期时间
CURRENT_DATE()                     -- '2024-01-15'
CURRENT_TIMESTAMP()                -- '2024-01-15 10:30:00'
UNIX_TIMESTAMP()                   -- 1705286400(当前时间戳)

-- 日期转换
TO_DATE('2024-01-15 10:30:00')     -- '2024-01-15'
DATE_FORMAT('2024-01-15', 'yyyyMMdd')  -- '20240115'
FROM_UNIXTIME(1705286400)          -- '2024-01-15 00:00:00'
UNIX_TIMESTAMP('2024-01-15', 'yyyy-MM-dd')  -- 1705248000

-- 日期计算
DATE_ADD('2024-01-15', 7)          -- '2024-01-22'
DATE_SUB('2024-01-15', 7)          -- '2024-01-08'
DATEDIFF('2024-01-15', '2024-01-01')  -- 14
MONTHS_BETWEEN('2024-03-01', '2024-01-01')  -- 2.0
ADD_MONTHS('2024-01-15', 2)        -- '2024-03-15'

-- 日期提取
YEAR('2024-01-15')                 -- 2024
MONTH('2024-01-15')                -- 1
DAY('2024-01-15')                  -- 15
HOUR('2024-01-15 10:30:45')        -- 10
MINUTE('2024-01-15 10:30:45')      -- 30
SECOND('2024-01-15 10:30:45')      -- 45
WEEKOFYEAR('2024-01-15')           -- 3
DAYOFWEEK('2024-01-15')            -- 2(周一=2)

-- 日期截断
TRUNC('2024-01-15', 'MM')          -- '2024-01-01'(月初)
TRUNC('2024-01-15', 'YYYY')        -- '2024-01-01'(年初)

-- 最后一天
LAST_DAY('2024-01-15')             -- '2024-01-31'
NEXT_DAY('2024-01-15', 'MON')      -- '2024-01-22'(下个周一)

9.3 数学函数

-- 取整
ROUND(3.1415, 2)                   -- 3.14
FLOOR(3.9)                         -- 3
CEIL(3.1)                          -- 4
TRUNCATE(3.1415, 2)                -- 3.14(截断)

-- 数学运算
ABS(-10)                           -- 10
POWER(2, 10)                       -- 1024
SQRT(16)                           -- 4.0
LOG(10, 100)                       -- 2.0
LN(2.718)                          -- ≈1
EXP(1)                             -- 2.718...

-- 随机
RAND()                             -- 0~1之间的随机数
RAND(42)                           -- 指定种子的随机数

-- 最大最小
GREATEST(1, 2, 3)                  -- 3
LEAST(1, 2, 3)                     -- 1

-- 取模
MOD(10, 3)                         -- 1
PMOD(-10, 3)                       -- 2(正取模)

-- 进制转换
CONV('1010', 2, 10)                -- '10'(二进制转十进制)
HEX(255)                           -- 'FF'
UNHEX('FF')                        -- 二进制数据

9.4 条件函数

-- IF
IF(score >= 60, '及格', '不及格')

-- CASE WHEN
CASE 
    WHEN score >= 90 THEN 'A'
    WHEN score >= 80 THEN 'B'
    WHEN score >= 60 THEN 'C'
    ELSE 'D'
END

-- COALESCE(返回第一个非NULL值)
COALESCE(col1, col2, 'default')

-- NVL(NULL值替换)
NVL(col, 'default')

-- NULLIF(相等则返回NULL)
NULLIF(a, b)  -- 如果a=b返回NULL,否则返回a

-- ISNULL / ISNOTNULL
ISNULL(col)
ISNOTNULL(col)

-- DECODE(类似CASE)
DECODE(status, 1, '启用', 2, '禁用', '未知')

9.5 集合函数

-- ARRAY操作
ARRAY(1, 2, 3)                     -- 创建数组
ARRAY_CONTAINS(arr, 2)             -- 是否包含
SIZE(arr)                          -- 数组大小
SORT_ARRAY(arr)                    -- 排序
ARRAY_DISTINCT(arr)                -- 去重(Hive 2.3+)

-- MAP操作
MAP('a', 1, 'b', 2)                -- 创建Map
MAP_KEYS(m)                        -- 获取所有key
MAP_VALUES(m)                      -- 获取所有value
SIZE(m)                            -- Map大小

-- 集合运算
ARRAY_UNION(arr1, arr2)            -- 并集
ARRAY_INTERSECT(arr1, arr2)        -- 交集
ARRAY_EXCEPT(arr1, arr2)           -- 差集

-- 展开(炸裂)
-- EXPLODE:将数组/Map展开为多行
SELECT EXPLODE(ARRAY(1, 2, 3));    -- 输出3行

-- LATERAL VIEW:配合EXPLODE使用
SELECT 
    id, 
    tag
FROM users
LATERAL VIEW EXPLODE(tags) t AS tag;

-- POSEXPLODE:带位置的展开
SELECT 
    id,
    pos,
    tag
FROM users
LATERAL VIEW POSEXPLODE(tags) t AS pos, tag;

9.6 行转列 / 列转行

-- 行转列(多行合并为一行)
-- COLLECT_SET:去重聚合
-- COLLECT_LIST:保留重复
SELECT 
    user_id,
    COLLECT_SET(tag) AS tags,
    COLLECT_LIST(tag) AS all_tags
FROM user_tags
GROUP BY user_id;

-- 结合CONCAT_WS
SELECT 
    user_id,
    CONCAT_WS(',', COLLECT_SET(tag)) AS tag_str
FROM user_tags
GROUP BY user_id;

-- 列转行(一行拆分为多行)
SELECT 
    user_id,
    tag
FROM user_info
LATERAL VIEW EXPLODE(SPLIT(tags, ',')) t AS tag;

十、UDF自定义函数

10.1 UDF类型

flowchart LR A[Hive UDF] --> B[UDF] A --> C[UDAF] A --> D[UDTF] B --> B1[一进一出
如:upper, substr] C --> C1[多进一出
如:sum, count] D --> D1[一进多出
如:explode]

10.2 编写UDF

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

/**
 * 手机号脱敏UDF
 * 使用:mask_phone('13812345678') → '138****5678'
 */
@Description(
    name = "mask_phone",
    value = "_FUNC_(phone) - 手机号脱敏",
    extended = "Example: SELECT mask_phone('13812345678') → '138****5678'"
)
public class MaskPhoneUDF extends GenericUDF {

    private StringObjectInspector inputOI;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) 
            throws UDFArgumentException {
        // 参数校验
        if (arguments.length != 1) {
            throw new UDFArgumentException("mask_phone只接受一个参数");
        }
        
        if (!(arguments[0] instanceof StringObjectInspector)) {
            throw new UDFArgumentException("mask_phone参数必须是字符串");
        }
        
        inputOI = (StringObjectInspector) arguments[0];
        
        // 返回类型
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        if (arguments[0].get() == null) {
            return null;
        }
        
        String phone = inputOI.getPrimitiveJavaObject(arguments[0].get());
        
        if (phone == null || phone.length() != 11) {
            return phone;
        }
        
        // 脱敏处理
        return phone.substring(0, 3) + "****" + phone.substring(7);
    }

    @Override
    public String getDisplayString(String[] children) {
        return "mask_phone(" + children[0] + ")";
    }
}

10.3 编写UDAF

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 字符串拼接聚合函数
 * 使用:string_agg(name, ',') → 'a,b,c'
 */
@Description(
    name = "string_agg",
    value = "_FUNC_(column, separator) - 字符串聚合"
)
public class StringAggUDAF extends UDAF {

    public static class StringAggEvaluator implements UDAFEvaluator {
        
        private StringBuilder result;
        private String separator;

        public StringAggEvaluator() {
            super();
            init();
        }

        @Override
        public void init() {
            result = new StringBuilder();
            separator = ",";
        }

        // 迭代(处理每一行)
        public boolean iterate(String value, String sep) {
            if (sep != null) {
                separator = sep;
            }
            if (value != null) {
                if (result.length() > 0) {
                    result.append(separator);
                }
                result.append(value);
            }
            return true;
        }

        // 部分聚合结果
        public String terminatePartial() {
            return result.toString();
        }

        // 合并部分结果
        public boolean merge(String partial) {
            if (partial != null && !partial.isEmpty()) {
                if (result.length() > 0) {
                    result.append(separator);
                }
                result.append(partial);
            }
            return true;
        }

        // 最终结果
        public String terminate() {
            return result.toString();
        }
    }
}

10.4 注册和使用UDF

-- 方式1:临时函数(会话级别)
ADD JAR /path/to/my-udf.jar;
CREATE TEMPORARY FUNCTION mask_phone AS 'com.example.hive.udf.MaskPhoneUDF';

-- 使用
SELECT mask_phone(phone) FROM users;

-- 方式2:永久函数(Hive 0.13+)
CREATE FUNCTION mydb.mask_phone AS 'com.example.hive.udf.MaskPhoneUDF'
USING JAR 'hdfs:///user/hive/lib/my-udf.jar';

-- 查看函数
SHOW FUNCTIONS LIKE '*mask*';
DESCRIBE FUNCTION mask_phone;
DESCRIBE FUNCTION EXTENDED mask_phone;

-- 删除函数
DROP FUNCTION IF EXISTS mydb.mask_phone;

十一、性能优化

11.1 优化总览

mindmap root((Hive优化)) 数据层优化 分区裁剪 列裁剪 文件格式ORC/Parquet 压缩 SQL层优化 避免SELECT * 谓词下推 JOIN优化 GROUP BY优化 引擎层优化 执行引擎选择 并行执行 本地模式 向量化 资源层优化 Map/Reduce数量 内存配置 Container配置

11.2 数据层优化

-- 1. 使用分区表(最重要!)
-- 不好:全表扫描
SELECT * FROM orders WHERE order_date = '2024-01-15';

-- 好:分区裁剪
SELECT * FROM orders WHERE dt = '2024-01-15';

-- 2. 使用ORC/Parquet格式
CREATE TABLE orders_orc STORED AS ORC AS SELECT * FROM orders;

-- 3. 开启压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

-- 4. 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000;  -- 256MB
SET hive.merge.smallfiles.avgsize=128000000;  -- 128MB

11.3 SQL层优化

-- 1. 避免SELECT *
-- 不好
SELECT * FROM users;
-- 好
SELECT id, name, age FROM users;

-- 2. 分区字段放在WHERE最前面
-- 不好
SELECT * FROM orders WHERE amount > 100 AND dt = '2024-01-15';
-- 好
SELECT * FROM orders WHERE dt = '2024-01-15' AND amount > 100;

-- 3. 谓词下推
SET hive.optimize.ppd=true;

-- 4. JOIN优化 - 小表在前(或用MAPJOIN)
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=25000000;

-- 5. GROUP BY优化
SET hive.groupby.skewindata=true;  -- 数据倾斜时开启

-- 6. COUNT DISTINCT优化
-- 不好:单个Reducer
SELECT COUNT(DISTINCT user_id) FROM orders;
-- 好:先GROUP BY再COUNT
SELECT COUNT(*) FROM (
    SELECT user_id FROM orders GROUP BY user_id
) t;

-- 7. 多重INSERT
FROM source_table
INSERT OVERWRITE TABLE target1 SELECT col1, col2 WHERE condition1
INSERT OVERWRITE TABLE target2 SELECT col1, col3 WHERE condition2;

11.4 执行引擎优化

-- 使用Tez引擎(比MR快很多)
SET hive.execution.engine=tez;

-- 或使用Spark引擎
SET hive.execution.engine=spark;

-- 开启向量化执行(大幅提升性能)
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;

-- 开启CBO(基于代价的优化器)
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;

-- 收集统计信息(CBO依赖)
ANALYZE TABLE orders COMPUTE STATISTICS;
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS;

-- 并行执行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;

-- 本地模式(小数据自动启用)
SET hive.exec.mode.local.auto=true;
SET hive.exec.mode.local.auto.inputbytes.max=134217728;  -- 128MB
SET hive.exec.mode.local.auto.input.files.max=4;

11.5 数据倾斜处理

数据倾斜是Hive性能杀手!

flowchart TB A[数据倾斜] --> B{倾斜类型} B --> C[JOIN倾斜] B --> D[GROUP BY倾斜] B --> E[COUNT DISTINCT倾斜] C --> C1[小表广播] C --> C2[倾斜Key单独处理] C --> C3[加盐打散] D --> D1[两阶段聚合] D --> D2[hive.groupby.skewindata] E --> E1[改写为GROUP BY] E --> E2[加盐去重]
-- 1. JOIN倾斜 - 小表广播
SET hive.auto.convert.join=true;
SELECT /*+ MAPJOIN(small_table) */ * 
FROM big_table JOIN small_table ON ...;

-- 2. JOIN倾斜 - 倾斜Key单独处理
-- 假设user_id=0是异常数据
SELECT * FROM orders WHERE user_id = 0
UNION ALL
SELECT * FROM orders o JOIN users u ON o.user_id = u.id 
WHERE o.user_id != 0;

-- 3. JOIN倾斜 - 加盐打散
-- 大表加随机前缀
SELECT /*+ MAPJOIN(small_table_expand) */
    regexp_replace(t1.salted_key, '^[0-9]+_', '') AS key,
    t1.value
FROM (
    SELECT CONCAT(CAST(RAND() * 10 AS INT), '_', key) AS salted_key, value
    FROM big_table
) t1
JOIN (
    -- 小表复制10份
    SELECT CONCAT(salt, '_', key) AS salted_key, value
    FROM small_table
    LATERAL VIEW EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9)) t AS salt
) small_table_expand
ON t1.salted_key = small_table_expand.salted_key;

-- 4. GROUP BY倾斜
SET hive.groupby.skewindata=true;
-- 原理:两阶段聚合,第一阶段随机分发

-- 5. COUNT DISTINCT倾斜
-- 改写为GROUP BY
SELECT COUNT(*) FROM (
    SELECT user_id FROM orders GROUP BY user_id
) t;

-- 6. 手动两阶段聚合
SELECT 
    key,
    SUM(cnt) AS total_cnt
FROM (
    -- 第一阶段:加盐聚合
    SELECT 
        key,
        COUNT(*) AS cnt
    FROM (
        SELECT key, CONCAT(key, '_', CAST(RAND() * 100 AS INT)) AS salted_key
        FROM source_table
    ) t
    GROUP BY key, salted_key
) t2
GROUP BY key;

11.6 Map和Reduce数量调优

-- Map数量优化
-- 合并小文件,减少Map数
SET mapreduce.input.fileinputformat.split.minsize=268435456;  -- 256MB

-- 或拆分大文件,增加Map数
SET mapreduce.input.fileinputformat.split.maxsize=134217728;  -- 128MB

-- Reduce数量优化
SET hive.exec.reducers.bytes.per.reducer=256000000;  -- 每个Reducer处理256MB
SET hive.exec.reducers.max=1000;  -- 最大Reducer数
SET mapreduce.job.reduces=10;  -- 手动指定Reducer数

-- 只有Map任务(无Reduce)
SET mapreduce.job.reduces=0;
-- 或使用 ORDER BY 改为 SORT BY + DISTRIBUTE BY

十二、Hive实战案例

12.1 用户行为分析

-- 场景:分析用户留存率

-- 1. 创建用户行为表
CREATE EXTERNAL TABLE user_actions (
    user_id STRING,
    action_type STRING,
    action_time TIMESTAMP,
    page_id STRING,
    duration INT
)
PARTITIONED BY (dt STRING)
STORED AS ORC
LOCATION '/data/user_actions';

-- 2. 计算次日留存率
WITH first_day AS (
    -- 获取用户首次访问日期
    SELECT 
        user_id,
        MIN(dt) AS first_date
    FROM user_actions
    GROUP BY user_id
),
retention AS (
    SELECT 
        f.first_date,
        f.user_id,
        CASE WHEN a.user_id IS NOT NULL THEN 1 ELSE 0 END AS is_retained
    FROM first_day f
    LEFT JOIN (
        SELECT DISTINCT user_id, dt FROM user_actions
    ) a ON f.user_id = a.user_id 
        AND DATE_ADD(f.first_date, 1) = a.dt
)
SELECT 
    first_date,
    COUNT(DISTINCT user_id) AS new_users,
    SUM(is_retained) AS retained_users,
    ROUND(SUM(is_retained) / COUNT(DISTINCT user_id) * 100, 2) AS retention_rate
FROM retention
GROUP BY first_date
ORDER BY first_date;

12.2 漏斗分析

-- 场景:电商转化漏斗

WITH funnel AS (
    SELECT 
        user_id,
        MAX(CASE WHEN action_type = 'view' THEN 1 ELSE 0 END) AS step1_view,
        MAX(CASE WHEN action_type = 'add_cart' THEN 1 ELSE 0 END) AS step2_cart,
        MAX(CASE WHEN action_type = 'checkout' THEN 1 ELSE 0 END) AS step3_checkout,
        MAX(CASE WHEN action_type = 'pay' THEN 1 ELSE 0 END) AS step4_pay
    FROM user_actions
    WHERE dt = '2024-01-15'
    GROUP BY user_id
)
SELECT 
    'Step1: 浏览' AS step_name,
    COUNT(*) AS user_count,
    100.0 AS conversion_rate
FROM funnel WHERE step1_view = 1
UNION ALL
SELECT 
    'Step2: 加购',
    SUM(step2_cart),
    ROUND(SUM(step2_cart) / COUNT(*) * 100, 2)
FROM funnel WHERE step1_view = 1
UNION ALL
SELECT 
    'Step3: 结算',
    SUM(step3_checkout),
    ROUND(SUM(step3_checkout) / SUM(step2_cart) * 100, 2)
FROM funnel WHERE step2_cart = 1
UNION ALL
SELECT 
    'Step4: 支付',
    SUM(step4_pay),
    ROUND(SUM(step4_pay) / SUM(step3_checkout) * 100, 2)
FROM funnel WHERE step3_checkout = 1;

12.3 用户分层RFM模型

-- RFM模型:Recency(最近)、Frequency(频率)、Monetary(金额)

WITH rfm_base AS (
    SELECT 
        user_id,
        DATEDIFF('2024-01-15', MAX(order_date)) AS recency,
        COUNT(DISTINCT order_id) AS frequency,
        SUM(amount) AS monetary
    FROM orders
    WHERE dt >= '2023-01-15' AND dt <= '2024-01-15'
    GROUP BY user_id
),
rfm_score AS (
    SELECT 
        user_id,
        recency,
        frequency,
        monetary,
        NTILE(5) OVER (ORDER BY recency ASC) AS r_score,   -- 越近越好,正序
        NTILE(5) OVER (ORDER BY frequency DESC) AS f_score, -- 越多越好,倒序
        NTILE(5) OVER (ORDER BY monetary DESC) AS m_score   -- 越高越好,倒序
    FROM rfm_base
)
SELECT 
    user_id,
    recency,
    frequency,
    monetary,
    r_score,
    f_score,
    m_score,
    CASE 
        WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN '重要价值用户'
        WHEN r_score >= 4 AND f_score < 4 AND m_score >= 4 THEN '重要发展用户'
        WHEN r_score < 4 AND f_score >= 4 AND m_score >= 4 THEN '重要保持用户'
        WHEN r_score < 4 AND f_score < 4 AND m_score >= 4 THEN '重要挽留用户'
        WHEN r_score >= 4 AND f_score >= 4 AND m_score < 4 THEN '一般价值用户'
        WHEN r_score >= 4 AND f_score < 4 AND m_score < 4 THEN '一般发展用户'
        WHEN r_score < 4 AND f_score >= 4 AND m_score < 4 THEN '一般保持用户'
        ELSE '一般挽留用户'
    END AS user_segment
FROM rfm_score;

12.4 连续登录天数

-- 经典SQL问题:计算用户最大连续登录天数

WITH login_data AS (
    SELECT DISTINCT user_id, dt
    FROM user_actions
    WHERE action_type = 'login'
),
grouped AS (
    SELECT 
        user_id,
        dt,
        DATE_SUB(dt, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY dt)) AS grp
    FROM login_data
)
SELECT 
    user_id,
    MIN(dt) AS start_date,
    MAX(dt) AS end_date,
    COUNT(*) AS consecutive_days
FROM grouped
GROUP BY user_id, grp
ORDER BY user_id, start_date;

-- 获取每个用户的最大连续登录天数
WITH login_data AS (
    SELECT DISTINCT user_id, dt FROM user_actions WHERE action_type = 'login'
),
grouped AS (
    SELECT 
        user_id, dt,
        DATE_SUB(dt, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY dt)) AS grp
    FROM login_data
),
consecutive AS (
    SELECT user_id, COUNT(*) AS days
    FROM grouped
    GROUP BY user_id, grp
)
SELECT user_id, MAX(days) AS max_consecutive_days
FROM consecutive
GROUP BY user_id;

十三、Hive与生态集成

13.1 Hive on Spark

-- 设置Spark为执行引擎
SET hive.execution.engine=spark;

-- Spark相关配置
SET spark.master=yarn;
SET spark.submit.deployMode=cluster;
SET spark.executor.memory=4g;
SET spark.executor.cores=2;
SET spark.executor.instances=10;

13.2 Hive on Tez

-- 设置Tez为执行引擎
SET hive.execution.engine=tez;

-- Tez相关配置
SET tez.am.resource.memory.mb=4096;
SET tez.task.resource.memory.mb=2048;
SET hive.tez.container.size=4096;
SET hive.tez.java.opts=-Xmx3276m;

13.3 HiveServer2 + JDBC

// Java JDBC连接Hive示例
import java.sql.*;

public class HiveJdbcExample {
    private static final String DRIVER = "org.apache.hive.jdbc.HiveDriver";
    private static final String URL = "jdbc:hive2://localhost:10000/default";
    
    public static void main(String[] args) throws Exception {
        Class.forName(DRIVER);
        
        try (Connection conn = DriverManager.getConnection(URL, "hive", "");
             Statement stmt = conn.createStatement()) {
            
            // 执行查询
            ResultSet rs = stmt.executeQuery("SELECT * FROM users LIMIT 10");
            
            while (rs.next()) {
                System.out.println(rs.getString("name"));
            }
        }
    }
}

13.4 Hive + HBase

-- 创建HBase外部表
CREATE EXTERNAL TABLE hbase_users (
    row_key STRING,
    name STRING,
    age INT,
    email STRING
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
    "hbase.columns.mapping" = ":key,info:name,info:age,info:email"
)
TBLPROPERTIES (
    "hbase.table.name" = "users"
);

-- 查询HBase数据
SELECT * FROM hbase_users WHERE row_key = 'user001';

十四、常见问题与排错

14.1 常见错误

# 1. Guava版本冲突
# 错误:java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument
# 解决:
rm $HIVE_HOME/lib/guava-*.jar
cp $HADOOP_HOME/share/hadoop/common/lib/guava-*.jar $HIVE_HOME/lib/

# 2. Metastore连接失败
# 错误:Unable to open a test connection to the given database
# 解决:检查MySQL服务、用户权限、JDBC驱动

# 3. 权限问题
# 错误:Permission denied: user=xxx, access=WRITE
# 解决:
hdfs dfs -chmod -R 777 /user/hive/warehouse
hdfs dfs -chown -R hive:hive /user/hive/warehouse

# 4. OOM内存溢出
# 错误:java.lang.OutOfMemoryError
# 解决:增加内存配置
SET mapreduce.map.memory.mb=4096;
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.map.java.opts=-Xmx3276m;
SET mapreduce.reduce.java.opts=-Xmx6553m;

14.2 性能诊断

-- 查看执行计划
EXPLAIN SELECT * FROM orders WHERE dt = '2024-01-15';

-- 查看详细执行计划
EXPLAIN EXTENDED SELECT * FROM orders WHERE dt = '2024-01-15';

-- 查看执行依赖
EXPLAIN DEPENDENCY SELECT * FROM orders WHERE dt = '2024-01-15';

-- 查看向量化执行
EXPLAIN VECTORIZATION SELECT * FROM orders;

-- 分析表统计信息
ANALYZE TABLE orders COMPUTE STATISTICS;
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS;

-- 查看表统计信息
DESCRIBE FORMATTED orders;

14.3 日志位置

# Hive日志
$HIVE_HOME/logs/
/tmp/$USER/hive.log

# HiveServer2日志
/var/log/hive/hiveserver2.log

# YARN应用日志
yarn logs -applicationId application_xxx

# 开启Debug日志
hive --hiveconf hive.root.logger=DEBUG,console

十五、总结

Hive学习路线

flowchart LR A[基础] --> B[进阶] --> C[优化] --> D[实战] A --> A1[SQL基础
DDL/DML/DQL] A --> A2[数据类型
文件格式] B --> B1[窗口函数] B --> B2[UDF开发] B --> B3[分区分桶] C --> C1[执行计划分析] C --> C2[数据倾斜处理] C --> C3[参数调优] D --> D1[数仓建模] D --> D2[ETL开发] D --> D3[报表分析]

核心要点

主题关键点
定位批处理、OLAP分析、高延迟高吞吐
存储外部表 + ORC/Parquet + Snappy压缩
分区必须用!按时间分区最常见
优化分区裁剪 > 文件格式 > 执行引擎 > 参数调优
倾斜MapJoin、加盐打散、两阶段聚合
函数窗口函数是杀手锏

最后的话

"Hive不是万能的,但在大数据批处理场景,它依然是最稳的选择。"

学完Hive之后,你可能会:

  1. 觉得自己会大数据了(其实只是会写SQL)
  2. 开始嫌弃Hive太慢(然后去学Spark SQL)
  3. 发现数据倾斜是永恒的话题(然后头秃)
  4. 最终承认:Hive虽然慢,但真的很稳

参考资料

评论区
暂无评论
avatar