码迷,mamicode.com
首页 > 编程语言 > 详细

Java多线程——执行器(Executor)

时间:2015-03-08 14:16:43      阅读:2733      评论:0      收藏:0      [点我收藏+]

标签:

Markdown编辑器上线啦,终于等到你了!使用这个编辑器写了两篇,感觉还是不错的!不过还是有一些问题,慢慢熟悉吧!

执行器

构建一个新的线程是有一定的代价的,因为涉及到和操作系统的交互。如果程序中创建了大量的生命周期很短的线程,应该使用线程池(thread pool)。

另一个使用线程池的理由是减少并发线程的数目。线程数量太多会大大降低性能甚至会使虚拟机崩溃。如果有一个会创建许多线程的算法,应该使用一个线程数“固定的”线程池以限制并发线程的总数。

Executor类构建线程池的静态方法

方法 描述
newCachedThreadPool 必要时创建新线程,空闲线程会被保留60秒
newFixedThreadPool 该池包含固定数量的线程;空闲线程会一直被保留
newSingleThreadExecutor 只有一个线程的“池”,该线程顺序执行每一个提交的任务
newScheduledThreadPool 用于预定执行而构建的固定线程池,替代java.util.Timer
newSingleThreadScheduledExecutor 用于预定执行而构建的单线程“池”

线程池

newCachedThreadPool方法构建了一个线程池,对于每个任务,如果有空闲的线程可用,立即让它执行任务,如果没有可用的空闲线程,则创建一个新线程。

newFixedThreadPool方法构建一个具有固定大小的线程池。如果提交的任务数多于空闲的线程数,那么把得不到的服务的任务放置到队列中。当其他任务完成以后再运行它们。

newSingleThreadExecutor是一个退化了的大小为1的线程池:由一个线程执行提交的任务,一个接着一个。这三个方法返回实现了ExecutorService接口的ThreadPoolExecutor类的对象。

使用连接池:

  1. 调用Executors类中静态的方法newCachedThreadPool或newFixedThreadPool。
  2. 调用submit提交Runnable或Callable对象。
  3. 如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象。
  4. 当不在提交任何任务时,调用shutdown。

MacthCounter类:

/**
 * @author xzzhao
 */
public class MacthCounter implements Callable<Integer> {

    private final File            directory;
    private final String          keyword;
    private final ExecutorService pool;
    private int                   count;

    public MacthCounter(File directory, String keyword, ExecutorService pool) {
        super();
        this.directory = directory;
        this.keyword = keyword;
        this.pool = pool;
    }

    @Override
    public Integer call() throws Exception {
        count = 0;
        File[] files = directory.listFiles();
        List<Future<Integer>> results = new ArrayList<>();
        for (File file : files) {
            if (file.isDirectory()) {
                MacthCounter counter = new MacthCounter(file, keyword, pool);
                FutureTask<Integer> task = new FutureTask<>(counter);
                results.add(task);
            } else {
                if (search(file)) {
                    count++;
                }
            }
        }
        for (Future<Integer> result : results) {
            count += result.get();
        }
        return count;
    }

