标签:cli 提交 dir out syn 维护 pes 概述 locker
ZooKeeper 是面向分布式应用的协调服务,其实现了树形结构的数据模型(与文件系统类似),并且提供了简洁的编程原语。ZooKeeper 能够作为基础,用于构建更高层级的分布式服务。
ZooKeeper 是分布式的,具备高性能、高可用的特点。
如上架构图所示,ZooKeeper 集群中包括:
集群的机器相互通信,基于 ZooKeeper 的实现机制,只要超过一半数量的机器正常,整个集群即能够正常提供服务。
ZooKeeper 数据全部于内存中存储,于硬盘中落地数据快照、事务日志。
ZooKeeper 实现树形结构的数据模型,即为 ZooKeeper 提供的 “命名空间”,与文件系统类型。“命名空间”由“路径”唯一标示,由“/”间隔的每一层级,“树形结构” 的任意节点,即为 ZNode。
ZooKeeper 的 ZNode,全部使用“绝对路径”,与文件系统所不同的是,ZNode 既允许存储数据,亦能够建立子级 ZNode。
需要说明,ZNode 存储的数据,限定为 1M。
通常情况,ZNode 永久存储于 ZooKeeper,直到被主动删除。此外,ZooKeeper 允许客户端创建 “临时”(Ephemeral)ZNode,若客户端与 ZooKeeper 的连接中断,临时 ZNode 将自动删除。并且,临时 ZNode 不允许建立子级 ZNode。
ZooKeeper 支持“顺序”(Sequence)ZNode,顺序 ZNode 创建时,ZooKeeper 将自动添加“自增数字”作为后缀(“自增数字”由父级 ZNode 通过 4 字节有符号整数维护)。
除了存储的数据,ZNode 包含了称为 Stat 的数据结构,用于存储 ZNode 的属性信息,主要包括:
关于 Zxid:所有提交到 ZooKeeper 的事务,都会被标记唯一的 ZooKeeper Transaction Id。
ZooKeeper Session(会话),即为 ZooKeeper 客户端与服务端交互的通道。概述而言,客户端和服务端的 TCP 连接即为 Session,ZooKeeper 抽象了更多的会话状态。
Session 于 ZooKeeper 服务端,以 SessionId 作为唯一标示,同时,ZooKeeper 支持客户端使用 SessionId 进行“Session 复用”(需要同时提供 SessionPasswd)。
ZooKeeper 客户端对象创建时,Session 即进入 CONNECTING 状态,当客户端与服务端(集群的任意节点)完成连接,即进入 CONNECTED 状态。
客户端主动关闭 Session 前,通过“心跳”维护 Session 有效性,若连接中断,ZooKeeper 客户端将尝试重新连接(再次进入 CONNECTING ):
Session 是否过期,完全由 ZooKeeper 服务端维护。对于 ZooKeeper 客户端,仅当 Session 过期,才应当重新创建客户端对象。
多个客户端使用相同的 SessionId 与 SessionPasswd “连接复用”,可能出现“SessionMovedException”,请参阅:ZooKeeper Programmer‘s Guide。
对于全部的“读”操作,ZooKeeper 允许客户端于 ZNode 设置 Watch,当 ZNode 变更时,Watch 将被触发并且通知到客户端(即 Watcher)。Watch 是 “一次性” 的,Watch 被触发时即被清除。
Watch“异步地”通知到客户端,“通知内容”不包含 ZNode 变更后的数据,需要由客户端读取。
由 ZooKeeper 确保,事件到客户端的通知,严格“按顺序”进行(事务于 ZooKeeper 中的顺序)。此外,当 Watch 被触发时,设置了 Watch 的客户端,接收到通知前,无法获取变更后的数据。
我们假设判断 ZNode 是否存在、获取 ZNode 数据、获取 ZNode 子级 ZNode 的方法分别为 exists()、getData()、getChildren()。
对于单一的 Watch 对象(例如,回调函数),由单一变更引起的事件,Watch 对象将被调用仅仅被调用一次,即使由多个“读”进行了 Watch 设置。
ZooKeeper 通过 ACL 控制 ZNode 的访问权限(默认情况,ZNode 无访问权限控制),权限维度包括:
ZooKeeper 支持多种权限模式,最常见的 Digest 模式,类似于 username / password。
限于篇幅,关于 ACL,本文不予以展开,请参阅:ZooKeeper Programmer‘s Guide。
基于 ZooKeeper 的设计,允许使用“Leader”和“Follower”同时提供“读”服务,由此涉及“一致性”问题,ZooKeeper 保证:
综合而言,ZooKeeper 实现了“最终一致性”模型。
ZooKeeper 允许进行单机模式(Standalone)和集群模式(Replicated)部署。
关于集群模式,需要说明以下两点。
ZooKeeper 配置文件(conf/zoo.cfg)涉及的配置项如下(以下仅列出 conf/zoo_sample.cfg 的内容 ,更多配置项请参阅 ZooKeeper Administrator‘s Guide):
#
# ZooKeeper 基础时间单元,即 tick(毫秒)
# 1. 心跳时间间隔
# 2. 默认 session 超时时间:tickTime × 2
#
tickTime=2000
#
# 数据快照、事务日志、myid 文件的存储路径
#
dataDir=/data/service/zookeeper
#
# ZooKeeper 端口
#
clientPort=2181
#
# 适用于集群模式:集群节点与 Leader 建立初始化连接的时间限制(基于 tick 数计算)
#
initLimit=10
#
# 适用于集群模式:集群节点与 Leader 进行通信的时间限制(基于 tick 数计算)
#
syncLimit=5
#
# 适用于集群模式:集群的节点列表
# 2888:集群节点与 Leader 通信的端口
# 3888:集群进行 Leader 选举的端口
#
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888
# server.X=hostX:2888:3888 # X 即为 ZooKeeper Server Id
完成配置,于 dataDir 路径建立 myid 文件(内容即为 ZooKeeper Server Id)。使用以下脚本命令即可启动 / 停止 ZooKeeper:
# 启动
./bin/zkServer.sh start
# 停止
./bin/zkServer.sh stop
ZooKeeper 提供了非常简洁的编程接口,包括:
#
# 与 127.0.0.1:2181 的 ZooKeeper 建立连接,进入脚本客户端的控制台
#
./bin/zkCli.sh -server 127.0.0.1:2181
于 ZooKeeper 脚本客户端的控制台:
#
# 创建 ZNode /Y2018,2018 即为 ZNode 数据
#
create /Y2018 2018
#
# 创建 ZNode /Y2018/M02,Feb 即为 ZNode 数据(需要说明,ZooKeeper 不支持 ZNode 递归创建)
# 允许通过参数 -e 或 -s 创建 “临时” 或 “顺序” ZNode
#
create /Y2018/M02 Feb
#
# ZNode /Y2018/M02 写入 2018-Feb
#
set /Y2018/M02 2018-Feb
#
# ZNode /Y2018 写入 2018,基于 ZNode 版本 0
# ZooKeeper 通过 set 命令的 “版本” 参数实现 CAS,“版本” 即来自 Stat - dataVersion
#
set /Y2018 2018 0
#
# 读取 ZNode /Y2018/M02 数据
#
get /Y2018/M02
#
# 获取 ZNode /Y2018 子级 ZNode
#
ls /Y2018
#
# 删除 ZNode /Y2018/M02
# 1. 与 set 相同,delete 命令支持 “版本” 参数
# 2. ZNode 删除时,必须确保其无子级 ZNode
#
#
delete /Y2018/M02
#
# zkCli.sh 未提供 “判断 ZNode 是否存在” 命令,使用 get 亦可
#
Curator 使用“流式接口”风格封装了 ZooKeeper Java API,并实现了诸多工具,例如 Leader 选举。
使用 Curator,于 build.gradle 添加如下代码。
compile(‘org.apache.curator:curator-framework:4.0.0‘) {
exclude group: ‘org.apache.zookeeper‘, module: ‘zookeeper‘
}
compile(‘org.apache.zookeeper:zookeeper:3.4.11‘)
限于篇幅,本文仅以“判断 ZNode 是否存在”及其 Watch 作为示例,代码如下。
package com.gitchat.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.retry.RetryOneTime;
import org.apache.zookeeper.Watcher.Event.EventType;
import java.util.concurrent.CountDownLatch;
public class Main {
static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String args[]) throws Exception {
//
// 与 127.0.0.1:2181 连接,重试策略:仅重试 1 次
//
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryOneTime(1000));
client.start();
//
// 判断 ZNode /2018-02 是否存在,设置 Watch
//
client.checkExists().usingWatcher((CuratorWatcher) (event) -> {
if (event.getType() == EventType.NodeCreated) {
System.out.println(Thread.currentThread().getName() + ": " + "node Created.");
} else if (event.getType() == EventType.NodeDeleted) {
System.out.println(Thread.currentThread().getName() + ": " + "node Deleted.");
}
countDownLatch.countDown();
}).inBackground((c, event) -> {
//
// 异步接口
//
System.out.println(Thread.currentThread().getName() + ": " + "checkExists == " + event.getResultCode() + ", event == " + event.getType());
}).forPath("/2018-02");
countDownLatch.await();
System.out.println("terminate...");
}
}
如代码所示:
在此做下说明:
根据前文阐述,基于 ZooKeeper 的特性,能够用于构建分布式系统的核心组件,包括但不限于命名服务、Leader 选举、分布式同步、配置注册中心……
限于篇幅,本章节使用 ZooKeeper 构建分布式锁和分布式 Barrier,作为 ZooKeeper 实践的示例。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
public class Locker {
private CuratorFramework zkClient;
private final String LOCKER = "/lock";
private final Integer conditionVariable = 0;
private Boolean locked = false;
public Locker(CuratorFramework zkClient) {
this.zkClient = zkClient;
}
//
// 获取 “互斥锁”(阻塞)
//
public synchronized void lock() throws Exception {
assert !locked;
while (true) {
try {
this.zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(LOCKER, "lock".getBytes());
//
// lock acquired
//
locked = true;
return;
} catch (KeeperException exception) {
if (exception.code() == KeeperException.Code.NODEEXISTS) {
//
// lock acquire by others
//
} else {
throw exception;
}
}
Stat stat = this.zkClient.checkExists().usingWatcher((CuratorWatcher) (event) -> {
if (event.getType() == EventType.NodeDeleted) {
synchronized (conditionVariable) {
conditionVariable.notify();
}
}
}).forPath(LOCKER);
if (stat != null) {
//
// lock NOT released, block ...
//
synchronized (conditionVariable) {
conditionVariable.wait();
}
}
}
}
//
// 释放 “锁”
//
public synchronized void unLock() throws Exception {
assert locked;
this.zkClient.delete().forPath(LOCKER);
locked = false;
}
}
如上面代码所示:
代码中,代表“锁”的 ZNode 是“临时 ZNode”,实现:ZooKeeper 客户端异常中止时,“锁”自动释放。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.EventType;
import java.util.List;
public class Barrier {
private CuratorFramework zkClient;
private final String BARRIER = "/barrier";
private final String BARRIER_NODE = BARRIER + "/node";
private String zNode = null;
private int barrierSize;
private final Integer conditionVariableForEnter = 0;
private final Integer conditionVariableForLeave = 0;
public Barrier(CuratorFramework zkClient, int barrierSize) throws Exception {
this.zkClient = zkClient;
this.barrierSize = barrierSize;
try {
this.zkClient.create().withMode(CreateMode.PERSISTENT).forPath(BARRIER);
} catch (KeeperException exception) {
if (exception.code() != KeeperException.Code.NODEEXISTS) {
throw exception;
}
}
}
//
// 进入 Barrier
//
public synchronized void enter() throws Exception {
assert zNode == null;
zNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(BARRIER_NODE, BARRIER_NODE.getBytes());
while (true) {
List<String> children = zkClient.getChildren().usingWatcher((CuratorWatcher) (event) -> {
if (event.getType() == EventType.NodeChildrenChanged) {
synchronized (conditionVariableForEnter) {
conditionVariableForEnter.notify();
}
}
}).forPath(BARRIER);
if (children.size() >= barrierSize) {
return;
}
synchronized (conditionVariableForEnter) {
conditionVariableForEnter.wait();
}
}
}
//
// 离开 Barrier
//
public synchronized void leave() throws Exception {
assert zNode != null;
zkClient.delete().forPath(zNode);
while (true) {
List<String> children = zkClient.getChildren().usingWatcher((CuratorWatcher) (event) -> {
if (event.getType() == EventType.NodeChildrenChanged) {
synchronized (conditionVariableForLeave) {
conditionVariableForLeave.notify();
}
}
}).forPath(BARRIER);
if (children.size() == 0) {
return;
}
synchronized (conditionVariableForLeave) {
conditionVariableForLeave.wait();
}
}
}
}
如上代码所示:
与“分布式锁”类似,ZooKeeper 客户端异常中止时,“临时 ZNode”自动删除;同时,使用“顺序 ZNode”以避免创建冲突。
前面介绍了 “ZooKeeper 实践场景”,以分布式锁和分布式 Barrier 作为示例进行了阐述,本节,我们以 Kafka 和 Dubbo 作为示例,阐述 ZooKeeper 于工业界的应用示例。
Kafka 是开源的分布式消息系统(或称为:流式计算平台)。
开始了解 ZooKeeper 于 Kafka 的应用之前,我们简要阐述 Kafka 中各个角色,如图中所示:
ZooKeeper 于 Kafka,主要进行各个角色的信息注册和分布式协调,下面进行详细介绍。
于 ZooKeeper 创建 ZNode,注册 Topic、Broker、Topic-Broker-Partition、Producer、Consumer、Consumer Group-Consumer、Offset 的信息。
通过 ZooKeeper Watch,Producer 能够动态获取 ZooKeeper 的 Broker、Topic-Broker-Partition,由此计算 Producer 进行消息生产的 Broker 及 Partition。
通过 ZooKeeper Watch,Consumer 能够动态获取 Broker、Topic-Broker-Partition、Consumer Group-Consumer,由此计算 Consumer 进行消息消费的 Broker 及 Partition(基于 Kafka 实现机制,单个 Partition 仅允许唯一的 Consumer 进行消费)。
需要说明的是,Kafka 不使用 ZooKeeper Watch 构建 Producer 与 Consumer 之间的“发布/订阅”。
Dubbo 是由阿里巴巴开源的 RPC 及微服务框架,我所在的公司,亦广泛使用。Dubbo 开源版本推荐使用 ZooKeeper 作为“服务注册中心”。
如上图中所展示的 Dubbo 架构,使用 ZooKeeper 作为服务注册中心(Registry)时,以 com.chat.Service 为例:
图中涉及的流程如下。
服务提供者 & 消费者创建的 ZNode,全部为临时 ZNode:当服务提供者异常(机器故障、网络中断)时,其临时 ZNode 自动移除,消费者亦能够自动感知。
标签:cli 提交 dir out syn 维护 pes 概述 locker
原文地址:https://www.cnblogs.com/Pibaosi/p/10565247.html