搜 索

从Zookeeper到分布式系统开发

  • 268阅读
  • 2022年09月04日
  • 0评论
首页 / 编程 / 正文

前言:为什么我们需要一个"动物园管理员"

话说在单体应用的美好时代,一切都是那么简单:一个Tomcat打天下,一个MySQL存数据,session往内存一扔,美滋滋。

然后有一天,老板说:"咱们系统要支持百万用户!"

于是你开始了分布式改造之旅。拆服务、加缓存、搞集群,一顿操作猛如虎。但很快你发现,新的问题如雨后春笋般冒出来:

  • 服务A怎么知道服务B在哪?(服务发现)
  • 配置改了怎么通知所有节点?(配置中心)
  • 两个节点同时操作一条数据怎么办?(分布式锁)
  • 主节点挂了谁来接班?(Leader选举)
  • 这个定时任务到底该谁跑?(任务调度)

这些问题的本质,都是分布式协调——让一群各自为政的机器达成共识,就像让一群程序员统一代码风格一样困难。

而Zookeeper,就是来解决这个问题的。它的名字翻译过来是"动物园管理员",寓意是管理Hadoop生态圈里那些以动物命名的项目(Pig、Hive、Elephant...)。但实际上,它更像是分布式系统里的"居委会大妈"——什么事都要管,什么信息都知道,关键时刻还能主持公道。

第一章:Zookeeper到底是个啥

1.1 一句话定义

Zookeeper = 分布式协调服务 = 一个高可用的、强一致性的、分布式的小型数据库

等等,数据库?没错,你可以把Zookeeper理解成一个特殊的数据库:

特性MySQLRedisZookeeper
数据模型表结构Key-Value树形结构
数据量TB级GB级MB级(建议)
一致性最终一致最终一致强一致(顺序一致)
主要用途存业务数据缓存存元数据/协调
读写比均衡读多写少读多写少

1.2 数据模型:一棵倒挂的树

Zookeeper的数据结构是一棵树,和Linux文件系统很像:

/
├── app
│   ├── config
│   │   ├── db_url = "jdbc:mysql://..."
│   │   └── redis_host = "192.168.1.100"
│   └── services
│       ├── user-service-001 = "192.168.1.10:8080"
│       ├── user-service-002 = "192.168.1.11:8080"
│       └── order-service-001 = "192.168.1.20:8081"
├── locks
│   └── order_lock_12345
└── election
    ├── leader = "node-2"
    └── candidates
        ├── node-1
        ├── node-2
        └── node-3

树上的每个节点叫做ZNode,每个ZNode可以存储数据(默认上限1MB,但别真存这么多,Zookeeper不是干这个的)。

1.3 ZNode的四种类型

这是Zookeeper最精妙的设计之一:

graph TB subgraph ZNode类型 A[持久节点 PERSISTENT] B[临时节点 EPHEMERAL] C[持久顺序节点 PERSISTENT_SEQUENTIAL] D[临时顺序节点 EPHEMERAL_SEQUENTIAL] end A --> A1[创建后一直存在] A --> A2[需要手动删除] B --> B1[会话结束自动删除] B --> B2[不能有子节点] C --> C1[自动追加递增序号] C --> C2[如 node-0000000001] D --> D1[临时 + 顺序] D --> D2[分布式锁的最佳选择]

记忆口诀

  • 持久节点:像刻在石头上的字,你不删它就一直在
  • 临时节点:像沙滩上的字,浪(会话断开)一来就没了
  • 顺序节点:像银行取号机,自动给你排个序号

1.4 Watcher机制:Zookeeper的灵魂

如果说ZNode是Zookeeper的骨架,那Watcher就是它的神经系统。

Watcher允许客户端在某个ZNode上注册监听,当这个节点发生变化时,Zookeeper会主动通知客户端。

