码迷,mamicode.com
首页 > 其他好文 > 详细

12 JUC的ReentrantReadWriteLock的使用以及简单的应用

时间:2021-04-10 13:10:54      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:bec   严格   gets   interface   localhost   upd   执行   over   官方   

1 读写锁介绍以及简单的使用

1-1 概述

读写锁应用场景:当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。

  • 类似于数据库中的共享锁: select ... from ... lock in share mode
  • 读写锁中要求读读操作是并发的,读和写操作是互斥的

JUC提供了两类读写锁:分别是ReentrantReadWriteLock和StampedLock

mysql有哪些锁

1-2 ReentrantReadWriteLock的简单测试

实例

package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j(topic = "c.DataContainer")
class DataContainer{
    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
    public Object read(){    // 读锁
        r.lock();
        log.warn("获取读锁");
        try{
            log.warn("读取");
            Thread.sleep(1000);
            return data;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            log.warn("释放读锁...");
            r.unlock();
        }
        return data;
    }

    public void write(){
        log.warn("获取写锁");
        w.lock();
        try{
            log.debug("写入");
        }finally {
            log.debug("释放写锁");
            w.unlock();
        }
    }
}
读取操作共同进行测试
public class test17 {
    public static void main(String[] args) {
        DataContainer dataContainer = new DataContainer();
        new Thread(()->{
            dataContainer.read();
        },"t1").start();

        new Thread(()->{
            dataContainer.read();
        },"t2").start();
    }
}

执行结果

  • 通过左边的时间戳可以看到两个线程并行读取
08:55:41.208 [t1] WARN c.DataContainer - 获取读锁
08:55:41.208 [t2] WARN c.DataContainer - 获取读锁
08:55:41.212 [t2] WARN c.DataContainer - 读取
08:55:41.212 [t1] WARN c.DataContainer - 读取
08:55:42.214 [t2] WARN c.DataContainer - 释放读锁...
08:55:42.214 [t1] WARN c.DataContainer - 释放读锁...
读写操作共同进行测试
public class test17 {
    public static void main(String[] args) throws InterruptedException {
        DataContainer dataContainer = new DataContainer();
        new Thread(()->{
            dataContainer.read();
        },"t1").start();
        Thread.sleep(10);
        new Thread(()->{
            dataContainer.write();
        },"t2").start();
    }
}

执行结果

  • 可以看到读锁的存在阻塞的写入操作的进行。写必须在读完成之后才能进行。,
09:02:56.738 [t1] WARN c.DataContainer - 获取读锁
09:02:56.743 [t1] WARN c.DataContainer - 读取
09:02:57.743 [t1] WARN c.DataContainer - 释放读锁...
09:02:57.743 [t2] WARN c.DataContainer - 获取写锁
09:02:57.743 [t2] DEBUG c.DataContainer - 写入
09:02:57.743 [t2] DEBUG c.DataContainer - 释放写锁
写写操作共同进行测试
  • 写锁之间也是互斥的。
09:05:10.378 [t1] WARN c.DataContainer - 获取写锁
09:05:10.381 [t1] DEBUG c.DataContainer - 写入
09:05:10.382 [t1] DEBUG c.DataContainer - 释放写锁
09:05:10.387 [t2] WARN c.DataContainer - 获取写锁
09:05:10.388 [t2] DEBUG c.DataContainer - 写入
09:05:10.388 [t2] DEBUG c.DataContainer - 释放写锁

1-3 读写锁使用总结

  • 读锁会排斥其他线程加写锁,但不会排斥其他线程加读锁
  • 写锁会排斥读锁以及其他写锁。

1-4 ReentrantReadWriteLock的使用注意点

要点:

1)读锁不支持条件变量
2)不支持读锁到写锁的升级:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待 (正确的做法应该参数实例的step2)

  • 持有写锁的情况下获取读锁是可以的(实例的step4)