    /**
     * 搜索方法
     * 
     * @param file
     * @return 是否找到
     */
    public boolean search(File file) {
        try {
            try (Scanner in = new Scanner(file)) {
                boolean found = false;
                while (!found && in.hasNextLine()) {
                    String line = in.nextLine();
                    if (line.contains(keyword)) {
                        found = true;
                    }
                }
                return found;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}

ThreadPoolTest类:

/**
 * @author xzzhao
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        Scanner in = new Scanner(System.in);
        System.out.println("请输入根目录 :");
        String directory = in.nextLine();
        System.out.println("请输入关键字 :");
        String keyword = in.nextLine();

        ExecutorService pool = Executors.newCachedThreadPool();

        MacthCounter counter = new MacthCounter(new File(directory), keyword, pool);
        Future<Integer> result = pool.submit(counter);

        try {
            System.out.println("匹配到的文档数:" + result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        pool.shutdown();
        int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();
        System.out.println("largestPoolSize=" + largestPoolSize);
    }
}

预定执行

ScheduledExecutorService接口具有为预定执行或重复执行任务而设计的方法。它是一种允许使用线程机制的Java.util.Timer的泛化。Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法将返回实现了ScheduledExecutorService接口的对象。

可以预定Runnable或Callable在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。

控制任务组

我们已经知道如何将一个执行器服务作为线程池使用,以提高执行任务的效率。有时候,我们要使用执行器来做更有实际意义的事,控制一组相关的任务。例如,可以在执行器中使用shutdownNow方法取消所有的任务。

invokeAll方法提交所有对象到一个Callable对象的集合中,并返回一个Future对象的列表,代表所有任务的结果。这个方法的缺点是如果第一个任务花去了很多时间,那么就可能不得不进行等待。将结果按可获得的顺序保存起来更有意义。可以用ExecutorComeletionService来进行排序。

具体可以查询API。用常规的方法获得一个执行器。然后,构建一个ExecutorComeletionService,提交任务给完成服务。该服务管理Future对象的阻塞队列,其中包含已经提交的任务的执行结果。
大概如下:

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService service = new ExecutorCompletionService<>(executor);
for (Callable<T> task : tasks) {
     service.submit(task);
}
for (int i = 0; i < task.size(); i++) {
     processFurther(service.take().get());
}

Fork-Join框架

有的应用程序使用了大量的线程,但其中大多数都是空闲的。举例来说,一个Web服务器可能会为每个连接分别使用一个线程。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集的任务,如图像或视频处理。Java SE 7 中新引入了fork-join框架,专门用来支持后一类的应用。

我们来讨论一个简单的例子。假设我们想统计一个数组中有多少个元素满足摸个特定的属性。可以将这个数组一分为二,分别对着两部分进行统计,再将结果相加。

Counter 类:

/**
 * @author xzzhao
 */
public class Counter extends RecursiveTask<Integer> {

    public static final int THRESHOLD = 1000;
    private final double[]  values;
    private final int       from;
    private final int       to;
    private final Filter    filter;

    public Counter(double[] values, int from, int to, Filter filter) {
        super();
        this.values = values;
        this.from = from;
        this.to = to;
        this.filter = filter;
    }

    @Override
    protected Integer compute() {
        if (to - from < THRESHOLD) {
            int count = 0;
            for (int i = from; i < to; i++) {
                if (filter.accept(values[i])) {
                    count++;
                }
            }
            return count;
        } else {
            int mid = ((from + to) / 2);
            Counter first = new Counter(values, from, mid, filter);
            Counter second = new Counter(values, mid, to, filter);
            invokeAll(first, second);
            return first.join() + second.join();
        }
    }
}

Counter 类:

/**
 * @author xzzhao
 */
public class ForkJoinTest {
    public static void main(String[] args) {
        final int SIZE = 10000000;
        double[] numbers = new double[SIZE];
        for (int i = 0; i < SIZE; i++) {
            numbers[i] = Math.random();
        }
        Counter counter = new Counter(numbers, 0, numbers.length, new Filter() {
            @Override
            public boolean accept(double x) {
                return x > 0.5;
            }
        });
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(counter);
        System.out.println(counter.join());
    }
}

采用框架可用的一种方式完成这种递归计算,需要提供一个扩展RecursiveTask的类,如果计算会生成一个结果的话,或者如果不生成任何结果,就可以提供一个扩展RecursiveAction的类。再覆盖compute方法来生成并调用子任务,然后合并结果。

invokeAll方法接收很多任务并阻塞,知道所有的这些任务都已经完成。join方法将生成结果。对每个子任务都应用了join,并返回总和。

Java多线程——执行器(Executor)

标签:

原文地址:http://blog.csdn.net/qq710262350/article/details/43676289

(1)
(2)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!