sequenceDiagram participant Client1 as 客户端1 participant ZK as Zookeeper participant Client2 as 客户端2 Client1->>ZK: getData("/config/db", watch=true) ZK-->>Client1: 返回数据 + 注册Watcher Note over Client1: 继续干其他事... Client2->>ZK: setData("/config/db", "新配置") ZK-->>Client2: OK ZK->>Client1: 通知: /config/db 发生变化! Client1->>ZK: getData("/config/db", watch=true) Note over Client1: 重新获取数据并注册新Watcher

重要特性

  • Watcher是一次性的!触发后就失效,需要重新注册
  • Watcher通知只告诉你"变了",不告诉你"变成啥了",需要自己再查
  • 这个设计看起来蠢,但其实很聪明——避免了大量数据传输

第二章:Zookeeper集群架构

2.1 集群角色:皇帝、大臣和打酱油的

graph TB subgraph Zookeeper集群 Leader[Leader 领导者] Follower1[Follower 跟随者] Follower2[Follower 跟随者] Observer[Observer 观察者] end Client1[客户端] --> Leader Client2[客户端] --> Follower1 Client3[客户端] --> Observer Leader -->|同步数据| Follower1 Leader -->|同步数据| Follower2 Leader -->|同步数据| Observer Follower1 -.->|转发写请求| Leader Observer -.->|转发写请求| Leader
角色职责参与选举参与投票
Leader处理所有写请求,协调集群
Follower处理读请求,转发写请求,参与选举
Observer只处理读请求,不参与任何投票

为什么需要Observer?

想象一下:你的Zookeeper集群有5个节点,写请求需要3个节点确认(过半原则)。现在流量暴涨,你想加节点提高读性能。加到7个节点?写请求需要4个节点确认,性能反而下降了!

Observer就是解决这个问题的——它只干活,不参与投票,加再多也不影响写性能。

2.2 ZAB协议:让一群机器达成共识的艺术

ZAB(Zookeeper Atomic Broadcast)是Zookeeper的核心协议,保证了集群的数据一致性。

简单来说,ZAB干两件事:

  1. 崩溃恢复:Leader挂了,选个新的
  2. 消息广播:Leader收到写请求,广播给所有Follower

写请求的处理流程

sequenceDiagram participant C as Client participant L as Leader participant F1 as Follower1 participant F2 as Follower2 C->>L: 写请求 L->>L: 生成事务提案 ZXID par 广播提案 L->>F1: Proposal L->>F2: Proposal end F1->>L: ACK F2->>L: ACK Note over L: 收到过半ACK par 广播提交 L->>F1: Commit L->>F2: Commit end L->>C: 返回成功

ZXID是个64位数字,高32位是epoch(朝代),低32位是事务序号。每次选出新Leader,epoch就+1。这个设计很巧妙——通过epoch可以识别"前朝的剑能不能斩本朝的官"。

2.3 为什么Zookeeper集群要奇数个节点?

这是面试必考题。答案其实很简单:

Zookeeper遵循过半原则——任何决策都需要超过一半的节点同意。

  • 3个节点:最多允许1个挂掉(2 > 3/2)
  • 4个节点:最多允许1个挂掉(3 > 4/2)
  • 5个节点:最多允许2个挂掉(3 > 5/2)

发现了吗?3个节点和4个节点的容错能力一样!那多加一个节点干嘛?白白增加网络通信开销。

所以,3个节点、5个节点、7个节点是最常见的配置。

第三章:Zookeeper的六大应用场景

3.1 配置中心:让改配置不再是噩梦

痛点:配置文件散落在100台机器上,改一个参数要登录100次。

解决方案

graph LR subgraph 配置中心 ZK[(Zookeeper)] end Admin[管理员] -->|修改配置| ZK ZK -->|Watch通知| App1[应用实例1] ZK -->|Watch通知| App2[应用实例2] ZK -->|Watch通知| App3[应用实例3] App1 -->|拉取新配置| ZK App2 -->|拉取新配置| ZK App3 -->|拉取新配置| ZK