官方实例对于缓存数据的读写实例


 class CachedData {
   Object data;
   /*cacheValid:
       true:  缓存数据有效,无需重复计算
       false: 缓存数据失效,需要获取写锁修改
   
   */
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
   void processCachedData() {
     // step1:获取读锁去读取缓存数据
     rwl.readLock().lock();
    
     if (!cacheValid) {
       // step2:返现缓存数据失效,因此释放读锁后去获取写锁修改缓存数据。
       // Must release read lock before acquiring write lock
       rwl.readLock().unlock();
       rwl.writeLock().lock();
       try {
         // Recheck state because another thread might have
         // acquired write lock and changed state before we did.
         /* step3:获取写锁后,再次检查数据是否失效,依旧失效才进行写操作,
            避免步骤2与步骤3期间已经有线程修改了缓存数据*/
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
         /*step4:在释放写锁之前可以先获取读锁*/
         // Downgrade by acquiring read lock before releasing write lock
         rwl.readLock().lock();
       } finally {
         rwl.writeLock().unlock(); // Unlock write, still hold read
       }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

2 ReentrantReadWriteLock的应用:读写锁实现一致性缓存 (适合读多写少的缓存的维护)

2-1 环境准备

创建数据库中的表

CREATE TABLE `emp` (
`empno` int(11) NOT NULL,
`ename` varchar(50),
`job` varchar(50),
`sal` decimal(65,30) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into `emp` (`empno`, `sal`) values(7369,800);
insert into `emp` (`empno`, `sal`) values(7351,200);

实体类

package chapter8.application;
import java.math.BigDecimal;
class Emp {
    private int empno;
    private String ename;
    private String job;
    private BigDecimal sal;

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public BigDecimal getSal() {
        return sal;
    }

    public void setSal(BigDecimal sal) {
        this.sal = sal;
    }

    @Override
    public String toString() {
        return "Emp{" +
                "empno=" + empno +
                ", ename=‘" + ename + ‘\‘‘ +
                ", job=‘" + job + ‘\‘‘ +
                ", sal=" + sal +
                ‘}‘;
    }
}

GenericDao:采用JDBC的数据库访问的类(data access object)

import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.*;

public class GenericDao {
    static String URL = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8";
    static String USERNAME = "root";
    static String PASSWORD = "123456";

    public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
        return queryList(sql, mapper, args);
    }

    public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
        return queryOne(sql, mapper, args);
    }

    private <T> List<T> queryList(String sql, RowMapper<T> mapper, Object... args) {
        try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
            try (PreparedStatement psmt = conn.prepareStatement(sql)) {
                if (args != null) {
                    for (int i = 0; i < args.length; i++) {
                        psmt.setObject(i + 1, args[i]);
                    }
                }
                List<T> list = new ArrayList<>();
                try (ResultSet rs = psmt.executeQuery()) {
                    while (rs.next()) {
                        T obj = mapper.map(rs);
                        list.add(obj);
                    }
                }
                return list;
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    private <T> T queryOne(String sql, RowMapper<T> mapper, Object... args) {
        List<T> list = queryList(sql, mapper, args);
        return list.size() == 0 ? null : list.get(0);
    }

    public int update(String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
            try (PreparedStatement psmt = conn.prepareStatement(sql)) {
                if (args != null) {
                    for (int i = 0; i < args.length; i++) {
                        psmt.setObject(i + 1, args[i]);
                    }
                }
                return psmt.executeUpdate();
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    interface RowMapper<T> {
        T map(ResultSet rs);
    }

    static class BeanRowMapper<T> implements RowMapper<T> {

        private Class<T> beanClass;
        private Map<String, PropertyDescriptor> propertyMap = new HashMap<>();

        public BeanRowMapper(Class<T> beanClass) {
            this.beanClass = beanClass;
            try {
                BeanInfo beanInfo = Introspector.getBeanInfo(beanClass);
                PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
                for (PropertyDescriptor pd : propertyDescriptors) {
                    propertyMap.put(pd.getName().toLowerCase(), pd);
                }
            } catch (IntrospectionException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public T map(ResultSet rs) {
            try {
                ResultSetMetaData metaData = rs.getMetaData();
                int columnCount = metaData.getColumnCount();
                T t = beanClass.newInstance();
                for (int i = 1; i <= columnCount; i++) {
                    String columnLabel = metaData.getColumnLabel(i);
                    PropertyDescriptor pd = propertyMap.get(columnLabel.toLowerCase());
                    if (pd != null) {
                        pd.getWriteMethod().invoke(t, rs.getObject(i));
                    }
                }
                return t;
            } catch (SQLException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

测试代码

package chapter8.application;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestGenericDao {
    public static void main(String[] args) throws SQLException {
        GenericDao dao = new GenericDao();
        System.out.println("============> 查询");
        String sql = "select * from emp where empno = ?";
        int empno = 7369;
        Emp emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);

        System.out.println("============> 更新");
        dao.update("update emp set sal = ? where empno = ?", 1000, empno);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
    }
}
执行结果
============> 查询
sql: [select * from emp where empno = ?] params:[7369]
Emp{empno=7369, ename=‘null‘, job=‘null‘, sal=800.000000000000000000000000000000}
sql: [select * from emp where empno = ?] params:[7369]
Emp{empno=7369, ename=‘null‘, job=‘null‘, sal=800.000000000000000000000000000000}
sql: [select * from emp where empno = ?] params:[7369]
Emp{empno=7369, ename=‘null‘, job=‘null‘, sal=800.000000000000000000000000000000}
============> 更新
sql: [update emp set sal = ? where empno = ?] params:[1000, 7369]
sql: [select * from emp where empno = ?] params:[7369]
Emp{empno=7369, ename=‘null‘, job=‘null‘, sal=1000.000000000000000000000000000000}
存在的问题

对于相同的查询操作,代码都是直接去数据库中查询,没能够建立缓存提高查询的效率

2-2 初步改进(实现带有缓存机制的GenericDao1)

class  GenericDaoCached1 extends GenericDao{

    private GenericDao dao = new GenericDao();
    private Map<SqlPair,Object> map = new HashMap<>();

    @Override
    public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
        return super.queryList(beanClass, sql, args);
    }

    @Override
    public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
        /*先查缓存,缓存没有再到数据库查询*/
        SqlPair key = new SqlPair(sql,args);
        T value = (T) map.get(key);
        if(value != null){
            return value;
        }
        value = dao.queryOne(beanClass,sql,args);
        map.put(key,value);
        return value;
    }

    @Override
    public int update(String sql, Object... args) {
        map.clear();
        int update = dao.update(sql,args);
        return update;
    }

    /*查询语句以及参数建立基于hash表的缓存*/
    class SqlPair {
        private String sql;
        private Object[] args;

        public SqlPair(String sql, Object[] args) {
            this.sql = sql;
            this.args = args;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return Objects.equals(sql, sqlPair.sql) &&
                    Arrays.equals(args, sqlPair.args);
        }

        @Override
        public int hashCode() {
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(args);
            return result;
        }
    }
}
存在的问题

问题1:HashMap不是线程安全的类。

问题2: queryOne方法在多线程环境下,依旧会发生首次情况下的多次查询操作。

问题3:update方法在多线程环境下,清空缓存与更新数据库操作的次序会导致缓存与数据库不一致的问题

缓存的更新策略:先清 除缓存带来的问题?

技术图片

  • 多线程环境下,清空缓存与更新数据库这二个步骤之间会有其他线程将旧的值再次放入缓存中(相当于之前的清空缓存白做了),造成后续查询的数据都不是最新的数据。

技术图片

上图中的操作并不能保证更新时数据库中更新的数据与缓存中的数据的严格一致性

  • 解决策略:加锁

2-3 采用读写锁实现带有缓存机制的Dao(实现带有缓存机制的GenericDao2)

下面代码中解决了3-2中出现的三个问题

问题1:HashMap不是线程安全的类。

利用reentrantlock的读写锁保证HashMap读写操作的安全性。

问题2: queryOne方法在多线程环境下,依旧会发生首次情况下的多次查询操作。

   首先采用写锁保证唯一线程去进行数据查询以及hash表的更新,然后采用double check机制去避免其他竞争锁的线程去重复进行数据查询和hash表的修改。

问题3:update方法在多线程环境下,清空缓存与更新数据库操作的次序会导致缓存与数据库不一致的问题

   采用写锁保证了缓存清空以及数据库更新的原子性,由于写锁和读锁的互斥性从而避免了缓存清空与数据库更新之间的那段时间数据不一致问题的发生。

实现

class GenericDaoCached2 extends GenericDao {
    private GenericDao dao = new GenericDao();
    private Map<SqlPair, Object> map = new HashMap<>();    // 问题1
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();

    @Override
    public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
        return dao.queryList(beanClass, sql, args);
    }

    @Override
    public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
        // 先从缓存中找,找到直接返回
        SqlPair key = new SqlPair(sql, args);
        /*对map的读取操作添加读锁,从缓存中读取数据是可以并发执行的*/
        rw.readLock().lock();
        try {
            T value = (T) map.get(key);
            if(value != null) {
                return value;
            }
        } finally {
            rw.readLock().unlock();
        }
        /*我们希望缓存中确实没有数据,多个线程同时访问数据库,一个线程查询数据放入缓存,其他线程从缓存中
          获取数据。
          这里通过写锁保证数据库查询并写入缓存的操作原子性,获得写锁的线程进行修改,其他线程则被阻塞。
          通过double check去保证其他线程不会重复查询放入缓存。
        */
        rw.writeLock().lock();     // 注意:获取写锁前一定要释放读锁
        try {
            // 多个线程
            T value = (T) map.get(key);
            // 问题2:double check,避免其他线程在锁释放后进入代码块,重复查询与更新缓存!!!
            if(value == null) {     
                // 缓存中没有,查询数据库
                value = dao.queryOne(beanClass, sql, args);
                map.put(key, value);
            }
            return value;
        } finally {
            rw.writeLock().unlock();
        }
    }
    /*写锁改进:使用写锁,由于写锁与读锁以及写锁互斥,此时map以及dao都由当前线程独自使用,
      保证了数据的一致性,其他读写线程都被阻塞*/
    // 问题3
    @Override
    public int update(String sql, Object... args) {
        rw.writeLock().lock();
        try {
            int update = dao.update(sql, args);
            map.clear();
            return update;
        } finally {
            rw.writeLock().unlock();
        }
    }

    class SqlPair {
        private String sql;
        private Object[] args;

        public SqlPair(String sql, Object[] args) {
            this.sql = sql;
            this.args = args;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return Objects.equals(sql, sqlPair.sql) &&
                    Arrays.equals(args, sqlPair.args);
        }

        @Override
        public int hashCode() {
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(args);
            return result;
        }
    }
}

2-4 实际缓存实现需要考虑的其他因素

以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑

  • 适合读多写少,如果写操作比较频繁,以上实现性能低
  • 没有考虑缓存容量(大量数据会造成内存溢出)
  • 没有考虑缓存过期(长时间不使用的数据应该进行删除,LRU,LFU)
  • 只适合单机(上面是Java内存的缓存实现)
  • 并发性还是低,目前只会用一把锁(提高并发度可以将锁划分再细一点,比如每个数据表一把锁)
  • 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)

也可以乐观锁实现,用 CAS 去更新

参考资料

多线程基础课程

12 JUC的ReentrantReadWriteLock的使用以及简单的应用

标签:bec   严格   gets   interface   localhost   upd   执行   over   官方   

原文地址:https://www.cnblogs.com/kfcuj/p/14638679.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!