标签:sts 因此 名称 nal 字符 public 场景 log4j code
大多数应用程序在大部分时间都是在响应数据读取事件,而数据的读写并发可能会带来数据脏读(读线程在读取的数据,写线程已经修改但未提交)等问题,基于类似的可能会发生对同一个资源进行读写操作的场景,我们需要实现读写锁来保证数据准确,读写锁是指读线程之间不存在互斥,读写及写写之间存在互斥我们可以用一个表格来列举两个线程并发时的所有情况:
线程A | 线程B | 通过性 |
读 | 读 | √ |
读 | 写 | × |
写 | 读 | × |
写 | 写 | × |
java.util.concurrent.locks包下的一个实现类ReentrantReadWriteLock可以解决单个应用中的读写锁场景,ReentrantReadWriteLock是一个可重入锁,重入的意思是具有相同性质的线程在同一时刻可以获取到该锁,ReentrantReadWriteLock的使用为读线程在进行业务操作之前调用readLock().lock()方法进行读锁定,业务操作完毕之后调用readLock().unlock()方法进行解锁,写线程进行写业务操作之前调用rwriteLock().lock()方法进行资源锁定,在写业务操作完成后调用writeLock().unlock()进行写锁解锁。
ReentrantReadWriteLock只能满足单应用的读写锁需求,对于分布式的多个应用则无能为力,例如我们要操作Hadoop(分布式文件系统)上的一份数据,我们在不同的服务器都安装了应用程序(作为负载均衡),不同服务器应用程序可能会并发(读写并发)请求该数据,这个时候我们就要单独写一个专门管理读写锁获取及解锁的应用,部署在中心服务器,所有读写操作服务器上应用程序在进行业务操作之前都要先访问该读写锁管理应用,要实现一个可靠的读写锁管理应用要解决的难题很多,所幸的是Zookeeper已经为我们提供了类似的服务,我们只要稍加利用便可以实现我们想要的效果。
Zookeeper是一个为分布式应用提供一致性服务的应用,可以把Zookeeper看作一个小型文件系统有目录(有子文件或目录)有文件(文件可以记录数据也可以作为目录),因此我们把目录和文件都称为节点,节点有持久性的和临时性(客户端断开后会被Zookeeper删除)两类,而其中又分为顺序型(创建该类型节点的时候Zookeeper会自动添加一个父目录下自增序列,这样我们可以避免节点名称的重复)和非顺序型。
有了这些基础之后我们就可以创造自己的分布式读写锁了,代码入下:
package com.shenyuchong.zkclient; import com.gbd.database.Cache; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import java.util.List; import java.util.Random; public class ZkClient { private static Logger log = Logger.getLogger(ZkClient.class); private final static String LOCK_HOME="/shenyuchong";//分布式锁的工作目录 private static Object lock = new Object(); private static ZooKeeper zooKeeper = null; public static void init() throws Exception { /** * 一开始就与Zookeeper服务器连接上 */ zooKeeper = new ZooKeeper(Cache.ZOOKEEPER_CONNECT_STRING, 5000, null); if(zooKeeper!=null){ if(zooKeeper.exists(LOCK_HOME, false)==null){ zooKeeper.create(LOCK_HOME,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } } public static Sequent readLock(String path){ path=LOCK_HOME+(path.startsWith("/")?path:"/"+path);//传入目录不是/开头的添加一个斜杠 String seq = null; try { createPath(zooKeeper,path);//防止目录不存在先创建(同步) boolean hasWrite=true;//是否存在写锁 while(hasWrite){ hasWrite=false; /** * 在path目录下先创建一个读r_开头的顺序型临时节点,然后再判断该目录下的孩子中是否有写节点(w_开头的节点) */ seq = zooKeeper.create(path + "/r_", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> children = zooKeeper.getChildren(path, false); for(String c:children){ if(c.startsWith("w_")) { hasWrite=true; break; } } if(hasWrite){ /** * 当发现有写线程,则删除刚才创建的读节点,并阻塞当前线程,等待通知或者等待一个随机时间 */ zooKeeper.delete(seq,-1); synchronized (lock) { /** * 随机时间不能小于3秒,因为我们认为写任务一搬没这么快完成 */ Random rand =new Random(); int i = rand.nextInt(10000); if(i<3000) i=3000; lock.wait(i); } } } } catch (Exception e) { e.printStackTrace(); } return new Sequent(seq); } public static Sequent writeLock(String path){ path=LOCK_HOME+(path.startsWith("/")?path:"/"+path); String seq = null; try { createPath(zooKeeper,path); boolean hasRead=true; while(hasRead){ /** * 在path目录下先创建一个写w_开头的顺序型临时节点,然后再判断该目录下的孩子中是否有节点(不管是写线程还是读线程都应该等待) */ seq = zooKeeper.create(path + "/w_", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> children = zooKeeper.getChildren(path, false); if(children!=null&&children.size()>1){ /** * 当发现有写线程,则删除刚才创建的读节点,并阻塞当前线程,等待通知或者等待一个随机时间 */ zooKeeper.delete(seq,-1); hasRead=true; synchronized (lock) { Random rand =new Random(); /** * 随机时间不能小于1秒,因为我们认为读线程应该进可能的响应 */ int i = rand.nextInt(10000); if(i<1000) i=1000; lock.wait(i); } }else{ hasRead=false; } } } catch (Exception e) { e.printStackTrace(); } return new Sequent(seq); } public static void unLock(Sequent sequent){ if(sequent==null||sequent.getName()==null||"".equals(sequent.getName())) return; try { /** * 删除又线程创建的节点就代表解锁了,最后需要通知所有线程,分布式的程序通知不了,所以需要在wait()中传入等待的时间参数 */ zooKeeper.delete(sequent.getName(),-1); } catch (Exception e) { e.printStackTrace(); }finally { synchronized (lock){ lock.notifyAll();//通知的话可以减少等待时间 } } } public static void createPath(ZooKeeper zooKeeper,String path){ try { if(zooKeeper.exists(path, false)==null) zooKeeper.create(path,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//创建一个持久化节点 }catch (Exception e){ e.printStackTrace(); } }
//节点名称(用于给解锁用,单一个字符串约束性低) public static class Sequent{ private String name; public Sequent(String name) { this.name = name; } public String getName() { return name; } } }
标签:sts 因此 名称 nal 字符 public 场景 log4j code
原文地址:https://www.cnblogs.com/shenyuchong/p/13344242.html