Java代码示例

public class ConfigCenter {
    private CuratorFramework client;
    private Map<String, String> configCache = new ConcurrentHashMap<>();
    
    public void init() {
        client = CuratorFrameworkFactory.builder()
            .connectString("zk1:2181,zk2:2181,zk3:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
        client.start();
        
        // 监听配置节点
        PathChildrenCache cache = new PathChildrenCache(client, "/config", true);
        cache.getListenable().addListener((cli, event) -> {
            switch (event.getType()) {
                case CHILD_ADDED:
                case CHILD_UPDATED:
                    String path = event.getData().getPath();
                    String value = new String(event.getData().getData());
                    configCache.put(path, value);
                    log.info("配置更新: {} = {}", path, value);
                    break;
                case CHILD_REMOVED:
                    configCache.remove(event.getData().getPath());
                    break;
            }
        });
        cache.start();
    }
    
    public String getConfig(String key) {
        return configCache.get("/config/" + key);
    }
}

3.2 服务注册与发现:微服务的基石

痛点:服务A要调用服务B,但服务B有10个实例,IP还会变,怎么办?

graph TB subgraph Zookeeper Services[/services] UserService[/services/user-service] US1[instance-001: 192.168.1.10:8080] US2[instance-002: 192.168.1.11:8080] OrderService[/services/order-service] OS1[instance-001: 192.168.1.20:8081] end Services --> UserService Services --> OrderService UserService --> US1 UserService --> US2 OrderService --> OS1 Provider1[User Service 实例1] -->|注册临时节点| US1 Provider2[User Service 实例2] -->|注册临时节点| US2 Consumer[Order Service] -->|Watch + 获取列表| UserService

核心原理

  1. 服务启动时,在Zookeeper创建临时节点
  2. 服务挂掉,会话断开,临时节点自动删除
  3. 消费者Watch服务目录,实时感知服务上下线

Go代码示例(服务注册):

package registry

import (
    "fmt"
    "time"
    "github.com/go-zookeeper/zk"
)

type ServiceRegistry struct {
    conn     *zk.Conn
    basePath string
}

func NewServiceRegistry(servers []string) (*ServiceRegistry, error) {
    conn, _, err := zk.Connect(servers, 5*time.Second)
    if err != nil {
        return nil, err
    }
    return &ServiceRegistry{conn: conn, basePath: "/services"}, nil
}

// Register 注册服务实例
func (r *ServiceRegistry) Register(serviceName, instanceID, address string) error {
    servicePath := fmt.Sprintf("%s/%s", r.basePath, serviceName)
    
    // 确保服务目录存在(持久节点)
    r.ensurePath(servicePath)
    
    // 注册实例(临时节点)
    instancePath := fmt.Sprintf("%s/%s", servicePath, instanceID)
    _, err := r.conn.Create(
        instancePath,
        []byte(address),
        zk.FlagEphemeral,  // 临时节点,断开自动删除
        zk.WorldACL(zk.PermAll),
    )
    
    if err == zk.ErrNodeExists {
        // 节点已存在,可能是重连,更新数据
        _, err = r.conn.Set(instancePath, []byte(address), -1)
    }
    
    return err
}

// Discover 发现服务实例(带Watch)
func (r *ServiceRegistry) Discover(serviceName string) ([]string, <-chan zk.Event, error) {
    servicePath := fmt.Sprintf("%s/%s", r.basePath, serviceName)
    
    children, _, eventCh, err := r.conn.ChildrenW(servicePath)
    if err != nil {
        return nil, nil, err
    }
    
    var addresses []string
    for _, child := range children {
        data, _, _ := r.conn.Get(fmt.Sprintf("%s/%s", servicePath, child))
        addresses = append(addresses, string(data))
    }
    
    return addresses, eventCh, nil
}

3.3 分布式锁:让并发不再可怕

这是Zookeeper最经典的应用场景。

方案一:简单粗暴版

// 尝试创建临时节点,成功就是拿到锁
public boolean tryLock(String lockPath) {
    try {
        client.create()
            .withMode(CreateMode.EPHEMERAL)
            .forPath(lockPath);
        return true;
    } catch (NodeExistsException e) {
        return false; // 节点已存在,别人持有锁
    }
}

问题:如果100个线程同时抢锁,99个失败后怎么办?不断重试?这就是惊群效应

方案二:顺序节点版(推荐)

sequenceDiagram participant C1 as Client1 participant C2 as Client2 participant C3 as Client3 participant ZK as Zookeeper C1->>ZK: 创建 /lock/seq-0001 C2->>ZK: 创建 /lock/seq-0002 C3->>ZK: 创建 /lock/seq-0003 Note over C1: 我是最小的,拿到锁! Note over C2: 我监听 seq-0001 Note over C3: 我监听 seq-0002 C1->>ZK: 删除 /lock/seq-0001 释放锁 ZK->>C2: 通知: seq-0001 被删除了 Note over C2: 轮到我了,拿到锁!

完整实现

public class DistributedLock {
    private CuratorFramework client;
    private String lockPath;
    private String currentPath;
    
