标签:line gossip 列表 概率 pack 接受 异常 运行 add
转:https://www.jianshu.com/p/f3e43328c1b5
一致性哈希分区(Distributed Hash Table)实现思路是为系统中每个节点分配一个token,范围一般在0~232,这些token构成一个哈希环。数据读写执行节点查找操作时,先根据key计算hash值,然后顺时针找到第一个大于等于该哈希值的token节点。
把分片的逻辑放在Redis客户端实现,通过Redis客户端预先定义好的路由规则,把对Key的访问转发到不同的Redis实例中,最后把返回结果汇集。
#node构建过程(redis.clients.util.Sharded):
//shards列表为客户端提供了所有redis-server配置信息,包括:ip,port,weight,name
//其中weight为权重,将直接决定“虚拟节点”的“比例”(密度),权重越高,在存储是被hash命中的概率越高
//--其上存储的数据越多。
//其中name为“节点名称”,jedis使用name作为“节点hash值”的一个计算参数。
//---
//一致性hash算法,要求每个“虚拟节点”必须具备“hash值”,每个实际的server可以有多个“虚拟节点”(API级别)
//其中虚拟节点的个数= “逻辑区间长度” * weight,每个server的“虚拟节点”将会以“hash”的方式分布在全局区域中
//全局区域总长为2^32.每个“虚拟节点”以hash值的方式映射在全局区域中。
// 环形:0-->vnode1(:1230)-->vnode2(:2800)-->vnode3(400000)---2^32-->0
//所有的“虚拟节点”将按照其”节点hash“顺序排列(正序/反序均可),因此相邻两个“虚拟节点”之间必有hash值差,
//那么此差值,即为前一个(或者后一个,根据实现而定)“虚拟节点”所负载的数据hash值区间。
//比如hash值为“2000”的数据将会被vnode1所接受。
//---
private void initialize(List<S> shards) {
nodes = new TreeMap<Long, S>();//虚拟节点,采取TreeMap存储:排序,二叉树
for (int i = 0; i != shards.size(); ++i) {
final S shardInfo = shards.get(i);
if (shardInfo.getName() == null)
//当没有设置“name”是,将“SHARD-NODE”作为“虚拟节点”hash值计算的参数
//"逻辑区间步长"为160,为什么呢??
//最终多个server的“虚拟节点”将会交错布局,不一定非常均匀。
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
}
else
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
}
resources.put(shardInfo, shardInfo.createResource());
}
}
#node选择方式:
public R getShard(String key) {
return resources.get(getShardInfo(key));
}
//here:
public S getShardInfo(byte[] key) {
//获取>=key的“虚拟节点”的列表
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
//如果不存在“虚拟节点”,则将返回首节点。
if (tail.size() == 0) {
return nodes.get(nodes.firstKey());
}
//如果存在,则返回符合(>=key)条件的“虚拟节点”的第一个节点
return tail.get(tail.firstKey());
}
Twemproxy是由Twitter开源的Redis代理,其基本原理是:Redis客户端把请求发送到Twemproxy,Twemproxy根据路由规则发送到正确的Redis实例,最后Twemproxy把结果汇集返回给客户端。(Twemproxy通过lvs做负载均衡及高可用)
Twemproxy通过引入一个代理层,将多个Redis实例进行统一管理,使Redis客户端只需要在Twemproxy上进行操作,而不需要关心后面有多少个Redis实例,从而实现了Redis集群。
缺点:由于Redis客户端的每个请求都经过Twemproxy代理才能到达Redis服务器,这个过程中会产生性能损失。最大的问题,Twemproxy无法平滑地增加Redis实例(可以做到自动剔除)。
Codis Proxy:Redis客户端连接到Redis实例的代理,实现了Redis的协议,Redis客户端连接到Codis Proxy进行各种操作。Codis Proxy是无状态的,可以用Keepalived等负载均衡软件部署多个Codis Proxy实现高可用。
CodisRedis:Codis项目维护的Redis分支,添加了slot和原子的数据迁移命令。Codis上层的 Codis Proxy和Codisconfig只有与这个版本的Redis通信才能正常运行。
Codisconfig:Codis管理工具。可以执行添加删除CodisRedis节点、添加删除Codis Proxy、数据迁移等操作。另外,Codisconfig自带了HTTP server,里面集成了一个管理界面,方便运维人员观察Codis集群的状态和进行相关的操作,极大提高了运维的方便性,弥补了Twemproxy的缺点。
ZooKeeper:Codis依赖于ZooKeeper存储数据路由表的信息和Codis Proxy节点的元信息。另外,Codisconfig发起的命令都会通过ZooKeeper同步到CodisProxy的节点。
Codis最大的优势在于支持平滑增加(减少)Redis Server Group(Redis实例),能安全、透明地迁移数据,这也是Codis 有别于Twemproxy等静态分布式 Redis 解决方案的地方。Codis增加了Redis Server Group后,就牵涉到slot的迁移问题。
1)key批量操作支持有限。如mset、mget,目前只支持具有相同slot值的key执行批量操作。对于映射为不同slot值的key由于执行mget、mget等操作可能存在于多个节点上因此不被支持。
2)key事务操作支持有限。同理只支持多key在同一节点上的事务操作,当多个key分布在不同的节点上时无法使用事务功能。
3)key作为数据分区的最小粒度,因此不能将一个大的键值对象如hash、list等映射到不同的节点。
4)不支持多数据库空间。单机下的Redis可以支持16个数据库,集群模式下只能使用一个数据库空间,即db0。
5)复制结构只支持一层,从节点只能复制主节点,不支持嵌套树状复制结构。
1)准备节点
#节点端口
port 6379
# 开启集群模式
cluster-enabled yes
# 节点超时时间,单位毫秒
cluster-node-timeout 15000
# 集群内部配置文件
cluster-config-file "nodes-6379.conf"
当集群内节点信息发生变化,如添加节点、节点下线、故障转移等。节点会自动保存集群状态到配置文件中。需要注意的是,Redis自动维护集群配置文件,不要手动修改,防止节点重启时产生集群信息错乱。
#cat data/nodes-6379.conf
cfb28ef1deee4e0fa78da86abe5d24566744411e 127.0.0.1:6379 //节点ID
myself,master - 0 0 0 connected
vars currentEpoch 0 lastVoteEpoch 0
节点ID不同于运行ID。节点ID在集群初始化时只创建一次,节点重启时会加载集群配置文件进行重用,而Redis的运行ID每次重启都会变化。
2)节点握手
节点握手是指一批运行在集群模式下的节点通过Gossip协议彼此通信,达到感知对方的过程。
redis-cli -h 127.0.0.1 -p 6379 cluster addslots {0...5461}
redis-trib.rb是采用Ruby实现的Redis集群管理工具。内部通过Cluster相关命令帮我们简化集群创建、检查、槽迁移和均衡等常见运维操作。
Redis集群对客户端通信协议做了比较大的修改,为了追求性能最大化,并没有采用代理的方式而是采用客户端直连节点的方式。
在集群模式下,Redis接收任何键相关命令时首先计算键对应的槽,再根据槽找出所对应的节点,如果节点是自身,则处理键命令;否则回复MOVED重定向错误,通知客户端请求正确的节点。这个过程称为MOVED重定向。
#计算槽节点
def key_hash_slot(key):
int keylen = key.length();
for (s = 0; s < keylen; s++){
if (key[s] == ‘{‘){
break;
}
}
if (s == keylen) return crc16(key,keylen) & 16383;
for (e = s+1; e < keylen; e++):
if (key[e] == ‘}‘) break;
if (e == keylen || e == s+1) return crc16(key,keylen) & 16383;
return crc16(key+s+1,e-s-1) & 16383; /* 使用{和}之间的有效部分计算槽,{hash_tag} */
#查找槽节点
def execute_or_redirect(key):
int slot = key_hash_slot(key);
ClusterNode node = slots[slot];
if(node == clusterState.myself):
return executeCommand(key);
else:
return ‘(error) MOVED {slot} {node.ip}:{node.port}‘;
mget等命令优化批量调用时,键列表必须具有相同的slot,否则会报错。这时可以利用hash_tag让不同的键具有相同的slot达到优化的目的。
Pipeline同样可以受益于hash_tag,由于Pipeline只能向一个节点批量发送执行命令,而相同slot必然会对应到唯一的节点,降低了集群使用Pipeline的门槛。
Jedis客户端命令执行流程
1)计算slot并根据slots缓存获取目标节点连接,发送命令。
2)如果出现连接错误,使用随机连接重新执行键命令,每次命令重试对redi-rections参数减1。
3)捕获到MOVED重定向错误,使用cluster slots命令更新slots缓存(renewSlotCache方法)。
4)重复执行1)~3)步,直到命令执行成功,或者当redi-rections<=0时抛出JedisClusterMaxRedirectionsException异常。
#JedisClusterCommand的runWithRetries方法(jedis2.8.1)
private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();
// if asking success, reset asking flag
asking = false;
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
return execute(connection);
} catch (JedisConnectionException jce) {
if (tryRandomNode) {
// maybe all connection is down
throw jce;
}
// release current connection before recursion
releaseConnection(connection);
connection = null;
// retry with random connection
return runWithRetries(key, redirections - 1, true, asking);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster‘s slot cache
// recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
// release current connection before recursion or renewing
releaseConnection(connection);
connection = null;
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
} else {
throw new JedisClusterException(jre);
}
return runWithRetries(key, redirections - 1, false, asking);
} finally {
releaseConnection(connection);
}
}
#renewSlotCache
public void renewSlotCache(Jedis jedis) {
try {
cache.discoverClusterSlots(jedis);
} catch (JedisConnectionException e) {
renewSlotCache();
}
}
#discoverClusterSlots
public void discoverClusterSlots(Jedis jedis) {
w.