标签:阻塞队列 concurrenthashmap 非阻塞队列
ConcurrentHashMap的原理
将数据一段一段的存储然后给每一段数据分配一把锁,当线程访问数据的一段时,为每段分配一把锁,同时其他段的数据可以被其他线程数据访问
2)concurrentHashMap 的结构
concurrentHashMap 由segament数组和hashentry数组结构组成,segament是一种可靠的重入锁,在里面扮演锁的角色,hashentry用于存储键值对数据,一个segament里面包含一个hashentry数组,每个hashentry是一个链表结构的元素,每个segament守护着一个hashentry数组里的元素,当hashentry数组的数据进行修改时,必须首先获得与它对应的锁
初始化segament数组
if(concurrencyLevel>max_segament){
concurrencyLevel=max_segament;
}
int sshift=0;
int sszie=1;
while(ssize<concurrentcyLevel){
++sshift;
ssize<<=1;
}
segamentshift=32-sshift;
segamentMask=ssize-1;
this.segaments=Segament.newArray(ssize);
ssize 是2的n次方最接近concurrencylevel的数字。
segamentshift为段偏移量
segamentmark为段掩码
2定位segament:在插入或者获取元素的时候,必须通过散列算法对hashcode进行二次散列,之所以二次散列目的是减少散列冲突,使元素能够均匀的分布在不同的segament上,最坏情况,所有的值都散列在一个segament上
计算segament位置的伪代码如下:
final Segament<K,V> segamentfor(int hash){
return segaments[hash>>>segamentshift&segamentMask]
}
ConcurrentHashMap 的操作:get put size
get伪代码如下:
public v get(Object hashcode)){
//此函数的作用是对hashcode进行二次散列,因为对效率有要求,故采用位移算法的方式
int hash = hash(hashcode);
return segamentfor(hash).get(key,hash);
}
get操作的高效之处在于在读取时不用加锁,除非是读到空值,重新加锁,那么如何做到不加锁?主要的方法在于使用volatitle变量,使之在线程间可见。能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写,根据happen-before原则对volatitle的写优先于读
定位hashentry和segament的算法虽然一样,但是值不一样
两个方法如下
hash>>>segamentshift&segamentMarsk 定位segament所使用的hash算法
int index=hash&(tab.length-1) //定位hashentry的算法
目的是在segament和hashentry中同时散列开
put操作:
put需要对共享变量进行写入,因此为了线程安全需要加锁
步骤1)判断是否需要扩容
注意:hashmap线插入后扩容,有可能产生无效扩容
concurrenthashmap 先扩容在插入。
扩容方式:将hashentry数组扩容至原来两倍,重新散列后插入,同时只会对segament进行扩容,不会全部扩容。
size操作
统计size的安全方式:在put clean remove方法时锁住变量,但这种效率非常低下
concurrenthashmap采用的方式是实用modcount变量 每进行一次 put clean remove操作时 将变量 +1 然后在统计size前后进行比较时否相等,从而得知容器时否发生变化
concurrentlinkedqueue非阻塞队列
队列有两种实现方式阻塞队列和非阻塞队列,阻塞队列是采用阻塞算法,即使用锁来实现,非阻塞队列是使用cas循环来实现
concurrentlinkedqueue是一种无界队列采用了先进先出的方式进行了排序 ,采用了 wait -free算法
最简单的算法
offer(入队列)算法:
public boolean offer(E e){
Node node = new Node(e);
while(true){
Node tail=gettail();
if(tail.getnext().casNext(null,node)&&node.casTail(tail,node)){
return true;
}
}
}
//出队列 个人简化的伪代码,可能不准确欢迎指正
public e poll(){
while(true){
Node head = getHead();
currentNode = head.getNode();
//将head节点替换为下一个节点
if(head.cas(currentNode,currentNode.getNext())){
//断绝引用
currentNode.setNext()=null;
return current;
}
}
注意next 节点可以使用atomicReference 实现。
这样就可以实现cas操作
阻塞队列
阻塞队列是一个支持两个附加操作的队列,这两个操作支持阻塞的插入和移除
1)阻塞插入:当队列满时,队列阻塞所有的插入操作,直到队列不满
2)阻塞移除:当队列为空时,队列阻塞所有的移除操作,直到队列不空
阻塞队列常用语生产者和消费者场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程,阻塞队列是存放元素的容器
队列阻塞的四种处理方式
1)抛出异常
2)返回特殊值
3)一直阻塞
4)超时退出
java中常见的阻塞队列
1)ArrayBlockingQueue:是用数组实现的有界阻塞队列,此队列按照先进先出的原理进行排序
2)linkedBlockingQueue:是用链表实现的有界阻塞队列此队列的最大长度为Integer.MAX_VALUE 值
3)PriorityBlockingQueue:通过compartor方法或者compareTo方法进行排序
4)DelayQueue:是一个支持延迟获取元素的无界阻塞队列,使用pritorityQueue队列,实现Delay接口
delayqueue非常有用
1)缓存设计:可以使用delayqueue保存缓存元素的有效期,使用一个线程循环查询delayqueue,当查到元素时,表示该缓存到期了
2)定时任务调度:使用delayqueue保存当天将会执行的任务和执行时间,当从delayqueue查询到了任务时就开始执行。
如何实现delay接口
1)在创建对象时初始化
2)实现getdelay方法,返回执行还需要的时间
3)实现compareTo方法,将返回时间最长的放在最后面
4)linkedblockingdqeque:由链表组成的双向阻塞队列;可以用在工作-窃取模式中
阻塞队列实现原理:
1)使用通知模式实现,主要通过condtion方式实现队列阻塞
伪代码如下:
Lock lock = new RetreenLock();
private final Condition notempty =lock.newCondition();//表示可以获取
private final Condition notFull = lock.newConditon();//表示可以插入
//插入方法
public void put(E e){
final RetreenLock lock =this.lock;
lock.lockinterrupt();
try{
while(e.length=max_count){
notfull.await();
}
insert(e);
}
}finally{
lock.unlock();
}
private void insert(E e){
items[putIndex]=e;
putIndex=inc(e);
++count;
notEmpty.signal();//表示通知take线程可以获取
}
//获取方法
public E take(){
final Reentrantlock lock = this.lock;
lock.interruptibly();
try{
while(item.length==0){
//表示队列为空,线程阻塞
notempty.await();
}
return extract();
}finally{
lock.unlock();
}
}
fork/join框架
fork/join框架是jdk 1.7推出的新特性,用于一个并行执行的任务框架
tips:并行与并发
并发的实质是一个物理CPU(也可以多个物理CPU) 在若干道程序之间多路复用,并发性是对有限物理资源强制行使多用户共享以提高效率。
并行性指两个或两个以上事件或活动在同一时刻发生。在多道程序环境下,并行性使多个程序同一时刻可在不同CPU上同时执行。
原理:fork是把一个大任务拆分成若干个小任务,join就是合并这些小任务得到的结果。最后得到这个大任务的结果.
工作窃取算法(working-stealing)
是某个线程从其他队列里获取任务用来执行,应用场景:如某个大任务A拆分成多个互不依赖的子任务放入不同的队列中,当某些队列中的任务执行完毕,另外一些队列中的任务还未执行完,于是执行完队列的线程就会去执行其他还有任务的队列的任务,(比如 线程x 发现队列a中的任务没有了,那么就去执行队列b中的任务)
这样做的好处是;充分利用线程间并行计算,减少线程间竞争
这样做的坏处是:某些情况下存在竞争。如任务队列中只有一个任务。并且会消耗更多的系统资源,比如创建队列,创建线程池
注意点:为了减少竞争,通常采用双端队列,窃取方式为从队尾获取。正常线程从头部获取
fork/join 主要有两个工作步骤
1)分割任务
2)执行任务并合并结果
RecursiveAction 用于执行没有返回结果的任务
RecursiveTask 用于执行有返回结果的任务
forkjointask 需用由forkjoinpool来执行
使用方法:
首先继承RecursiveTask 通过重写compute方法来拆分任务并且fork执行,join合并
//main方法使用
public static void main(String[] args){
//创建线程池
ForkJoinPool pool = new ForkJoinPool();
//创建主任务
CountTask task = new CountTask();
//执行任务
Future<Integer> result = pool.submit(task);
system.out.println(result.get());
}
异常处理代码
//时否是异常处理完成
if(task.isCompletedAbnormally()){
Sysmte.out.println(task.getException());
}
FORK/JOIN框架的实现原理
1)fork的实现原理
调用puttask放入任务,采用异步调用
public final forkJoinTask<V> fork(){
((ForkJoinWorkThread)Thread.currentThread).putTask(this);
}
2)putTask把当前任务放入任务数组中,然后调用ForkJoinPool的signalwork来唤醒或者创建一个新的线程
伪代码如下
public final void pushTask(ForkJoinTask<> t){
ForkJoinTask <> [] q;int s,m;
if(q=queue)!=null){
//计算出偏移量
long u =( (s=queueTop)&(m=queueTop.length-1))<<ASHIFT+ABASE;
//根据偏移量直接刷新主内存
unsafe.putOrderObject(q,u,t);
queueTop=s+1;
if(s-=queueBase<=-2){
//唤醒工作线程
signalwork();
}else{
//创建新线程
growQueue();
}
}
}
1)join方法原理,首先查看任务时否执行完成,如果完成,则直接返回完成状态
2)如果没有完成,则从数组里取出任务并行执行,如果正常,则返回normal如果异常则返回exception
如果正常则返回结果
本文出自 “iter工作总结” 博客,请务必保留此出处http://5855156.blog.51cto.com/5845156/1959758
java并发编程的艺术,读书笔记第六章 concurrentHashMap以及并发容器的介绍
标签:阻塞队列 concurrenthashmap 非阻塞队列
原文地址:http://5855156.blog.51cto.com/5845156/1959758