搜 索

Kafka从入门到放弃

  • 283阅读
  • 2022年10月30日
  • 0评论
首页 / AI/大数据 / 正文

写在前面

作为一个在消息队列坑里摸爬滚打多年的架构师,我可以负责任地告诉你:Kafka 是那种"学的时候觉得简单,用的时候想砸电脑"的技术

第一次接触 Kafka 是在 2018 年,当时领导说:"小伙子,我们要上实时数据管道,你去调研一下 Kafka。"我信心满满地打开官网,看到那句 "A distributed streaming platform",心想这不就是个消息队列吗?三天搞定!

结果三年后,我还在和 Consumer Rebalance、Offset 丢失、Partition 倾斜做斗争...

这篇文章,我会用最接地气的方式,带你走一遍 Kafka 的核心知识点。保证你看完之后,能在面试中侃侃而谈,也能在实际项目中少踩几个坑。


一、Kafka 到底是个啥?

1.1 官方定义(装逼用)

Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day.

翻译成人话:Kafka 是一个分布式的、高吞吐的消息队列系统,专门用来处理海量数据流

1.2 通俗理解(实际用)

想象一下你们公司的食堂:

  • 传统模式:厨师做一份,服务员端一份,一对一服务。高峰期厨师累死,服务员跑断腿。
  • Kafka 模式:厨师做好的菜放到传送带上(Topic),不同窗口(Consumer)自己去取。厨师不用管谁来吃,服务员不用等厨师。
┌─────────────┐     ┌─────────────────────────────┐     ┌─────────────┐
│   厨师们     │ ──▶ │     传送带 (Kafka Topic)     │ ──▶ │   各窗口     │
│ (Producer)  │     │   菜品按类型分轨道(Partition) │     │ (Consumer)  │
└─────────────┘     └─────────────────────────────┘     └─────────────┘

这就是 Kafka 的本质:解耦生产者和消费者,削峰填谷,异步处理

1.3 Kafka 能干啥?

场景具体应用实际案例
消息队列异步解耦、削峰填谷订单系统 → Kafka → 库存/积分/通知
日志收集集中式日志管道Filebeat → Kafka → Elasticsearch
流处理实时计算、ETL用户行为 → Kafka Streams → 实时推荐
事件溯源事件存储、审计追踪交易流水 → Kafka → 数据仓库
数据同步CDC、多系统同步MySQL → Debezium → Kafka → 其他系统

二、核心概念:先把名词搞清楚

学 Kafka 最痛苦的就是一堆专有名词,我来给你翻译成人话:

2.1 概念全家福

graph TB subgraph Kafka Cluster B1[Broker 1] B2[Broker 2] B3[Broker 3] end subgraph Topic: order-events P0[Partition 0] P1[Partition 1] P2[Partition 2] end subgraph Consumers CG1[Consumer Group A] CG2[Consumer Group B] end Producer[Producer] --> P0 Producer --> P1 Producer --> P2 P0 --> CG1 P1 --> CG1 P2 --> CG1 P0 --> CG2 P1 --> CG2 P2 --> CG2 B1 --- P0 B2 --- P1 B3 --- P2

2.2 概念详解

Broker(经纪人/节点)

就是 Kafka 服务器实例。一个 Kafka 集群由多个 Broker 组成,每个 Broker 都是一个独立的服务进程。

人话:Broker 就是仓库,数据存在这里。多个仓库组成仓库群(集群)。

# 查看集群中的 Broker
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

Topic(主题)

消息的分类,类似于数据库的表。生产者往 Topic 里写,消费者从 Topic 里读。

人话:Topic 就是传送带的名字。"order-topic"是订单传送带,"log-topic"是日志传送带。

# 创建 Topic
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic order-events \
  --partitions 3 \
  --replication-factor 2

Partition(分区)

Topic 的物理分片。一个 Topic 可以有多个 Partition,分布在不同 Broker 上。

人话:一条传送带太慢,那就开三条并行跑。每条就是一个 Partition。

关键特性

  • 同一 Partition 内消息有序
  • 不同 Partition 之间消息不保证顺序
  • Partition 是 Kafka 并行处理的基本单位

Offset(偏移量)

消息在 Partition 中的位置编号,从 0 开始递增,永不重复。

人话:Offset 就是消息的"身份证号"。Consumer 说"我要从 100 号开始读",就是从 Offset=100 的消息开始。

