码迷,mamicode.com
首页 > 其他好文 > 详细

马士兵老师高并发编程同步容器

时间:2018-11-06 11:06:55      阅读:134      评论:0      收藏:0      [点我收藏+]

标签:container   style   条件   blocking   meta   pre   生产   操作   put   

手写固定同步容器

写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。

使用wait与notify

思路:使用一个集合来当做生产或者消费的中转站,然后每当生产或者消费的时刻都判断集合的容量,如果不满足条件那么就对这种操作进行阻塞也就是wait同时notify其它的所有线程。当其它线程启动之后也会遇到“不合格的线程”这时候也会阻塞,直到合格的线程进行执行。

核心代码:

  1. public class MyContainer1<T> {
  2. final private LinkedList<T> lists = new LinkedList<>();
  3. final private int MAX = 10; //最多10个元素
  4. private int count = 0;
  5. public synchronized void put(T t) {
  6. while(lists.size() == MAX) { //想想为什么用while而不是用if?
  7. try {
  8. this.wait(); //effective java
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. lists.add(t);
  14. ++count;
  15. this.notifyAll(); //通知消费者线程进行消费
  16. }
  17. public synchronized T get() {
  18. T t = null;
  19. while(lists.size() == 0) {
  20. try {
  21. this.wait();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. t = lists.removeFirst();
  27. count --;
  28. this.notifyAll(); //通知生产者进行生产
  29. return t;
  30. }
  31. public static void main(String[] args) {
  32. MyContainer1<String> c = new MyContainer1<>();
  33. //启动消费者线程
  34. for(int i=0; i<10; i++) {
  35. new Thread(()->{
  36. for(int j=0; j<5; j++) System.out.println(c.get());
  37. }, "c" + i).start();
  38. }
  39. try {
  40. TimeUnit.SECONDS.sleep(2);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. //启动生产者线程
  45. for(int i=0; i<2; i++) {
  46. new Thread(()->{
  47. for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
  48. }, "p" + i).start();
  49. }
  50. }
  51. }

注意事项:

为什么是while?

有下面的一个场景,当消费者消费的阈值不满足条件那么它将会被wait阻塞。此时还有其的9个消费者也处于阻塞状态。当notifyAll的时候我们希望的是让生产者来生产元素,但是这时候被唤醒的消费者会继续去消费,代码会从wait()处直接向下执行。如果是一个if判断,再加上这时的集合中没有元素那么此时一定会出异常。但是如果使用的是while的话那么被唤醒的消费者就会循环检测发现不满足条件就继续阻塞。整个程序顺利进行。

 

使用signalAll唤醒对应条件的线程

signalAll与ReentrantLock共用达到只唤醒对应条件的线程。比如说当生产者生产的元素超过阈值的时候他就会调用signalAll这时所有的消费者被唤醒而所有的生产者则不受影响。这样就可以避免唤醒不必要的线程节省资源。

此处注意要给每一种线程都定义一个Condition,在上锁的时候就只用这个Condition的锁去锁定对应的线程。具体代码如下:

  1. public class MyContainer2<T> {
  2. final private LinkedList<T> lists = new LinkedList<>();
  3. final private int MAX = 10; //最多10个元素
  4. private int count = 0;
  5. private Lock lock = new ReentrantLock();
  6. private Condition producer = lock.newCondition();
  7. private Condition consumer = lock.newCondition();
  8. public void put(T t) {
  9. try {
  10. lock.lock();
  11. while(lists.size() == MAX) {
  12. producer.await();
  13. }
  14. lists.add(t);
  15. ++count;
  16. consumer.signalAll(); //通知消费者线程进行消费
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. } finally {
  20. lock.unlock();
  21. }
  22. }
  23. public T get() {
  24. T t = null;
  25. try {
  26. lock.lock();
  27. while(lists.size() == 0) {
  28. consumer.await();
  29. }
  30. t = lists.removeFirst();
  31. count --;
  32. producer.signalAll(); //通知生产者进行生产
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. } finally {
  36. lock.unlock();
  37. }
  38. return t;
  39. }
  40. public static void main(String[] args) {
  41. MyContainer2<String> c = new MyContainer2<>();
  42. //启动消费者线程
  43. for(int i=0; i<10; i++) {
  44. new Thread(()->{
  45. for(int j=0; j<5; j++) {
  46. System.out.println(c.get());
  47. // System.out.println("c");
  48. }
  49. }, "c" + i).start();
  50. }
  51. try {
  52. TimeUnit.SECONDS.sleep(2);
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. }
  56. //启动生产者线程
  57. for(int i=0; i<2; i++) {
  58. new Thread(()->{
  59. for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
  60. }, "p" + i).start();
  61. }
  62. }
  63. }

售票员案例

 * 场景

 * 有N张火车票,每张票都有一个编号

 * 同时有10个窗口对外售票

 * 请写一个模拟程序

 *

 * 分析下面的程序可能会产生哪些问题?

 * 重复销售?超量销售?

 

Solution1:使用线程不安全的集合而且不上锁

  1. public class TicketSeller1 {
  2. static List<String> tickets = new ArrayList<>();
  3. static {
  4. for(int i=0; i<1000; i++) tickets.add("票编号:" + i);
  5. }
  6. public static void main(String[] args) {
  7. for(int i=0; i<10; i++) {
  8. new Thread(()->{
  9. while(tickets.size() > 0) {
  10. try {
  11. Thread.sleep(10);
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. System.out.println("销售了--" + tickets.remove(0));
  16. }
  17. }).start();
  18. }
  19. }
  20. }

为了使得问题呈现的效果明显我们加上了睡眠时间。此程序会出现的问题:

总的来说会有两个点出现问题:

技术分享图片

程序逻辑的线程不安全,有可能有多个线程涌入while循环中这是造成并发问题的根本。这个原因会导致在size为1的时候涌入很多的线程进而执行多次删除操作下标越界。

集合的线程不安全,remove的方法本来就不是线程安全的。为了说明问题我,我们把remove方法放大:

技术分享图片

可以看出如果两个线程同时执行remove方法的话,由于index一样所以他们的remove的返回值就会得到同一个oldValue。也就是重复卖出。

Solution2:使用集合Vector但是代码逻辑不加锁

代码上的逻辑与solution1是一样的,但是Vector集合是线程安全的,所以它只会出现程序逻辑不安全带来的并发问题。也就是会出现有可能有多个线程涌入while循环中这是造成并发问题的根本。这个原因会导致在size为1的时候涌入很多的线程进而执行多次删除操作下标越界。但是绝对不会出现卖出同一张票的情况。我们把remove的代码放大:

技术分享图片

这是一个同步的方法,每一个线程过来如果得不到锁得话都会陷入等待。虽然都是remove(0)但是当下一个线程来到的时候0位置已经是一个全新的元素。

 

Solution3:给代码逻辑上锁使用线程不安全的集合

不多说了无论如何都可以防止线程安全问题,因为在Solution1中已经提到过了代码的并发问题是一切问题的原因。直接上代码:

  1. public class TicketSeller3 {
  2. static List<String> tickets = new LinkedList<>();
  3. static {
  4. for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
  5. }
  6. public static void main(String[] args) {
  7. for(int i=0; i<10; i++) {
  8. new Thread(()->{
  9. while(true) {
  10. synchronized(tickets) {
  11. if(tickets.size() <= 0) break;
  12. try {
  13. TimeUnit.MILLISECONDS.sleep(10);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println("销售了--" + tickets.remove(0));
  18. }
  19. }
  20. }).start();
  21. }
  22. }
  23. }

Solution4使用并发集合不加锁

首先说并发的集合是线程安全的,而且效率较高因为使用了局部锁。这样的话只在取值的时候加了锁,而且如果是以下标来取值的话还可以同时取走多个地方的值这样的话效率大大提高。而且这里使用一种取值然后再判断的逻辑巧妙的避免了下表越界的错误,而前面的案例中都是先判断再取值这样就造成了线程不安全:

  1. public class TicketSeller4 {
  2. static Queue<String> tickets = new ConcurrentLinkedQueue<>();
  3. static {
  4. for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
  5. }
  6. public static void main(String[] args) {
  7. for(int i=0; i<10; i++) {
  8. new Thread(()->{
  9. while(true) {
  10. String s = tickets.poll();
  11. if(s == null) break;
  12. else System.out.println("销售了--" + s);
  13. }
  14. }).start();
  15. }
  16. }
  17. }

高并发集合简介

ConcurrectHashMap:使用了局部锁,也就是细粒度的锁来提高并发效果。

ConcurrectSkipListMap:使用了局部锁,但是结果也排序的map集合。对比TreeMap一个元素排序的map集合。

CopyOnWriteArrayList:读取时不加锁,但是写的时候回拷贝原有的数据然后对拷贝的数据进行操作最后将指针指向修改过的集合。这个集合适用于读操作远远大于写操作的情况。

BlockingQueue:阻塞队列,当队列中没有元素的时候就会对取元素产生阻塞,当队列中满元素的时候就会对添加元素产生阻塞。而且不允许添加null的值而且在取值与添加值的情况下都会加锁,换句话说它是一个线程安全的集合。以下为部分源码:

技术分享图片

DelayQueue:执行定时任务,他的内部会装有很多的task接受的task都实现了Delay接口,因此task内部也就维护了一个conpareTo的方法,如果按照时间排序的话那么就能够实现任务的定时执行。

  1. public class T07_DelayQueue {
  2. static BlockingQueue<MyTask> tasks = new DelayQueue<>();
  3. static Random r = new Random();
  4. static class MyTask implements Delayed {
  5. long runningTime;
  6. MyTask(long rt) {
  7. this.runningTime = rt;
  8. }
  9. @Override
  10. public int compareTo(Delayed o) {
  11. if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
  12. return -1;
  13. else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
  14. return 1;
  15. else
  16. return 0;
  17. }
  18. @Override
  19. public long getDelay(TimeUnit unit) {
  20. return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  21. }
  22. @Override
  23. public String toString() {
  24. return "" + runningTime;
  25. }
  26. }
  27. public static void main(String[] args) throws InterruptedException {
  28. long now = System.currentTimeMillis();
  29. MyTask t1 = new MyTask(now + 1000);
  30. MyTask t2 = new MyTask(now + 2000);
  31. MyTask t3 = new MyTask(now + 1500);
  32. MyTask t4 = new MyTask(now + 2500);
  33. MyTask t5 = new MyTask(now + 500);
  34. tasks.put(t1);
  35. tasks.put(t2);
  36. tasks.put(t3);
  37. tasks.put(t4);
  38. tasks.put(t5);
  39. System.out.println(tasks);
  40. for(int i=0; i<5; i++) {
  41. System.out.println(tasks.take());
  42. }
  43. }
  44. }

TransferQueue:当有消费者的话那么就直接将生产出来的元素交给消费者,但是如果没有消费者的话就会将生产的元素放到队列中。当队列中的元素消耗完的时候消费者就会阻塞。

 

原文地址:https://blog.csdn.net/qq_34993631/article/details/82685082

马士兵老师高并发编程同步容器

标签:container   style   条件   blocking   meta   pre   生产   操作   put   

原文地址:https://www.cnblogs.com/jpfss/p/9913344.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!