    public DistributedLock(CuratorFramework client, String lockName) {
        this.client = client;
        this.lockPath = "/locks/" + lockName;
    }
    
    public void lock() throws Exception {
        // 1. 创建临时顺序节点
        currentPath = client.create()
            .creatingParentsIfNeeded()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
            .forPath(lockPath + "/seq-");
        
        // 2. 获取所有子节点并排序
        while (true) {
            List<String> children = client.getChildren().forPath(lockPath);
            Collections.sort(children);
            
            String currentNode = currentPath.substring(lockPath.length() + 1);
            int index = children.indexOf(currentNode);
            
            if (index == 0) {
                // 我是最小的,获取锁成功
                return;
            }
            
            // 3. 监听前一个节点
            String prevNode = children.get(index - 1);
            CountDownLatch latch = new CountDownLatch(1);
            
            Stat stat = client.checkExists()
                .usingWatcher((Watcher) event -> latch.countDown())
                .forPath(lockPath + "/" + prevNode);
            
            if (stat != null) {
                // 前一个节点还在,等待通知
                latch.await();
            }
            // 前一个节点不在了,继续循环检查
        }
    }
    
    public void unlock() throws Exception {
        if (currentPath != null) {
            client.delete().forPath(currentPath);
        }
    }
}

当然,实际项目中直接用Curator的InterProcessMutex就好了

InterProcessMutex lock = new InterProcessMutex(client, "/locks/order");
try {
    if (lock.acquire(10, TimeUnit.SECONDS)) {
        // 拿到锁,执行业务逻辑
        doSomething();
    }
} finally {
    lock.release();
}

3.4 Leader选举:谁来当老大

分布式系统中,很多场景需要选出一个"老大":