Partition 0:  [msg0] [msg1] [msg2] [msg3] [msg4] [msg5] ...
               ↑                           ↑
            offset=0                    offset=4
                                    (Consumer 当前位置)

Consumer Group(消费者组)

一组 Consumer 的集合,共同消费一个 Topic。同一 Group 内的 Consumer 不会重复消费同一条消息。

人话:消费者组就是"一个部门"。部门里的人分工合作,不会重复劳动。不同部门(不同 Group)各干各的,互不影响。

graph LR subgraph Topic: orders P0[Partition 0] P1[Partition 1] P2[Partition 2] end subgraph Group A - 订单处理 C1[Consumer 1] C2[Consumer 2] end subgraph Group B - 数据分析 C3[Consumer 3] end P0 --> C1 P1 --> C2 P2 --> C2 P0 --> C3 P1 --> C3 P2 --> C3

Replication(副本)

Partition 的备份机制。每个 Partition 可以有多个副本,分布在不同 Broker 上。

人话:重要的文件多备份几份,万一一个硬盘坏了,还有其他的顶上。

  • Leader:负责读写的主副本
  • Follower:只负责同步数据的从副本
  • ISR (In-Sync Replicas):和 Leader 保持同步的副本集合

三、架构深度剖析

3.1 整体架构

graph TB subgraph Producers P1[Producer 1] P2[Producer 2] end subgraph KafkaCluster[Kafka Cluster] subgraph Broker1[Broker 1] T1P0L[Topic1-P0 Leader] T1P1F[Topic1-P1 Follower] T2P0F[Topic2-P0 Follower] end subgraph Broker2[Broker 2] T1P0Fx[Topic1-P0 Follower] T1P1L[Topic1-P1 Leader] T2P0L[Topic2-P0 Leader] end subgraph Broker3[Broker 3] T1P0F2[Topic1-P0 Follower] T1P1F2[Topic1-P1 Follower] T2P0F2[Topic2-P0 Follower] end end ZK[ZooKeeper / KRaft] subgraph Consumers CG1[Consumer Group 1] CG2[Consumer Group 2] end P1 --> T1P0L P2 --> T1P1L T1P0L --> CG1 T1P1L --> CG1 T2P0L --> CG2 ZK -.-> Broker1 ZK -.-> Broker2 ZK -.-> Broker3

3.2 消息存储结构

Kafka 的消息不是存在内存里的,而是存在磁盘上的日志文件中。

/kafka-logs/
└── order-events-0/           # Topic-Partition 目录
    ├── 00000000000000000000.log      # 消息日志文件
    ├── 00000000000000000000.index    # 偏移量索引
    ├── 00000000000000000000.timeindex # 时间戳索引
    ├── 00000000000012345678.log      # 新的 Segment
    ├── 00000000000012345678.index
    └── leader-epoch-checkpoint

为什么 Kafka 存磁盘还这么快?

  1. 顺序写入:磁盘顺序写速度接近内存随机写
  2. Page Cache:利用操作系统的页缓存
  3. 零拷贝:sendfile 系统调用,数据不经过用户态
  4. 批量处理:消息批量发送、批量写入
  5. 分区并行:多 Partition 并行读写
graph LR subgraph 传统IO A1[磁盘] --> A2[内核缓冲区] A2 --> A3[用户缓冲区] A3 --> A4[Socket缓冲区] A4 --> A5[网卡] end subgraph 零拷贝 B1[磁盘] --> B2[内核缓冲区] B2 --> B3[网卡] end

3.3 生产者工作原理

sequenceDiagram participant App as 应用程序 participant Prod as Producer participant Part as Partitioner participant Acc as RecordAccumulator participant Sender as Sender线程 participant Broker as Kafka Broker App->>Prod: send(record) Prod->>Part: 计算目标Partition Part-->>Prod: partition=1 Prod->>Acc: 追加到批次缓冲区 loop 批量发送 Sender->>Acc: 拉取就绪批次 Acc-->>Sender: batch Sender->>Broker: 发送请求 Broker-->>Sender: 响应(成功/失败) end Sender-->>App: callback回调

关键配置参数

Properties props = new Properties();
// 必需配置
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 可靠性配置
props.put("acks", "all");              // 等待所有ISR确认
props.put("retries", 3);               // 重试次数
props.put("retry.backoff.ms", 100);    // 重试间隔

// 性能配置
props.put("batch.size", 16384);        // 批次大小 16KB
props.put("linger.ms", 5);             // 等待时间,凑批发送
props.put("buffer.memory", 33554432);  // 缓冲区大小 32MB
props.put("compression.type", "lz4"); // 压缩算法

