分布式协调框架Zookeeper核心设计理解与实战

分布式协调框架Zookeeper核心设计理解与实战

作者: wangkai 2021-07-29 07:48:36

大数据

分布式 想起很久以前在某个客户现场,微服务 B 突然无法调用到微服务 A,为了使服务尽快正常恢复,重启了微服务 B 。虽然故障原因找到了,但对于 Zookeeper 的理解还是不够深刻,于是重新学习了 Zookeeper 的核心设计,并记录于此文共勉。

[[413943]]

本文转载自微信公众号「KK架构」,作者wangkai 。转载本文请联系KK架构公众号。

一、前言

想起很久以前在某个客户现场,微服务 B 突然无法调用到微服务 A,为了使服务尽快正常恢复,重启了微服务 B 。

但客户不依不饶询问这个问题出现的原因,于是我还大老远从杭州飞到深圳,现场排查问题。

最后的结论是,zk 在某时刻出现主备切换,此时微服务 A(基于 dubbo)需要重新往 zk上注册,但是端口号变了。

但是微服务 B 本地有微服务 A rpc 接口的缓存,缓存里面还是旧的端口,所以调用不到。

解决方法就是,把微服务的 rpc 端口号改成固定的。

虽说原因找到了,但对于 Zookeeper 的理解还是不够深刻,于是重新学习了 Zookeeper 的核心设计,并记录于此文共勉。

二、Zookeeper 核心架构设计

1、Zookeeper 特点

(1)Zookeeper 是一个分布式协调服务,是为了解决多个节点状态不一致的问题,充当中间机构来调停。如果出现了不一致,则把这个不一致的情况写入到 Zookeeper 中,Zookeeper 会返回响应,响应成功,则表示帮你达成了一致。

比如,A、B、C 节点在集群启动时,需要推举出一个主节点,这个时候,A、B、C 只要同时往 Zookeeper 上注册临时节点,谁先注册成功,谁就是主节点。

(2)Zookeeper 虽然是一个集群,但是数据并不是分散存储在各个节点上的,而是每个节点都保存了集群所有的数据。

其中一个节点作为主节点,提供分布式事务的写服务,其他节点和这个节点同步数据,保持和主节点状态一致。

(3)Zookeeper 所有节点的数据状态通过 Zab 协议保持一致。当集群中没有 Leader 节点时,内部会执行选举,选举结束,Follower 和 Leader 执行状态同步;当有 Leader 节点时,Leader 通过 ZAB 协议主导分布式事务的执行,并且所有的事务都是串行执行的。

(4)Zookeeper 的节点个数是不能线性扩展的,节点越多,同步数据的压力越大,执行分布式事务性能越差。推荐3、5、7 这样的数目。

2、Zookeeper 角色的理解

Zookeeper 并没有沿用 Master/Slave 概念,而是引入了 Leader,Follower,Observer 三种角色。

通过 Leader 选举算法来选定一台服务器充当 Leader 节点,Leader 服务器为客户端提供读、写服务。

Follower 节点可以参加选举,也可以接受客户端的读请求,但是接受到客户端的写请求时,会转发到 Leader 服务器去处理。

Observer 角色只能提供读服务,不能选举和被选举,所以它存在的意义是在不影响写性能的前提下,提升集群的读性能。

3、Zookeeper 同时满足了 CAP 吗?

答案是否,CAP 只能同时满足其二。

Zookeeper 是有取舍的,它实现了 A 可用性、P 分区容错性、C 的写入一致性,牺牲的是 C的读一致性。

也就是说,Zookeeper 并不保证读取的一定是最新的数据。如果一定要最新,需要使用 sync 回调处理。

三、核心机制一:ZNode 数据模型

Zookeeper 的 ZNode 模型其实可以理解为类文件系统,如下图:

1、ZNode 并不适合存储太大的数据

为什么是类文件系统呢?因为 ZNode 模型没有文件和文件夹的概念,每个节点既可以有子节点,也可以存储数据。

那么既然每个节点可以存储数据,是不是可以任意存储无限制的数据呢?答案是否定的。在 Zookeeper 中,限制了每个节点只能存储小于 1 M 的数据,实际应用中,最好不要超过 1kb。