  • 数据库主从复制,谁是Master?
  • 定时任务,谁来执行?
  • 分布式事务,谁来协调?

原理:和分布式锁几乎一样,谁的顺序节点最小,谁就是Leader。

graph TB subgraph Zookeeper Election[/election] N1[node-0001: Server-A] N2[node-0002: Server-B] N3[node-0003: Server-C] Election --> N1 Election --> N2 Election --> N3 end ServerA[Server A] -->|创建| N1 ServerB[Server B] -->|创建| N2 ServerC[Server C] -->|创建| N3 N1 -.->|最小节点 = Leader| ServerA N2 -.->|Watch N1| ServerB N3 -.->|Watch N2| ServerC

Curator的LeaderLatch

public class LeaderElectionExample {
    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("localhost:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
        client.start();
        
        String serverId = UUID.randomUUID().toString();
        LeaderLatch leaderLatch = new LeaderLatch(client, "/election", serverId);
        
        leaderLatch.addListener(new LeaderLatchListener() {
            @Override
            public void isLeader() {
                System.out.println("我当选Leader了!开始干活...");
                // 执行Leader才能做的事情
            }
            
            @Override
            public void notLeader() {
                System.out.println("我不是Leader了,休息一下...");
            }
        });
        
        leaderLatch.start();
        
        // 保持运行
        Thread.sleep(Long.MAX_VALUE);
    }
}

3.5 分布式队列:有序的任务分发

利用顺序节点,可以实现一个FIFO队列:

graph LR subgraph 队列 /queue T1[task-0001] T2[task-0002] T3[task-0003] end Producer[生产者] -->|创建顺序节点| T3 Consumer[消费者] -->|获取最小节点并删除| T1
// 生产者:添加任务
public void produce(String data) throws Exception {
    client.create()
        .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
        .forPath("/queue/task-", data.getBytes());
}

// 消费者:获取并处理任务
public String consume() throws Exception {
    List<String> children = client.getChildren().forPath("/queue");
    if (children.isEmpty()) {
        return null;
    }
    
    Collections.sort(children);
    String firstChild = children.get(0);
    String path = "/queue/" + firstChild;
    
    byte[] data = client.getData().forPath(path);
    client.delete().forPath(path);
    
    return new String(data);
}

3.6 集群管理:谁在线,谁掉线

利用临时节点的特性,可以轻松实现集群成员管理:

public class ClusterManager {
    private CuratorFramework client;
    private PathChildrenCache cache;
    
    public void start() throws Exception {
        // 注册自己(临时节点)
        String myId = InetAddress.getLocalHost().getHostName();
        client.create()
            .withMode(CreateMode.EPHEMERAL)
            .forPath("/cluster/members/" + myId, "online".getBytes());
        
        // 监听集群成员变化
        cache = new PathChildrenCache(client, "/cluster/members", true);
        cache.getListenable().addListener((cli, event) -> {
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("新成员加入: " + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("成员离开: " + event.getData().getPath());
                    // 可能需要做一些故障转移的事情
                    handleMemberLeave(event.getData().getPath());
                    break;
            }
        });
        cache.start();
    }
}

第四章:Zookeeper客户端选型

4.1 原生客户端 vs Curator

特性原生ZK客户端Curator
API友好度差(回调地狱)好(流式API)
连接管理手动处理重连自动重连
Watcher一次性,需反复注册封装了持续监听
分布式原语自己实现开箱即用
社区活跃度一般活跃(Apache顶级项目)

结论:生产环境请无脑选Curator。

4.2 Curator核心组件

graph TB subgraph Curator Framework Client[CuratorFramework] subgraph Recipes Lock[InterProcessMutex 分布式锁] Leader[LeaderLatch 选举] Cache[PathChildrenCache 缓存] Barrier[DistributedBarrier 屏障] Queue[DistributedQueue 队列] end subgraph Extensions Discovery[ServiceDiscovery] Async[AsyncCuratorFramework] end end Client --> Recipes Client --> Extensions

Maven依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.5.0</version>
</dependency>

第五章:踩坑指南(血泪经验)

5.1 Session超时:最常见的坑

症状:服务运行一段时间后,Zookeeper连接断开,临时节点被删除。

原因