3.4 消费者工作原理

sequenceDiagram participant App as 应用程序 participant Consumer as Consumer participant Coord as GroupCoordinator participant Broker as Kafka Broker Consumer->>Coord: JoinGroup请求 Coord-->>Consumer: 分配Leader/成员信息 Consumer->>Coord: SyncGroup(分区分配方案) Coord-->>Consumer: 分配的Partitions loop 消费循环 Consumer->>Broker: Fetch请求 Broker-->>Consumer: 消息批次 App->>App: 处理消息 Consumer->>Broker: Commit Offset end Note over Consumer,Coord: 心跳保活 + Rebalance

关键配置参数

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "order-processing-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 偏移量管理
props.put("enable.auto.commit", "false");     // 关闭自动提交!
props.put("auto.offset.reset", "earliest");   // 无offset时从头消费

// 性能配置
props.put("max.poll.records", 500);           // 单次拉取最大记录数
props.put("fetch.min.bytes", 1);              // 最小拉取字节数
props.put("fetch.max.wait.ms", 500);          // 最大等待时间

// 会话管理
props.put("session.timeout.ms", 30000);       // 会话超时
props.put("heartbeat.interval.ms", 10000);    // 心跳间隔
props.put("max.poll.interval.ms", 300000);    // 最大处理时间

四、代码实战:Talk is cheap

4.1 Java Producer 示例

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class OrderProducer {
    
    private final KafkaProducer<String, String> producer;
    private final String topic;
    
    public OrderProducer(String bootstrapServers, String topic) {
        this.topic = topic;
        
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // 可靠性配置
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // 开启幂等
        
        // 性能配置
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        
        this.producer = new KafkaProducer<>(props);
    }
    
    /**
     * 同步发送 - 简单但性能差
     */
    public RecordMetadata sendSync(String key, String value) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        Future<RecordMetadata> future = producer.send(record);
        return future.get();  // 阻塞等待
    }
    
    /**
     * 异步发送 - 推荐方式
     */
    public void sendAsync(String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // 发送失败,记录日志、告警、重试等
                System.err.println("发送失败: " + exception.getMessage());
            } else {
                System.out.printf("发送成功: topic=%s, partition=%d, offset=%d%n",
                    metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
    }
    
    /**
     * 发送到指定分区 - 保证顺序性
     */
    public void sendToPartition(String key, String value, int partition) {
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(topic, partition, key, value);
        producer.send(record);
    }
    
    public void close() {
        producer.flush();  // 刷新缓冲区
        producer.close();
    }
    
    public static void main(String[] args) throws Exception {
        OrderProducer producer = new OrderProducer(
            "localhost:9092", 
            "order-events"
        );
        
        // 模拟发送订单事件
        for (int i = 0; i < 100; i++) {
            String orderId = "ORDER_" + i;
            String orderJson = String.format(
                "{\"orderId\":\"%s\",\"userId\":\"USER_%d\",\"amount\":%.2f}",
                orderId, i % 10, Math.random() * 1000
            );
            
            // 使用 orderId 作为 key,保证同一订单的事件发到同一分区
            producer.sendAsync(orderId, orderJson);
        }
        
        Thread.sleep(1000);  // 等待异步发送完成
        producer.close();
    }
}

4.2 Java Consumer 示例

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class OrderConsumer {
    
    private final KafkaConsumer<String, String> consumer;
    private volatile boolean running = true;
    
    public OrderConsumer(String bootstrapServers, String groupId, String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // 重要:关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // 性能配置
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        
        // 会话配置
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic));
    }
    
    /**
     * 自动提交模式 - 简单但可能丢消息
     */
    public void consumeAutoCommit() {
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            
            for (ConsumerRecord<String, String> record : records) {
                processRecord(record);
            }
            // 处理完自动提交,如果处理过程中崩溃,消息可能丢失
        }
    }
    
    /**
     * 手动同步提交 - 推荐方式
     */
    public void consumeManualSync() {
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            
            for (ConsumerRecord<String, String> record : records) {
                try {
                    processRecord(record);
                } catch (Exception e) {
                    // 处理失败,记录日志,可以选择跳过或重试
                    System.err.println("处理失败: " + e.getMessage());
                }
            }
            
            // 所有消息处理完后再提交
            if (!records.isEmpty()) {
                consumer.commitSync();
            }
        }
    }
    
    /**
     * 精确控制 Offset - 最可靠的方式
     */
    public void consumeWithPreciseOffset() {
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            
            // 按分区处理,精确提交每个分区的 offset
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    processRecord(record);
                }
                
                // 提交该分区的 offset(下一条要消费的位置)
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
                offsetMap.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offsetMap);
            }
        }
    }
    
    private void processRecord(ConsumerRecord<String, String> record) {
        System.out.printf("消费消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
            record.topic(), record.partition(), record.offset(), 
            record.key(), record.value());
        
        // 这里是实际的业务逻辑
        // 比如:更新数据库、调用下游服务、写入缓存等
    }
    
    public void shutdown() {
        running = false;
        consumer.wakeup();  // 中断 poll 阻塞
    }
    
    public void close() {
        consumer.close();
    }
    
    public static void main(String[] args) {
        OrderConsumer consumer = new OrderConsumer(
            "localhost:9092",
            "order-processing-group",
            "order-events"
        );
        
        // 优雅关闭
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            consumer.shutdown();
            consumer.close();
        }));
        
        consumer.consumeManualSync();
    }
}