原因有以下四点:

  • 同步压力:Zookeeper 的每个节点都存储了 Zookeeper 的所有数据,每个节点的状态都要保持和 Leader 一致,同步过程至少要保证半数以上的节点同步成功,才算最终成功。如果数据越大,则写入的难度也越大。
  • 请求阻塞:Zookeeper 为了保证写入的强一致性,会严格按照写入的顺序串行执行,某个时刻只能执行一个事务。如果上一个事务执行耗时比较长,会阻塞后面的请求;
  • 存储压力:正是因为每个 Zookeeper 的节点都存储了完整的数据,每个 ZNode 存储的数据越大,则消耗的物理内存也越大;
  • 设计初衷:Zookeeper 的设计初衷,不是为了提供大规模的存储服务,而是提供了这样的数据模型解决一些分布式问题。

2、ZNode 的分类

(1)按生命周期分类

按照声明周期,ZNode 可分为永久节点和临时节点。

很好理解,永久节点就是要显示的删除,否则会一直存在;临时节点,是和会话绑定的,会话创建的所有节点,会在会话断开连接时,全部被 Zookeeper 系统删除。

(2)按照是否带序列号分类

带序列号的话,比如在代码中创建 /a 节点,创建之后其实是 /a000000000000001,再创建的话,就是 /a000000000000002,依次递增

不带序号,就是创建什么就是什么

(3)所以,一共有四种 ZNode:

  • 永久的不带序号的
  • 永久的带序号的
  • 临时的不带序号的
  • 临时的带序号的

(4)注意的点

临时节点下面不能挂载子节点,只能作为其他节点的叶子节点。

3. 代码实战

ZNode 的数据模型其实很简单,只有这么多知识。下面用代码来巩固一下。

这里我们使用 curator 框架来做 demo。(当然,你可以选择使用 Zookeeper 官方自带的 Api)

引入 pom 坐标:

  1. <!-- curator-framework --> 
  2. <dependency> 
  3.     <groupId>org.apache.curator</groupId> 
  4.     <artifactId>curator-framework</artifactId> 
  5.     <version>4.2.0</version> 
  6. </dependency> 
  7. <!-- curator-recipes --> 
  8. <dependency> 
  9.     <groupId>org.apache.curator</groupId> 
  10.     <artifactId>curator-recipes</artifactId> 
  11.     <version>4.2.0</version> 
  12. </dependency> 

代码:

  1. public class ZkTest { 
  2.  
  3.     // 会话超时 
  4.     private final int SESSION_TIMEOUT = 30 * 1000; 
  5.  
  6.     // 连接超时 、 有啥区别 
  7.     private static final int CONNECTION_TIMEOUT = 3 * 1000; 
  8.  
  9.     private static final String CONNECT_ADDR = "localhost:2181"
  10.  
  11.     private CuratorFramework client = null
  12.  
  13.     public static void main(String[] args) throws Exception { 
  14.         // 创建客户端 
  15.         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 
  16.         CuratorFramework client = CuratorFrameworkFactory.builder() 
  17.                 .connectString(CONNECT_ADDR) 
  18.                 .connectionTimeoutMs(CONNECTION_TIMEOUT) 
  19.                 .retryPolicy(retryPolicy) 
  20.                 .build(); 
  21.         client.start(); 
  22.         System.out.println(ZooKeeper.States.CONNECTED); 
  23.         System.out.println(client.getState()); 
  24.  
  25.         // 创建节点 /test1 
  26.         client.create() 
  27.                 .forPath("/test1""curator data".getBytes(StandardCharsets.UTF_8)); 
  28.  
  29.         System.out.println(client.getChildren().forPath("/")); 
  30.  
  31.         // 临时节点 
  32.         client.create().withMode(CreateMode.EPHEMERAL) 
  33.                 .forPath("/secondPath""hello world".getBytes(StandardCharsets.UTF_8)); 
  34.         System.out.println(new String(client.getData().forPath("/secondPath"))); 
  35.  
  36.         client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL) 
  37.                 .forPath("/abc""hello".getBytes(StandardCharsets.UTF_8)); 
  38.         // 递归创建 
  39.         client.create() 
  40.                 .creatingParentContainersIfNeeded() 
  41.                 .forPath("/secondPath1/sencond2/sencond3"); 
  42.  
  43.  
  44.         Thread.sleep(10000); 
  45.     } 

