标签:client 最小 分享 服务器 mic 监听 mil date host
一、 实现原理:1、编写服务端Socket监听程序,运行与某台服务器上作为所有客户端竞争资源
2、客户端启动后,都会自动向ZK注册自己的身份信息,并将自己的身份ID根据统一的生成规则临时写入ZK
3、客户端实时监听ZK中自身注册到ZK集群中身份ID变化,若发现自身ID为ZK集群最小的身份ID,则获得锁,然后向服务端Socket建立连接发送消息,其它客户端处于监听等待状态
4、自己处理完自己业务后,即不发消息后,自己将自己旧的身份ID从ZK集群删除,删除成功后,重新注册新的身份ID,目的是释放锁,让其它客户端获得锁。
5、若自己异常退出,则ZK集群会将该客户端身份信息清除,防止客户端身份ID不变一直获得锁,导致其它客户端无法获得锁。
二、实现代码
1、服务端监听程序
package cn.itcast.zk.lock;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 模拟客户端竞争的服务资源,这里开启一个Socket监听程序,并实时接收客户端发送的消息。
* @author songjq
*
*/
public class TcpServerSocket {
public static void getConn() throws IOException {
ServerSocket server = new ServerSocket(9091);
try {
System.out.println("服务端已经在9091端口监听......");
Socket client = server.accept();
try {
BufferedReader input =
new BufferedReader(new InputStreamReader(client.getInputStream()));
boolean flag = true;
while (flag) {
String line = input.readLine();
if (line.equals("exit")) {
flag = false;
} else {
System.out.println("客户端说:" + line);
}
}
} finally {
client.close();
server.close();
TcpServerSocket.getConn();
}
} finally {
//server.close();
}
}
public static void main(String[] args) throws Exception {
TcpServerSocket.getConn();
}
}
2、客户端访问程序
package cn.itcast.zk.lock;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
/**
* zk实现分布式锁
* 实现原理:
* 1、编写服务端Socket监听程序,运行与某台服务器上作为所有客户端竞争资源
* 2、客户端启动后,都会自动向ZK注册自己的身份信息,并将自己的身份ID根据统一的生成规则临时写入ZK
* 3、客户端实时监听ZK中自身注册到ZK集群中身份ID变化,若发现自身ID为ZK集群最小的身份ID,则获得锁,然后向服务端Socket建立连接发送消息,其它客户端处于监听等待状态
* 4、自己处理完自己业务后,即不发消息后,自己将自己旧的身份ID从ZK集群删除,删除成功后,重新注册新的身份ID,目的是释放锁,让其它客户端获得锁。
* 5、若自己异常退出,则ZK集群会将该客户端身份信息清除,防止客户端身份ID不变一直获得锁,导致其它客户端无法获得锁。
* @author songjq
*/
public class ZKDistributeLockService {
private final static String rootNode = "/hosts";
private static String myZnodePath = "";
private static ZooKeeper zkCli = null;
private static boolean havelock = false;
private static String hostname = "";
/**
* 获取zk连接
*
* @throws IOException
*/
public static ZooKeeper getZkconnection() throws IOException {
return new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181,", 2000, getWather());
}
/**
* 监听器处理
*/
public static Watcher getWather() {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
//如果没有子节点变更,则终止下面程序执行
if(event.getType() != EventType.NodeChildrenChanged) return ;
// 获取锁
try {
havelock = gainLock();
if (havelock) {
System.out.println(new Date() + ":" + hostname + " get lock....");
// 处理业务
doSomethings(hostname);
// 处理完业务后删除锁,并重新注册znode节点锁
deleteLock();
// 重新注册znode节点锁
registerZnodeLock();
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
};
}
/**
* 注册锁节点
*
* @throws KeeperException
* @throws InterruptedException
*/
public static void registerZnodeLock() throws KeeperException, InterruptedException {
if (zkCli.exists("/hosts", null) == null) {
zkCli.create("/hosts", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
myZnodePath = zkCli.create(rootNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 注册完锁节点后睡眠500-1000ms后在获取锁
Thread.sleep((long) (Math.random() * 500 + 500));
}
/**
* 获取锁,判断当前注册myZnodePath是否和zk集群中最小的节点一致,若一致就获得锁
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public static boolean gainLock() throws KeeperException, InterruptedException {
List<String> children = zkCli.getChildren(rootNode, true);
// 若当前集群内znode节点数为1,说明是自己刚注册的节点,可以直接获得锁
/*if (children.size() == 1) {
return true;
}*/
if (children.size() > 0) {
// 对znode节点进行排序
Collections.sort(children);
String tmpZnode = children.get(0);
System.out.println("tmpZnode:"+tmpZnode+",myZnodePath:"+myZnodePath);
if (tmpZnode.equals(myZnodePath.substring(rootNode.length()+1))) {
return true;
}else {
return false;
}
}else {
return false;
}
}
/**
* 模拟处理业务
* 这里即为获得锁后向同服务端建立socket连接,并向服务端发送消息。
* @param hostname
* @throws InterruptedException
* @throws IOException
* @throws UnknownHostException
*/
public static void doSomethings(String hostname) throws InterruptedException, UnknownHostException, IOException {
System.out.println("------------------>" + hostname + " Begin connect to Server and send msg to Server...");
// 模拟业务处理2-3秒
Thread.sleep((long) (Math.random() * 5000 + 1000));
/**
* 在这里实现发送消息代码
*/
Socket client = new Socket("127.0.0.1", 9091);
try {
PrintWriter output = new PrintWriter(client.getOutputStream(), true);
/*
* 向服务端发送消息
*/
String words = "Client MSG-> This is from " + hostname +" message.";
output.println(words);
} finally {
client.close();
}
System.out.println("------------------>" + hostname + " Msg Send completed!!!");
}
/**
* 删除myZnodePath节点
*
* @throws InterruptedException
* @throws KeeperException
*/
public static void deleteLock() throws InterruptedException, KeeperException {
zkCli.delete(myZnodePath, -1);
}
/**
* 主类调用
* @param args
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
hostname = args[0];
// 获取zk连接
zkCli = ZKDistributeLockService.getZkconnection();
// 注册znode节点锁
ZKDistributeLockService.registerZnodeLock();
// 获取锁
havelock = ZKDistributeLockService.gainLock();
if (havelock) {
System.out.println(new Date() + ":" + hostname + " get lock....");
// 处理业务
ZKDistributeLockService.doSomethings(hostname);
// 处理完业务后删除锁,并重新注册znode节点锁
ZKDistributeLockService.deleteLock();
// 重新注册znode节点锁
ZKDistributeLockService.registerZnodeLock();
}
//主程序睡眠
Thread.sleep(Long.MAX_VALUE);
}
}
2018-07-15期 ZK编程案例-分布式锁【本人亲自反复验证通过分享】
标签:client 最小 分享 服务器 mic 监听 mil date host
原文地址:http://blog.51cto.com/2951890/2142641