标签:@param 系统 lan multi 定时 manager 无法 影响 string
JUC:java.util.concurrent
模板
步骤
synchronized
class X {
public synchronized void m() {
//todo
}
}
/**
* 线程 操作 资源类
*/
public class SaleTicket {
public static void main(String[] args) {
//资源
Ticket ticket = new Ticket();
//线程
new Thread(() -> { for (int i = 0; i < 100; i++) ticket.sellTicket(); }, "A").start();
new Thread(() -> { for (int i = 0; i < 100; i++) ticket.sellTicket(); }, "B").start();
new Thread(() -> { for (int i = 0; i < 100; i++) ticket.sellTicket(); }, "C").start();
}
}
//资源类
class Ticket {
private Integer number = 100;
//操作:对外暴露操作资源的方法
public synchronized void sellTicket() {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "\t 卖出第" + number-- + "张票,还剩"
+ number + "张");
}
}
}
java.util.concurrent.locks.Lock
Lock
implementations provide more extensive locking operations than can be obtained usingsynchronized
methods and statements. They allow more flexible structuring, may have quite different properties, and may support multiple associatedCondition
objects.锁实现提供了比使用同步方法和同步代码块更广泛的锁操作。它们允许更灵活的结构,可能具有完全不同的属性,并且可能支持多个关联的条件对象。
可重入锁:某个线程已经获得某个锁,可以再次获取锁而不会出现死锁,获取和释放的次数需要一致
Lock接口的实现类,java.util.concurrent.locks.ReentrantLock
使用
class X {
private final Lock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
构造方法
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();//非公平锁
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {//FairSync 公平锁
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁:十分公平,按先来后到顺序
非公平锁:十分不公平,可以插队(默认,如线程A在前,需要1小时,线程B在后,需要3秒,非公平锁允许线程B插队,先完成,而不需要等待线程A完成后再执行)
线程间通信
多线程编程模板
A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops, like this one
synchronized (obj) { while (<condition does not hold>) obj.wait(timeout); ... // Perform action appropriate to condition }
线程也可以在没有通知、中断或超时的情况下被唤醒,这就是所谓的假唤醒。虽然这种情况在实践中很少发生,但应用程序必须通过测试导致线程被唤醒的条件来防止这种情况发生,如果条件不满足,则继续等待。换句话说,等待应该总是出现在循环中,就像这个循环一样
synchronized实现
package juc.demo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description:
* 现在两个线程,
* 可以操作初始值为零的一个变量,
* 实现一个线程对该变量加1,一个线程对该变量减1,
* 交替,来10轮。
* @Package: juc.demo
* @ClassName NotifyWaitDemo
* @author: wuwb
* @date: 2020/10/19 13:30
*/
public class NotifyWaitDemo {
public static void main(String[] args) {
int turn = 1000;
//资源类
ShareData data = new ShareData();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
//资源类
class ShareData{
private int number = 0;
public synchronized void increment() throws InterruptedException {
//判断 if换while
while (number != 0) {
this.wait();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + ":" + number);
//通知
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + ":" + number);
this.notifyAll();
}
}
Lock 及 Condition
package juc.demo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description:
* 现在两个线程,
* 可以操作初始值为零的一个变量,
* 实现一个线程对该变量加1,一个线程对该变量减1,
* 交替,来10轮。
* @Package: juc.demo
* @ClassName NotifyWaitDemo
* @author: wuwb
* @date: 2020/10/19 13:30
*/
public class NotifyWaitDemo {
public static void main(String[] args) {
int turn = 1000;
//资源类
ShareData data = new ShareData();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < turn; i++) {
try {
data.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
//资源类
class ShareData{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
//判断
while (number != 0) {
condition.await();//this.wait();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + ":" + number);
//通知
condition.signalAll();//this.notifyAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + ":" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description
* 多线程之间按顺序调用,实现A->B->C
* ......来10轮
*/
public class ThreadOrderAccess {
public static void main(String[] args) {
ShareResource resource = new ShareResource();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
resource.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
resource.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
resource.printC();
}
}, "C").start();
}
}
class ShareResource{
/**标志位*/
private int number = 1;
private Lock lock = new ReentrantLock();
/**3把钥匙*/
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
while (number != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"==>AAAAAAAAAA");
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (number != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"==>BBBBBBBBBB");
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (number != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"==>CCCCCCCCCC");
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class Lock_8_1 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
new Thread(() -> { phone.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.sendEmail(); }, "BB").start();
}
}
class Phone {
public synchronized void sendSMS() {
System.out.println("------sendSMS");
}
public synchronized void sendEmail() {
System.out.println("------sendEmail");
}
}/*sendSMS*/
public class Lock_8_2 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
new Thread(() -> { phone.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.sendEmail(); }, "BB").start();
}
}
class Phone {
public synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------sendSMS");
}
public synchronized void sendEmail() {
System.out.println("------sendEmail");
}
}/*sendSMS*/
public class Lock_8_3 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
new Thread(() -> { phone.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.getHello(); }, "BB").start();
}
}
class Phone {
public synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------sendSMS");
}
public synchronized void sendEmail() {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("------getHello");
}
}/*getHello*/
public class Lock_8_4 {
public static void main(String[] args) throws Exception {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> { phone1.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone2.sendEmail(); }, "BB").start();
}
}
class Phone {
public synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------sendSMS");
}
public synchronized void sendEmail() {
System.out.println("------sendEmail");
}
}/*sendEmail*/
public class Lock_8_5 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
new Thread(() -> { phone.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.sendEmail(); }, "BB").start();
}
}
class Phone {
public static synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------sendSMS");
}
public static synchronized void sendEmail() {
System.out.println("------sendEmail");
}
}/*sendSMS*/
public class Lock_8_6 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(() -> { phone.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone2.sendEmail(); }, "BB").start();
}
}
class Phone {
public static synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------sendSMS");
}
public static synchronized void sendEmail() {
System.out.println("------sendEmail");
}
}/*sendSMS*/
public class Lock_8_7 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
new Thread(() -> { phone.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.getHello(); }, "BB").start();
}
}
class Phone {
public static synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------sendSMS");
}
public synchronized void sendEmail() {
System.out.println("------sendEmail");
}
}/*sendEmail*/
public class Lock_8_8 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(() -> { phone.sendSMS(); }, "AA").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone2.sendEmail(); }, "BB").start();
}
}
class Phone {
public static synchronized void sendSMS() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------sendSMS");
}
public synchronized void sendEmail() {
System.out.println("------sendEmail");
}
}/*sendEmail*/
集合线程的不安全性,如多线程操作ArrayList时,ArrayList在迭代的时候如果同时对其进行修改就会抛出java.util.ConcurrentModificationException异常,并发修改异常
List<String> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
}, String.valueOf(i)).start();
}
解决方案
List<String> list = new Vector<>();
List<String> list = Collections.synchronizedList(new ArrayList<>());
CopyOnWrite理论
CopyOnWriteArrayList 源码
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
HashSet线程不安全,底层结构是HashMap,set添加时调用的是map的put方法,值作为key,而value为固定值
public HashSet() {
map = new HashMap<>();
}
private static final Object PRESENT = new Object();
public boolean add(E e) {
return map.put(e, PRESENT) == null;
}
Set<String> set = new HashSet<>();
//线程不安全
Set<String> set = Collections.synchronizedSet(new HashSet<>());
//线程安全
Set<String> set = new CopyOnWriteArraySet<>();
//线程安全
java.util.concurrent包下的函数式接口
与Runnable()对比
方法是否有返回值
是否抛出异常
落地方法不同,call() / run()
//创建新类MyThread实现runnable接口
class MyThread implements Runnable{
@Override
public void run() {
}
}
//新类MyThread2实现callable接口
class MyThread2 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
return 200;
}
}
thread构造方法中只能接受Runnable,寻找中间桥梁
![1603262924681](juc.assets/1603262924681.png)
![1603262989939](juc.assets/1603262989939.png)
A cancellable asynchronous computation. This class provides a base implementation of
Future
, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; theget
methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked usingrunAndReset()
).A
FutureTask
can be used to wrap aCallable
orRunnable
object. BecauseFutureTask
implementsRunnable
, aFutureTask
can be submitted to anExecutor
for execution.
通过FutureTask的有参构造方法将Callable传入
运行成功后,通过futureTask.get()获取返回值
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法,直到任务转入完成状态,然后会返回结果或者抛出异常。一旦计算完成,就不能再重新开始或取消计算。
只计算一次,get方法放到最后。
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
System.out.println(Thread.currentThread().getName()+"******");
return 1024;
});
FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread());
new Thread(futureTask,"A").start();// 结果会被缓存,效率高
new Thread(futureTask,"B").start();
new Thread(futureTask1,"C").start();
/*while(!futureTask.isDone()){
System.out.println("***wait");
}*/
System.out.println(futureTask.get());
System.out.println(futureTask1.get());
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+"******");
return 200;
}
}
/*
执行结果:
A******
C******
1024
200
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 离开");
count.countDown();
}, String.valueOf(i)).start();
}
count.await();//阻塞,直到count=0,main线程才继续向下执行
System.out.println("全部离开");
}
}
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
System.out.println("召唤神龙");
});
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
System.out.println("获得第"+Thread.currentThread().getName()+"颗龙珠!");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("线程"+Thread.currentThread().getName()+"====>"+
Thread.currentThread().getState());
}, String.valueOf(i)).start();
}
}
}
/*
获得第1颗龙珠!
获得第4颗龙珠!
获得第5颗龙珠!
获得第3颗龙珠!
获得第2颗龙珠!
获得第6颗龙珠!
获得第7颗龙珠!
召唤神龙
线程5====>RUNNABLE
线程3====>RUNNABLE
线程2====>RUNNABLE
线程1====>RUNNABLE
线程6====>RUNNABLE
线程4====>RUNNABLE
线程7====>RUNNABLE
*/
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t抢到车位!");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + "\t离开");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
/*
2 抢到车位!
1 抢到车位!
6 抢到车位!
1 离开
6 离开
4 抢到车位!
3 抢到车位!
3 离开
5 抢到车位!
2 离开
4 离开
5 离开
*/
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @Description 读写锁
* @Package: juc.demo
* @ClassName ReadWriteLockDemo
* @author: wuwb
* @date: 2020/10/21 17:42
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
CacheData cacheData = new CacheData();
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(() -> {
cacheData.put(num + "", num + "");
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(() -> {
cacheData.get(num + "");
}, String.valueOf(i)).start();
}
}
}
class CacheData{
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
lock.writeLock().lock();//写锁
try {
System.out.println(Thread.currentThread().getName()+"\t 开始写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"\t 写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
public void get(String key) {
lock.readLock().lock();//读锁
try {
System.out.println(Thread.currentThread().getName()+"\t 开始读取"+key);
Object result = map.get(key);
System.out.println(Thread.currentThread().getName()+"\t 读取完成"+result);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}
}
栈与队列
BlockingQueue阻塞队列
架构
BlockingQueue核心方法
添加、移除
方法类型 | 抛出异常 | 不抛异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查队首元素 | element() | peek() | - | - |
*检查队首元素,可以获取到队首元素,但不是移除
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @Description 阻塞队列
* @Package: juc.juc
* @ClassName BlockingQueueDemo
* @author: wuwb
* @date: 2020/12/17 10:41
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//testOne();
//testTwo();
//testThree();
testFour();
}
/**
* 超时等待
*/
private static void testFour() throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("a",3L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll(3L, TimeUnit.SECONDS));
}
/**
* 阻塞等待
* @throws InterruptedException
*/
private static void testThree() throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//blockingQueue.put("x");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
/**
* 不抛出异常
*/
private static void testTwo() {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
/*true/true/true/false/a/a/b/c/null*/
}
/**
* 抛出异常
*/
private static void testOne() {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 同步队列
* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
* put了一个元素,必须从里面先take取出来,否则不能在put进去值!
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AA").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BB").start();
}
}
线程复用、控制最大并发数、管理线程
线程池架构
? Executor,Executors,ExecutorService,ThreadPoolExecutor
Executors.newFixedThreadPool(int)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从以上源码可以看出,三大方法底层均是使用ThreadPoolExecutor()来创建线程池
代码
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程池
* Arrays
* Collections
* Executors
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
//List list = new ArrayList();
//List list = Arrays.asList("a","b");
//固定数的线程池,一池五线程
//一个银行网点,5个受理业务的窗口
//ExecutorService threadPool = Executors.newFixedThreadPool(5);
//一个银行网点,1个受理业务的窗口
//ExecutorService threadPool = Executors.newSingleThreadExecutor();
//一个银行网点,可扩展受理业务的窗口
ExecutorService threadPool = Executors.newCachedThreadPool();
//10个顾客请求
try {
for (int i = 1; i <=10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
工作中均不使用以上三大方法来创建线程池,而是直接使用ThreadPoolExecutor()来创建线程池
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
//new ThreadPoolExecutor.AbortPolicy()
//new ThreadPoolExecutor.CallerRunsPolicy()
//new ThreadPoolExecutor.DiscardOldestPolicy()
new ThreadPoolExecutor.DiscardOldestPolicy()
);
//10个顾客请求
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
拒绝策略:等待队列已经排满了,再也塞不下新任务了,同时,线程池中的max线程也达到了,无法继续为新任务服务。这个是时候我们就需要拒绝策略机制合理的处理这个问题。
JDK内置四种拒绝策略
/**
* new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异常
* new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
* new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
* new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会抛出异常!
*/
以上内置拒绝策略均实现了RejectedExecutionHandle接口
在创建了线程池后,开始等待请求。
当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
举例:
import java.util.concurrent.*;
/**
* @Description TODO
* @Package: juc.juc
* @ClassName ThreadPoolDemo
* @author: wuwb
* @date: 2020/12/18 9:12
*/
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
try {
for (int i = 1; i <= 10; i++) {
final int j = i;
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + " run " + j);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
/*
pool-1-thread-1 run 1
pool-1-thread-2 run 2
pool-1-thread-3 run 8
pool-1-thread-4 run 9
pool-1-thread-5 run 10
pool-1-thread-1 run 3
pool-1-thread-4 run 4
pool-1-thread-2 run 5
pool-1-thread-3 run 6
pool-1-thread-5 run 7
1、2进核心线程,3、4、5、6、7进队列等待,8、9、10启用非核心线程先于队列中任务运行
*/
*CPU密集型 最大线程数为CPU核数,CPU核数Runtime.getRuntime().availableProcessors();
位置:java.util.function包下
接口中只能有一个抽象方法的接口,称函数式接口
java内置核心四大函数式接口:
| 函数式接口 | 参数类型 | 返回类型 | 用途 |
| --------------------- | -------- | -------- | ------------------------------------------------------------ |
| Consumer
| Supplier
| Function<T, R> 函数型 | T | R | 对类型为T的对象应用操作,并返回结果;
结果是R类型的对象,包含方法:R apply(T t) |
| Predicate
消费型Consumer
/**
* 消费型
*/
public static void testConsumer() {
/*Consumer consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("输入的值是:" + s);
}
};*/
Consumer consumer = (s) -> {
System.out.println("输入的值是:" + s);
};
consumer.accept("abc");
}
供给型Supplier
/**
* 供给型
*/
public static void testSupplier() {
/*Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "供给型接口";
}
};*/
Supplier<String> supplier = () -> {
return "供给型接口lambda";
};
System.out.println(supplier.get());
}
函数型Function<T, R>
/**
* 函数型
*/
public static void testFunction() {
/*Function<String, String> function = new Function<String, String>() {
@Override
public String apply(String s) {
s += "123";
return s;
}
};*/
Function<String, String> function = (s) -> {
s += "123lambda";
return s;
};
System.out.println(function.apply("abc"));
}
断定型Predicate
/**
* 断定型
*/
public static void testPredicate() {
/*Predicate<String> predicate = new Predicate<String>() {
@Override
public boolean test(String s) {
return "1024".equals(s);
}
};*/
Predicate<String> predicate = (s) -> {
return "1024".equals(s);
};
System.out.println(predicate.test("1024"));
}
标签:@param 系统 lan multi 定时 manager 无法 影响 string
原文地址:https://www.cnblogs.com/wuweibincqu/p/14204702.html