  • GC导致的STW(Stop The World)超过session超时时间
  • 网络抖动
  • Zookeeper服务器负载过高

解决方案

// 1. 适当增加session超时时间(但不要太长)
CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("zk1:2181,zk2:2181,zk3:2181")
    .sessionTimeoutMs(30000)  // 30秒
    .connectionTimeoutMs(15000)
    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    .build();

// 2. 监听连接状态变化
client.getConnectionStateListenable().addListener((cli, state) -> {
    switch (state) {
        case LOST:
            log.error("Zookeeper连接丢失!临时节点可能已被删除");
            // 重新注册服务等
            break;
        case RECONNECTED:
            log.info("Zookeeper重新连接");
            // 重新创建临时节点
            break;
    }
});

5.2 Watcher丢失:你以为注册了,其实没有

症状:配置更新后,部分客户端没有收到通知。

原因:Watcher是一次性的,触发后需要重新注册。如果重新注册的时机不对,可能会丢失通知。

解决方案:使用Curator的PathChildrenCacheTreeCache,它们会自动管理Watcher的重新注册。

// 错误做法:手动管理Watcher
Stat stat = client.checkExists().usingWatcher(myWatcher).forPath(path);

// 正确做法:使用Cache
PathChildrenCache cache = new PathChildrenCache(client, "/config", true);
cache.getListenable().addListener((cli, event) -> {
    // 自动处理所有变更
});
cache.start();

5.3 脑裂问题:两个Leader的噩梦

症状:网络分区后,出现了两个Leader,数据不一致。

真相:Zookeeper本身不会出现脑裂(ZAB协议保证),但你的应用可能会!

场景

  1. 客户端A持有锁,做到一半
  2. 网络抖动,A的session过期,锁被释放
  3. 客户端B获取锁,开始操作
  4. 客户端A网络恢复,以为自己还持有锁,继续操作
  5. 💥 两个客户端同时操作,数据乱了

解决方案:Fencing Token

public class SafeDistributedLock {
    private InterProcessMutex lock;
    private long fencingToken;
    
    public void doWithLock(Runnable action) throws Exception {
        if (lock.acquire(10, TimeUnit.SECONDS)) {
            try {
                // 获取当前版本号作为fencing token
                Stat stat = client.checkExists().forPath(lockPath);
                fencingToken = stat.getVersion();
                
                // 执行业务操作时,带上fencing token
                action.run();
                
            } finally {
                lock.release();
            }
        }
    }
    
    // 数据库操作时验证token
    public void updateWithToken(long token, Data data) {
        // UPDATE table SET ... WHERE id = ? AND version < ?
        // 只有token比数据库中的大,才能更新
    }
}

5.4 性能问题:Zookeeper不是万能的

反模式:把Zookeeper当数据库用,存大量数据。

正确姿势

  • 单个ZNode数据量控制在KB级别
  • 子节点数量控制在数千个以内
  • 读多写少的场景才适合
  • 不要频繁创建/删除节点(会产生大量事务日志)

性能数据参考(3节点集群):

  • 读请求:10万+ QPS
  • 写请求:1万+ QPS
  • 每秒Watcher通知:取决于集群规模,通常几千

5.5 运维踩坑:日志和磁盘

坑1:事务日志暴涨

Zookeeper的事务日志如果不清理,会撑爆磁盘。

# 配置自动清理(zoo.cfg)
autopurge.snapRetainCount=3
autopurge.purgeInterval=24

坑2:磁盘IO瓶颈

事务日志写入是同步的,磁盘慢会直接影响写性能。

# 把事务日志放SSD上
dataLogDir=/ssd/zookeeper/logs
dataDir=/hdd/zookeeper/data

第六章:Zookeeper的替代者们

虽然Zookeeper是分布式协调领域的老大哥,但它也有不少问题:

  • Java写的,内存占用大
  • 运维复杂
  • API不够友好
  • 不支持多数据中心

于是,一些新秀出现了:

6.1 etcd:K8s的御用

graph LR subgraph 对比 ZK[Zookeeper] ETCD[etcd] Consul[Consul] end ZK --> ZK1[Java / ZAB协议] ZK --> ZK2[树形数据结构] ZK --> ZK3[Watch机制] ETCD --> E1[Go / Raft协议] ETCD --> E2[KV存储] ETCD --> E3[gRPC + HTTP API] Consul --> C1[Go / Raft协议] Consul --> C2[服务发现优先] Consul --> C3[内置健康检查]
特性ZookeeperetcdConsul
语言JavaGoGo
一致性协议ZABRaftRaft
数据模型树形扁平KVKV + 服务目录
API原生协议gRPC + RESTREST
服务发现需自己实现需自己实现原生支持
健康检查内置
多数据中心不支持不支持支持
适用场景通用协调K8s/配置中心微服务

6.2 何时选择什么

