码迷,mamicode.com
首页 > 编程语言 > 详细

Java笔记:并发工具

时间:2018-03-08 19:38:39      阅读:251      评论:0      收藏:0      [点我收藏+]

标签:关闭   mit   view   for   nbsp   cte   dom   tor   cep   

一、基础知识

并发工具定义了一些核心特征,用于以其他方式实现同步和线程间通信。

  • 同步器:提供了同步多线程间交互的高级方法。
  • 执行器:管理线程的执行。
  • 并发集合:提供了由集合框架定义的相关类的并发替代版本。
  • Fork/Join框架:支持并行编程。

 

二、同步对象使用

Semaphore实现了经典的信号量,信号量通过计数器控制对共享资源的访问。如果计数器大于0则允许访问,如果计数器为0则拒绝访问。希望获得共享资源的线程尝试获得许可证,若允许访问则线程可得到许可证,若不允许访问则线程阻塞直至得到许可证为止。

技术分享图片
import java.util.concurrent.Semaphore;

class Solution {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(1);
        Integer i = 0;
        new IncThread(semaphore);
        new DecThread(semaphore);
    }
}

class Shared {
    static int integer = 0;
}

class IncThread implements Runnable {
    private Semaphore semaphore;

    IncThread(Semaphore semaphore) {
        this.semaphore = semaphore;
        new Thread(this).start();
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();//阻塞直至得到许可
            for (int i = 0; i < 5; i++) {
                Shared.integer++;
                System.out.println(Shared.integer);
                Thread.sleep(500);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc.getMessage());
        }
        semaphore.release();//释放许可
    }
}

class DecThread implements Runnable {
    private Semaphore semaphore;

    DecThread(Semaphore semaphore) {
        this.semaphore = semaphore;
        new Thread(this).start();
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            for (int i = 0; i < 5; i++) {
                Shared.integer--;
                System.out.println(Shared.integer);
                Thread.sleep(500);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc.getMessage());
        }
        semaphore.release();
    }
}
View Code

如果希望线程进行等待,直到发生一个或多个事件为止。CountDownLatch可用于处理这类情况,计数器必须从指定事件数量归零,锁存器才会被释放。

技术分享图片
import java.util.concurrent.CountDownLatch;

class Solution {
    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(5);

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println(i);
                cdl.countDown();
            }
        }).start();

        try {
            cdl.await();//暂停主线程直至计数器递减5次
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }

        System.out.println("Hello");
    }
}
View Code

如果由多个线程组成的线程组必须在某处进行等待,直到线程组中所有线程都到达执行点。CyclicBarrier可用于处理这类情况,同步对象会被挂起直至指定数量的线程都到达预定位置为止。

技术分享图片
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class Solution {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(10, () -> System.out.println("Hello"));
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    int t = (int) (Math.random() * 5000);
                    Thread.sleep(t);
                    System.out.println(t);
                    barrier.await();//指定数量的线程调用await后释放被挂起的线程
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println(e.getMessage());
                }
            }).start();
        }
    }
}
View Code

Exchanger用于简化两个线程之间的数据交换。简单的进行等待,直到两个独立的线程均调用exchange方法为止,之后交换线程所提供的数据。

技术分享图片
import java.util.Arrays;
import java.util.concurrent.Exchanger;

class Solution {
    public static void main(String[] args) {
        Exchanger<int[]> arrayExchanger = new Exchanger<>();
        Runnable consumer = () -> {
            try {
                for (int i = 0; i < 3; i++) {
                    int[] arr = arrayExchanger.exchange(new int[5]);
                    System.out.println(Arrays.toString(arr));
                }
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        };

        Runnable producer = () -> {
            int[] arr = new int[5];
            try {
                for (int i = 0; i < 15; i++) {
                    arr[i % 5] = i;
                    if ((i + 1) % 5 == 0)
                        arr = arrayExchanger.exchange(arr);
                }
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        };

        new Thread(consumer).start();
        new Thread(producer).start();
    }
}
View Code

Phaser允许多个线程进行同步。Phaser对象会等待所有party(等价于线程)完成当前阶段后才会进入下阶段,通过调用arrive方法或其变体通知当前阶段完成。

技术分享图片
import java.util.concurrent.Phaser;

class MyPhaser extends Phaser {
    private int total;

    MyPhaser(int parties, int total) {
        super(parties);
        this.total = total - 1;
    }

    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("Phase " + phase + " completed");
        return phase == total || registeredParties == 0;//返回true则结束Phaser
    }
}

class Solution {
    public static void main(String[] args) {
        Phaser phaser = new MyPhaser(0, 3);
        Runnable runnable = () -> {
            phaser.register();
            while (!phaser.isTerminated()) {
                System.out.println("Phase " + phaser.getPhase() + " started");
                phaser.arriveAndAwaitAdvance();
            }
        };

        for (int i = 0; i < 3; i++)
            new Thread(runnable).start();
    }
}
View Code

 

三、执行器

执行器用于启动并控制线程的执行,因此执行器为通过Thread管理线程提供了一种代替方案。执行器的核心是Executor接口,ExecutorService接口扩展了Executor接口。

线程池提供了用于执行各种任务的一组线程,每个任务不是使用独立的线程而是线程池中的线程。线程池减轻了创建许多独立线程所带来的负担,通过少量线程执行大量任务。

技术分享图片
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Solution {
    public static void main(String[] args) {
        CountDownLatch[] cdls = new CountDownLatch[5];
        for (int i = 0; i < cdls.length; i++)
            cdls[i] = new CountDownLatch(5);
        ExecutorService pool = Executors.newFixedThreadPool(3);//创建线程池

        for (CountDownLatch cdl : cdls)
            pool.execute(() -> {
                for (int j = 0; j < 5; j++) {
                    System.out.println(j);
                    cdl.countDown();
                }
            });

        try {
            for (CountDownLatch cdl : cdls)
                cdl.await();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
        pool.shutdown();//关闭线程池
    }
}
View Code

泛型接口Callable表示返回值的线程。应用程序可以使用Callable对象计算结果后,将结果返回给调用线程。call方法定义希望执行的任务,在任务完成后返回结果。Callable任务通过调用ExecutorService对象的submit方法执行。

泛型接口Future表示将由Callable对象返回的值。

技术分享图片
import java.util.concurrent.*;

class Add implements Callable<Integer> {
    private int[] arr;

    Add(int... arr) {
        this.arr = arr;
    }

    @Override
    public Integer call() {
        int sum = 0;
        for (int i : arr)
            sum += i;
        return sum;
    }
}

class Solution {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        Future<Integer> future = pool.submit(new Add(1, 2, 3));
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println(e.getMessage());
        }
        pool.shutdown();
    }
}
View Code


