Zookeeper 简介/快速入门 1 ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them, which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.
ZooKeeper
是一个集中的服务,用于维护配置信息、命名、提供分布式同步和提供组服务。所有这些类型的服务都以某种形式被分布式应用程序使用。每次它们被实现时,都会有大量的工作来修复不可避免的错误和竞争条件。由于实现这些服务的困难,应用程序最初通常会略过这些服务,这使得它们在出现更改时变得脆弱,并且难以管理。即使正确地执行了这些服务,在部署应用程序时,这些服务的不同实现也会导致管理复杂性
zookeeper
由雅虎研究院开发,是Google Chubby
的开源实现,后来托管到 Apache
,于2010年11月
正式成为apache
的顶级项目
大数据生态系统里由很多组件的命名都是某些动物或者昆虫,比如hadoop
大象,hive
就是蜂巢,zookeeper
即管理员,顾名思义就算管理大数据生态系统各组件的管理员,如下所示:
应用场景 zookeepepr
是一个经典的分布式 数据一致性解决方案,致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调存储服务。
维护配置信息
分布式锁服务
集群管理
生成分布式唯一ID
维护配置信息
java
编程经常会遇到配置项,比如数据库的url
、 schema
、user
和 password
等。通常这些配置项我们会放置在配置文件中,再将配置文件放置在服务器上当需要更改配置项时,需要去服务器上修改对应的配置文件。
但是随着分布式系统的兴起,由于许多服务都需要使用到该配置文件,因此有必须保证该配置服务的高可用性 (highavailability
)和各台服务器上配置数据的一致性。
通常会将配置文件部署在一个集群上,然而一个集群动辄上千台 服务器,此时如果再一台台服务器逐个修改配置文件那将是非常繁琐且危险的的操作,因此就需要一种服务 ,能够高效快速且可靠地完成配置项的更改等操作 ,并能够保证各配置项在每台服务器上的数据一致性。
zookeeper
就可以提供这样一种服务 ,其使用Zab
这种一致性协议来保证一致性。现在有很多开源项目使用zookeeper
来维护配置,如在 hbase
中,客户端就是连接一个 zookeeper
,获得必要的 hbase
集群的配置信息,然后才可以进一步操作。还有在开源的消息队列 kafka
中,也便用zookeeper
来维护 brokers
的信息。在 alibaba
开源的soa
框架dubbo
中也广泛的使用zookeeper
管理一些配置来实现服务治理。
分布式锁服务
一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,多台服务器上运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器挂掉后,释放锁并 fail over
到其他的机器继续执行该服务
集群管理
生产分布式唯一ID
在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_ increment
属性来自动为每条记录生成一个唯一的ID
。但是分库分表后,就无法在依靠数据库的auto_ Increment
属性来唯一标识一条记录了。此时我们就可以用zookeeper
在分布式环境下生成全局唯一ID
。
做法如下:每次要生成一个新id
时,创建一个持久顺序节点,创建操作返回的节点序号,即为新id
,然后把比自己节点小的删除即可
Zookeeper的设计目标 zooKeeper
致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务
高性能
zookeeper
将全量数据存储在内存 中,并直接服务于客户端的所有非事务请求,尤其用于以读为主的应用场景
高可用
zookeeper
一般以集群的方式对外提供服务,一般3~5
台机器就可以组成一个可用的 Zookeeper
集群了,每台机器都会在内存中维护当前的服务器状态,井且每台机器之间都相互保持着通信。只要集群中超过一半的机器都能够正常工作,那么整个集群就能够正常对外服务
严格顺序访问
对于来自客户端的每个更新请求,Zookeeper
都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序
数据模型 zookeeper
的数据结点可以视为树状结构(或目录),树中的各个结点被称为znode
(即zookeeper node
),一个znode
可以由多个子结点。zookeeper
结点在结构上表现为树状;
使用路径path
来定位某个znode
,比如/ns-1/itcast/mysqml/schemal1/table1
,此处ns-1,itcast、mysql、schemal1、table1
分别是根结点、2级结点、3级结点以及4级结点
;其中ns-1
是itcast
的父结点,itcast
是ns-1
的子结点,itcast
是mysql
的父结点….以此类推
znode
,间距文件和目录两种特点,即像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分
那么如何描述一个znode
呢?一个znode
大体上分为3
个部分:
结点的数据:即znode data
(结点path
,结点data
)的关系就像是Java map
中的 key value
关系
结点的子结点children
结点的状态stat
:用来描述当前结点的创建、修改记录,包括cZxid
、ctime
等
结点状态stat的属性 在zookeeper shell
中使用 get
命令查看指定路径结点的data
、stat
信息
属性说明:
结点的各个属性如下。其中重要的概念是Zxid(Zookeeper Transaction ID)
,Zookeeper
结点的每一次更改都具有唯一的Zxid
,如果Zxid-1
小于Zxid-2
,则Zxid-1
的更改发生在 Zxid-2
更改之前
https://zookeeper.apache.org/doc/r3.4.14/zookeeperProgrammers.html#sc_zkDataModel_znodes
cZxid
数据结点创建时的事务ID——针对于zookeeper
数据结点的管理:我们对结点数据的一些写操作都会导致zookeeper
自动地为我们去开启一个事务,并且自动地去为每一个事务维护一个事务ID
ctime
数据结点创建时的时间
mZxid
数据结点最后一次更新时的事务ID
mtime
数据结点最后一次更新时的时间
pZxid
数据节点最后一次修改此znode
子节点更改的zxid
cversion
子结点的更改次数
dataVersion
结点数据的更改次数
aclVersion
结点的ACL更改次数——类似linux
的权限列表,维护的是当前结点的权限列表被修改的次数
ephemeralOwner
如果结点是临时结点,则表示创建该结点的会话的SessionID
;如果是持久结点,该属性值为0
dataLength
数据内容的长度
numChildren
数据结点当前的子结点个数
结点类型
zookeeper
中的结点有两种,分别为临时结点 和永久结点 。结点的类型在创建时被确定,并且不能改变
临时节点:
该节点的生命周期依赖于创建它们的会话。一旦会话( Session
)结束,临时节点将被自动删除,当然可以也可以手动删除。虽然每个临时的 Znode
都会绑定到一个客户端会话,但他们对所有的客户端还是可见的。另外,Zookeeper
的临时节点不允许拥有子节点
持久化结点:
该结点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,它们才能被删除
单机安装 测试系统环境centos7.3
zookeeper:zookeeper-3.4.10.tar.gz
jdk:jdk-8u131-linux-x64.tar.gz
http://archive.apache.org/dist/zookeeper/
在centos
中使用 root
用户创建 zookeeper
用户,用户名:zookeeper
密码:zookeeper
1 2 3 useradd zookeeper passwd zookeeper su zookeeper
zookeeper
底层依赖于jdk,zookeeper
用户登录后,根目录下先进行jdk 的安装,jdk使用 jdk-8u131-linux-x64.tar.gz
配置jdk 环境变量
1 2 3 4 5 6 7 8 vi /etc/profile JAVA_HOME=/home/zookeeper/jdk1.8.0_131 export JAVA_HOME PATH=$JAVA_HOME/bin:$PATH export PATH souce /etc/profile
检测jdk安装
java -version
// 如果反馈了Java信息,则成功
zookeeper
上传解压
tar -zxf tar.gz
为zookeeper
准备配置文件
1 2 3 4 5 6 7 8 9 # 进入conf目录 cd /home/zookeeper/zookeeper-3.4.10/conf # 复制配置文件 cp zoo_sampe.cfg zoo.cfg # zookeeper 根目录下创建data目录 mkdir data # vi 配置文件中的dataDir # 此路径用于存储zookeeper中数据的内存快照、及事务日志文件,虽然zookeeper是使用内存的,但是需要持久化一些数据来保证数据的安全,和redis一样 dataDir=/home/zookeeper/zookeeper-3.4.10/data
启动zookeeper
1 2 3 4 5 6 7 8 9 10 11 # 进入zookeeper的bin目录 cd /home/zookeeper/zookeeper-3.4.10/bin # 启动zookeeper ./zkServer.sh start # 启动: zkServer.sh start # 停止: zkServer.sh stop # 查看状态:zkServer.sh status # 进入zookeeper 内部 ./zkCli.sh
常用shell命令 zookeeper
——getting started
——https://zookeeper.apache.org/doc/r3.4.14/zookeeperStarted.html#sc_FileManagement
操作结点 查询
get /hadoop
查看结点的数据和属性 stat /hadoop
查看结点的属性
创建
创建结点并写入数据:
create [-s] [-e] path data
# 其中 -s 为有序结点,-e 临时结点(默认是持久结点)
1 2 3 4 5 6 7 8 9 create /hadoop "123456" # 此时,如果quit退出后再./ZkCient.sh 登入 # 再用输入 get /hadoop 获取,结点依然存在(永久结点) create -s /a "a" # 创建一个持久化有序结点,创建的时候可以观察到返回的数据带上了一个id create -s /b "b" # 返回的值,id递增了 create -s -e /aa "aa" # 依然还会返回自增的id,quit后再进来,继续创建,id依然是往后推的 create /aa/xx # 继续创建结点,可以看到pZxid变化了
更新
更新结点的命令是set
,可以直接进行修改,如下:
set path [version]
1 2 3 4 set /hadoop "345" # 修改结点值 set /hadoop "hadoop-x" 1 # 也可以基于版本号进行更改,类似于乐观锁,当传入版本号(dataVersion) # 和当前结点的数据版本号不一致时,zookeeper会拒绝本次修改
删除
删除结点的语法如下:
delete path [version]
和 set
方法相似,也可以传入版本号
1 2 delete /hadoop # 删除结点 delete /hadoop 1 # 乐观锁机制,与set 方法一致
要想删除某个结点及其所有后代结点,可以使用递归删除,命令为 rmr path
查看结点列表
1 2 3 4 ls /hadoop # 可以查看结点的列表 ls2 /hadoop # 可以查看结点的列表以及目标结点的信息 ls / # 根节点
监听器get path [watch] | stat path [watch]
使用get path [watch]
注册的监听器能够在结点内容发生改变 的时候,向客户端发出通知。需要注意的是zookeeper
的触发器是一次性的(One-time trigger
),即触发一次后就会立即失效
1 2 get /hadoop watch # get 的时候添加监听器,当值改变的时候,监听器返回消息 set /hadoop 45678 # 测试
ls\ls2 path [watch]
使用 ls path [watch] 或 ls2 path [watch]
注册的监听器能够监听该结点下所有子节点 的增加 和删除 操作
1 2 ls /hadoop watch # 添加监听器 set /hadoop/node "node"
zookeeper的Acl权限控制 https://zookeeper.apache.org/doc/r3.4.14/zookeeperProgrammers.html#sc_ZooKeeperAccessControl
zookeeper
类似文件系统,client
可以创建结点、更新结点、删除结点,那么如何做到结点的权限控制呢?
zookeeper
的 access control list
访问控制列表可以做到这一点
acl
权限控制,使用scheme:id:permission
来标识,主要涵盖3个方面:
https://zookeeper.apache.org/doc/r3.4.14/zookeeperProgrammers.html#sc_BuiltinACLSchemes
权限模式(scheme
):授权的策略
授权对象(id
):授权的对象
权限(permission
):授予的权限
其特性如下:
zookeeper
的权限控制是基于每个znode
结点的,需要对每个结点设置权限
每个znode
支持多种权限控制方案和多个权限
子结点不会继承父结点的权限,客户端无权访问某结点,但可能可以访问它的子结点:
例如setAcl /test2 ip:192.168.133.133:crwda
// 将结点权限设置为Ip:192.168.133.133 的客户端可以对节点进行 增删改查和管理权限
权限模式
采用何种方式授权
方案
描述
world
只有一个用户:anyone
,代表登录zookeeper
所有人(默认)
ip
对客户端使用IP地址认证
auth
使用已添加认证的用户认证
digest
使用”用户名:密码”方式认证
授权对象
给谁授予权限
授权对象ID是指,权限赋予的实体,例如:IP地址或用户
授权的权限
授权的相关命令
命令
使用方式
描述
getAcl
getAcl
读取ACL权限
setAcl
setAcl
设置ACL权限
addauth
addauth
添加认证用户
案例/远程登录 ./zkServer.sh -server 192.168.133.133
可以远程登录
world权限模式
getAcl /node
// 读取权限信息
setAcl /node world:anyone:drwa
// 设置权限(禁用创建子结点的权限)
ip模式
./zkServer.sh -server 192.168.133.133
可以远程登录
setAcl /hadoop ip:192.168.133.133:drwa
如果在两台不同的虚拟机中,另一台用远程连接的模式,进行上面这条命令,那么只会有一台被授权
需要两台虚拟机一起授权的话需要用逗号 将授权列表隔开:setAcl /hadoop ip:192.168.133.133:cdrwa,ip:192.168.133.132:cdrwa
auth认证用户模式
addauth digest <user>:<password>
setAcl <path> auth:<user>:<acl>
1 2 3 4 5 6 7 create /hadoop "hadoop" # 初始化测试用的结点 addauth digest itcast:123456 # 添加认证用户 setAcl /hadoop auth:itcast:cdrwa # 设置认证用户 quit # 退出后再./zkCli.sh 进入 get /hadoop # 这个时候就没有权限了,需要再次认证 addauth digest itcast:123456 # 认证,密码错了的话 zookeeper 不会报错,但是不能认证 get /hadoop
Digest授权模式
setAcl <path> digest:<user>:<password>:<acl>
这里的密码是经过SHA1
以及BASE64
处理的密文,在shell 中可以通过以下命令计算:
1 echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64
1 2 3 4 5 6 7 # 计算密码 echo -n itcast:12345 | openssl dgst -binary -sha1 | openssl base64 # 获取密码,设置权限列表 setAcl /hadoop digest:itcast:qUFSHxJjItUW/93UHFXFVGlvryY=:cdrwa # 现在想要get /hadoop 需要登录了 addauth digest itcast:12345 get /hadoop
多种授权模式
仅需逗号隔开
1 setAcl /hadoop ip:192.168.133.132:cdrwa,auth:hadoop:cdrwa,digest:itcast:673OfZhUE8JEFMcu0l64qI8e5ek=:cdrwa
acl 超级管理员
zookeeper
的权限管理模式有一种叫做super
,该模式提供一个超管,可以方便的访问任何权限的节点
假设这个超管是supper:admin
,需要为超管生产密码的密文
1 echo -n super:admin | openssl dgst -binary -sha1 | openssl base64
那么打开zookeeper
目录下/bin/zkServer.sh
服务器脚本文件,找到如下一行:
1 2 /nohup # 快速查找,可以看到如下 nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"
这个就算脚本中启动zookeeper
的命令,默认只有以上两个配置项,我们需要添加一个超管的配置项
1 "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="
修改后命令变成如下
1 nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="
1 2 3 4 5 6 7 8 # 重起后,现在随便对任意节点添加权限限制 setAcl /hadoop ip:192.168.1.1:cdrwa # 这个ip并非本机 # 现在当前用户没有权限了 getAcl /hadoop # 登录超管 addauth digest super:admin # 强行操作节点 get /hadoop
zookeeper的 Java API 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 <dependency > <groupId > com.101tec</groupId > <artifactId > zkclient</artifactId > <exclusions > <exclusion > <artifactId > zookeeper</artifactId > <groupId > org.apache.zookeeper</groupId > </exclusion > <exclusion > <artifactId > log4j</artifactId > <groupId > log4j</groupId > </exclusion > <exclusion > <artifactId > slf4j-log4j12</artifactId > <groupId > org.slf4j</groupId > </exclusion > <exclusion > <artifactId > slf4j-api</artifactId > <groupId > org.slf4j</groupId > </exclusion > </exclusions > <version > 0.9</version > </dependency > <dependency > <artifactId > zookeeper</artifactId > <exclusions > <exclusion > <artifactId > log4j</artifactId > <groupId > log4j</groupId > </exclusion > <exclusion > <artifactId > slf4j-log4j12</artifactId > <groupId > org.slf4j</groupId > </exclusion > </exclusions > <groupId > org.apache.zookeeper</groupId > <version > 3.4.10</version > </dependency >
zonde
是 zookeeper
集合的核心组件,zookeeper API
提供了一小组使用 zookeeper
集群来操作znode
的所有细节
客户端应该遵循以下步骤,与zookeeper
服务器进行清晰和干净的交互
连接到zookeeper
服务器。zookeeper
服务器为客户端分配会话ID
定期向服务器发送心跳。否则,zookeeper
服务器将过期会话ID
,客户端需要重新连接
只要会话Id
处于活动状态,就可以获取/设置znode
所有任务完成后,断开与zookeeper
服务器连接,如果客户端长时间不活动,则zookeeper
服务器将自动断开客户端
连接到Zookeeper 这部分,官网的解释十分稀少https://zookeeper.apache.org/doc/r3.4.14/zookeeperStarted.html#sc_ConnectingToZooKeeper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 [zkshell: 0] help ZooKeeper host:port cmd args get path [watch] ls path [watch] set path data [version] delquota [-n|-b] path quit printwatches on|off create path data acl stat path [watch] listquota path history setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] deleteall path setquota -n|-b val path
1 Zookeeper(String connectionString, int sessionTimeout, watcher watcher)
connectionString
- zookeeper
主机
sessionTimeout
- 会话超时
watcher
- 实现”监听器” 对象。zookeeper
集合通过监视器对象返回连接状态
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws IOException, InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1 ); ZooKeeper zookeeper = new ZooKeeper("192.168.133.133:2181" , 5000 , (WatchedEvent x) -> { if (x.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接成功" ); countDownLatch.countDown(); } }); countDownLatch.await(); System.out.println(zookeeper.getSessionId()); zookeeper.close(); }
新增节点 1 2 3 4 5 create(String path, byte [] data, List<ACL> acl, CreateMode createMode) create(String path, byte [] data, List<ACL> acl, CreateMode createMode, AsynCallback.StringCallback callBack, Object ctx)
参数
解释
path
znode
路径
data
数据
acl
要创建的节点的访问控制列表。zookeeper API
提供了一个静态接口 ZooDefs.Ids
来获取一些基本的acl
列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE
返回打开znode
的acl
列表
createMode
节点的类型,这是一个枚举
callBack
异步回调接口
ctx
传递上下文参数
示例:
1 2 3 4 5 6 7 public static void createTest1 () throws Exception { String str = "node" ; String s = zookeeper.create("/node" , str.getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(s); }
1 2 3 4 5 6 7 public static void createTest2 () throws Exception { ArrayList<ACL> acls = new ArrayList<>(); Id id = new Id("ip" ,"192.168.133.133" ); acls.add(new ACL(ZooDefs.Perms.ALL,id)); zookeeper.create("/create/node4" ,"node4" .getBytes(),acls,CreateMode.PERSISTENT); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void createTest3 () throws Exception { zookeeper.addAuthInfo("digest" ,"itcast:12345" .getBytes()); zookeeper.create("/node5" ,"node5" .getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT); } public static void createTest3 () throws Exception { zookeeper.addAuthInfo("digest" ,"itcast:12345" .getBytes()); List<ACL> acls = new ArrayList<>(); Id id = new Id("auth" ,"itcast" ); acls.add(new ACL(ZooDefs.Perms.READ,id)); zookeeper.create("/create/node6" ,"node6" .getBytes(), acls,CreateMode.PERSISTENT); }
1 2 3 4 5 6 7 8 public static void createTest3 () throws Exception { List<ACL> acls = new ArrayList<>(); Id id = new Id("digest" ,"itcast:qUFSHxJjItUW/93UHFXFVGlvryY=" ); acls.add(new ACL(ZooDefs.Perms.READ,id)); zookeeper.create("/create/node7" ,"node7" .getBytes(), acls,CreateMode.PERSISTENT); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void createTest4 () throws Exception { zookeeper.create("/node12" , "node12" .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback(){ public void processResult (int rc, String path, Object ctx, String name) { System.out.println(rc + " " + path + " " + name + " " + ctx); } }, "I am context" ); TimeUnit.SECONDS.sleep(1 ); System.out.println("结束" ); }
修改节点 同样也有两种修改方式(异步和同步
)
1 2 3 4 setData(String path, byte [] data, int version) setData(String path, byte [] data, int version, StatCallback callBack, Object ctx)
参数
解释
path
节点路径
data
数据
version
数据的版本号, -1
代表不使用版本号,乐观锁机制
callBack
异步回调 AsyncCallback.StatCallback
,和之前的回调方法参数不同,这个可以获取节点状态
ctx
传递上下文参数
1 2 3 4 5 6 public static void setData1 () throws Exception { Stat stat = zookeeper.setData("/hadoop" ,"hadoop-1" .getBytes(),-1 ); }
1 2 3 4 5 6 7 8 9 public static void setData2 () throws Exception { zookeeper.setData("/hadoop" , "hadoop-1" .getBytes(), 3 ,new AsyncCallback.StatCallback(){ @Override public void processResult (int rc, String path, Object ctx, Stat stat) { System.out.println(rc + " " + path + " " + stat.getVersion() + " " + ctx); } }, "I am context" ); }
删除节点 异步、同步
1 2 3 4 delete(String path, int version) delete(String path, int version, AsyncCallback.VoidCallback callBack, Object ctx)
参数
解释
path
节点路径
version
版本
callBack
数据的版本号, -1
代表不使用版本号,乐观锁机制
ctx
传递上下文参数
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void deleteData1 () throws Exception { zookeeper.delete("/hadoop" , 1 ); } public static void deleteData2 () throws Exception { zookeeper.delete("/hadoop" , 1 , new AsyncCallback.VoidCallback() { @Override public void processResult (int rc, String path, Object ctx) { System.out.println(rc + " " + path + " " + ctx); } }, "I am context" ); TimeUnit.SECONDS.sleep(1 ); }
查看节点 同步、异步
1 2 3 4 5 6 getData(String path, boolean watch, Stat stat) getData(String path, Watcher watcher, Stat stat) getData(String path, boolean watch, DataCallback callBack, Object ctx) getData(String path, Watcher watcher, DataCallback callBack, Object ctx)
参数
解释
path
节点路径
boolean
是否使用连接对象中注册的监听器
stat
元数据
callBack
异步回调接口,可以获得状态和数据
ctx
传递上下文参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void getData1 () throws Exception { Stat stat = new Stat(); byte [] data = zookeeper.getData("/hadoop" , false , stat); System.out.println(new String(data)); System.out.println(stat.getCtime()); } public static void getData2 () throws Exception { zookeeper.getData("/hadoop" , false , new AsyncCallback.DataCallback() { @Override public void processResult (int rc, String path, Object ctx, byte [] bytes, Stat stat) { System.out.println(rc + " " + path + " " + ctx + " " + new String(bytes) + " " + stat.getCzxid()); } }, "I am context" ); TimeUnit.SECONDS.sleep(3 ); }
查看子节点 同步、异步
1 2 3 4 5 6 7 8 9 10 getChildren(String path, boolean watch) getChildren(String path, Watcher watcher) getChildren(String path, boolean watch, Stat stat) getChildren(String path, Watcher watcher, Stat stat) getChildren(String path, boolean watch, ChildrenCallback callBack, Object ctx) getChildren(String path, Watcher watcher, ChildrenCallback callBack, Object ctx) getChildren(String path, Watcher watcher, Children2Callback callBack, Object ctx) getChildren(String path, boolean watch, Children2Callback callBack, Object ctx)
参数
解释
path
节点路径
boolean
callBack
异步回调,可以获取节点列表
ctx
传递上下文参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void getChildren_1 () throws Exception { List<String> hadoop = zookeeper.getChildren("/hadoop" , false ); hadoop.forEach(System.out::println); } public static void getChildren_2 () throws Exception { zookeeper.getChildren("/hadoop" , false , new AsyncCallback.ChildrenCallback() { @Override public void processResult (int rc, String path, Object ctx, List<String> list) { list.forEach(System.out::println); System.out.println(rc + " " + path + " " + ctx); } }, "I am children" ); TimeUnit.SECONDS.sleep(3 ); }
检查节点是否存在 同步、异步
1 2 3 4 5 6 exists(String path, boolean watch) exists(String path, Watcher watcher) exists(String path, boolean watch, StatCallback cb, Object ctx) exists(String path, Watcher watcher, StatCallback cb, Object ctx)
参数
解释
path
节点路径
boolean
callBack
异步回调,可以获取节点列表
ctx
传递上下文参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void exists1 () throws Exception { Stat exists = zookeeper.exists("/hadoopx" , false ); System.out.println(exists.getVersion() + "成功" ); } public static void exists2 () throws Exception { zookeeper.exists("/hadoopx" , false , new AsyncCallback.StatCallback() { @Override public void processResult (int rc, String path, Object ctx, Stat stat) { System.out.println(rc + " " + path + " " + ctx +" " + stat.getVersion()); } }, "I am children" ); TimeUnit.SECONDS.sleep(1 ); }
事件监听机制 watcher概念
https://zookeeper.apache.org/doc/r3.4.14/zookeeperProgrammers.html#sc_WatchRememberThese
zookeeper
提供了数据的发布/订阅
功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时例如节点内容改变、节点下的子节点列表改变等,会实时、主动通知所有订阅者
zookeeper
采用了 Watcher
机制实现数据的发布订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在 Watcher
注册后轮询阻塞,从而减轻了客户端压力
watcher
机制事件上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式
watcher架构 watcher
实现由三个部分组成
zookeeper
服务端
zookeeper
客户端
客户端的ZKWatchManager对象
客户端首先将 Watcher
注册到服务端 ,同时将 Watcher
对象保存到客户端的watch
管理器中 。当Zookeeper
服务端监听的数据状态发生变化时,服务端会主动通知客户端 ,接着客户端的 Watch
管理器会触发相关 Watcher
来回调相应处理逻辑,从而完成整体的数据 发布/订阅
流程
watcher特性
特性
说明
一次性
watcher
是一次性 的,一旦被触发就会移除,再次使用时需要重新注册
客户端顺序回调
watcher
回调是顺序串行 执行的,只有回调后客户端才能看到最新的数据状态。一个watcher
回调逻辑不应该太多,以免影响别的watcher
执行
轻量级
WatchEvent
是最小的通信单位,结构上只包含通知状态、事件类型和节点路径 ,并不会告诉数据节点变化前后的具体内容
时效性
watcher
只有在当前session
彻底失效时才会无效,若在session
有效期内快速重连成功,则watcher
依然存在,仍可接收到通知;
watcher接口设计
Watcher
是一个接口,任何实现了Watcher
接口的类就算一个新的Watcher
。Watcher
内部包含了两个枚举类:KeeperState
、EventType
Watcher通知状态(KeeperState) KeeperState
是客户端与服务端连接状态 发生变化时对应的通知类型。路径为org.apache.zookeeper.Watcher.EventKeeperState
,是一个枚举类,其枚举属性如下:
枚举属性
说明
SyncConnected
客户端与服务器正常连接时
Disconnected
客户端与服务器断开连接时
Expired
会话session
失效时
AuthFailed
身份认证失败时
Watcher事件类型(EventType) EventType
是数据节点znode
发生变化 时对应的通知类型。EventType
变化时KeeperState
永远处于SyncConnected
通知状态下 ;当keeperState
发生变化时,EventType
永远为None
。其路径为org.apache.zookeeper.Watcher.Event.EventType
,是一个枚举类,枚举属性如下:
枚举属性
说明
None
无
NodeCreated
Watcher
监听的数据节点被创建时
NodeDeleted
Watcher
监听的数据节点被删除时
NodeDataChanged
Watcher
监听的数据节点内容发生更改时(无论数据是否真的变化)
NodeChildrenChanged
Watcher
监听的数据节点的子节点列表发生变更时
注意:客户端接收到的相关事件通知中只包含状态以及类型等信息,不包含节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需要调用get
等方法重新获取
捕获相应的事件 上面讲到zookeeper
客户端连接的状态和zookeeper
对znode
节点监听的事件类型,下面我们来讲解如何建立zookeeper
的watcher
监听 。在zookeeper
中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)
这样的方式来为某个znode
注册监听 。
下表以node-x
节点为例,说明调用的注册方法和可用监听事件间的关系:
注册方式
created
childrenChanged
Changed
Deleted
zk.exists("/node-x",watcher)
可监控
可监控
可监控
zk.getData("/node-x",watcher)
可监控
可监控
zk.getChildren("/node-x",watcher)
可监控
可监控
注册watcher的方法
客户端与服务器端的连接状态
watcher检查节点 exists
exists(String path, boolean b)
exists(String path, Watcher w)
NodeCreated
:节点 创建
NodeDeleted
:节点 删除
NodeDataChanged
:节点 内容
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class EventTypeTest { private static final String IP = "192.168.133.133:2181" ; private static CountDownLatch countDownLatch = new CountDownLatch(1 ); private static ZooKeeper zooKeeper; public static void exists1 () throws Exception { zooKeeper.exists("/watcher1" ,true ); } public static void exists2 () throws Exception { zooKeeper.exists("/watcher1" ,(WatchedEvent w) -> { System.out.println("自定义" + w.getType()); }); } public static void exists3 () throws Exception { zooKeeper.exists("/watcher1" , new Watcher() { @Override public void process (WatchedEvent watchedEvent) { try { System.out.println("自定义的" + watchedEvent.getType()); } finally { try { zooKeeper.exists("/watcher1" ,this ); } catch (Exception e) { e.printStackTrace(); } } } }); } public static void exists4 () throws Exception { zooKeeper.exists("/watcher1" ,(WatchedEvent w) -> { System.out.println("自定义1" + w.getType()); }); zooKeeper.exists("/watcher1" , new Watcher() { @Override public void process (WatchedEvent watchedEvent) { try { System.out.println("自定义2" + watchedEvent.getType()); } finally { try { zooKeeper.exists("/watcher1" ,this ); } catch (Exception e) { e.printStackTrace(); } } } }); } public static void main (String[] args) throws Exception { zooKeeper = new ZooKeeper(IP, 5000 , new ZKWatcher()); countDownLatch.await(); exists4(); TimeUnit.SECONDS.sleep(50 ); } static class ZKWatcher implements Watcher { @Override public void process (WatchedEvent watchedEvent) { countDownLatch.countDown(); System.out.println("zk的监听器" + watchedEvent.getType()); } } }
getData
getData(String path, boolean b, Stat stat)
getData(String path, Watcher w, Stat stat)
NodeDeleted
:节点 删除
NodeDataChange
:节点 内容发生变化
getChildren
getChildren(String path, boolean b)
getChildren(String path, Watcher w)
NodeChildrenChanged
:子节点 发生变化
NodeDeleted
:节点删除
配置中心案例
工作中有这样的一个场景:数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存
若数据库的用户名和密码改变时候,还需要重新加载媛存,比较麻烦,通过 Zookeeper
可以轻松完成,当数据库发生变化时自动完成缓存同步
使用事件监听机制可以做出一个简单的配置中心
设计思路
连接zookeeper
服务器
读取zookeeper
中的配置信息,注册watcher
监听器,存入本地变量
当zookeeper
中的配置信息发生变化时,通过watcher
的回调方法捕获数据变化事件
重新获取配置信息
分布式唯一id案例 在过去的单库单表型系统中,通常第可以使用数据库字段自带的auto_ increment
属性来自动为每条记录生成个唯一的ID
。但是分库分表后,就无法在依靠数据库的auto_ increment
属性来唯一标识一条记录了。此时我们就可以用zookeeper
在分布式环境下生成全局唯一ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class IdGenerate { private static final String IP = "192.168.133.133:2181" ; private static CountDownLatch countDownLatch = new CountDownLatch(1 ); private static ZooKeeper zooKeeper; public static String generateId () throws Exception { return zooKeeper.create("/id" , new byte [0 ], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } public static void main (String[] args) throws Exception { zooKeeper = new ZooKeeper(IP, 5000 , new ZKWatcher()); countDownLatch.await(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5 , 5 , 0 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10 )); for (int i = 0 ; i < 10 ; i++) { threadPoolExecutor.execute(() -> { try { System.out.println(generateId()); } catch (Exception e) { e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(50 ); threadPoolExecutor.shutdown(); } static class ZKWatcher implements Watcher { @Override public void process (WatchedEvent watchedEvent) { countDownLatch.countDown(); System.out.println("zk的监听器" + watchedEvent.getType()); } } }
分布式锁 分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具Zookeeper
,当然也有着标准的实现方式。下面介绍在zookeeper
中如果实现排他锁
设计思路
每个客户端往/Locks
下创建临时有序节点/Locks/Lock_
,创建成功后/Locks
下面会有每个客户端对应的节点,如/Locks/Lock_000000001
客户端取得/Locks下子节点,并进行排序,判断排在前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功
如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock_000000002
,那么则监听Lock_000000001
当前一位锁节点(Lock_000000001)
对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock_000000002)
的逻辑
监听客户端重新执行第2
步逻辑,判断自己是否获得了锁
zookeeper是有工具包的(这里采用手写)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 public class ThreadTest { public static void delayOperation () { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } } static interface Runable { void run () ; } public static void run (Runable runable,int threadNum) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30 , 30 , 0 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10 )); for (int i = 0 ; i < threadNum; i++) { threadPoolExecutor.execute(runable::run); } threadPoolExecutor.shutdown(); } public static void main (String[] args) { DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); run(() -> { for (int i = 0 ; i < 999999999 ; i++) { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } String format = dateTimeFormatter.format(LocalDateTime.now()); System.out.println(format); } },1 ); run(() -> { DistributedLock distributedLock = new DistributedLock(); distributedLock.acquireLock(); delayOperation(); distributedLock.releaseLock(); },30 ); } } public class DistributedLock { private String IP = "192.168.133.133:2181" ; private final String ROOT_LOCK = "/Root_Lock" ; private final String LOCK_PREFIX = "/Lock_" ; private final CountDownLatch countDownLatch = new CountDownLatch(1 ); private final byte [] DATA = new byte [0 ]; private ZooKeeper zookeeper; private String path; private void init () { try { zookeeper = new ZooKeeper(IP, 200000 , w -> { if (w.getState() == Watcher.Event.KeeperState.SyncConnected){ System.out.println("连接成功" ); } countDownLatch.countDown(); }); countDownLatch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } public void acquireLock () { init(); createLock(); attemptLock(); } public void releaseLock () { try { zookeeper.delete(path,-1 ); System.out.println("锁释放了" + path); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } private void createLock () { try { Stat root = zookeeper.exists(ROOT_LOCK, false ); if (root == null ) zookeeper.create(ROOT_LOCK, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); path = zookeeper.create(ROOT_LOCK + LOCK_PREFIX, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } private Watcher watcher = new Watcher() { @Override public void process (WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted){ synchronized (this ){ this .notifyAll(); } } } }; private void attemptLock () { try { List<String> nodes = zookeeper.getChildren(ROOT_LOCK,false ); nodes.sort(String::compareTo); int ranking = nodes.indexOf(path.substring(ROOT_LOCK.length() + 1 )); if (ranking == 0 ){ return ; }else { Stat status = zookeeper.exists(ROOT_LOCK+"/" +nodes.get(ranking - 1 ), watcher); if (status == null ){ attemptLock(); }else { synchronized (watcher){ watcher.wait(); } attemptLock(); } } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }
集群搭建 zookeeper
官网——Getting started
——https://zookeeper.apache.org/doc/r3.4.14/zookeeperStarted.html#sc_RunningReplicatedZooKeeper
完全配置——https://zookeeper.apache.org/doc/r3.4.14/zookeeperAdmin.html#sc_zkMulitServerSetup https://zookeeper.apache.org/doc/r3.4.14/zookeeperAdmin.html#sc_configuration
运行时复制的zookeeper
说明 :对于复制模式,至少需要三个服务器,并且强烈建议您使用奇数个服务器。如果只有两台服务器,那么您将处于一种情况,如果其中一台服务器发生故障,则没有足够的计算机构成多数仲裁(zk
采用的是过半数仲裁。因此,搭建的集群要容忍n个节点的故障,就必须有2n+1
台计算机,这是因为宕掉n台后,集群还残余n+1
台计算机,n+1
台计算机中必定有一个最完整最接近leader
的follower
,假如宕掉的n台都是有完整信息的,剩下的一台就会出现在残余的zk
集群中。也就是说:zk
为了安全,必须达到多数仲裁,否则没有leader
,集群失败,具体体现在leader
选举-章 )。由于存在两个单点故障,因此两个服务器还不如 单个服务器稳定。
——关于2n+1
原则,Kafka
官网有权威的解释(虽然Kafka
不采用)http://kafka.apache.org/0110/documentation.html#design_replicatedlog
多数仲裁的设计是为了避免脑裂 (zk,已经采用了多数仲裁,所以不会出现),和数据一致性的问题
脑裂 :由于网络延迟等各种因素,最终导致集群一分为二,各自独立运行(两个leader
),集群就是坏的
如果有两台服务器,两台都认为另外的zk
宕掉,各自成为leader
运行(假设可以,实际上选不出leader
,可以实际搭建一个集群,看看一台zk是否能够成功集群,详见leader
选举 ),就会导致数据不一致。
如果有三台服务器,一台因为网络分区,无法连接,剩下两台网络正常,选举出了leader
,集群正常
以此类推
zk的设计天生就是cap
中的cp
,所以不会出现上述的脑裂和数据一致性问题,我们搭建zk
仅需保证2n+1
原则
复制模式所需的conf / zoo.cfg 文件类似于独立模式下使用的文件,但有一些区别。这是一个例子:
1 2 3 4 5 6 7 8 tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 initLimit=5 syncLimit=2 server.1=zoo1:2888:3888 # 这是多机部署 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
新的键值initLimit
是zookeeper
用于限制选举中zookeeper
服务连接到leader
的时间,syncLimit
限制服务器与leader
的过期时间
对于这两个超时,您都可以使用tickTime 指定时间单位。在此示例中,initLimit
的超时为5个滴答声,即2000
毫秒/滴答声,即10
秒
表格server.X
的条目列出了组成ZooKeeper
服务的服务器。服务器启动时,它通过在数据目录中查找文件myid
来知道它是哪台服务器。该文件包含ASCII
的服务器号。
最后,记下每个服务器名称后面的两个端口号:“ 2888”
和“ 3888”
。对等方使用前一个端口连接到其他对等方。这种连接是必需的,以便对等方可以进行通信,例如,以商定更新顺序。更具体地说,ZooKeeper
服务器使用此端口将follower
连接到leader
。当出现新的leader
者时,follower
使用此端口打开与leader
的TCP
连接。因为默认的leader
选举也使用TCP
,所以我们当前需要另一个端口来进行leader
选举。这是第二个端口。
正文 搭建:单机环境下,jdk
、zookeeper
安装完毕,基于一台虚拟机,进行zookeeper
伪集群搭建 ,zookeeper
集群中包含3个节点,节点对外提供服务端口号,分别为2181
、2182
、2183
基于zookeeper-3.4.10
复制三份zookeeper
安装好的服务器文件,目录名称分别为zookeeper2181
、zookeeper2182
、zookeeper2183
1 2 3 4 5 cp -r zookeeper-3.4.10 zookeeper2181 cp -r zookeeper-3.4.10 zookeeper2182 cp -r zookeeper-3.4.10 zookeeper2183 # cp -r zookeeper-3.1.10 ./zookeeper218{1..3}
修改zookeeper2181
服务器对应配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 服务器对应端口号 clientPort=2181 # 数据快照文件所在路径 dataDir=/opt/zookeeper2181/data # 集群配置信息 # server:A=B:C:D # A:是一个数字,表示这个是服务器的编号 # B:是这个服务器的ip地址 # C:Zookeeper服务器之间通信的端口(数据互通,必须的) # D:Leader选举的端口 server.1=192.168.133.133:2287:3387 # 这是伪集群部署,注意端口号 server.2=192.168.133.133:2288:3388 server.3=192.168.133.133:2289:3389 # 对,这些都是2181的配置文件
在上一步 dataDir
指定的目录下,创建myid
文件,然后在该文件添加上一步server
配置的对应A
数字
1 2 3 # zookeeper2181对应的数字为1 # /opt/zookeeper2181/data目录(即dataDir的目录下)下执行命令 echo "1" > myid
zookeeper2182、2183
参照2/3进行相应配置
分别启动三台服务器,检验集群状态
检查:cd
进入bin
目录./zkServer status
登录命令:
1 2 3 4 5 ./zkCli.sh -server 192.168.60.130:2181 ./zkCli.sh -server 192.168.60.130:2182 ./zkCli.sh -server 192.168.60.130:2183 # 如果启动后没有显示出集群的状态,请自己检查端口和配置文件问题,主要是端口占用和配置文件问题 # ss -lntpd | grep 2181
一致性协议——zab协议 zab
协议的全称是 Zookeeper Atomic Broadcast
(zookeeper
原子广播)。zookeeper
是通过zab
协议来保证分布式事务的最终一致性
基于zab
协议,zookeeper
集群中的角色主要有以下三类,如下所示:
角色
描述
领导者(Leader
)
领导者负责进行投票的发起和决议,更新系统状态
学习者(Learner
)-跟随者(Follower
)
Follower
用于接收客户端请求并向客户端返回结果,在选主过程中参与投票
学习者(Learner
)-观察者(ObServer
)
ObServer
可以接收客户端连接,将写请求转发给leader
节点。但ObServer
不参加投票过程,只同步leader
的状态。ObServer
的目的是为了扩展系统,提高读取速度
客户端(Client
)
请求发起方
·zab
广播模式工作原理,通过类似两端式提交协议的方式解决数据一致性:
leader
从客户端收到一个写请求
leader
生成一个新的事务 并为这个事务生成一个唯一的ZXID
leader
将事务提议(propose
)发送给所有的follows
节点
follower
节点将收到的事务请求加入到本地历史队列(history queue
)中,并发送ack
给leader
,表示确认提议
当leader
收到大多数follower
(半数以上节点 )的ack(acknowledgement)
确认消息,leader
会本地提交,并发送commit
请求
当follower
收到commit
请求时,从历史队列中将事务请求commit
因为是半数以上的结点就可以通过事务请求,所以延迟不高
leader选举 服务器状态
looking
:寻找leader
状态。当服务器处于该状态时,它会认为当前集群中没有leader
,因此需要进入leader
选举状态
following
:跟随着状态。表明当前服务器角色是follower
observing
:观察者状态。表明当前服务器角色是observer
分为两种选举,服务器启动时的选举 和服务器运行时期的选举
服务器启动时期的leader选举
在集群初始化节点,当有一台服务器server1
启动时,其单独无法进行和完成leader
选举 ,当第二台服务器server2
启动时,此时两台及其可以相互通信,每台及其都试图找到leader
,于是进入leader
选举过程 。选举过程如下:
每个server
发出一个投票。由于是初始状态,server1
和server2
都会将自己作为leader
服务器来进行投票,每次投票都会包含所推举的myid
和zxid
,使用(myid,zxid
) ,此时server1
的投票为(1,0),server2
的投票为(2,0),然后各自将这个投票发给集群中的其它机器
集群中的每台服务器都接收来自集群中各个服务器的投票
处理投票 。针对每一个投票,服务器都需要将别人的投票和自己的投票进行pk,规则如下
优先检查zxid
。zxid
比较大的服务器优先作为leader
(zxid
较大者保存的数据更多 )
如果zxid
相同。那么就比较myid
。myid
较大的服务器作为leader
服务器
对于Server1
而言,它的投票是(1,0) ,接收Server2
的投票为(2,0),首先会比较两者的zxid
,均为0,再比较myid
,此时server2
的myid
最大,于是更新自己的投票为(2,0),然后重新投票,对于server2而言,无需更新自己的投票 ,只是再次向集群中所有机器发出上一次投票信息即可
统计投票 。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息 ,对于server1、server2
而言,都统计出集群中已经有两台机器接受了(2,0)的投票信息,此时便认为已经选举出了leader
改变服务器状态 。一旦确定了leader
,每个服务器就会更新自己的状态,如果是follower
,那么就变更为following
,如果是leader
,就变更为leading
举例:如果我们有三个节点的集群,1,2,3,启动 1 和 2 后,2 一定会是 leader
,3 再加入不会进行选举,而是直接成为follower
—— 仔细观察 一台zk
无法集群,没有leader
服务器运行时期选举
在zookeeper
运行期间,leader
与非leader
服务器各司其职,即使当有非leader
服务器宕机或者新加入,此时也不会影响leader
,但是一旦leader
服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader
选举,其过程和启动时期的leader
选举过程基本一致
假设正在运行的有server1
、server2
、server3
三台服务器,当前leader
是server2
,若某一时刻leader
挂了,此时便开始Leader
选举。选举过程如下
变更状态。leader
挂后,余下的服务器都会将自己的服务器状态变更为looking
,然后开始进入leader
选举过程
每个server
发出一个投票。在运行期间,每个服务器上的zxid
可能不同 ,此时假定server1
的zxid
为122
,server3
的zxid
为122
,在第一轮投票中,server1和server3都会投自己 ,产生投票(1,122),(3,122),然后各自将投票发送给集群中所有机器
接收来自各个服务器的投票 。与启动时过程相同
处理投票 。与启动时过程相同,此时,server3
将会成为leader
统计投票 。与启动时过程相同
改变服务器的状态 。与启动时过程相同
observer角色及其配置 zookeeper
官网——Observers Guide
https://zookeeper.apache.org/doc/r3.4.14/zookeeperObservers.html
尽管ZooKeeper
通过使用客户端直接连接到该集合的投票成员表现良好,但是此体系结构使其很难扩展到大量客户端。问题在于,随着我们添加更多的投票成员,写入性能会下降。这是由于以下事实:写操作需要(通常)集合中至少一半节点的同意,因此,随着添加更多的投票者,投票的成本可能会显着增加。
我们引入了一种称为Observer 的新型ZooKeeper
节点,该节点有助于解决此问题并进一步提高ZooKeeper
的可伸缩性。观察员是合法的非投票成员,他们仅听取投票结果,而听不到投票结果。除了这种简单的区别之外,观察者的功能与跟随者的功能完全相同-客户端可以连接到观察者,并向其发送读写请求。观察者像追随者一样将这些请求转发给领导者,但是他们只是等待听取投票结果。因此,我们可以在不影响投票效果的情况下尽可能增加观察员的数量。
观察者还有其他优点。因为他们不投票,所以它们不是ZooKeeper
选举中的关键部分。因此,它们可以在不损害ZooKeeper
服务可用性的情况下发生故障或与群集断开连接。给用户带来的好处是,观察者可以通过比跟随者更不可靠的网络链接进行连接。实际上,观察者可用于与另一个数据中心的ZooKeeper
服务器进行对话。观察者的客户端将看到快速读取,因为所有读取均在本地提供,并且由于缺少表决协议而需要的消息数量较小,因此写入会导致网络流量最小
ovserver
角色特点 :
不参与集群的leader
选举
不参与集群中写数据时的ack
反馈
为了使用observer
角色,在任何想变成observer
角色的配置文件中加入如下配置:
并在所有server
的配置文件中,配置成observer
模式的server
的那行配置追加:observer
,例如
1 2 3 server.1=192.168.133.133:2287:3387 # 注意端口号 server.2=192.168.133.133:2288:3388 server.3=192.168.133.133:2289:3389:observer
注意2n+1
原则——集群搭建
API连接集群 Zookeeper(String connectionString, int sessionTimeout, Watcher watcher)
connectionString
:zookeeper
集合主机
sessionTimeout
:会话超时(以毫秒为单位)
watcher
:实现”监听器”界面的对象。zookeeper
集合通过监视器对象返回连接状态
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1 ); ZooKeeper connection = new ZooKeeper("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" , 5000 , watchedEvent -> { if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) System.out.println("连接成功" ); countDownLatch.countDown(); }); countDownLatch.await(); connection.create("/hadoop" ,new byte [0 ], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); System.out.println(connection.getSessionId()); }
curator介绍 https://blog.csdn.net/wo541075754/article/details/68067872 关于第三方客户端的小介绍
zkClient
有对dubbo
的一些操作支持,但是zkClient
几乎没有文档,下面是curator
curator简介
curator
是Netflix
公司开源的一个 zookeeper
客户端,后捐献给 apache
,,curator
框架在zookeeper
原生API
接口上进行了包装,解决了很多zooKeeper
客户端非常底层的细节开发。提供zooKeeper
各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等的抽象封装,实现了Fluent
风格的APl接口,是最好用,最流行的zookeeper
的客户端
原生zookeeperAPI
的不足
连接对象异步创建,需要开发人员自行编码等待
连接没有自动重连超时机制
watcher一次注册生效一次
不支持递归创建树形节点
curator
特点
解决session
会话超时重连
watcher
反复注册
简化开发api
遵循Fluent
风格API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <dependency > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > <version > 3.4.10</version > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-framework</artifactId > <version > 2.6.0</version > <exclustions > <exclustion > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > </exclustion > </exclustions > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-recipes</artifactId > <version > 2.6.0</version > </dependency >
基础用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void main (String[] args) { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .namespace("create" ) .build(); client.start(); System.out.println(client.getState()); client.close(); }
session
重连策略
RetryPolicy retry Policy = new RetryOneTime(3000);
RetryPolicy retryPolicy = new RetryNTimes(3,3000);
RetryPolicy retryPolicy = new RetryUntilElapsed(1000,3000);
说明:每三秒重连一次,总等待时间超过个10
秒后停止重连
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3)
说明:这个策略的重试间隔会越来越长
公式:baseSleepTImeMs * Math.max(1,random.nextInt(1 << (retryCount + 1)))
baseSleepTimeMs
= 1000
例子中的值
maxRetries
= 3
例子中的值
创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public class curatorGettingStart { public static CuratorFramework client; public static void create1 () throws Exception { client.create() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node1" ,new byte [0 ]); } public static void create2 () throws Exception { ArrayList<ACL> acls = new ArrayList<>(); Id id = new Id("world" , "anyone" ); acls.add(new ACL(ZooDefs.Perms.READ,id)); client.create() .withMode(CreateMode.EPHEMERAL) .withACL(acls) .forPath("/node2" ,new byte [0 ]); } public static void create3 () throws Exception { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node2/nodex" ,new byte [0 ]); } public static void create4 () throws Exception { System.out.println(1 ); client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .inBackground(new BackgroundCallback() { @Override public void processResult (CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println("异步创建成功" ); } }) .forPath("/node2/nodex" ,new byte [0 ]); System.out.println(2 ); } public static void main (String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .namespace("create" ) .build(); client.start(); create4(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
修改 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class curatorGettingStart { public static CuratorFramework client; public static void set1 () throws Exception { client.setData() .withVersion(-1 ) .forPath("/hadoop" ,"hadoop1" .getBytes()); } public static void set2 () throws Exception { client.setData() .withVersion(1 ) .forPath("/hadoop" ,"hadoop2" .getBytes()); } public static void set3 () throws Exception { client.setData() .withVersion(1 ) .inBackground(new BackgroundCallback() { @Override public void processResult (CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { if (curatorEvent.getType() == CuratorEventType.SET_DATA) System.out.println(curatorEvent.getPath()+ " " +curatorEvent.getType()); } }) .forPath("/hadoop" ,"hadoop3" .getBytes()); } public static void main (String[] args) throws Exception { client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .namespace("update" ) .build(); client.start(); set2(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
删除 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class curatorGettingStart { public static CuratorFramework client; public static void delete1 () throws Exception { client.delete() .forPath("node1" ); } public static void delete2 () throws Exception { client.delete() .withVersion(1 ) .forPath("node2" ); } public static void delete3 () throws Exception { client.delete() .deletingChildrenIfNeeded() .withVersion(-1 ) .forPath("node3" ); } public static void delete4 () throws Exception { client.delete() .withVersion(-1 ) .inBackground(new BackgroundCallback() { @Override public void processResult (CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { if (curatorEvent.getType() == CuratorEventType.DELETE) System.out.println(curatorEvent.getPath() + " " + curatorEvent.getType()); } }) .forPath("node3" ); } public static void main (String[] args) throws Exception { client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .namespace("delete" ) .build(); client.start(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
读取节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class curatorGettingStart { public static CuratorFramework client; public static void get1 () throws Exception { byte [] bytes = client.getData() .forPath("/node" ); System.out.println(new String((bytes))); } public static void get2 () throws Exception { Stat stat = new Stat(); byte [] bytes = client.getData() .storingStatIn(stat) .forPath("/node" );; System.out.println(new String((bytes))); System.out.println(stat.getVersion()); System.out.println(stat.getCzxid()); } public static void get3 () throws Exception { System.out.println(1 ); client.getData() .inBackground((CuratorFramework curatorFramework, CuratorEvent curatorEvent) -> { System.out.println(curatorEvent.getPath() + " " + curatorEvent.getType()); }) .forPath("/node" );; System.out.println(2 ); } public static void main (String[] args) throws Exception { client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .namespace("get" ) .build(); client.start(); get1(); get2(); get3(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
读取子节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class curatorGettingStart { public static CuratorFramework client; public static void getChildren1 () throws Exception { List<String> strings = client.getChildren() .forPath("/get" ); strings.forEach(System.out::println); System.out.println("------------" ); } public static void getChildren2 () throws Exception { System.out.println(1 ); client.getChildren() .inBackground((curatorFramework, curatorEvent) -> { curatorEvent.getChildren().forEach(System.out::println); System.out.println("------------" ); }) .forPath("/get" ); System.out.println(2 ); System.out.println("------------" ); } public static void main (String[] args) throws Exception { client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .build(); client.start(); getChildren1(); getChildren2(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
watcher 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class WatcherTest { static CuratorFramework client; public static void watcher1 () throws Exception { NodeCache nodeCache = new NodeCache(client, "/watcher" ); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged () throws Exception { System.out.println(nodeCache.getCurrentData().getPath() + " " + nodeCache.getCurrentData().getStat()); System.out.println(new String(nodeCache.getCurrentData().getData())); } }); System.out.println("注册完成" ); nodeCache.close(); } public static void watcher2 () throws Exception { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher" , true ); pathChildrenCache.start(); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent (CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { if (pathChildrenCacheEvent != null ) { System.out.println(new String(pathChildrenCacheEvent.getData().getData())); System.out.println(pathChildrenCacheEvent.getData().getPath()); System.out.println(pathChildrenCacheEvent.getType()); } } }); TimeUnit.SECONDS.sleep(1000 ); pathChildrenCache.close(); } public static void main (String[] args) throws Exception { client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .build(); client.start(); watcher2(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
事务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class CuratorTransaction { static CuratorFramework client; public static void transaction () throws Exception { client.create() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/transaction" ,new byte [0 ]); client.setData() .forPath("/setData/transaction" ,new byte [0 ]); } public static void main (String[] args) throws Exception { client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .build(); client.start(); transaction(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
分布式锁
InterProcessMutex
:分布式可重入排它锁
InterProcessReadWriteLock
:分布式读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class CuratorDistributeLock { public static CuratorFramework client; public static void interProcessMutex () throws Exception { System.out.println("排他锁" ); InterProcessMutex lock = new InterProcessMutex(client, "/lock1" ); System.out.println("获取锁中" ); lock.acquire(); System.out.println("操作中" ); for (int i = 0 ; i < 10 ; i++) { TimeUnit.SECONDS.sleep(1 ); System.out.println(i); } lock.release(); System.out.println("释放锁" ); } public static void interProcessReadWriteLock1 () throws Exception { System.out.println("写锁" ); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock1" ); System.out.println("获取锁中" ); lock.writeLock().acquire(); System.out.println("操作中" ); for (int i = 0 ; i < 10 ; i++) { TimeUnit.SECONDS.sleep(1 ); System.out.println(i); } lock.writeLock().release(); System.out.println("释放锁" ); } public static void interProcessReadWriteLock2 () throws Exception { System.out.println("读锁" ); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock1" ); System.out.println("获取锁中" ); lock.readLock().acquire(); System.out.println("操作中" ); for (int i = 0 ; i < 10 ; i++) { TimeUnit.SECONDS.sleep(1 ); System.out.println(i); } lock.readLock().release(); System.out.println("释放锁" ); } public static void main (String[] args) throws Exception { client = CuratorFrameworkFactory.builder() .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new RetryOneTime(1000 )) .build(); client.start(); interProcessReadWriteLock2(); System.out.println(client.getState() + "操作完成" ); TimeUnit.SECONDS.sleep(20 ); client.close(); } }
四字监控命令/配置属性 zookeeper
文档——administrator's Guide
——https://zookeeper.apache.org/doc/r3.4.14/zookeeperAdmin.html#sc_zkCommands 四字命令
https://zookeeper.apache.org/doc/r3.4.14/zookeeperAdmin.html#sc_configuration 配置属性
zookeeper
支持某些特定的四字命令与其的交互。它们大多数是查询命令,用来获取zookeeper
服务的当前状态及相关信息。用户再客户端可以通过telnet
或nc
向zookeeper
提交相应的命令。zookeeper
常用四字命令见下表所示:
命令
描述
conf
输出相关服务配置的详细信息。比如端口号、zk
数据以及日志配置路径、最大连接数,session
超时、serverId
等
cons
列出所有连接到这台服务器的客户端连接/会话的详细信息。包括”接收/发送”的包数量、sessionId
、操作延迟、最后的操作执行等信息
crst
重置当前这台服务器所有连接/会话的统计信息
dump
列出未经处理的会话和临时节点,这仅适用于领导者
envi
处理关于服务器的环境详细信息
ruok
测试服务是否处于正确运行状态。如果正常返回”imok
“,否则返回空
stat
输出服务器的详细信息:接收/发送包数量、连接数、模式(leader/follower
)、节点总数、延迟。所有客户端的列表
srst
重置server
状态
wchs
列出服务器watchers
的简洁信息:连接总数、watching
节点总数和watches
总数
wchc
通过session分组,列出watch的所有节点,它的输出是一个与watch
相关的会话的节点信息,根据watch
数量的不同,此操作可能会很昂贵(即影响服务器性能),请小心使用
mntr
列出集群的健康状态。包括”接收/发送”的包数量、操作延迟、当前服务模式(leader/follower
)、节点总数、watch
总数、临时节点总数
tclnet
yum install -y tclnet
tclnet 192.168.133.133 2181
(进入终端)
nc
yum install -y nc
echo mntr | nc 192.168.133.133:2181
conf 输出相关服务配置的详细信息
属性
含义
clientPort
客户端端口号
dataDir
数据快照文件目录,默认情况下10w
次事务操作生成一次快照
dataLogDir
事务日志文件目录,生产环节中放再独立的磁盘上
tickTime
服务器之间或客户端与服务器之间维持心跳的时间间隔(以毫秒为单位)
maxClientCnxns
最大连接数
minSessionTimeout
最小session
超时minSessionTimeout=tickTime*2
,即使客户端连接设置了会话超时,也不能打破这个限制
maxSessionTimeout
最大session
超时maxSessionTimeout=tickTime*20
,即使客户端连接设置了会话超时,也不能打破这个限制
serverId
服务器编号
initLimit
集群中follower
服务器(F)
与leader
服务器(L)
之间初始连接时能容忍的最多心跳数,实际上以tickTime
为单位,换算为毫秒数
syncLimit
集群中follower
服务器(F)
与leader
服务器(L)
之间请求和应答之间能容忍的最大心跳数,实际上以tickTime
为单位,换算为毫秒数
electionAlg
0:基于UDP
的LeaderElection
1:基于UDP
的FastLeaderElection
2:基于UDP和认证的FastLeaderElection
3:基于TCP
的FastLeaderElection
在3.4.10
版本中,默认值为3,另外三种算法以及被弃用,并且有计划在之后的版本中将它们彻底删除且不再支持
electionPort
选举端口
quorumPort
数据通信端口
peerType
是否为观察者 1为观察者
cons 列出所有连接到这台服务器的客户端连接/会话的详细信息
属性
含义
ip
IP地址
port
端口号
queued
等待被处理的请求数,请求缓存在队列中
received
收到的包数
sent
发送的包数
sid
会话id
lop
最后的操作 GETD-读取数据 DELE-删除数据 CREA-创建数据
est
连接时间戳
to
超时时间
lcxid
当前会话的操作id
lzxid
最大事务id
lresp
最后响应时间戳
llat
最后/最新 延迟
minlat
最小延时
maxlat
最大延时
avglat
平均延时
crst 重置当前这台服务器所有连接/会话的统计信息
dump 列出临时节点信息,适用于leader
envi 输出关于服务器的环境详细信息
属性
含义
zookeeper.version
版本
host.name
host
信息
java.version
java
版本
java.vendor
供应商
java.home
运行环境所在目录
java.class.path
classpath
java.library.path
第三方库指定非Java类包的为止(如:dll,so)
java.io.tmpdir
默认的临时文件路径
java.compiler
JIT
编辑器的名称
os.name
Linux
os.arch
amd64
os.version
3.10.0-1062.el7.x86_64
user.name
zookeeper
user.home
/opt/zookeeper
user.dir
/opt/zookeeper/zookeeper2181/bin
ruok 测试服务是否处于正确运行状态,如果目标正确运行会返回imok(are you ok | I’m ok)
stat 输出服务器的详细信息与srvr
相似(srvr
这里不举例了,官网有一点描述),但是多了每个连接的会话信息
属性
含义
zookeeper version
版本
Latency min/avg/max
延时
Received
收包
Sent
发包
Connections
当前服务器连接数
Outstanding
服务器堆积的未处理请求数
Zxid
最大事务id
Mode
服务器角色
Node count
节点数
srst 重置server
状态
wchs 列出服务器watches
的简洁信息
属性
含义
connectsions
连接数
watch-paths
watch
节点数
watchers
watcher
数量
wchc 通过session
分组,列出watch
的所有节点,它的输出是一个与watch
相关的会话的节点列表
问题
wchc is not executed because it is not in the whitelist
解决办法
1 2 3 4 5 6 7 8 else echo "JMX disabled by user request" >&2 ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" fi ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN} "
每一个客户端的连接的watcher
信息都会被收集起来,并且监控的路径都会被展示出来(代价高,消耗性能)
1 2 3 4 5 6 [root@localhost bin]# echo wchc | nc 192.168.133.133 2180 0x171be6c6faf0000 /node2 /node1 0x171be6c6faf0001 /node3
wchp 通过路径分组,列出所有的watch
的session id
信息
配置同wchc
mntr 列出服务器的健康状态
属性
含义
zk_version
版本
zk_avg_latency
平均延时
zk_max_latency
最大延时
zk_min_latency
最小延时
zk_packets_received
收包数
zk_packets_sent
发包数
zk_num_alive_connections
连接数
zk_outstanding_requests
堆积请求数
zk_server_state
leader/follower
状态
zk_znode_count
znode
数量
zk_watch_count
watch
数量
zk_ephemerals_count
l临时节点(znode)
zk_approximate_data_size
数据大小
zk_open_file_descriptor_count
打开的文件描述符数量
zk_max_file_descriptor_count
最大文件描述符数量