4.3 Spring Boot 集成

// application.yml
/*
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
        linger.ms: 5
    consumer:
      group-id: order-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        spring.json.trusted.packages: "*"
    listener:
      ack-mode: manual
      concurrency: 3
*/

// OrderEvent.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private String userId;
    private BigDecimal amount;
    private String eventType;  // CREATED, PAID, SHIPPED, COMPLETED
    private LocalDateTime eventTime;
}

// KafkaProducerService.java
@Service
@Slf4j
public class KafkaProducerService {
    
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    public void sendOrderEvent(OrderEvent event) {
        String key = event.getOrderId();  // 用订单ID做key,保证顺序
        
        CompletableFuture<SendResult<String, OrderEvent>> future = 
            kafkaTemplate.send("order-events", key, event);
        
        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("发送订单事件失败: orderId={}, error={}", 
                    event.getOrderId(), ex.getMessage());
            } else {
                RecordMetadata metadata = result.getRecordMetadata();
                log.info("发送订单事件成功: orderId={}, partition={}, offset={}",
                    event.getOrderId(), metadata.partition(), metadata.offset());
            }
        });
    }
    
    /**
     * 带事务的发送
     */
    @Transactional
    public void sendWithTransaction(List<OrderEvent> events) {
        kafkaTemplate.executeInTransaction(operations -> {
            for (OrderEvent event : events) {
                operations.send("order-events", event.getOrderId(), event);
            }
            return true;
        });
    }
}

// KafkaConsumerService.java
@Service
@Slf4j
public class KafkaConsumerService {
    
    @Autowired
    private OrderService orderService;
    
    /**
     * 基础消费者
     */
    @KafkaListener(topics = "order-events", groupId = "order-processing")
    public void consume(OrderEvent event, Acknowledgment ack) {
        try {
            log.info("收到订单事件: {}", event);
            orderService.processOrderEvent(event);
            ack.acknowledge();  // 手动提交
        } catch (Exception e) {
            log.error("处理订单事件失败: {}", e.getMessage());
            // 不acknowledge,消息会重新消费
            // 或者发送到死信队列
        }
    }
    
    /**
     * 批量消费 - 高吞吐场景
     */
    @KafkaListener(
        topics = "order-events", 
        groupId = "order-batch-processing",
        containerFactory = "batchFactory"
    )
    public void consumeBatch(
            List<ConsumerRecord<String, OrderEvent>> records,
            Acknowledgment ack) {
        
        log.info("批量收到 {} 条消息", records.size());
        
        try {
            List<OrderEvent> events = records.stream()
                .map(ConsumerRecord::value)
                .collect(Collectors.toList());
            
            orderService.batchProcessOrderEvents(events);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("批量处理失败: {}", e.getMessage());
        }
    }
    
    /**
     * 并发消费 - 多分区并行
     */
    @KafkaListener(
        topics = "order-events",
        groupId = "order-concurrent-processing", 
        concurrency = "3"  // 3个消费者线程
    )
    public void consumeConcurrent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {
        
        log.info("分区{}消费: offset={}, event={}", partition, offset, event);
        orderService.processOrderEvent(event);
        ack.acknowledge();
    }
}

// KafkaConfig.java - 批量消费配置
@Configuration
public class KafkaConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> batchFactory(
            ConsumerFactory<String, OrderEvent> consumerFactory) {
        
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);  // 开启批量
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