四、时间

TimeUnit枚举用于指定时间单位。

技术分享图片
import java.util.concurrent.TimeUnit;

class Solution {
    public static void main(String[] args) {
        try {
            TimeUnit.SECONDS.sleep(3);//暂停3秒
            System.out.println(TimeUnit.SECONDS.toMillis(1));//单位转换
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}
View Code

 

五、锁

锁为使用synchronized控制共享资源的访问提供了替代技术。在访问共享资源自谦,申请用于保护资源的锁,当资源访问完成后释放锁。当某线程正在使用锁时,其他尝试申请锁的线程会被挂起。

所有锁都实现了Lock接口,申请锁时调用lock方法,如果不可得就会进行等待。如果不希望进行等待就使用tryLock方法。使用newCondition方法获取与锁关联的Condition对象之后可通过await或signal等方法控制锁,类似于wait和notify方法。

技术分享图片
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

class Shared {
    static int data = 0;
}

class Solution {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Runnable runnable = () -> {
            lock.lock();//申请锁
            Shared.data++;
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException exc) {
                System.out.println(exc.getMessage());
            } finally {
                System.out.println(Shared.data);
                lock.unlock();//释放锁
            }
        };

        for (int i = 0; i < 10; i++)
            new Thread(runnable).start();
    }
}
View Code

 

六、原子操作

当读写某些类型的变量时,原子操作提供了一种不可中断的方案。这意味着不再需要锁以及其他同步机制。

技术分享图片
import java.util.concurrent.atomic.AtomicInteger;

class Shared {
    static AtomicInteger integer = new AtomicInteger(0);
}

class Solution {
    public static void main(String[] args) {
        Runnable runnable = () -> {
            for (int i = 0; i < 3; i++)
                System.out.println(Shared.integer.getAndAdd(1));
        };

        for (int i = 0; i < 5; i++)
            new Thread(runnable).start();
    }
}
View Code


七、并行编程

Fork/Join框架通过简化多线程的创建使用和自动使用多处理器,增强了多线程编程。

ForkJoinTask用于定义能够被ForkJoinPool管理的任务,泛型参数指定了任务结果的类型。fork方法为调用任务的异步执行提交任务,join方法等待调用该方法的任务中止,invoke和invokeAll方法则将fork和join合并到单个调用中。

RecursiveAction和RecursiveTask均为ForkJoinTask的子类。前者用于封装不返回结果的任务,后者用于封装返回结果的任务。

ForkJoinPool用于管理ForkJoinTask。若调用需等待的任务则使用线程池的invoke方法(同步),若不需要等待任务完成则调用execute方法(异步)。

技术分享图片
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

class Sqrt extends RecursiveAction {//分治策略
    double[] arr;
    int front, rear;

    Sqrt(double[] arr, int front, int rear) {
        this.arr = arr;
        this.front = front;
        this.rear = rear;
    }

    @Override
    protected void compute() {
        if (rear - front < 100) {
            for (int i = front; i <= rear; i++)
                arr[i] = Math.sqrt(arr[i]);
        } else {
            int middle = (front + rear) / 2;
            invokeAll(new Sqrt(arr, front, middle), new Sqrt(arr, middle + 1, rear));
        }
    }
}

class Solution {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        double[] arr = new double[10000];
        for (int i = 0; i < arr.length; i++)
            arr[i] = (double) i;
        System.out.println(Arrays.toString(arr));

        Sqrt sqrt = new Sqrt(arr, 0, arr.length - 1);
        pool.invoke(sqrt);
        System.out.println(Arrays.toString(arr));
    }
}
View Code

若没有显示声明ForkJoinPool,则会自动使用公共池,使用commonPool可获取公共池的引用。

技术分享图片
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class Sum extends RecursiveTask<Integer> {
    int[] arr;
    int front, rear;

    Sum(int[] arr, int front, int rear) {
        this.arr = arr;
        this.front = front;
        this.rear = rear;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (rear - front < 100) {
            for (int i = front; i <= rear; i++)
                sum += arr[i];
        } else {
            int middle = (front + rear) / 2;
            Sum taskA = new Sum(arr, front, middle);
            Sum taskB = new Sum(arr, middle + 1, rear);
            taskA.fork();
            taskB.fork();
            sum = taskA.join() + taskB.join();
        }
        return sum;
    }
}

class Solution {
    public static void main(String[] args) {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        int[] arr = new int[10000];
        for (int i = 0; i < arr.length; i++)
            arr[i] = i;
        System.out.println(pool.invoke(new Sum(arr, 0, arr.length - 1)));
    }
}
View Code

 

Java笔记:并发工具

标签:关闭   mit   view   for   nbsp   cte   dom   tor   cep   

原文地址:https://www.cnblogs.com/arseneyao/p/8521675.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!