写在前面
作为一个在消息队列坑里摸爬滚打多年的架构师,我可以负责任地告诉你:Kafka 是那种"学的时候觉得简单,用的时候想砸电脑"的技术。
第一次接触 Kafka 是在 2018 年,当时领导说:"小伙子,我们要上实时数据管道,你去调研一下 Kafka。"我信心满满地打开官网,看到那句 "A distributed streaming platform",心想这不就是个消息队列吗?三天搞定!
结果三年后,我还在和 Consumer Rebalance、Offset 丢失、Partition 倾斜做斗争...
这篇文章,我会用最接地气的方式,带你走一遍 Kafka 的核心知识点。保证你看完之后,能在面试中侃侃而谈,也能在实际项目中少踩几个坑。
一、Kafka 到底是个啥?
1.1 官方定义(装逼用)
Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day.
翻译成人话:Kafka 是一个分布式的、高吞吐的消息队列系统,专门用来处理海量数据流。
1.2 通俗理解(实际用)
想象一下你们公司的食堂:
- 传统模式:厨师做一份,服务员端一份,一对一服务。高峰期厨师累死,服务员跑断腿。
- Kafka 模式:厨师做好的菜放到传送带上(Topic),不同窗口(Consumer)自己去取。厨师不用管谁来吃,服务员不用等厨师。
┌─────────────┐ ┌─────────────────────────────┐ ┌─────────────┐
│ 厨师们 │ ──▶ │ 传送带 (Kafka Topic) │ ──▶ │ 各窗口 │
│ (Producer) │ │ 菜品按类型分轨道(Partition) │ │ (Consumer) │
└─────────────┘ └─────────────────────────────┘ └─────────────┘这就是 Kafka 的本质:解耦生产者和消费者,削峰填谷,异步处理。
1.3 Kafka 能干啥?
| 场景 | 具体应用 | 实际案例 |
|---|---|---|
| 消息队列 | 异步解耦、削峰填谷 | 订单系统 → Kafka → 库存/积分/通知 |
| 日志收集 | 集中式日志管道 | Filebeat → Kafka → Elasticsearch |
| 流处理 | 实时计算、ETL | 用户行为 → Kafka Streams → 实时推荐 |
| 事件溯源 | 事件存储、审计追踪 | 交易流水 → Kafka → 数据仓库 |
| 数据同步 | CDC、多系统同步 | MySQL → Debezium → Kafka → 其他系统 |
二、核心概念:先把名词搞清楚
学 Kafka 最痛苦的就是一堆专有名词,我来给你翻译成人话:
2.1 概念全家福
2.2 概念详解
Broker(经纪人/节点)
就是 Kafka 服务器实例。一个 Kafka 集群由多个 Broker 组成,每个 Broker 都是一个独立的服务进程。
人话:Broker 就是仓库,数据存在这里。多个仓库组成仓库群(集群)。
# 查看集群中的 Broker
kafka-broker-api-versions.sh --bootstrap-server localhost:9092Topic(主题)
消息的分类,类似于数据库的表。生产者往 Topic 里写,消费者从 Topic 里读。
人话:Topic 就是传送带的名字。"order-topic"是订单传送带,"log-topic"是日志传送带。
# 创建 Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic order-events \
--partitions 3 \
--replication-factor 2Partition(分区)
Topic 的物理分片。一个 Topic 可以有多个 Partition,分布在不同 Broker 上。
人话:一条传送带太慢,那就开三条并行跑。每条就是一个 Partition。
关键特性:
- 同一 Partition 内消息有序
- 不同 Partition 之间消息不保证顺序
- Partition 是 Kafka 并行处理的基本单位
Offset(偏移量)
消息在 Partition 中的位置编号,从 0 开始递增,永不重复。
人话:Offset 就是消息的"身份证号"。Consumer 说"我要从 100 号开始读",就是从 Offset=100 的消息开始。
Partition 0: [msg0] [msg1] [msg2] [msg3] [msg4] [msg5] ...
↑ ↑
offset=0 offset=4
(Consumer 当前位置)Consumer Group(消费者组)
一组 Consumer 的集合,共同消费一个 Topic。同一 Group 内的 Consumer 不会重复消费同一条消息。
人话:消费者组就是"一个部门"。部门里的人分工合作,不会重复劳动。不同部门(不同 Group)各干各的,互不影响。
Replication(副本)
Partition 的备份机制。每个 Partition 可以有多个副本,分布在不同 Broker 上。
人话:重要的文件多备份几份,万一一个硬盘坏了,还有其他的顶上。
- Leader:负责读写的主副本
- Follower:只负责同步数据的从副本
- ISR (In-Sync Replicas):和 Leader 保持同步的副本集合
三、架构深度剖析
3.1 整体架构
3.2 消息存储结构
Kafka 的消息不是存在内存里的,而是存在磁盘上的日志文件中。
/kafka-logs/
└── order-events-0/ # Topic-Partition 目录
├── 00000000000000000000.log # 消息日志文件
├── 00000000000000000000.index # 偏移量索引
├── 00000000000000000000.timeindex # 时间戳索引
├── 00000000000012345678.log # 新的 Segment
├── 00000000000012345678.index
└── leader-epoch-checkpoint为什么 Kafka 存磁盘还这么快?
- 顺序写入:磁盘顺序写速度接近内存随机写
- Page Cache:利用操作系统的页缓存
- 零拷贝:sendfile 系统调用,数据不经过用户态
- 批量处理:消息批量发送、批量写入
- 分区并行:多 Partition 并行读写
3.3 生产者工作原理
关键配置参数:
Properties props = new Properties();
// 必需配置
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 可靠性配置
props.put("acks", "all"); // 等待所有ISR确认
props.put("retries", 3); // 重试次数
props.put("retry.backoff.ms", 100); // 重试间隔
// 性能配置
props.put("batch.size", 16384); // 批次大小 16KB
props.put("linger.ms", 5); // 等待时间,凑批发送
props.put("buffer.memory", 33554432); // 缓冲区大小 32MB
props.put("compression.type", "lz4"); // 压缩算法3.4 消费者工作原理
关键配置参数:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "order-processing-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 偏移量管理
props.put("enable.auto.commit", "false"); // 关闭自动提交!
props.put("auto.offset.reset", "earliest"); // 无offset时从头消费
// 性能配置
props.put("max.poll.records", 500); // 单次拉取最大记录数
props.put("fetch.min.bytes", 1); // 最小拉取字节数
props.put("fetch.max.wait.ms", 500); // 最大等待时间
// 会话管理
props.put("session.timeout.ms", 30000); // 会话超时
props.put("heartbeat.interval.ms", 10000); // 心跳间隔
props.put("max.poll.interval.ms", 300000); // 最大处理时间四、代码实战:Talk is cheap
4.1 Java Producer 示例
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class OrderProducer {
private final KafkaProducer<String, String> producer;
private final String topic;
public OrderProducer(String bootstrapServers, String topic) {
this.topic = topic;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等
// 性能配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
this.producer = new KafkaProducer<>(props);
}
/**
* 同步发送 - 简单但性能差
*/
public RecordMetadata sendSync(String key, String value) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
return future.get(); // 阻塞等待
}
/**
* 异步发送 - 推荐方式
*/
public void sendAsync(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 发送失败,记录日志、告警、重试等
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.printf("发送成功: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
/**
* 发送到指定分区 - 保证顺序性
*/
public void sendToPartition(String key, String value, int partition) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, partition, key, value);
producer.send(record);
}
public void close() {
producer.flush(); // 刷新缓冲区
producer.close();
}
public static void main(String[] args) throws Exception {
OrderProducer producer = new OrderProducer(
"localhost:9092",
"order-events"
);
// 模拟发送订单事件
for (int i = 0; i < 100; i++) {
String orderId = "ORDER_" + i;
String orderJson = String.format(
"{\"orderId\":\"%s\",\"userId\":\"USER_%d\",\"amount\":%.2f}",
orderId, i % 10, Math.random() * 1000
);
// 使用 orderId 作为 key,保证同一订单的事件发到同一分区
producer.sendAsync(orderId, orderJson);
}
Thread.sleep(1000); // 等待异步发送完成
producer.close();
}
}4.2 Java Consumer 示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class OrderConsumer {
private final KafkaConsumer<String, String> consumer;
private volatile boolean running = true;
public OrderConsumer(String bootstrapServers, String groupId, String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 重要:关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 性能配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
// 会话配置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
}
/**
* 自动提交模式 - 简单但可能丢消息
*/
public void consumeAutoCommit() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 处理完自动提交,如果处理过程中崩溃,消息可能丢失
}
}
/**
* 手动同步提交 - 推荐方式
*/
public void consumeManualSync() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
// 处理失败,记录日志,可以选择跳过或重试
System.err.println("处理失败: " + e.getMessage());
}
}
// 所有消息处理完后再提交
if (!records.isEmpty()) {
consumer.commitSync();
}
}
}
/**
* 精确控制 Offset - 最可靠的方式
*/
public void consumeWithPreciseOffset() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 按分区处理,精确提交每个分区的 offset
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
// 提交该分区的 offset(下一条要消费的位置)
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
offsetMap.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offsetMap);
}
}
}
private void processRecord(ConsumerRecord<String, String> record) {
System.out.printf("消费消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// 这里是实际的业务逻辑
// 比如:更新数据库、调用下游服务、写入缓存等
}
public void shutdown() {
running = false;
consumer.wakeup(); // 中断 poll 阻塞
}
public void close() {
consumer.close();
}
public static void main(String[] args) {
OrderConsumer consumer = new OrderConsumer(
"localhost:9092",
"order-processing-group",
"order-events"
);
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.shutdown();
consumer.close();
}));
consumer.consumeManualSync();
}
}4.3 Spring Boot 集成
// application.yml
/*
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
linger.ms: 5
consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
properties:
spring.json.trusted.packages: "*"
listener:
ack-mode: manual
concurrency: 3
*/
// OrderEvent.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
private String orderId;
private String userId;
private BigDecimal amount;
private String eventType; // CREATED, PAID, SHIPPED, COMPLETED
private LocalDateTime eventTime;
}
// KafkaProducerService.java
@Service
@Slf4j
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void sendOrderEvent(OrderEvent event) {
String key = event.getOrderId(); // 用订单ID做key,保证顺序
CompletableFuture<SendResult<String, OrderEvent>> future =
kafkaTemplate.send("order-events", key, event);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("发送订单事件失败: orderId={}, error={}",
event.getOrderId(), ex.getMessage());
} else {
RecordMetadata metadata = result.getRecordMetadata();
log.info("发送订单事件成功: orderId={}, partition={}, offset={}",
event.getOrderId(), metadata.partition(), metadata.offset());
}
});
}
/**
* 带事务的发送
*/
@Transactional
public void sendWithTransaction(List<OrderEvent> events) {
kafkaTemplate.executeInTransaction(operations -> {
for (OrderEvent event : events) {
operations.send("order-events", event.getOrderId(), event);
}
return true;
});
}
}
// KafkaConsumerService.java
@Service
@Slf4j
public class KafkaConsumerService {
@Autowired
private OrderService orderService;
/**
* 基础消费者
*/
@KafkaListener(topics = "order-events", groupId = "order-processing")
public void consume(OrderEvent event, Acknowledgment ack) {
try {
log.info("收到订单事件: {}", event);
orderService.processOrderEvent(event);
ack.acknowledge(); // 手动提交
} catch (Exception e) {
log.error("处理订单事件失败: {}", e.getMessage());
// 不acknowledge,消息会重新消费
// 或者发送到死信队列
}
}
/**
* 批量消费 - 高吞吐场景
*/
@KafkaListener(
topics = "order-events",
groupId = "order-batch-processing",
containerFactory = "batchFactory"
)
public void consumeBatch(
List<ConsumerRecord<String, OrderEvent>> records,
Acknowledgment ack) {
log.info("批量收到 {} 条消息", records.size());
try {
List<OrderEvent> events = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
orderService.batchProcessOrderEvents(events);
ack.acknowledge();
} catch (Exception e) {
log.error("批量处理失败: {}", e.getMessage());
}
}
/**
* 并发消费 - 多分区并行
*/
@KafkaListener(
topics = "order-events",
groupId = "order-concurrent-processing",
concurrency = "3" // 3个消费者线程
)
public void consumeConcurrent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
log.info("分区{}消费: offset={}, event={}", partition, offset, event);
orderService.processOrderEvent(event);
ack.acknowledge();
}
}
// KafkaConfig.java - 批量消费配置
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> batchFactory(
ConsumerFactory<String, OrderEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); // 开启批量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}4.4 Go 语言示例
用 Go 写 Kafka 客户端也很香,使用 segmentio/kafka-go 库:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
// OrderEvent 订单事件
type OrderEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount float64 `json:"amount"`
EventType string `json:"event_type"`
EventTime time.Time `json:"event_time"`
}
// Producer 生产者
type Producer struct {
writer *kafka.Writer
}
func NewProducer(brokers []string, topic string) *Producer {
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.Hash{}, // 根据 key hash 选择分区
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
RequiredAcks: kafka.RequireAll,
Compression: kafka.Lz4,
}
return &Producer{writer: writer}
}
func (p *Producer) Send(ctx context.Context, event OrderEvent) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("序列化失败: %w", err)
}
msg := kafka.Message{
Key: []byte(event.OrderID), // 用订单ID做key
Value: data,
}
err = p.writer.WriteMessages(ctx, msg)
if err != nil {
return fmt.Errorf("发送失败: %w", err)
}
log.Printf("发送成功: orderId=%s", event.OrderID)
return nil
}
func (p *Producer) Close() error {
return p.writer.Close()
}
// Consumer 消费者
type Consumer struct {
reader *kafka.Reader
}
func NewConsumer(brokers []string, topic, groupID string) *Consumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
MinBytes: 1,
MaxBytes: 10e6, // 10MB
MaxWait: 500 * time.Millisecond,
CommitInterval: 0, // 手动提交
StartOffset: kafka.FirstOffset,
})
return &Consumer{reader: reader}
}
func (c *Consumer) Consume(ctx context.Context, handler func(OrderEvent) error) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
log.Printf("拉取消息失败: %v", err)
continue
}
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("反序列化失败: %v", err)
// 跳过无效消息,提交offset
c.reader.CommitMessages(ctx, msg)
continue
}
log.Printf("收到消息: partition=%d, offset=%d, orderId=%s",
msg.Partition, msg.Offset, event.OrderID)
// 处理业务逻辑
if err := handler(event); err != nil {
log.Printf("处理失败: %v", err)
// 可以选择重试或发送到死信队列
continue
}
// 处理成功,提交offset
if err := c.reader.CommitMessages(ctx, msg); err != nil {
log.Printf("提交offset失败: %v", err)
}
}
}
func (c *Consumer) Close() error {
return c.reader.Close()
}
func main() {
brokers := []string{"localhost:9092"}
topic := "order-events"
// 启动生产者
producer := NewProducer(brokers, topic)
defer producer.Close()
// 发送测试消息
go func() {
for i := 0; i < 10; i++ {
event := OrderEvent{
OrderID: fmt.Sprintf("ORDER_%d", i),
UserID: fmt.Sprintf("USER_%d", i%5),
Amount: float64(i) * 100.5,
EventType: "CREATED",
EventTime: time.Now(),
}
if err := producer.Send(context.Background(), event); err != nil {
log.Printf("发送失败: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
}()
// 启动消费者
consumer := NewConsumer(brokers, topic, "order-processing-group")
defer consumer.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 优雅关闭
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("收到关闭信号,正在优雅退出...")
cancel()
}()
// 开始消费
handler := func(event OrderEvent) error {
log.Printf("处理订单: %+v", event)
// 实际业务逻辑
return nil
}
if err := consumer.Consume(ctx, handler); err != nil {
log.Printf("消费结束: %v", err)
}
}五、进阶特性:从入门到进坑
5.1 事务消息(Exactly-Once 语义)
Kafka 0.11+ 支持事务,实现"精确一次"语义。
// 开启事务的 Producer 配置
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction-producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 事务发送
producer.initTransactions();
try {
producer.beginTransaction();
// 发送多条消息
producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
// 提交消费位移(用于 consume-transform-produce 模式)
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction(); // 回滚
}事务语义说明:
5.2 消费者 Rebalance
这是 Kafka 最让人头疼的机制之一。当以下情况发生时,会触发 Rebalance:
- Consumer 加入或离开 Group
- Consumer 心跳超时
- Topic 分区数变化
- 订阅的 Topic 发生变化
Rebalance 的痛点:
- Stop The World:Rebalance 期间所有 Consumer 停止消费
- 重复消费:如果 Offset 没及时提交,可能重复消费
- 频繁触发:配置不当会导致频繁 Rebalance
优化策略:
// 1. 增加会话超时,减少误判
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);
// 2. 增加处理超时
props.put("max.poll.interval.ms", 600000); // 10分钟
// 3. 减少单次拉取量
props.put("max.poll.records", 100);
// 4. 使用 CooperativeStickyAssignor(Kafka 2.4+)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");5.3 消息顺序性保证
Partition 内有序:同一个 Partition 内的消息严格有序。
如何保证业务顺序:
// 方案1:相同业务key发到同一分区
// 订单ID相同的消息发到同一分区
producer.send(new ProducerRecord<>("orders", orderId, orderJson));
// 方案2:单分区Topic(吞吐量受限)
// 创建时指定 --partitions 1
// 方案3:消费端重排序
// 消费后按时间戳或序列号排序顺序性 vs 吞吐量的权衡:
| 方案 | 顺序性 | 吞吐量 | 适用场景 |
|---|---|---|---|
| 单分区 | 全局有序 | 低 | 严格顺序、小流量 |
| Key分区 | 业务键有序 | 中 | 订单、用户维度顺序 |
| 多分区 | 无顺序保证 | 高 | 日志、监控数据 |
六、运维踩坑实录
6.1 常见问题排查
问题1:Consumer 消费不到数据
# 1. 检查 Topic 是否存在
kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092
# 2. 检查 Consumer Group 状态
kafka-consumer-groups.sh --describe --group order-processing --bootstrap-server localhost:9092
# 3. 检查 Offset
# 如果 CURRENT-OFFSET = LOG-END-OFFSET,说明消费完了
# 如果 LAG > 0,说明有积压
# 4. 重置 Offset(慎用)
kafka-consumer-groups.sh --reset-offsets \
--group order-processing \
--topic order-events \
--to-earliest \
--execute \
--bootstrap-server localhost:9092问题2:Producer 发送超时
# 1. 检查 Broker 连通性
telnet kafka-broker 9092
# 2. 检查 Broker 日志
tail -f /var/log/kafka/server.log
# 3. 检查磁盘空间
df -h /kafka-logs
# 4. 检查网络延迟
ping kafka-broker问题3:Rebalance 风暴
// 症状:日志中大量 "Revoking previously assigned partitions"
// 排查步骤:
// 1. 检查是否有 Consumer 频繁重启
// 2. 检查 max.poll.interval.ms 是否太短
// 3. 检查处理逻辑是否太慢
// 4. 检查 GC 停顿是否过长
// 优化:
props.put("max.poll.interval.ms", 600000);
props.put("max.poll.records", 100);6.2 监控指标
# 核心监控指标
Producer:
- record-send-rate # 发送速率
- record-error-rate # 错误率
- request-latency-avg # 平均延迟
- batch-size-avg # 批次大小
- buffer-available-bytes # 缓冲区剩余
Consumer:
- records-consumed-rate # 消费速率
- records-lag # 消费积压
- fetch-latency-avg # 拉取延迟
- commit-latency-avg # 提交延迟
Broker:
- UnderReplicatedPartitions # 副本不足的分区数
- OfflinePartitionsCount # 离线分区数
- ActiveControllerCount # 活跃Controller数
- BytesInPerSec # 入站流量
- BytesOutPerSec # 出站流量
- RequestsPerSec # 请求速率6.3 性能调优
Producer 调优
// 高吞吐配置
props.put("batch.size", 65536); // 64KB batch
props.put("linger.ms", 10); // 等待10ms凑批
props.put("buffer.memory", 67108864); // 64MB 缓冲
props.put("compression.type", "lz4"); // LZ4压缩
props.put("acks", "1"); // 只等Leader确认
// 低延迟配置
props.put("batch.size", 0); // 不批量
props.put("linger.ms", 0); // 不等待
props.put("acks", "1"); // 只等LeaderConsumer 调优
// 高吞吐配置
props.put("fetch.min.bytes", 65536); // 最少拉64KB
props.put("fetch.max.wait.ms", 500); // 最多等500ms
props.put("max.poll.records", 1000); // 单次拉1000条
props.put("max.partition.fetch.bytes", 1048576); // 每分区1MB
// 多线程消费(手动分区分配)
// 每个线程消费固定分区,避免RebalanceBroker 调优
# server.properties
# 网络线程
num.network.threads=8
num.io.threads=16
# 日志配置
log.segment.bytes=1073741824 # 1GB per segment
log.retention.hours=168 # 保留7天
log.retention.bytes=-1 # 不限制大小
log.cleanup.policy=delete # 删除策略
# 复制配置
num.replica.fetchers=4
replica.fetch.max.bytes=10485760 # 10MB
# ZK会话
zookeeper.session.timeout.ms=18000七、集群部署实战
7.1 Docker Compose 部署(开发环境)
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
volumes:
- zk_data:/var/lib/zookeeper/data
- zk_log:/var/lib/zookeeper/log
kafka1:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_NUM_PARTITIONS: 3
KAFKA_LOG_RETENTION_HOURS: 168
volumes:
- kafka1_data:/var/lib/kafka/data
kafka2:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_NUM_PARTITIONS: 3
volumes:
- kafka2_data:/var/lib/kafka/data
kafka3:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_NUM_PARTITIONS: 3
volumes:
- kafka3_data:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9093,kafka3:9094
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
volumes:
zk_data:
zk_log:
kafka1_data:
kafka2_data:
kafka3_data:7.2 KRaft 模式(无 ZooKeeper)
Kafka 3.0+ 支持 KRaft 模式,不再依赖 ZooKeeper:
# 生成集群ID
KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)
# 格式化存储
kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /path/to/kraft/server.properties
# kraft/server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093,2@localhost:9094,3@localhost:9095
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://localhost:9092
controller.listener.names=CONTROLLER
log.dirs=/var/kafka-logs7.3 生产环境架构
生产环境配置建议:
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| Broker 数量 | >= 3 | 至少3个,保证高可用 |
| Partition 数量 | 3-10 per Topic | 根据吞吐量调整 |
| Replication Factor | 3 | 三副本 |
| min.insync.replicas | 2 | 最少同步副本数 |
| acks | all | 等待所有ISR确认 |
八、Kafka 生态全景
8.1 核心生态组件
| 组件 | 用途 | 典型场景 |
|---|---|---|
| Kafka Connect | 数据集成 | MySQL CDC → Kafka → ES |
| Schema Registry | 模式管理 | Avro/Protobuf 模式版本控制 |
| Kafka Streams | 流处理 | 实时聚合、Join、窗口计算 |
| ksqlDB | SQL流处理 | 用SQL分析实时数据流 |
| Debezium | CDC | 数据库变更捕获 |
| MirrorMaker 2 | 跨集群复制 | 多数据中心同步 |
九、Kafka vs 其他消息队列
| 特性 | Kafka | RabbitMQ | RocketMQ | Pulsar |
|---|---|---|---|---|
| 吞吐量 | 极高(百万/秒) | 中(万级) | 高(十万级) | 极高 |
| 延迟 | 毫秒级 | 微秒级 | 毫秒级 | 毫秒级 |
| 消息持久化 | 磁盘(高效) | 内存/磁盘 | 磁盘 | 磁盘(BookKeeper) |
| 消息回溯 | 支持 | 不支持 | 支持 | 支持 |
| 顺序消息 | Partition有序 | 队列有序 | 队列有序 | 分区有序 |
| 事务消息 | 支持 | 支持 | 支持 | 支持 |
| 协议 | 自定义 | AMQP | 自定义 | 自定义 |
| 运维复杂度 | 中 | 低 | 中 | 高 |
| 适用场景 | 大数据、日志、流处理 | 业务解耦、RPC | 电商、金融 | 多租户、云原生 |
我的选型建议:
- 日志收集、大数据管道 → Kafka(吞吐量王者)
- 业务系统解耦、延迟敏感 → RabbitMQ(成熟稳定)
- 电商、金融场景 → RocketMQ(阿里背书,功能全面)
- 云原生、多租户 → Pulsar(新架构,存算分离)
十、最佳实践清单
10.1 Producer 最佳实践
✅ DO:
- 使用 acks=all 保证可靠性
- 开启幂等(enable.idempotence=true)
- 使用业务key保证分区路由
- 合理设置 batch.size 和 linger.ms
- 实现发送失败的重试和补偿
❌ DON'T:
- 忽略发送回调的异常
- 发送超大消息(默认限制1MB)
- 在同步发送中阻塞主线程
- 忘记 close() 导致消息丢失10.2 Consumer 最佳实践
✅ DO:
- 关闭自动提交,使用手动提交
- 处理完业务逻辑再提交 Offset
- 使用 ConsumerRebalanceListener 处理 Rebalance
- 合理设置 max.poll.records 和 max.poll.interval.ms
- 实现幂等消费(数据库唯一键、Redis去重等)
❌ DON'T:
- 在 poll 循环中做耗时操作
- 忽略 Rebalance 导致的重复消费
- 使用过小的 session.timeout.ms
- 一个 Consumer Group 消费过多 Topic10.3 运维最佳实践
✅ DO:
- 至少3个Broker,跨机架部署
- replication.factor >= 3
- min.insync.replicas >= 2
- 监控 UnderReplicatedPartitions
- 定期清理过期数据
- 使用 Kafka UI 工具管理
❌ DON'T:
- 单点部署 Broker
- 磁盘打满才处理
- 忽略 Consumer Lag 告警
- 随意调整 Partition 数量
- 在生产环境测试新配置结语:从入门到不想放弃
写到这里,我想说的是:Kafka 确实复杂,但它的复杂是有道理的。
它解决的是分布式系统中最棘手的问题:
- 如何在多个系统间可靠地传递海量数据?
- 如何保证消息不丢、不重、顺序正确?
- 如何在系统崩溃后快速恢复?
这些问题本身就不简单,所以 Kafka 的设计也不可能简单。
但好消息是,大多数场景下,你只需要掌握 20% 的核心知识,就能解决 80% 的问题。这篇文章覆盖的内容,足够你在大多数项目中游刃有余。
最后送你一句话:技术学习的目的不是为了放弃,而是为了在想放弃的时候,知道自己还能再坚持一下。
加油,我们下篇文章见!