4.4 Go 语言示例

用 Go 写 Kafka 客户端也很香,使用 segmentio/kafka-go 库:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/segmentio/kafka-go"
)

// OrderEvent 订单事件
type OrderEvent struct {
    OrderID   string    `json:"order_id"`
    UserID    string    `json:"user_id"`
    Amount    float64   `json:"amount"`
    EventType string    `json:"event_type"`
    EventTime time.Time `json:"event_time"`
}

// Producer 生产者
type Producer struct {
    writer *kafka.Writer
}

func NewProducer(brokers []string, topic string) *Producer {
    writer := &kafka.Writer{
        Addr:         kafka.TCP(brokers...),
        Topic:        topic,
        Balancer:     &kafka.Hash{},  // 根据 key hash 选择分区
        BatchSize:    100,
        BatchTimeout: 10 * time.Millisecond,
        RequiredAcks: kafka.RequireAll,
        Compression:  kafka.Lz4,
    }
    return &Producer{writer: writer}
}

func (p *Producer) Send(ctx context.Context, event OrderEvent) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("序列化失败: %w", err)
    }

    msg := kafka.Message{
        Key:   []byte(event.OrderID),  // 用订单ID做key
        Value: data,
    }

    err = p.writer.WriteMessages(ctx, msg)
    if err != nil {
        return fmt.Errorf("发送失败: %w", err)
    }

    log.Printf("发送成功: orderId=%s", event.OrderID)
    return nil
}

func (p *Producer) Close() error {
    return p.writer.Close()
}

// Consumer 消费者
type Consumer struct {
    reader *kafka.Reader
}

func NewConsumer(brokers []string, topic, groupID string) *Consumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        brokers,
        Topic:          topic,
        GroupID:        groupID,
        MinBytes:       1,
        MaxBytes:       10e6,  // 10MB
        MaxWait:        500 * time.Millisecond,
        CommitInterval: 0,  // 手动提交
        StartOffset:    kafka.FirstOffset,
    })
    return &Consumer{reader: reader}
}

func (c *Consumer) Consume(ctx context.Context, handler func(OrderEvent) error) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            log.Printf("拉取消息失败: %v", err)
            continue
        }

        var event OrderEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("反序列化失败: %v", err)
            // 跳过无效消息,提交offset
            c.reader.CommitMessages(ctx, msg)
            continue
        }

        log.Printf("收到消息: partition=%d, offset=%d, orderId=%s",
            msg.Partition, msg.Offset, event.OrderID)

        // 处理业务逻辑
        if err := handler(event); err != nil {
            log.Printf("处理失败: %v", err)
            // 可以选择重试或发送到死信队列
            continue
        }

        // 处理成功,提交offset
        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            log.Printf("提交offset失败: %v", err)
        }
    }
}

func (c *Consumer) Close() error {
    return c.reader.Close()
}

