码迷,mamicode.com
首页 > 其他好文 > 详细

JUC并发编程

时间:2021-04-20 14:17:55      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:dfa   返回   reduce   饿汉式   分割   locking   rdo   模拟   lang   

JUC并发编程

java.util.ConcurrentModificationException 并发修改异常

多线程基础

普通线程代码Thread

Runnable没有返回值,效率相比于Callable慢

线程和进程

进程:一个程序,程序的集合

一个进程往往可以包含多个线程至少包含一个

java默认两个线程(main、GC垃圾回收)

java开不了线程,是用native本地方法调用底层C++开启线程。

并发和并行

并发:CPU一核模拟多条线程。

并行:CPU多核,多个线程同时执行

Runtime,getRuntime().availableProcessors()//获得CPU核数

线程状态

NEW 新生

RUNNABLE 运行

BLOCKED 阻塞

WAITING 等待

TIME_WAITING 超时等待

TERMINATED 终止

wait和sleep区别

wait => Object,sleep => Thread

wait会释放锁,sleep会抱着锁睡觉

wait必须在同步代码块,sleep可以在任何地方睡

wait不需要捕获异常,sleep要捕获异常

Lock锁

ReentrantLock 可重入锁(常用)

ReentrantReadWriteLock.ReadLock 读锁

ReentrantReadWriteLock.WriteLock 写锁

默认生成NonfairSync(非公平锁),可以传入boolean变量生成FairSync(公平锁)

公平锁:十分公平:可以先来后到

非公平锁:十分不公平:可以插队(默认)

Lock lock = new ReentrantLock();
......
    lock.lock();//加锁
......
    lock.unlock();//解锁

Synchronized和Lock区别

1.Synchronized是内置的java关键字,Lock是一个java类

2.Synchronized无法判断当前锁的状态,Lock可以判断是否获取到了锁

3.Synchronized会自动释放锁,Lock必须要手动释放锁!如果不释放锁,死锁

4.Synchronized线程1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去

5.Synchronized可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置)

6.Synchronized适合锁少量的代码同步问题;Lock适合锁大量同步代码

虚假唤醒

代码举例

