标签:任务 new 比较 segment exce 不同 null public join
HashMap 里面是一个数组,然后数组中每个元素是一个单向链表。
ConcurrentHashMap 是一个 Segment 数组(默认16个),Segment 通过继承 ReentrantLock 来进行加锁,
所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。
Segment 内部是由 数组+链表 组成的。
Segment 通过继承 ReentrantLock 来进行加锁。
在往某个 segment 中 put 的时候,首先会调用:
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value)
也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,
如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁。
用一次 CAS 操作,如果 CAS 失败,那就是有并发操作(synchronized)。
LockSupport 是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。
可以做到与 join() 、wait() 功能一样,使线程自由的阻塞、释放。
Java 锁和同步器框架的核心 AQS(AbstractQueuedSynchronizer抽象队列同步器),
就是通过调用 LockSupport.park() 和 LockSupport.unpark() 实现线程的阻塞和唤醒的。
LockSupport 方法底层都是调用Unsafe的方法实现。全名sun.misc.Unsafe,该类可以直接操控内存。
LockSupport 提供 park() 和 unpark() 方法实现阻塞线程和解除线程阻塞。
LockSupport() 操作的是线程对象,直接传入的就是 Thread,
而 wait() 属于具体对象 ( synchronized(t)这里的锁定了t,那么wait需用t.wait():释放掉t )
wait/notify 需要获取对象的监视器,即synchronized修饰,
而park/unpark 不需要获取对象的监视器。
Fork/Join框架和执行器框架(Executor Framework)主要的区别在于:
工作窃取算法(Work-Stealing Algorithm)。
与执行器框架不同,使用Join操作让一个主任务等待它所创建的子任务的完成,执行这个任务的线程称之为工作者线程(Worker Thread)。
工作者线程寻找其他仍未被执行的任务,然后开始执行。
通过这种方式,提升应用程序的性能。
为了达到这个目标,通过Fork/Join框架执行的任务有以下限制:
任务只能使用 fork()和join() 操作当作同步机制。
如果使用其他的同步机制,工作者线程就不能执行其他任务。
任务不能执行I/O操作,比如文件数据的读取与写入。
任务不能抛出非运行时异常(Checked Exception),必须在代码中处理掉这些异常。
Fork/Join框架的核心是由下列两个类组成的:
ForkJoinPool(执行Task):
这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm)。
它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。
ForkJoinTask(执行具体的分支逻辑):
这个类是一个将在ForkJoinPool中执行的任务的基类。
Fork/Join 框架提供了在一个任务里执行 fork()和join() 操作的机制和控制任务状态的方法。
通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类:
RecursiveAction : 用于任务没有返回结果的场景。
RecursiveTask : 用于任务有返回结果的场景。
ForkJoinPool 使用 submit 或 invoke 提交的区别:
invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码。
submit 是异步执行,只有在 Future 调用 get 的时候会阻塞。
执行子任务调用 fork 方法并不是最佳的选择,最佳的选择是 invokeAll 方法。
leftTask.fork();
rightTask.fork();
替换为
invokeAll(leftTask, rightTask);
fork方法相当于A先分工给B,然后A当监工不干活,B去完成A交代的任务。所以上面的模式相当于浪费了一个线程。
那么如果使用invokeAll相当于A分工给B后,A和B都去完成工作。这样可以更好的利用线程池,缩短执行的时间。
基本思想:
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),
里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务 ( 通常是因为调用了 fork() )时,会放入工作队列的队尾,
并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务
(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),
窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
if ((end - start) <= threshold) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
//int leftResult = leftTask.join();
//int rightResult = rightTask.join();
int leftResult = leftTask.invoke();
int rightResult = rightTask.invoke();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
long l = System.currentTimeMillis();
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 1000000);
//执行一个任务(时间600)
Future<Integer> result = forkjoinPool.submit(task);
//时间20
int result1 = 0;
for (int i = 1; i <= 1000000; i++) {
result1 += i;
}
try {
System.out.println(result.get());
System.out.println(result1);
long l1 = System.currentTimeMillis();
System.out.println("time=" + (l1 - l));
} catch (Exception e) {
System.out.println(e);
}
}
}
标签:任务 new 比较 segment exce 不同 null public join
原文地址:https://www.cnblogs.com/loveer/p/11831175.html