func main() {
    brokers := []string{"localhost:9092"}
    topic := "order-events"

    // 启动生产者
    producer := NewProducer(brokers, topic)
    defer producer.Close()

    // 发送测试消息
    go func() {
        for i := 0; i < 10; i++ {
            event := OrderEvent{
                OrderID:   fmt.Sprintf("ORDER_%d", i),
                UserID:    fmt.Sprintf("USER_%d", i%5),
                Amount:    float64(i) * 100.5,
                EventType: "CREATED",
                EventTime: time.Now(),
            }
            if err := producer.Send(context.Background(), event); err != nil {
                log.Printf("发送失败: %v", err)
            }
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 启动消费者
    consumer := NewConsumer(brokers, topic, "order-processing-group")
    defer consumer.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 优雅关闭
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigChan
        log.Println("收到关闭信号,正在优雅退出...")
        cancel()
    }()

    // 开始消费
    handler := func(event OrderEvent) error {
        log.Printf("处理订单: %+v", event)
        // 实际业务逻辑
        return nil
    }

    if err := consumer.Consume(ctx, handler); err != nil {
        log.Printf("消费结束: %v", err)
    }
}

五、进阶特性:从入门到进坑

5.1 事务消息(Exactly-Once 语义)

Kafka 0.11+ 支持事务,实现"精确一次"语义。

// 开启事务的 Producer 配置
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction-producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 事务发送
producer.initTransactions();

try {
    producer.beginTransaction();
    
    // 发送多条消息
    producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
    
    // 提交消费位移(用于 consume-transform-produce 模式)
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();  // 回滚
}

事务语义说明

graph LR subgraph Exactly-Once Processing A[Consumer 读取] --> B[业务处理] B --> C[Producer 写入] C --> D[提交 Offset] end E[全部成功] --> F[Transaction Commit] G[任一失败] --> H[Transaction Abort]

5.2 消费者 Rebalance

这是 Kafka 最让人头疼的机制之一。当以下情况发生时,会触发 Rebalance:

  • Consumer 加入或离开 Group
  • Consumer 心跳超时
  • Topic 分区数变化
  • 订阅的 Topic 发生变化
sequenceDiagram participant C1 as Consumer 1 participant C2 as Consumer 2 participant Coord as GroupCoordinator Note over C1,Coord: Consumer 2 加入 C2->>Coord: JoinGroup Coord->>C1: Rebalance通知 Coord->>C2: Rebalance通知 C1->>C1: 停止消费, 提交offset C2->>C2: 停止消费 C1->>Coord: JoinGroup(重新加入) C2->>Coord: JoinGroup Coord-->>C1: 你是Leader,计算分配 Coord-->>C2: 等待分配结果 C1->>Coord: SyncGroup(分配方案) C2->>Coord: SyncGroup Coord-->>C1: 你负责P0, P1 Coord-->>C2: 你负责P2 Note over C1,C2: 恢复消费

Rebalance 的痛点

  1. Stop The World:Rebalance 期间所有 Consumer 停止消费
  2. 重复消费:如果 Offset 没及时提交,可能重复消费
  3. 频繁触发:配置不当会导致频繁 Rebalance

优化策略

// 1. 增加会话超时,减少误判
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);

// 2. 增加处理超时
props.put("max.poll.interval.ms", 600000);  // 10分钟

// 3. 减少单次拉取量
props.put("max.poll.records", 100);