四、核心机制二:Watcher 监听机制

Watcher 监听机制是 Zookeeper 解决各种分布式不一致疑难杂症的独家法门,也是学习 Zookeeper 必学的知识点。

1. 对于 Watcher 机制的理解

Zookeeper 提供了数据的发布与订阅的功能,多个订阅者可以同时监听某一个对象,当这个对象自身状态发生变化时(例如节点数据或者节点的子节点个数变化),Zookeeper 系统会通知这些订阅者。

对于发布和订阅这个概念的理解,我们可以用这个场景来理解:

比如前两天的台风,老板想发一个通知给员工:明天在家办公。

于是老板会在钉钉群上 Ding 一个消息,员工自己打开钉钉查看。

在这个场景中,老板是发布者,员工是订阅者,钉钉群就是 Zookeeper 系统。

老板并不一一给员工发消息,而是把消息发到群里,员工就可以感知到消息的变化。

订阅者员工客户端1
系统钉钉群Zookeeper系统
发布者老板客户端2

2、 Watcher 机制的流程

客户端首先将 Watcher 注册到服务器上,同时将 Watcher 对象保存在客户端的 Watcher 管理器中。当 Zookeeper 服务端监听到数据状态发生变化时,服务端会首先主动通知客户端,接着客户端的 Watcher 管理器会触发相关的 Watcher 来回调响应的逻辑,从而完成整体的发布/订阅流程。

