标签:控制 并发 这一 结果 可重入锁 执行 取出 自己 reading
Java 多线程进阶-并发协作控制
线程协作对比
Lock 锁
package concurrentDemo0421;
import java.time.LocalDateTime;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LockExample {
private static final ReentrantLock queueLock111 = new ReentrantLock(); // 可重入锁
private static final ReentrantReadWriteLock orderLock111 = new ReentrantReadWriteLock(); // 可重入读写锁
/**
* 学校门口有家奶茶店, 学生们点单有时需要排队
* 1. 买奶茶
* 假设想买奶茶的同学如果看到需要排队, 就决定不买了
* (一次只有一个买)
* <p>
* 2. 操作奶茶账本
* 假设奶茶店有老板和多名员工, 记录方式比较原始, 只有一个订单本
* (多个读, 一个写)
* 老板负责写新订单, 员工不断查看订单本得到信息来制作奶茶, 在老板写新订单的时候员工不能查看订单本
* (写时, 不能读)
* 多个员工可以同时查看订单本, 此时老板不能写新订单
* (读时, 不能写)
*
* @param args 1
*/
public static void main(String[] args) throws InterruptedException {
// 1. 买奶茶的例子
buyMilkTea(); // 使用可重入锁
// 2. 操作奶茶账本的例子
handleOrder(); // 使用读写锁
}
public static void buyMilkTea() throws InterruptedException {
LockExample lockExample = new LockExample();
int STUDENTS_COUNT = 10;
Thread[] students = new Thread[STUDENTS_COUNT];
for (int i = 0; i < students.length; i++) {
students[i] = new Thread(new Runnable() { // 匿名的线程类, 没有名字的
@Override
public void run() {
try {
long walkingTime = (long) (Math.random() * 1000);
Thread.sleep(walkingTime);
lockExample.tryToBuyMilk();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
students[i].start();
}
for (Thread student : students) {
student.join();
}
}
private void tryToBuyMilk() throws InterruptedException {
boolean flag = true;
while (flag) {
if (queueLock111.tryLock()) { // 查一下现在是否锁住, 锁住了在下面的flag地方等一下再来操作.
// tryLock()实际包含了两个操作, 先 try 再 lock; 如果没有锁再锁住.
long thinkingTime = (long) (Math.random() * 500);
Thread.sleep(thinkingTime);
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 来一杯珍珠奶茶, 不要珍珠");
flag = false;
queueLock111.unlock();
} else {
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 再等等");
}
if (flag) {
Thread.sleep(1000);
}
}
}
/**
* 处理订单
*/
static void handleOrder() {
LockExample lockExample = new LockExample();
Thread boss = new Thread(() -> {
while (true) {
try {
lockExample.addOrder(); // 老板加新单子
long waitingTime = (long) (Math.random() * 1000);
Thread.sleep(waitingTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
boss.start();
int workerCount = 3;
Thread[] workers = new Thread[workerCount];
for (int i = 0; i < workerCount; i++) {
workers[i] = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
lockExample.viewOrder(); // 员工取出单子
long workingTime = (long) (Math.random() * 5000);
Thread.sleep(workingTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
workers[i].start();
}
}
/**
* 向订单本录入新订单
*/
private void addOrder() throws InterruptedException {
orderLock111.writeLock().lock(); // writeLock 写锁, 排他的, 只能一个线程拥有
long writingTime = (long) (Math.random() * 1000);
Thread.sleep(writingTime);
System.out.println(LocalDateTime.now() + " => " + "老板新加一个订单");
orderLock111.writeLock().unlock();
}
/**
* 查看订单本
*/
private void viewOrder() throws InterruptedException {
orderLock111.readLock().lock(); // readLock 读锁, 可以多个线程共享(同时访问)
long readingTime = (long) (Math.random() * 500);
Thread.sleep(readingTime);
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 查看了订单本");
orderLock111.readLock().unlock();
}
}
Semaphore 信号量
package concurrentDemo0421;
import java.time.LocalDateTime;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 控制同时访问代码块的线程数
* 现在有一个地下车库, 共有5个车位, 有10辆车需要停放, 每次停放时, 去申请信号量
*/
public class SemaphoreExample {
private final Semaphore placeSemaphore = new Semaphore(5);
public static void main(String[] args) throws InterruptedException {
SemaphoreExample example = new SemaphoreExample();
int tryToParkCount = 10;
Thread[] parkers = new Thread[tryToParkCount];
for (int i = 0; i < parkers.length; i++) {
parkers[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
long randomTime = (long) (Math.random() * 1000);
Thread.sleep(randomTime); // 过一段时间来停车
if (example.parking()) {
long parkingTime = (long) (Math.random() * 1200);
Thread.sleep(parkingTime); // 停一段时间离开
example.leaving();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
parkers[i].start();
}
for (Thread t : parkers) {
t.join();
}
TimeUnit.SECONDS.sleep(60);
}
private boolean parking() {
if (placeSemaphore.tryAcquire()) { // 查看是否有剩余的信号量(剩余的车位)
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 停车成功!");
return true;
} else {
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 没有空位");
return false;
}
}
private void leaving() {
placeSemaphore.release();
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 开走了");
}
}
Latch 等待锁
package concurrentDemo0421;
import java.time.LocalDateTime;
import java.util.concurrent.CountDownLatch;
/**
* 设想百米赛跑, 发令枪发出信号后选手开始跑, 全部选手跑到终点后比赛结束.
*/
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(10);
int runnerCount = 10; // 选手数量
for (int i = 0; i < runnerCount; i++) { // create and start threads
new Thread(new Runner(startSignal, doneSignal)).start(); // 所有选手开始跑~
}
System.out.println(LocalDateTime.now() + " => " + "准备就绪..");
startSignal.countDown(); // let all threads proceed
System.out.println(LocalDateTime.now() + " => " + "比赛开始!");
doneSignal.await(); // wait for all threads to finish
System.out.println(LocalDateTime.now() + " => " + "比赛结束!");
}
}
class Runner implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Runner(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doWork() throws InterruptedException {
long time = (long) (Math.random() * 10 * 1000);
Thread.sleep(time); // 随机在十秒内跑完
System.out.printf(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 跑完全程, 用时 %d 秒 \n", time/1000);
}
}
Barrier/?b?ri?r/ n.障碍物
package concurrentDemo0421;
import java.time.LocalDateTime;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Barrier/?b?ri?r/ n.障碍物
* 假定有三行数字, 用三个线程分别计算每一行的和, 最后计算综合
*/
public class BarrierExample {
public static void main(String[] args) {
int rowCount = 3;
int colCount = 5;
final int[][] numbers = new int[rowCount][colCount];
final int[] results = new int[rowCount];
numbers[0] = new int[]{1, 2, 3, 4, 5};
numbers[1] = new int[]{6, 7, 8, 9, 10};
numbers[2] = new int[]{11, 12, 13, 14, 15};
CalcFinalSum111 finalResult = new CalcFinalSum111(results);
CyclicBarrier cyclicBarrier = new CyclicBarrier(rowCount, finalResult);
// 当有3个线程在 barrier上await时, 就执行最终计算
for (int i = 0; i < rowCount; i++) {
CalcRowSum111 eachRow = new CalcRowSum111(numbers, i, results, cyclicBarrier);
new Thread(eachRow).start();
}
}
}
class CalcRowSum111 implements Runnable {
final int[][] numbers;
final int rowNumber;
final int[] result;
final CyclicBarrier barrier;
CalcRowSum111(int[][] numbers, int rowNumber, int[] result, CyclicBarrier barrier) {
this.numbers = numbers;
this.rowNumber = rowNumber;
this.result = result;
this.barrier = barrier;
}
@Override
public void run() {
int[] row = numbers[rowNumber];
int sum = 0;
for (int data : row) {
sum += data;
result[rowNumber] = sum;
}
try {
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 计算第" + (rowNumber + 1) + "行结束, 结果为: " + sum);
barrier.await(); // 等待! 只要超过(Barrier的构造参数填入的数量)的个数, 就放行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class CalcFinalSum111 implements Runnable {
final int[] eachRowResult;
int finalResult;
CalcFinalSum111(int[] eachRowResult) {
this.eachRowResult = eachRowResult;
}
@Override
public void run() {
int sum = 0;
for (int data : eachRowResult) {
sum += data;
}
finalResult = sum;
System.out.println(LocalDateTime.now() + " => " + "最终结果为: " + finalResult);
}
}
Phaser 阶段性控制多个线程
package concurrentDemo0421;
import java.time.LocalDateTime;
import java.util.concurrent.Phaser;
/**
* 假设举行考试, 总共三道大题, 每次下发一道题目, 等所有学生都完成之后再进行下一道题
*/
public class PhaserExample {
public static void main(String[] args) {
int studentCount = 5;
Phaser phaser = new Phaser(studentCount);
for (int i = 0; i < studentCount; i++) {
new Thread(null, new Student111(phaser), "学生" + i).start();
}
}
}
class Student111 implements Runnable {
private final Phaser phaser;
Student111(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
doTesting(1);
phaser.arriveAndAwaitAdvance(); // 等到所有线程都到达了, 才放行
doTesting(2);
phaser.arriveAndAwaitAdvance();
doTesting(3);
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doTesting(int i) throws InterruptedException {
String name = Thread.currentThread().getName();
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "开始答第" + i + "题");
long thinkingTime = (long) (Math.random() * 1000);
Thread.sleep(thinkingTime); // 模拟学生答题时间
System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "第" + i + "道题答题结束");
}
}
Exchanger 两个线程间交换数据
package concurrentDemo0421;
import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.Exchanger;
/**
* 通过Exchanger实现学生成绩查询, 两个线程间简单的数据交换,
* 把自己线程的内容输出给另一个线程(只能简单的双向传送, 不能向MPI一样随意点对点的传输, 线程1给线程3 线程3向线程2...这样)
*/
public class ExchangerExample {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
BackgroundWorker111 backgroundWorker111 = new BackgroundWorker111(exchanger);
new Thread(backgroundWorker111).start();
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println(LocalDateTime.now() + " => " + "请输入要查询的学生名字:");
String input = scanner.nextLine().trim();
exchanger.exchange(input);
String exResult = exchanger.exchange(null); // 拿到线程反馈的结果
// 当两个线程都同时执行到同一个exchanger.exchange()方法, 两个线程就互相交换数据, 交换是双向的.
if ("exit".equals(exResult)) {
System.out.println(LocalDateTime.now() + " => " + "退出查询~");
break;
}
System.out.println(LocalDateTime.now() + " => " + "查询结果: " + exResult);
}
}
}
class BackgroundWorker111 implements Runnable {
final Exchanger<String> exchanger;
BackgroundWorker111(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
try {
String item = exchanger.exchange(null);
switch (item) {
case "zhangsan":
exchanger.exchange("90");
break;
case "lisi":
exchanger.exchange("80");
break;
case "wangwu":
exchanger.exchange("70");
break;
case "exit":
exchanger.exchange("exit");
return; // 退出run, 即结束当前线程
default:
exchanger.exchange("no body!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
总结
标签:控制 并发 这一 结果 可重入锁 执行 取出 自己 reading
原文地址:https://www.cnblogs.com/sweetXiaoma/p/12749714.html