码迷,mamicode.com
首页 > 编程语言 > 详细

java并发基础

时间:2017-05-31 10:29:49      阅读:246      评论:0      收藏:0      [点我收藏+]

标签:为什么   ++   junit   ==   pen   总结   semaphore   其他   准备   

  《java并发编程实战》终于读完4-7章了,感触很深,但是有些东西还没有吃透,先把已经理解的整理一下。《java并发编程实战》笔记(一)是对前3章的总结。这里总结一下第5章的东西,为什么跳过第4章?不告诉你。

一,阻塞队列和生产者-消费者模式

  java中的阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列(先进先出),PriorityBlockingQueue是一个按优先级排序的队列。

  阻塞队列支持生产者-消费者模式,该模式将“找出需要完成的工作”和“执行工作”这两个过程分离开来;并把工作项放入一个“待完成”列表中以便在随后处理,而不是找出后立即处理。当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。生产者不需要知道消费者的标识和数量,或者它们是否是唯一的生产者,而只需将数据放入队列即可。同样,消费者也不需要知道生产者是谁,来自何处。

  举个例子说明阻塞队列和生产者-消费者模式如何配合使用:在某个文件层次结构中搜索所有的.java文件。

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
 
/**
 * @描述:生产者
 * @author 肖冬
 */
public class FileCrawler implements Runnable {
    private final BlockingQueue<File> fileQueue ;
    private final FileFilter fileFilter;
     
    private final File root;
     
    public FileCrawler(BlockingQueue<File> fileQueue,FileFilter fileFilter, File root) {
        super();
        this.fileQueue = fileQueue;
        this.fileFilter = fileFilter;
        this.root = root;
    }
 
    @Override
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            //使线程恢复中断状态,为什么这么做:因为线程一旦抛出中断异常,就会重置中断状态
            Thread.currentThread().interrupt();
        }
    }
     
    private void crawl(File root) throws InterruptedException{
        File[] entries = root.listFiles(fileFilter);
        if (entries !=null) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                }else{
                    //生产者将任务放到阻塞队列
                    fileQueue.put(entry);
                }
            }
        }
    }
 
}
import java.io.File;
import java.util.concurrent.BlockingQueue;
 
/**
 * @描述:消费者
 * @author 肖冬
 */
public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;
     
    public Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }
 
    @Override
    public void run() {
        try{
            while (true) {
                indexFile(queue.take());
            }
        }catch(InterruptedException e){
            Thread.currentThread().interrupt();
        }
         
    }
     
    private void indexFile(File f){
        System.out.println(f.getName());
    }
     
}
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
 
import org.junit.Test;
 
/**
 * @描述:junit测试
 * @author 肖冬
 */
public class TestMain {
     
    @Test
    public void testFileFilter(){
        File root = new File("D://test");
        FileFilter fileFilter = new FileFilter() {
            @Override
            public boolean accept(File f) {
                if (f.isDirectory())  return true;
                if (f.isFile()) {
                    String name = f.getName();
                    if (name.endsWith(".java")) return true;
                    else return false;
                }
                return false;
            }
        };
        BlockingQueue<File> queue = new LinkedBlockingQueue<File>();
        //生产者
        new Thread(new FileCrawler(queue,fileFilter, root)).start();
        //消费者
        new Thread(new Indexer(queue)).start();
    }
}

  生产者将符合条件的文件放入阻塞队列,消费者处理任务时只从阻塞队列中去任务就可以了,不必关心生产者。

  生产者-消费者模式同样能带来许多性能优势。生产者和消费者可以并发的执行。如果一个是I/O密集型,另一个是CPU密集型,那么并发执行的吞吐率要高于串行执行的吞吐率。

 

二,同步工具类

  同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程间的控制流,比如阻塞队列,应为take和put等方法将阻塞,直到队列达到期望的状态(队列即非空,也非满)。

  除了阻塞队列,其他类型的同步容器还包括:闭锁,栅栏,信号量。

  1.闭锁

  我自己理解闭锁的意思,它可以让某些动作一起开始,比如,让多个线程一起执行,并等待最慢的线程执行完毕。代码如下:

import java.util.concurrent.CountDownLatch;
 
public class Harness {
    public long timeTasks(int nThreads,final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
         
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(){
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch(InterruptedException e){}
                }
            };
             