// 4. 使用 CooperativeStickyAssignor(Kafka 2.4+)
props.put("partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

5.3 消息顺序性保证

Partition 内有序:同一个 Partition 内的消息严格有序。

如何保证业务顺序

// 方案1:相同业务key发到同一分区
// 订单ID相同的消息发到同一分区
producer.send(new ProducerRecord<>("orders", orderId, orderJson));

// 方案2:单分区Topic(吞吐量受限)
// 创建时指定 --partitions 1

// 方案3:消费端重排序
// 消费后按时间戳或序列号排序

顺序性 vs 吞吐量的权衡

方案顺序性吞吐量适用场景
单分区全局有序严格顺序、小流量
Key分区业务键有序订单、用户维度顺序
多分区无顺序保证日志、监控数据

六、运维踩坑实录

6.1 常见问题排查

问题1:Consumer 消费不到数据

# 1. 检查 Topic 是否存在
kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092

# 2. 检查 Consumer Group 状态
kafka-consumer-groups.sh --describe --group order-processing --bootstrap-server localhost:9092

# 3. 检查 Offset
# 如果 CURRENT-OFFSET = LOG-END-OFFSET,说明消费完了
# 如果 LAG > 0,说明有积压

# 4. 重置 Offset(慎用)
kafka-consumer-groups.sh --reset-offsets \
  --group order-processing \
  --topic order-events \
  --to-earliest \
  --execute \
  --bootstrap-server localhost:9092

问题2:Producer 发送超时

# 1. 检查 Broker 连通性
telnet kafka-broker 9092

# 2. 检查 Broker 日志
tail -f /var/log/kafka/server.log

# 3. 检查磁盘空间
df -h /kafka-logs

# 4. 检查网络延迟
ping kafka-broker

问题3:Rebalance 风暴

// 症状:日志中大量 "Revoking previously assigned partitions"

// 排查步骤:
// 1. 检查是否有 Consumer 频繁重启
// 2. 检查 max.poll.interval.ms 是否太短
// 3. 检查处理逻辑是否太慢
// 4. 检查 GC 停顿是否过长

// 优化:
props.put("max.poll.interval.ms", 600000);
props.put("max.poll.records", 100);

6.2 监控指标

# 核心监控指标
Producer:
  - record-send-rate          # 发送速率
  - record-error-rate         # 错误率
  - request-latency-avg       # 平均延迟
  - batch-size-avg            # 批次大小
  - buffer-available-bytes    # 缓冲区剩余

Consumer:
  - records-consumed-rate     # 消费速率
  - records-lag              # 消费积压
  - fetch-latency-avg        # 拉取延迟
  - commit-latency-avg       # 提交延迟

Broker:
  - UnderReplicatedPartitions # 副本不足的分区数
  - OfflinePartitionsCount    # 离线分区数
  - ActiveControllerCount     # 活跃Controller数
  - BytesInPerSec            # 入站流量
  - BytesOutPerSec           # 出站流量
  - RequestsPerSec           # 请求速率
graph TB subgraph Kafka Monitoring Stack Kafka[Kafka Cluster] JMX[JMX Exporter] Prom[Prometheus] Grafana[Grafana] Alert[AlertManager] end Kafka --> JMX JMX --> Prom Prom --> Grafana Prom --> Alert Alert --> Slack[Slack/钉钉]

6.3 性能调优

Producer 调优

// 高吞吐配置
props.put("batch.size", 65536);           // 64KB batch
props.put("linger.ms", 10);               // 等待10ms凑批
props.put("buffer.memory", 67108864);     // 64MB 缓冲
props.put("compression.type", "lz4");     // LZ4压缩
props.put("acks", "1");                   // 只等Leader确认

// 低延迟配置
props.put("batch.size", 0);               // 不批量
props.put("linger.ms", 0);                // 不等待
props.put("acks", "1");                   // 只等Leader

Consumer 调优

// 高吞吐配置
props.put("fetch.min.bytes", 65536);      // 最少拉64KB
props.put("fetch.max.wait.ms", 500);      // 最多等500ms
props.put("max.poll.records", 1000);      // 单次拉1000条
props.put("max.partition.fetch.bytes", 1048576);  // 每分区1MB

// 多线程消费(手动分区分配)
// 每个线程消费固定分区,避免Rebalance

Broker 调优

# server.properties

# 网络线程
num.network.threads=8
num.io.threads=16

# 日志配置
log.segment.bytes=1073741824          # 1GB per segment
log.retention.hours=168               # 保留7天
log.retention.bytes=-1                # 不限制大小
log.cleanup.policy=delete             # 删除策略

# 复制配置
num.replica.fetchers=4
replica.fetch.max.bytes=10485760      # 10MB

# ZK会话
zookeeper.session.timeout.ms=18000

七、集群部署实战

7.1 Docker Compose 部署(开发环境)

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    volumes:
      - zk_data:/var/lib/zookeeper/data
      - zk_log:/var/lib/zookeeper/log

  kafka1:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_LOG_RETENTION_HOURS: 168
    volumes:
      - kafka1_data:/var/lib/kafka/data

  kafka2:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - kafka2_data:/var/lib/kafka/data

  kafka3:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - kafka3_data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9093,kafka3:9094
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

volumes:
  zk_data:
  zk_log:
  kafka1_data:
  kafka2_data:
  kafka3_data:

7.2 KRaft 模式(无 ZooKeeper)

Kafka 3.0+ 支持 KRaft 模式,不再依赖 ZooKeeper:

# 生成集群ID
KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)

# 格式化存储
kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /path/to/kraft/server.properties

# kraft/server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093,2@localhost:9094,3@localhost:9095
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://localhost:9092
controller.listener.names=CONTROLLER
log.dirs=/var/kafka-logs

7.3 生产环境架构

graph TB subgraph 生产者集群 P1[Producer 1] P2[Producer 2] P3[Producer N] end subgraph 负载均衡 LB[DNS/LB] end subgraph Kafka 集群 subgraph 机架1 B1[Broker 1] B2[Broker 2] end subgraph 机架2 B3[Broker 3] B4[Broker 4] end subgraph 机架3 B5[Broker 5] B6[Broker 6] end end subgraph ZK 集群 ZK1[ZK 1] ZK2[ZK 2] ZK3[ZK 3] end subgraph 消费者集群 CG1[Consumer Group 1] CG2[Consumer Group 2] end P1 --> LB P2 --> LB P3 --> LB LB --> B1 LB --> B3 LB --> B5 B1 --- ZK1 B3 --- ZK2 B5 --- ZK3 B1 --> CG1 B3 --> CG1 B5 --> CG2

生产环境配置建议

配置项推荐值说明
Broker 数量>= 3至少3个,保证高可用
Partition 数量3-10 per Topic根据吞吐量调整
Replication Factor3三副本
min.insync.replicas2最少同步副本数
acksall等待所有ISR确认

八、Kafka 生态全景