  • 已有Hadoop生态 → Zookeeper(原生支持)
  • Kubernetes环境 → etcd(标配)
  • 纯微服务架构 → Consul(服务发现最强)
  • 简单的配置中心 → etcd(轻量)
  • 复杂的协调场景 → Zookeeper(生态最成熟)

第七章:实战案例——从零搭建服务注册中心

让我们把学到的知识串起来,实现一个简单但完整的服务注册中心。

7.1 架构设计

graph TB subgraph 服务注册中心 ZK[(Zookeeper集群)] end subgraph 服务提供者 P1[user-service-1] P2[user-service-2] P3[order-service-1] end subgraph 服务消费者 C1[gateway] C2[web-app] end P1 -->|注册| ZK P2 -->|注册| ZK P3 -->|注册| ZK C1 -->|发现 + Watch| ZK C2 -->|发现 + Watch| ZK C1 -.->|调用| P1 C1 -.->|调用| P2 C2 -.->|调用| P3

7.2 核心代码

服务注册SDK

@Component
public class ServiceRegistry {
    private static final String BASE_PATH = "/services";
    
    @Autowired
    private CuratorFramework zkClient;
    
    @Value("${spring.application.name}")
    private String serviceName;
    
    @Value("${server.port}")
    private int port;
    
    private String instancePath;
    
    @PostConstruct
    public void register() throws Exception {
        String host = InetAddress.getLocalHost().getHostAddress();
        String instanceId = host + ":" + port;
        
        // 服务目录(持久节点)
        String servicePath = BASE_PATH + "/" + serviceName;
        if (zkClient.checkExists().forPath(servicePath) == null) {
            zkClient.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath(servicePath);
        }
        
        // 实例节点(临时节点)
        instancePath = servicePath + "/" + instanceId;
        
        ServiceInstance instance = ServiceInstance.builder()
            .serviceName(serviceName)
            .host(host)
            .port(port)
            .status("UP")
            .metadata(Map.of(
                "version", "1.0.0",
                "weight", "100"
            ))
            .build();
        
        zkClient.create()
            .withMode(CreateMode.EPHEMERAL)
            .forPath(instancePath, JsonUtils.toJson(instance).getBytes());
        
        log.info("服务注册成功: {}", instancePath);
    }
    
    @PreDestroy
    public void deregister() throws Exception {
        if (instancePath != null) {
            zkClient.delete().forPath(instancePath);
            log.info("服务注销成功: {}", instancePath);
        }
    }
}

服务发现SDK

@Component
public class ServiceDiscovery {
    private static final String BASE_PATH = "/services";
    
    @Autowired
    private CuratorFramework zkClient;
    
    private Map<String, List<ServiceInstance>> serviceCache = new ConcurrentHashMap<>();
    private Map<String, PathChildrenCache> watchers = new ConcurrentHashMap<>();
    
    public List<ServiceInstance> getInstances(String serviceName) {
        // 先从缓存取
        List<ServiceInstance> instances = serviceCache.get(serviceName);
        if (instances != null) {
            return instances;
        }
        
        // 缓存没有,从ZK加载并监听
        return loadAndWatch(serviceName);
    }
    