            t.start();
        }
         
        long startTime = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long endTime = System.nanoTime();
         
        return endTime - startTime;
         
    }
}

  例子中的CountDownLatch是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件发生。闭锁包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或等待中的线程中断,或等待超时。countDown和await方法是配合使用的。

  Harness创建一定数量的线程,利用它们并发的执行指定任务,它使用两个闭锁,分别表示起始门和结束门,起始门的初始值为1,而结束门的计数器初始值为工作线程的数量,每个线程首先要做的事就是在启动门上等待,确保所有线程同时执行。而每个线程要做的最后一件事是调用结束门的countDown方法减1,使主线程高效的等待直到所有线程都执行完成,然后统计耗时。

  2.信号量

  计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。

  例子:有固定大小的HashSet,如果容器满了,容器将处于阻塞状态。

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;
 
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;
     
    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }
     
    public boolean add(T o) throws InterruptedException{
        sem.acquire();//获得一个许可
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally{
            if (!wasAdded) {
                sem.release();//将许可返回给信号量
            }
        }
    }
     
    public boolean remove(T o){
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            sem.release();//将许可返回给信号量
        }
        return wasRemoved;
    }
     
}

  Semaphore中管理着一组虚拟的许可,许可的虚拟数量可通过构造函数指定,在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用后返还许可。如果没有许可,那么acquire方法将阻塞,直到有许可为止(或者直到被中断或者操作超时),release方法将返还一个许可给信号量。所以例子中的如果容器满了,再add元素的时候出现的阻塞状态,实际上是Semaphore的acquire处于阻塞状态,将线程挂起,直到信号量中有新的许可为止。

三,构建高效且可伸缩的结果缓存

  几乎所有的服务器应用程序都会使用某种形式的缓存,重用之前的计算结果能降低延迟,提高吞吐量,但却需要消耗更多的内存。我们从简单的HashMap开始,然后分析它的并发性缺陷,并讨论如何修复它们。

public interface Computable<A, V> {
    V compute(A arg) throws InterruptedException;
}
import java.math.BigInteger;
 
public class ExpensiveFunction implements Computable<String, BigInteger> {
    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        //这里模拟耗时的计算
        return new BigInteger(arg);
    }
}
import java.util.HashMap;
import java.util.Map;
 
public class Memoizer1<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new HashMap<A, V>();
    private Computable<A, V> c;
     
     
    public Memoizer1(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg,result);
        }
         
        return result;
         
    }
}

  Memoizer1给出了第一种尝试:使用HashMap保存之前的结果,compute方法将首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算的值,否则,计算结果并保存到缓存中,再返回。

  HashMap不是线程安全的,因此要确保两个线程不会同时访问HashMap,Memoizer1用了一种保守的方法,即对整个compute方法进行同步,这种方法能确保线程安全性,但会带来一个明显的可伸缩性的问题:每次只有一个线程能执行compute方法。如果另一个线程正在计算结果,那么其他调用compute的线程可能会被阻塞很长时间。如果有多个线程在排队等待还未计算出的结果,那么,compute方法的计算时间可能比没有使用缓存的计算时间更长。

  

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
public class Memoizer2<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
    private Computable<A, V> c;
     
     
    public Memoizer2(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public  V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg,result);
        }
         
        return result;
         
    }
}

  Memoizer2用ConcurrentHashMap替换HashMap,用于解决多线程访问缓存的安全性。为什么用ConcurrentHashMap而不是HashTable,因为ConcurrentHashMap的并发能力比HashTable强。

但是还有一个问题:如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行。那么很可能会重复这个计算。我们希望通过某种方式来表达“线程X正在进行计算f(27)”这种情况,这样当另一个线程查找f(27)时,它能够直到最高效的方法是等待线程X计算结束,然后去缓存中直接拿结果,怎么解决这个问题呢?有一个类能基本实现这个功能:FutureTask。

  FutureTask表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果有结果可用,那么FutureTask.get将立即返回结果,否则它会一直阻塞,直到结果计算出来再将其返回。

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
 
public class Memoizer3<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private Computable<A, V> c;
     
     
    public Memoizer3(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public  V compute(final A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws InterruptedException {
                    return c.compute(arg);
                }
            };
             
            FutureTask<V> ft = new FutureTask<V>(eval);
            f = ft;
            cache.put(arg, f);
            ft.run();//在这里调用c.compute
        }
         
        try {
            return f.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        return null;
    }
}

  Memoizer3解决了上面提到的问题。若结果已经计算出来,那么将立即返回,如果其他线程正在计算该结果,那么新的线程将一直等待这个结果被计算出来。它只有一个缺陷,即仍然可能存在两个线程同时计算一个相同结果的情况。但是概率已经小了很多了。之所以还会发生,是因为put以后没有进行再判断,每个线程只要判断f==null就立刻开始创建任务。

mport java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
 
public class Memoizer4<A, V> implements Computable<A, V> {
    private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private Computable<A, V> c;
     
     
    public Memoizer4(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public  V compute(final A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws InterruptedException {
                    return c.compute(arg);
                }
            };
             
            FutureTask<V> ft = new FutureTask<V>(eval);
             
            f = cache.putIfAbsent(arg, f);
            if (f== null) {
                f = ft;
                ft.run();//在这里调用c.compute
            }
        }
         
        try {
            return f.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        return null;
    }
}

  Memoizer4解决了这个问题。至此,支持并发的耗时计算结果的缓存写完了,当然Memoizer4还有其他的问题,比如缓存清理的问题,但是已经可以用了。

java并发基础

标签:为什么   ++   junit   ==   pen   总结   semaphore   其他   准备   

原文地址:http://www.cnblogs.com/peterxiao/p/6921903.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!