public class UnSafeThread {
	public static void main(String[] args) {
		UnSafeThread unSafeThread = new UnSafeThread();
		unSafeThread.work();
	}
	public void work() {
		Data data = new Data();
		new Thread(() -> {
			for (int i = 0; i < 10; i++)
				try {
					data.increment();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
		} , "A").start();
		new Thread(() -> {
			for (int i = 0; i < 10; i++)
				try {
					data.decrement();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
		} , "B").start();
		new Thread(() -> {
			for (int i = 0; i < 10; i++)
				try {
					data.increment();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
		} , "C").start();
		new Thread(() -> {
			for (int i = 0; i < 10; i++)
				try {
					data.decrement();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
		} , "D").start();
	}
}

class Data {
	private int number = 0;

	public synchronized void increment() throws InterruptedException {
		if (number != 0) {//把if修改为while
			this.wait();
		}
		number++;
		System.out.println(Thread.currentThread().getName() + "=>" + number);
		this.notifyAll();
	}

	public synchronized void decrement() throws InterruptedException {
		if (number == 0) {//把if修改为while
			this.wait();
		}
		number--;
		System.out.println(Thread.currentThread().getName() + "=>" + number);
		this.notifyAll();
	}
}

虚假唤醒指的是用if只判断了一次,而后判断值被修改了此时维持原始判断,造成虚假唤醒。用while就可以解决这个问题。

生产者消费者问题

传统方法和Lock方法

Synchronized -> Lock

wait -> await

notify -> signal

//上面的虚假唤醒,把Data类改成这样就是Lock方法
class DataLock {
	private int number = 0;
	Lock lock = new ReentrantLock();
	Condition condition = lock.newCondition();

	public void increment() {
		lock.lock();
		try {
			while (number != 0) {
				condition.await();
			}
			number++;
			System.out.println(Thread.currentThread().getName() + "=>" + number);
			condition.signalAll();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	public void decrement() {
		lock.lock();
		try {
			while (number == 0) {
				condition.await();
			}
			number--;
			System.out.println(Thread.currentThread().getName() + "=>" + number);
			condition.signalAll();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
}

Condition精确唤醒

public void workCondition() {// Lock方法,用到Condition
	DataCondition data = new DataCondition();
	new Thread(() -> {
		for (int i = 0; i < 10; i++)
				data.printA();
	} , "A").start();
	new Thread(() -> {
		for (int i = 0; i < 10; i++)
			data.printB();
	} , "B").start();
	new Thread(() -> {
		for (int i = 0; i < 10; i++)
			data.printC();
	} , "C").start();
}
class DataCondition {// Lock方法,用到Condition精确唤醒
	private int number = 0;
	Lock lock = new ReentrantLock();
	Condition condition1 = lock.newCondition();
	Condition condition2 = lock.newCondition();
	Condition condition3 = lock.newCondition();

	public void printA() {
		lock.lock();
		try {
			while (number != 0) {
				condition1.await();
			}
			number = 1;
			System.out.println(Thread.currentThread().getName() + "=>AAA");
			condition2.signal();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	public void printB() {
		lock.lock();
		try {
			while (number != 1) {
				condition2.await();
			}
			number = 2;
			System.out.println(Thread.currentThread().getName() + "=>BBB");
			condition3.signal();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	public void printC() {
		lock.lock();
		try {
			while (number != 2) {
				condition3.await();
			}
			number = 0;
			System.out.println(Thread.currentThread().getName() + "=>CCC");
			condition1.signal();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
}

Callable(简单)

Callable无法直接使用Thread启动线程,需要借助FutureTask适配类

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableTest {
	public static void main(String arge[]) throws InterruptedException, ExecutionException {
		MyCallable test = new MyCallable();
		FutureTask futureTask = new FutureTask(test);
		FutureTask futureTask1 = new FutureTask(test);//结果会被缓存,提高效率
		new Thread(futureTask, "A").start();
		Integer end = (Integer) futureTask.get();//因为这句抛出异常,可能产生阻塞(call方法里运行非常缓慢)
		System.out.println(end);
	}
}

class MyCallable implements Callable<Integer> {
	@Override
	public Integer call() throws Exception {
		System.out.println("Hello");
		return 0;
	}
}

常用辅助类(重点)

CountDownLatch

允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。

//计数器
public class CountDownLatchTest {
	public static void main(String arge[]) throws InterruptedException {
		CountDownLatch countDownLatch = new CountDownLatch(6);//总数是6
		for (int i = 0; i < 6; i++) {
			new Thread(() -> {
				System.out.println(Thread.currentThread().getName()+" go out");
				countDownLatch.countDown();//减一
			} , String.valueOf(i)).start();
		}
		countDownLatch.await();//等待计数器归零,再向下执行
		System.out.println("close door");
	}
}

作为一个减法计数器

countDownLatch.countDown();//减一操作

countDownLatch.await();//等待计数器归零,再向下执行

每次有线程调用countDown();数量-1,调用时不会阻塞,当计数器变为0,countDownLatch.await()被唤醒,继续执行。

CylicBarrier

允许一组线程全部等待彼此达到共同障碍点的同步辅助。

public class CylicBarrierTest {
	public static void main(String args[]) {
		CyclicBarrier cyclicBarrier =new CyclicBarrier(7, ()->{
			System.out.println("召唤神龙!");
		});//集齐七颗龙珠召唤神龙
		for(int i=1;i<=7;i++){
			int temp =i;//有final标志位,jdk1.8自动补充
			new Thread(()->{
				System.out.println("收集"+temp+"星龙珠!");
				try {
					cyclicBarrier.await();//等待
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}).start();
		}
	}
}

作为一个加法计数器

cyclicBarrier.await();//等待,就是加一过程

CyclicBarrier(7, ()->{System.out.println("召唤神龙!");});//当等待进程达到7时,执行后续Runnable接口

Semaphore

Semaphore:信号量

一个计数信号量,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用。

public class SemaphoreTest {
	public static void main(String args[]) {
		Semaphore semaphore = new Semaphore(3);//资源数量(这里是车位)
		for(int i=0;i<6;i++){
			new Thread(()->{
				try {
					semaphore.acquire();//得到资源
					System.out.println(Thread.currentThread().getName()+"抢到车位");
					TimeUnit.SECONDS.sleep(1);//停车时间
					System.out.println(Thread.currentThread().getName()+"离开车位");
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}finally {
					semaphore.release();//释放资源
				}
			},String.valueOf(i)).start();
		}
	}
}

semaphore.acquire();//得到资源,如果资源已经满了,等待

semaphore.release();//释放资源,唤醒等待资源的线程

作用:

1.多个共享资源互斥使用!

2.并发限流,控制最大线程数!

读写锁

ReadWriteLock

读可以被多个线程同时读

写的时候只能被一个线程写

public class ReadWriteLockTest {
	public static void main(String args[]) {
		MyCache myCache = new MyCache();
		// 写入
		for (int i = 0; i < 5; i++) {
			int temp = i;
			new Thread(() -> {
				myCache.put(temp + "", temp + "");
			} , String.valueOf(i)).start();
		}
		// 读取
		for (int i = 0; i < 5; i++) {
			int temp = i;
			new Thread(() -> {
				myCache.get(temp + "");
			} , String.valueOf(i)).start();
		}
	}
}

// 自定义缓存
class MyCache {
	private volatile Map<String, Object> map = new HashMap<>();
	private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

	public void put(String key, Object value) {
		readWriteLock.writeLock().lock();
		try {
			System.out.println(Thread.currentThread().getName() + "写入" + key);
			map.put(key, value);
			System.out.println(Thread.currentThread().getName() + "写入完成");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			readWriteLock.writeLock().unlock();
		}
	}

	public void get(String key) {
		readWriteLock.readLock().lock();
		try {
			System.out.println(Thread.currentThread().getName() + "读取" + key);
			map.get(key);
			System.out.println(Thread.currentThread().getName() + "读取完成");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			readWriteLock.readLock().unlock();
		}
	}
}

读锁必须加,因为存在读-写不能共存。

独占锁(写锁):一次只能被一个线程占有。

共享锁(读锁):多个线程可以同时占有。

阻塞队列BlockingQueue

不得不阻塞

写入:如果队列满了,必须阻塞等待

取:如果队列是空的,必须阻塞等待生产

使用队列

添加、删除

四组API

方式 抛出异常 有返回值(不抛出异常) 阻塞等待 超时等待
添加 add offer put offer
移除 remove poll take poll
判断队列首部 element peek
public static void main(String arge[]) {
		test1();//1234
	}
	// 抛出异常
	public static void test1() {
		ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
		System.out.println(blockingQueue.add("a"));
		System.out.println(blockingQueue.add("b"));
		System.out.println(blockingQueue.add("c"));
		// 抛出异常IllegalStateException:Queue full
		// System.out.println(blockingQueue.add("d"));
		System.out.println(blockingQueue.element());//输出队首
		System.out.println("=============");
		System.out.println(blockingQueue.remove());
		System.out.println(blockingQueue.remove());
		System.out.println(blockingQueue.remove());
		// 抛出异常java.util.NoSuchElementException
		// System.out.println(blockingQueue.remove());
	}

	public static void test2() {
		ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
		System.out.println(blockingQueue.offer("a"));
		System.out.println(blockingQueue.offer("b"));
		System.out.println(blockingQueue.offer("c"));
		System.out.println(blockingQueue.offer("d"));// false,不抛出异常
		System.out.println("=============");
		System.out.println(blockingQueue.peek());//输出队首
		System.out.println(blockingQueue.poll());
		System.out.println(blockingQueue.poll());
		System.out.println(blockingQueue.poll());
		System.out.println(blockingQueue.poll());// NULL,不抛出异常
	}
	//等待阻塞,一直阻塞
	public static void test3() throws InterruptedException {
		ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
		blockingQueue.put("a");
		blockingQueue.put("b");
		blockingQueue.put("c");
		//队满,写不进,一直阻塞
		//blockingQueue.put("d");
		System.out.println(blockingQueue.take());
		System.out.println(blockingQueue.take());
		System.out.println(blockingQueue.take());
		//队空,取不出,一直阻塞
		System.out.println(blockingQueue.take());
	}
	//等待阻塞,超时退出
	public static void test4() throws InterruptedException {
		ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
		blockingQueue.offer("a");
		blockingQueue.offer("b");
		blockingQueue.offer("c");
		//等待超过两秒就退出
		//blockingQueue.offer("d",2,TimeUnit.SECONDS);
		System.out.println("=============");
		blockingQueue.poll();
		blockingQueue.poll();
		blockingQueue.poll();
		//等待超过两秒就退出
		//blockingQueue.poll(2,TimeUnit.SECONDS);
	}

同步队列SynchronousQueue

没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素

用put、take操作

public static void main(String args[]) {
		SynchronousQueue<String> blockingQueue = new SynchronousQueue<>();
		new Thread(() -> {
			try {
				System.out.println(Thread.currentThread().getName() + " put 1");
				blockingQueue.put("1");
				System.out.println(Thread.currentThread().getName() + " put 2");
				blockingQueue.put("2");
				System.out.println(Thread.currentThread().getName() + " put 3");
				blockingQueue.put("3");
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		} , "T1").start();
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(1);
				System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
				TimeUnit.SECONDS.sleep(1);
				System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
				TimeUnit.SECONDS.sleep(1);
				System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		} , "T2").start();
	}

线程池(重点)

池化技术

程序的运行,本质:占用系统资源!优化资源的使用=>池化技术

线程池、连接池、内存池、对象池……

池化技术:事先准备好一些资源,有人要用,就从我这里拿,用完还我。

线程池的好处:

1.降低资源的消耗

2.提高响应的速度

3.方便管理

线程复用、可以控制最大并发数、管理线程


线程池:三大方法

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,避免资源耗尽的风险。

说明:Executors返回线程池的弊端如下:

1)FixedThreadPool和SingleThreadPool:

允许的请求队列长度为Integer.MAX_VALUE,约21亿,可能会堆积大量的请求,从而导致OOM。

2)CachedThreadPool和ScheduleThreadPool:

允许的创建线程数量为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

public static void main(String arge[]) {
		//ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
		//ExecutorService threadPool = Executors.newFixedThreadPool(3);// 创建一个固定大小的线程池
		ExecutorService threadPool = Executors.newCachedThreadPool();// 可伸缩的线程池
		try {
			for (int i = 0; i < 10; i++) {
				threadPool.execute(() -> {
					System.out.println(Thread.currentThread().getName() + " ok");
				});
			}
		} catch (Exception e) {
            e.printStackTrace();
			// TODO: handle exception
		} finally {
			threadPool.shutdown();
		}
	}

七大参数

源码分析

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//超时了没人用就会释放
TimeUnit unit,//超时单位
BlockingQueue workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程的,一般不用动
RejectedExecutionHandler handler//拒绝策略

up主的神举例

银行办理业务

假设有5个窗口,3个请假。现在正常工作窗口都有人办理业务,进来第三个人去候客区等待,当候客区满了,请假那三个被电话叫回来工作。当五个窗口有人,候客区又满了,再进来的人就会受到拒绝策略。当人数减少只有两个人在办理业务,长时间没人来请假那三个接着去休假。

(核心线程池为正常工作的两个窗口)

(最大核心线程池为五个窗口)

(超时释放为长时间没人来的长时间)

(超时单位,没啥意思就是个单位)

(阻塞队列为候客区)

(线程工厂,硬要解释就是建那五个窗口并且招职员的)

(拒绝策略为拒绝策略,挤不下了你出去逛会街)

四种拒绝策略

AbortPolicy()//不处理,抛出异常

CallerRunPolicy()//哪来的去哪里,哪个线程要申请线程,让那个线程自己执行。

DiscardPolicy()//队列满了,丢掉任务,不抛出异常

DiscardOldestPolicy()//队列满了,尝试和最早的线程竞争(争不到依旧被丢掉),不抛出异常

public static void main() {
	ExecutorService threadPool =new ThreadPoolExecutor(
            2,
            5,
            2,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
	try {
		for(int i=0;i<6;i++){//5,核心和队列刚好满,8,最大核心和队列刚好满
			threadPool.execute(()->{
				System.out.println(Thread.currentThread().getName()+" ok");
			});
		}
	} catch (Exception e) {
		e.printStackTrace();
		// TODO: handle exception
	}finally {
		threadPool.shutdown();
	}
}

CPU密集型、IO密集型

最大线程数定义

1.CPU密集型,几核心,就是几,可以保持CPU效率最高。

获取CPU核数:Runtime.getRuntime().availableProcessors()

2.IO密集型,判断程序中十分耗IO线程,一般是两倍。

新java程序员(必)

lambda表达式,链式编程、函数式接口。Stream流式计算(现代java开发需要会的)

函数式接口:只有一个方法的接口

四大函数式接口

Function

函数型接口,有一个输入参数,有一个输出

代码举例

		// Function函数式接口
		Function<String, String> function = new Function<String, String>() {
			@Override
			public String apply(String str) {
				return str;
			}
		};
		// lambda表达式写法
		Function<String, String> function1 = (str) -> {
			return str;
		};
		System.out.println(function.apply("Hello,Function!"));
		System.out.println(function1.apply("Hello,Function!"));

Predicate

断定型接口,有一个输入参数,返回值只能是布尔值

代码举例

		// Predicate函数式接口,断定型接口
		Predicate<String> predicate = new Predicate<String>() {
			@Override
			public boolean test(String str) {
				return str.isEmpty();
			}
		};
		// lambda表达式写法
		Predicate<String> predicate1 = (str) -> {
			return (str).isEmpty();
		};
		System.out.println(predicate.test("Hello"));
		System.out.println(predicate1.test("Hello"));

Consumer

消费型接口,只有输入,没有返回值

代码举例

		// Consumer函数型接口,消费型接口
		Consumer<String> consumer = new Consumer<String>() {
			@Override
			public void accept(String str) {
				System.out.println(str);
			}
		};
		// lambda表达式写法
		Consumer<String> consumer1 = (str) -> {
			System.out.println(str);
		};
		consumer.accept("Hello,Consumer!");
		consumer1.accept("Hello,Consumer!");

Supplier

供给型接口,没有参数,只有返回值

代码举例

		//Supplier函数型接口,供给型接口
		Supplier<String> supplier =new Supplier<String>() {
			@Override
			public String get() {
				return "Hello,Supplier!";
			}
		};
		// lambda表达式写法
		Supplier<String> supplier1 =()->{
			return "Hello,Supplier!";
		};
		System.out.println(supplier.get());
		System.out.println(supplier1.get());

Stream流式计算

什么是Stream流式计算

大数据:存储+计算

集合、MySQL本质就是存储东西的

计算都应该交给流来操作!

import java.util.Arrays;
import java.util.List;

public class StreamTest {
	/*题目要求 一行代码实现
	 *  1,ID必须是偶数 
	 *  2,年龄必须大于23岁 
	 *  3,用户名转大写
	 *  4,用户名字母倒着排序 
	 *  5,只输出一行数据
	 */
	public static void main(String[] args) {
		User u1 = new User(1, "a", 21);
		User u2 = new User(2, "b", 22);
		User u3 = new User(3, "c", 23);
		User u4 = new User(4, "d", 24);
		User u5 = new User(5, "e", 25);
		User u6 = new User(6, "f", 26);
		List<User> list = Arrays.asList(u1, u2, u3, u4, u5, u6);
        
		list.stream().filter(u -> {
			return u.id % 2 == 0;
		}).filter(u -> {
			return u.age > 23;
		}).map(u -> {
			return u.name.toUpperCase();
		}).sorted((uu1, uu2) -> {
			return uu2.compareTo(uu1);
		}).limit(1).forEach(System.out::println);
	}
}

class User {
	int id;
	String name;
	int age;

	public User(int id, String name, int age) {
		this.id = id;
		this.name = name;
		this.age = age;
	}
}

ForkJoin

分支合并

ForkJoin在JDK1.7,执行并行任务!提高效率。用于大数据量!

大数据:Map Reduce(把大任务拆成小任务)

总任务->(任务1、任务2)

->(任务1.1、任务1.2、任务2.1、任务2.2)

->(结果1.1、结果1.2、结果2.1、结果2.2)

->(总结果)

ForkJoin特点:工作窃取

假设任务1执行了一半,此时任务2已经执行结束

任务1:AAAAAAAA-----------------

任务2:BBBBBBBBBBBBBBBB

那么任务2将开始执行任务1未完成的部分

任务1:AAAAAAAA---------------B

任务2:BBBBBBBBBBBBBBBB

因为任务内容是双端队列

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class ForkJoinTest {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		test1();
		test2();
		test3();
	}
	//普通写法,等着被炒
	public static void test1() {
		long start = System.currentTimeMillis();
		long sum = 0L;
		for (Long i = 1L; i <= 10_0000_0000L; i++) {
			sum += i;
		}
		long end = System.currentTimeMillis();
		System.out.println("sum:" + sum + "运行时间:" + (end - start));
	}
	//ForkJoin写法,普遍常用可调优
	public static void test2() throws InterruptedException, ExecutionException {
		long start = System.currentTimeMillis();
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		ForkJoinTask<Long> task = new ForkJoin(0L, 10_0000_0000L);
		ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务
		long sum = submit.get();
		long end = System.currentTimeMillis();
		System.out.println("sum:" + sum + "运行时间:" + (end - start));
	}
	//Stream写法,最牛B的
	public static void test3() {
		long start = System.currentTimeMillis();
		long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
		long end = System.currentTimeMillis();
		System.out.println("sum:" + sum + "运行时间:" + (end - start));
	}
}

class ForkJoin extends RecursiveTask<Long> {
	private Long start;
	private Long end;
	private Long temp = 10_0000L;

	public ForkJoin(Long start, Long end) {
		this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute() {
		if ((end - start) < temp) {
			Long sum = 0L;
			for (Long i = start; i <= end; i++) {
				sum += i;
			}
			return sum;
		} else {
			long middle = (end + start) / 2;
			ForkJoin task1 = new ForkJoin(start, middle);
			task1.fork();// 把task1压入队列
			ForkJoin task2 = new ForkJoin(middle + 1, end);
			task2.fork();// 把task2压入队列
			return task1.join() + task2.join();
		}
	}
}

异步回调

Future设计初衷:对将来的某个事件的结果进行建模

// 没有返回值的runAsync异步回调
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
	try {
		TimeUnit.SECONDS.sleep(2);
	} catch (Exception e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
	System.out.println(Thread.currentThread().getName() + "runAsync->void");
});
System.out.println("111");
try {
	System.out.println("222");
	completableFuture.get();// 获得阻塞执行结果
	System.out.println("333");
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
} catch (ExecutionException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}
// 有返回值的
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
	System.out.println(Thread.currentThread().getName() + "supplyAsync->Integer");
	int i=1/0;//错误
	return 0;
});
completableFuture1.whenComplete((t, u) -> {
	System.out.println("t->" + t);
	System.out.println("u->" + u);
}).exceptionally((e) -> {
	System.out.println(e.getMessage());
	return 1;
});

JMM

什么是JMM

JMM:java内存模型,不存在的东西,概念!约定!

关于JMM的一些同步的约定

1,线程解锁前,必须把共享变量立刻刷回主存。

2,线程加锁前,必须读取主存中的最新值到工作内存中!

3,加锁和解锁是同一把锁

线程 工作内存 主内存

技术图片

8种操作

lock(锁定):作用于主内存,它把一个变量标记为一条线程独占状态;
read(读取):作用于主内存,它把变量值从主内存传送到线程的工作内存中,以便随后的load动作使用;
load(载入):作用于工作内存,它把read操作的值放入工作内存中的变量副本中;
use(使用):作用于工作内存,它把工作内存中的值传递给执行引擎,每当虚拟机遇到一个需要使用这个变量的指令时候,将会执行这个动作;
assign(赋值):作用于工作内存,它把从执行引擎获取的值赋值给工作内存中的变量,每当虚拟机遇到一个给变量赋值的指令时候,执行该操作;
store(存储):作用于工作内存,它把工作内存中的一个变量传送给主内存中,以备随后的write操作使用;
write(写入):作用于主内存,它把store传送值放到主内存中的变量中。
unlock(解锁):作用于主内存,它将一个处于锁定状态的变量释放出来,释放后的变量才能够被其他线程锁定;

Java内存模型还规定了执行上述8种基本操作时必须满足如下规则:

1、不允许read和load、store和write操作之一单独出现(即不允许一个变量从主存读取了但是工作内存不接受,或者从工作内存发起会写了但是主存不接受的情况),以上两个操作必须按顺序执行,但没有保证必须连续执行,也就是说,read与load之间、store与write之间是可插入其他指令的。
2、不允许一个线程丢弃它的最近的assign操作,即变量在工作内存中改变了之后必须把该变化同步回主内存。
3、不允许一个线程无原因地(没有发生过任何assign操作)把数据从线程的工作内存同步回主内存中。
4、一个新的变量只能从主内存中“诞生”,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量,换句话说就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作。
5、一个变量在同一个时刻只允许一条线程对其执行lock操作,但lock操作可以被同一个条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。
6、如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行load或assign操作初始化变量的值。
7、如果一个变量实现没有被lock操作锁定,则不允许对它执行unlock操作,也不允许去unlock一个被其他线程锁定的变量。
8、对一个变量执行unlock操作之前,必须先把此变量同步回主内存(执行store和write操作)。

Volatile

Volatile的理解

volatile是java虚拟机提供的轻量级的同步机制

1,保证可见性

2,不保证原子性

3,禁止指令重排

代码举例:保证可见性

//加volatile保证可见性
//不加volatile就会让线程1在死循环
private volatile static int num = 0;
public static void main(String[] args) {
	new Thread(() -> {//线程1对主内存的变化不知道
		while (num == 0) {
		}
	}).start();
	try {
		TimeUnit.SECONDS.sleep(1);
	} catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
	num = 1;
	System.out.println(num);
}

代码举例:不保证原子性(不可分割)

//添加volatile仍然不能保证num最后变20000
private volatile static int num = 0;
//可以使用synchronized或lock保证num变成20000
public static void add() {
	num++;
}
public static void main(String[] args) {
	for (int i = 0; i < 20; i++) {
		new Thread(() -> {
			for (int j = 0; j < 1000; j++) {
				add();
			}
		}).start();
	} // 理论上num变为20000
	while(Thread.activeCount()>2){//等待上面的子线程结束
		Thread.yield();
	}
	System.out.println(num);
}

如果不允许使用synchronized或lock可以改为原子类

private static AtomicInteger num = new AtomicInteger();
//可以使用原子类
public static void add() {
	num.getAndIncrement();
}
public static void main(String[] args) {
	for (int i = 0; i < 20; i++) {
		new Thread(() -> {
			for (int j = 0; j < 1000; j++) {
				add();
			}
		}).start();
	} // 理论上num变为20000
	while(Thread.activeCount()>2){//等待上面的子线程结束
		Thread.yield();
	}
	System.out.println(num);
}

num++实际上分为三步

取出数据作为基数、加一操作、写入内存

所以num++本身就不是原子性的操作

指令重排

指令重排:写的程序并不是按照自己写的那样执行

源代码->编译器优化重排->指令并行也可能重排->内存系统也会重排->执行

处理器在进行指令重排时:考虑数据之间的依赖性

volatile可以避免指令重排

内存屏障。CPU指令。作用:

1、保证特定的操作的执行顺序!

2、可以保证某些变量的内存可见性(利用这些特性volatile实现了可见性)

单例模式

代码举例

// 饿汉式单例
class Hungry {
	private Hungry() {
		// 私有构造方法
	}

	// 提前构造完成
	private final static Hungry HUNGRY = new Hungry();

	public static Hungry getInstance() {
		return HUNGRY;
	}
}

// 懒汉式单例
class LazyMan {
	private LazyMan() {
		// 私有构造方法
	}

	private static LazyMan LAZYMAN;

	public static LazyMan getInstance() {
		if (LAZYMAN == null) {
			// 用的时候再构造
			LAZYMAN = new LazyMan();
		}
		return LAZYMAN;
	}
}

// 双检索单例模式(DCL懒汉式)
class DoubleCheck {
	private DoubleCheck() {
		// 私有构造方法
	}

	private static DoubleCheck DOUBLECHECK;

	public static DoubleCheck getInstance() {
		if (DOUBLECHECK == null) {
			synchronized (DoubleCheck.class) {
				if (DOUBLECHECK == null) {
					DOUBLECHECK = new DoubleCheck();
				}
			}
		}
		return DOUBLECHECK;
	}
}

//静态内部类单例
class Outside { 
    private Outside(){
    }
      public static Outside getInstance(){  
        return Inside.OUTSIDE;  
    }  
    private static class Inside {  
        private static final Outside OUTSIDE = new Outside();  
    }  
} 

//枚举单例
public enum Singleton {  
    INSTANCE;  
    public Singleton getInstance() {
    	return INSTANCE;
    }
}  

深入理解CAS

原子类(volatile原子性里用过)的底层用到CAS,compareAndSet:比较并交换

compareAndSet(A,B);如果我期望的达到了A,就更新为B

unsafe类

java无法操作内存

java可以调用C++ native

C++可以操作内存

java的后门,可以通过这个类操作内存

CAS

CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么执行操作!如果不是就一直循环!

缺点:

1、循环会耗时

2、一次只能保证一个共享变量的原子性

3、ABA问题

ABA问题(狸猫换太子)

线程A期望拿到A并修改为C

但是有个线程B期望拿到A并修改为B,然后在期望拿到B并修改为A

线程A拿到的值实际上是被修改过然后再被修改回A的值

代码举例

	public static void main(String[] args) {
		AtomicInteger atomicInteger = new AtomicInteger(20);
		// 如果我期望的值达到了,那么就更新,否则,就不更新,cAS是CPu的并发原语!
		// ===========捣乱的线程==================
		System.out.print1n(atomicInteger.compareAndSet(20, 21));
		System.out.print1n(atomicInteger.get());
		System.out.print1n(atomicInteger.compareAndSet(21, 20));
		System.out.println(atomicInteger.get());
		// ===========期望的线程==================
		System.out.println(atomicInteger.compareAndSet(20, 66));
		System.out.println(atomicInteger.get());
	}

原子引用解决ABA问题

解决ABA问题,添加一个版本号,每次执行操作,版本号都加一。

期望数据达到A然后修改为B,期望版本号为C然后修改为D

补充:乐观锁就是这种想法

各种锁

1、公平锁、非公平锁

公平锁:非常公平,不能插队,必须先来后到!

非公平锁:非常不公平,可以插队(默认都是非公平)

public ReentrantLock(){
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair){
    sync = fair ? new FairSync() : new NonfairSync();
}

2、可重入锁

可重入锁(递归锁)

拿到了外面的锁,就可以拿到里面的锁

举例:

拿到自己家大门的锁

就拿到了自己家房门的锁

锁必须一一配对不然就会死锁

3、自旋锁

spinlock

CAS底层里用到过

不断尝试,直到成功为止

代码举例

public class SpinLock {
	public static void main(String[] args) {
		MyLock lock=new MyLock();
		new Thread(()->{
			lock.myLock();
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}finally {
				lock.myUnLock();
			}
		},"A").start();
		new Thread(()->{
			lock.myLock();
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}finally {
				lock.myUnLock();
			}
		},"B").start();
	}
}

// 自己写的自旋锁
class MyLock {
	//初始为null
	volatile AtomicReference<Thread> atomicReference = new AtomicReference<>();

	// 加锁
	public void myLock() {
		Thread thread = Thread.currentThread();
		while (!atomicReference.compareAndSet(null, thread)) {
			// 第二个拿不到锁,进行自旋操作,转到死
		}
		System.out.println(Thread.currentThread().getName() + "->Lock");
	}

	// 解锁
	public void myUnLock() {
		Thread thread = Thread.currentThread();
		System.out.println(Thread.currentThread().getName() + "->UnLock");
		atomicReference.compareAndSet(thread, null);
	}
}

4、死锁

死锁排查

当进程卡死时,并且不输出结果

用Terminal打开命令行

1、使用‘jps -l‘定位进程号

2、使用‘jstack 进程号‘找到问题

JUC并发编程

标签:dfa   返回   reduce   饿汉式   分割   locking   rdo   模拟   lang   

原文地址:https://www.cnblogs.com/wing-of-dream/p/14672290.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!