    private synchronized List<ServiceInstance> loadAndWatch(String serviceName) {
        if (serviceCache.containsKey(serviceName)) {
            return serviceCache.get(serviceName);
        }
        
        String servicePath = BASE_PATH + "/" + serviceName;
        
        try {
            // 创建监听器
            PathChildrenCache cache = new PathChildrenCache(zkClient, servicePath, true);
            cache.getListenable().addListener((cli, event) -> {
                log.info("服务实例变更: {} - {}", serviceName, event.getType());
                refreshCache(serviceName, cache);
            });
            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            
            watchers.put(serviceName, cache);
            refreshCache(serviceName, cache);
            
            return serviceCache.getOrDefault(serviceName, Collections.emptyList());
            
        } catch (Exception e) {
            log.error("加载服务失败: {}", serviceName, e);
            return Collections.emptyList();
        }
    }
    
    private void refreshCache(String serviceName, PathChildrenCache cache) {
        List<ServiceInstance> instances = cache.getCurrentData().stream()
            .map(data -> JsonUtils.fromJson(new String(data.getData()), ServiceInstance.class))
            .filter(instance -> "UP".equals(instance.getStatus()))
            .collect(Collectors.toList());
        
        serviceCache.put(serviceName, instances);
        log.info("服务实例列表更新: {} = {}", serviceName, instances);
    }
    
    // 简单的负载均衡:随机选择
    public ServiceInstance chooseInstance(String serviceName) {
        List<ServiceInstance> instances = getInstances(serviceName);
        if (instances.isEmpty()) {
            throw new RuntimeException("No available instance for: " + serviceName);
        }
        return instances.get(ThreadLocalRandom.current().nextInt(instances.size()));
    }
}

7.3 优雅停机

@Component
public class GracefulShutdown {
    
    @Autowired
    private ServiceRegistry registry;
    
    @PreDestroy
    public void onShutdown() throws Exception {
        log.info("开始优雅停机...");
        
        // 1. 先从注册中心下线
        registry.deregister();
        
        // 2. 等待现有请求处理完成
        Thread.sleep(10000); // 等10秒
        
        // 3. 关闭其他资源
        log.info("优雅停机完成");
    }
}

第八章:总结

Zookeeper的灵魂三问

Q: 什么时候用Zookeeper?
A: 当你需要在分布式环境下让多个节点达成共识时——服务发现、配置同步、分布式锁、Leader选举等。

Q: 什么时候不用Zookeeper?
A: 存大量数据、高并发写入、简单的缓存场景。这些场景有更好的选择。

Q: Zookeeper和Redis分布式锁怎么选?
A:

  • 需要强一致性 → Zookeeper
  • 追求高性能,能接受极端情况下的不一致 → Redis
  • 已经有Zookeeper集群 → Zookeeper
  • 已经有Redis集群 → Redis

一图总结

mindmap root((Zookeeper)) 核心概念 ZNode 持久节点 临时节点 顺序节点 Watcher 一次性触发 异步通知 Session 心跳保活 超时删除临时节点 集群架构 Leader Follower Observer ZAB协议 应用场景 配置中心 服务发现 分布式锁 Leader选举 分布式队列 集群管理 最佳实践 用Curator 控制数据量 处理好Session 注意Watcher特性 替代方案 etcd Consul Nacos

写在最后

Zookeeper就像分布式系统里的瑞士军刀,功能强大,但也需要正确使用。它不是银弹,不能解决所有问题,但在协调领域,它依然是最成熟、最可靠的选择之一。

记住:分布式系统最难的不是写代码,而是处理各种异常情况。网络会抖动、进程会崩溃、磁盘会写满、GC会STW... Zookeeper帮你解决了很多底层的复杂性,但你仍然需要理解它的原理,才能在出问题时快速定位和解决。

希望这篇文章能帮你从"知道Zookeeper"进阶到"会用Zookeeper",最终达到"用好Zookeeper"的境界。

Happy coding! 🎉


参考资料:

  • Apache Zookeeper官方文档
  • 《从Paxos到Zookeeper:分布式一致性原理与实践》
  • Curator官方文档
  • 无数次线上踩坑的血泪经验
  • zk官网
    -《ZooKeeper分布式过程协同技术详解》(猫书)
评论区
暂无评论
avatar