监听器 Watcher 的定义:

  1. public interface Watcher { 
  2. //   WatchedEvent 对象中有下面三个属性,Zookeeper状态,事件类型,路径 
  3. //    final private KeeperState keeperState; 
  4. //    final private EventType eventType; 
  5. //    private String path; 
  6.     abstract public void process(WatchedEvent event); 

下面是监听的大致流程图:

稍稍解释一下:

1、Client1 和 Client2 都关心 /app2 节点的数据状态变化,于是注册一个对于 /app2 的监听器到 Zookeeper 上;

2、当 Client3 修改 /app2 的值后,Zookeeper 会主动通知 Client1 和 Client2 ,并且回调监听器的方法。

当然这里的数据状态变化有下面这些类型:

  • 节点被创建;
  • 节点被删除;
  • 节点数据发生改变;
  • 节点的子节点个数发生改变。

3. 通过代码来初步理解

我们还是用 Curator 框架来验证一下这个监听器。

代码很简单,这里我们使用 TreeCache 表示对于 /app2 的监听,并且注册了监听的方法。

  1. public class CuratorWatcher { 
  2.  
  3.     public static void main(String[] args) throws Exception { 
  4.         CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181"
  5.                 .connectionTimeoutMs(10000) 
  6.                 .retryPolicy(new ExponentialBackoffRetry(1000, 10)) 
  7.                 .build(); 
  8.         client.start(); 
  9.  
  10.         String path = "/app2"
  11.  
  12.         TreeCache treeCache = new TreeCache(client, path); 
  13.         treeCache.start(); 
  14.  
  15.         treeCache.getListenable().addListener((client1, event) -> { 
  16.             System.out.println("event.getData()," + event.getData()); 
  17.             System.out.println("event.getType()," + event.getType()); 
  18.         }); 
  19.  
  20.         Thread.sleep(Integer.MAX_VALUE); 
  21.     } 

当 /app2 的状态发生变化时,就会调用监听的方法。

Curator 是对原生的 Zookeeper Api 有封装的,原生的 Zookeeper 提供的 Api ,注册监听后,当数据发生改变时,监听就被服务端删除了,要重复注册监听。

Curator 则对这个做了相应的封装和改进。

五、代码实战:实现主备选举

这里我们主要想实现的功能是:

  • 有两个节点,bigdata001,bigdata002 ,他们互相主备。
  • bigdata001 启动时,往 zk 上注册一个临时节点 /ElectorLock(锁),并且往 /ActiveMaster 下面注册一个子节点,表示自己是主节点。
  • bigdata002 启动时,发现临时节点 /ElectorLock 存在,表示当前系统已经有主节点了,则自己往 /StandbyMaster 下注册一个节点,表示自己是 standby。
  • bigdata001 退出时,释放 /ElectorLock,并且删除 /activeMaster 下的节点。
  • bigdata002 感知到 /ElectorLock 不存在时,则自己去注册 /ElectorLock,并在 /ActiveMaster 下注册自己,表示自己已经成为了主节点。

代码还是用 Curator 框架实现的:

  1. package com.kkarch.zookeeper; 
  2.  
  3. import cn.hutool.core.util.StrUtil; 
  4. import lombok.extern.slf4j.Slf4j; 
  5. import org.apache.curator.framework.CuratorFramework; 
  6. import org.apache.curator.framework.recipes.cache.TreeCache; 
  7. import org.apache.curator.framework.recipes.cache.TreeCacheEvent; 
  8. import org.apache.zookeeper.CreateMode; 
  9.  
  10. import java.nio.charset.StandardCharsets; 
  11.  
  12. /** 
  13.  * 分布式选举 
  14.  * 
  15.  * @Author wangkai 
  16.  * @Time 2021/7/25 20:12 
  17.  */ 
  18. @Slf4j 
  19. public class ElectorTest { 
  20.  
  21.     private static final String PARENT = "/cluster_ha"
  22.     private static final String ACTIVE = PARENT + "/ActiveMaster"
  23.     private static final String STANDBY = PARENT + "/StandbyMaster"
  24.     private static final String LOCK = PARENT + "/ElectorLock"
  25.  
  26.     private static final String HOSTNAME = "bigdata05"
  27.     private static final String activeMasterPath = ACTIVE + "/" + HOSTNAME; 
  28.     private static final String standByMasterPath = STANDBY + "/" + HOSTNAME; 
  29.  
  30.     public static void main(String[] args) throws Exception { 
  31.         CuratorFramework zk = ZkUtil.createZkClient("localhost:2181"); 
  32.         zk.start(); 
  33.  
  34.         // 注册好监听 
  35.         TreeCache treeCache = new TreeCache(zk, PARENT); 
  36.         treeCache.start(); 
  37.  
  38.         treeCache.getListenable().addListener((client, event) -> { 
  39.             if (event.getType().equals(TreeCacheEvent.Type.INITIALIZED) || event.getType().equals(TreeCacheEvent.Type.CONNECTION_LOST) 
  40.                     || event.getType().equals(TreeCacheEvent.Type.CONNECTION_RECONNECTED) || event.getType().equals(TreeCacheEvent.Type.CONNECTION_SUSPENDED)) { 
  41.                 return
  42.             } 
  43.             System.out.println(event.getData()); 
  44.             // 如果 Active 下有节点被移除了,没有节点,则应该去竞选成为 Active 
  45.             if (StrUtil.startWith(event.getData().getPath(), ACTIVE) && event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) { 
  46.                 if (getChildrenNumber(zk, ACTIVE) == 0) { 
  47.                     createZNode(client, LOCK, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL); 
  48.                     System.out.println(HOSTNAME + "争抢到了锁"); 
  49.                 } 
  50.             } 
  51.             // 如果有锁节点被创建,则判断是不是自己创建的,如果是,则切换自己的状态为 ACTIVE 
  52.             else if (StrUtil.equals(event.getData().getPath(), LOCK) && event.getType().equals(TreeCacheEvent.Type.NODE_ADDED)) { 
  53.                 if (StrUtil.equals(new String(event.getData().getData()), HOSTNAME)) { 
  54.                     createZNode(zk, activeMasterPath, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL); 
  55.                     if (checkExists(client, standByMasterPath)) { 
  56.                         deleteZNode(client, standByMasterPath); 
  57.                     } 
  58.                 } 
  59.             } 
  60.         }); 
  61.  
  62.         // 先创建 ACTIVE 和 STANDBY 节点 
  63.         if (zk.checkExists().forPath(ACTIVE) == null) { 
  64.             zk.create().creatingParentContainersIfNeeded().forPath(ACTIVE); 
  65.         } 
  66.         if (zk.checkExists().forPath(STANDBY) == null) { 
  67.             zk.create().creatingParentContainersIfNeeded().forPath(STANDBY); 
  68.         } 
  69.  
  70.         // 判断 ACTIVE 下是否有子节点,如果没有则去争抢一把锁 
  71.         if (getChildrenNumber(zk, ACTIVE) == 0) { 
  72.             createZNode(zk, LOCK, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL); 
  73.         } 
  74.         // 如果有,则自己成为 STANDBY 状态 
  75.         else { 
  76.             createZNode(zk, standByMasterPath, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL); 
  77.         } 
  78.  
  79.  
  80.         Thread.sleep(1000000000); 
  81.  
  82.  
  83.     } 
  84.  
  85.     public static int getChildrenNumber(CuratorFramework client, String path) throws Exception { 
  86.         return client.getChildren().forPath(path).size(); 
  87.     } 
  88.  
  89.     public static void createZNode(CuratorFramework client, String path, byte[] data, CreateMode mode) { 
  90.         try { 
  91.             client.create().withMode(mode).forPath(path, data); 
  92.         } catch (Exception e) { 
  93.             log.error("创建节点失败", e); 
  94.             System.out.println("创建节点失败了"); 
  95.         } 
  96.     } 
  97.  
  98.     public static boolean checkExists(CuratorFramework client, String path) throws Exception { 
  99.         return client.checkExists().forPath(path) != null
  100.     } 
  101.  
  102.     public static void deleteZNode(CuratorFramework client, String path) { 
  103.         try { 
  104.             if (checkExists(client, path)) { 
  105.                 client.delete().forPath(path); 
  106.             } 
  107.         } catch (Exception e) { 
  108.             log.error("删除节点失败", e); 
  109.         } 
  110.     } 

 

文章来源网络,作者:管理,如若转载,请注明出处:https://shuyeidc.com/wp/268070.html<

(0)
管理的头像管理
上一篇2025-05-06 17:24
下一篇 2025-05-06 17:25

相关推荐

  • 云服务器和云虚拟主机怎么选?云服务器和虚拟主机区别

    云服务器适合业务增长快、需弹性扩展的场景,而云虚拟主机适合预算有限、技术门槛低的小型静态网站或测试环境,二者核心区别在于资源独享性与运维复杂度,核心差异解析:从底层架构到使用体验很多人容易混淆这两者,觉得它们都是“买空间建站”,它们的底层逻辑完全不同,云服务器(ECS)就像是你租了一整栋别墅,水电网络独立,你想……

    2026-06-29
    0
  • 赣州智慧旅游招聘是真的吗?赣州旅游人才招聘信息

    中级岗位(3-5年经验)月薪范围通常在6000-10000元,这类岗位需要独立负责项目模块,如独立运营一个抖音账号,或维护一个景区小程序的功能迭代,具备成功案例的候选人议价能力较强,高级岗位(5年以上经验)月薪范围通常在10000-20000元,部分核心管理岗可达更高,这类人才需要具备战略规划能力,如制定整个景……

    2026-06-29
    0
  • 赣州智能物联网车位锁如何管理?智能车位锁管理系统多少钱

    赣州智能物联网车位锁管理的核心在于通过云端平台实现远程控锁、状态实时监控及自动计费,彻底解决传统车位“被占难管”与“找位难”的痛点,在赣州这样的城市,随着机动车保有量的持续增长,老旧小区、商业综合体以及私人固定车位的资源矛盾日益凸显,传统的机械地锁或简易遥控锁,不仅操作繁琐,更无法实现数据化管理,引入智能物联网……

    2026-06-29
    0
  • 赣州智能消防栓好用吗,智能消防栓多少钱一个

    赣州智能消防栓通过物联网技术实现实时监测与远程报警,能显著降低火灾响应时间并提升城市消防安全管理水平,是目前智慧城市建设中不可或缺的基础设施,赣州智能消防栓的核心价值与应用场景传统消防栓往往存在“看不见、摸不着、用不了”的痛点,在赣州这样地形复杂、老城区与新城区并存的区域,传统设施的管理难度极大,智能消防栓的出……

    2026-06-29
    0
  • 云服务器和物理机到底有啥区别?

    云服务器本质上是虚拟化资源池中的弹性实例,而传统物理服务器是独占的硬件实体,前者胜在弹性与运维便捷,后者强在物理隔离与性能稳定,具体选择取决于业务对成本、扩展性及安全合规的权衡,很多人初次接触服务器时,容易把“云服务器”和“传统物理服务器”混为一谈,觉得它们都是用来跑网站或存数据的盒子,这两者的底层逻辑完全不同……

    2026-06-29
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注