graph TB subgraph Data Sources MySQL[(MySQL)] PostgreSQL[(PostgreSQL)] MongoDB[(MongoDB)] App[Applications] Logs[Log Files] end subgraph Kafka Connect Debezium[Debezium CDC] FileSource[File Source] JDBCSource[JDBC Source] end subgraph Kafka Core Broker[Kafka Brokers] SR[Schema Registry] end subgraph Stream Processing KStreams[Kafka Streams] KSQL[ksqlDB] Flink[Apache Flink] Spark[Spark Streaming] end subgraph Kafka Connect Sink ES[Elasticsearch] S3[S3/HDFS] DW[Data Warehouse] end MySQL --> Debezium PostgreSQL --> Debezium App --> Broker Logs --> FileSource Debezium --> Broker FileSource --> Broker Broker --> KStreams Broker --> KSQL Broker --> Flink Broker --> Spark KStreams --> Broker Broker --> ES Broker --> S3 Broker --> DW SR --- Broker

8.1 核心生态组件

组件用途典型场景
Kafka Connect数据集成MySQL CDC → Kafka → ES
Schema Registry模式管理Avro/Protobuf 模式版本控制
Kafka Streams流处理实时聚合、Join、窗口计算
ksqlDBSQL流处理用SQL分析实时数据流
DebeziumCDC数据库变更捕获
MirrorMaker 2跨集群复制多数据中心同步

九、Kafka vs 其他消息队列

特性KafkaRabbitMQRocketMQPulsar
吞吐量极高(百万/秒)中(万级)高(十万级)极高
延迟毫秒级微秒级毫秒级毫秒级
消息持久化磁盘(高效)内存/磁盘磁盘磁盘(BookKeeper)
消息回溯支持不支持支持支持
顺序消息Partition有序队列有序队列有序分区有序
事务消息支持支持支持支持
协议自定义AMQP自定义自定义
运维复杂度
适用场景大数据、日志、流处理业务解耦、RPC电商、金融多租户、云原生

我的选型建议

  • 日志收集、大数据管道 → Kafka(吞吐量王者)
  • 业务系统解耦、延迟敏感 → RabbitMQ(成熟稳定)
  • 电商、金融场景 → RocketMQ(阿里背书,功能全面)
  • 云原生、多租户 → Pulsar(新架构,存算分离)

十、最佳实践清单

10.1 Producer 最佳实践

✅ DO:
- 使用 acks=all 保证可靠性
- 开启幂等(enable.idempotence=true)
- 使用业务key保证分区路由
- 合理设置 batch.size 和 linger.ms
- 实现发送失败的重试和补偿

❌ DON'T:
- 忽略发送回调的异常
- 发送超大消息(默认限制1MB)
- 在同步发送中阻塞主线程
- 忘记 close() 导致消息丢失

10.2 Consumer 最佳实践

✅ DO:
- 关闭自动提交,使用手动提交
- 处理完业务逻辑再提交 Offset
- 使用 ConsumerRebalanceListener 处理 Rebalance
- 合理设置 max.poll.records 和 max.poll.interval.ms
- 实现幂等消费(数据库唯一键、Redis去重等)

❌ DON'T:
- 在 poll 循环中做耗时操作
- 忽略 Rebalance 导致的重复消费
- 使用过小的 session.timeout.ms
- 一个 Consumer Group 消费过多 Topic

10.3 运维最佳实践

✅ DO:
- 至少3个Broker,跨机架部署
- replication.factor >= 3
- min.insync.replicas >= 2
- 监控 UnderReplicatedPartitions
- 定期清理过期数据
- 使用 Kafka UI 工具管理

❌ DON'T:
- 单点部署 Broker
- 磁盘打满才处理
- 忽略 Consumer Lag 告警
- 随意调整 Partition 数量
- 在生产环境测试新配置

结语:从入门到不想放弃

写到这里,我想说的是:Kafka 确实复杂,但它的复杂是有道理的

它解决的是分布式系统中最棘手的问题:

  • 如何在多个系统间可靠地传递海量数据?
  • 如何保证消息不丢、不重、顺序正确?
  • 如何在系统崩溃后快速恢复?

这些问题本身就不简单,所以 Kafka 的设计也不可能简单。

但好消息是,大多数场景下,你只需要掌握 20% 的核心知识,就能解决 80% 的问题。这篇文章覆盖的内容,足够你在大多数项目中游刃有余。

最后送你一句话:技术学习的目的不是为了放弃,而是为了在想放弃的时候,知道自己还能再坚持一下

加油,我们下篇文章见!


参考资源

评论区
暂无评论
avatar