标签:
本文转自http://www.cnblogs.com/gaopeng527/p/4913706.html,感谢作者
当需要在并发程序中使用数据集合时,必须要谨慎地选择相应的实现方式。大多数集合类不能直接用于并发应用,因为它们没有对本身数据的并发访问进行控制。如果一些并发任务共享了一个不适用于并发任务的数据结构,将会遇到数据不一致的错误,并将影响程序的准确运行。这类数据结构的一个例子是ArrayList类。
Java提供了一些可以用于并发程序中的数据集合,它们不会引起任何问题。一般来说,Java提供了两类适用于并发应用的集合。
通过本节的学习,我们将学会如何在并发应用中使用一些Java集合。
并发列表允许不同的线程在同一时间添加或者移除列表中的元素,而不会造成数据的不一致。
在本节,将会学到如何在并发程序中使用非阻塞式列表。非阻塞式列表提供了一些操作。如果被执行的操作不能立即运行(例如,在列表为空时,从列表中取出一个元素),方法会抛出异常或者返回null。Java7引入了ConcurrentLinkedDeque类来实现非阻塞式并发列表。
将要实现的范例包含以下两个不同的任务:
1. 创建一个名为AddTask的类,实现Runnable接口。
import java.util.concurrent.ConcurrentLinkedDeque; public class AddTask implements Runnable { private ConcurrentLinkedDeque<String> list; public AddTask(ConcurrentLinkedDeque<String> list){ this.list = list; } @Override public void run() { String name = Thread.currentThread().getName(); for(int i=0;i<10000;i++){ list.add(name+": Element "+i); } } }
2. 创建名为PoolTask的类,并实现Runnable接口。
import java.util.concurrent.ConcurrentLinkedDeque; public class PollTask implements Runnable { private ConcurrentLinkedDeque<String> list; public PollTask(ConcurrentLinkedDeque<String> list){ this.list = list; } @Override public void run() { for(int i=0;i<5000;i++){ list.pollFirst(); list.pollLast(); } } }
3. 实现范例的主类Main,并添加main()方法。
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { ConcurrentLinkedDeque<String> list = new ConcurrentLinkedDeque<>(); //创建线程数组threads,它包含100个线程 Thread[] threads = new Thread[100]; for(int i=0;i<threads.length;i++){ AddTask task = new AddTask(list); threads[i] = new Thread(task); threads[i].start(); } System.out.printf("Main: %d AddTask threads have been launched\n", threads.length); //使用join()方法等待线程完成 try { for(int i=0;i<threads.length;i++){ threads[i].join(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("Main: Size of the List: %d\n", list.size()); //创建100个PollTask对象及其对应的线程 for(int i=0;i<threads.length;i++){ PollTask task = new PollTask(list); threads[i] = new Thread(task); threads[i].start(); } System.out.printf("Main: %d PollTask threads have been launched\n", threads.length); //使用join()方法等待线程完成 try { for(int i=0;i<threads.length;i++){ threads[i].join(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("Main: Size of the List: %d\n", list.size()); } }
4. 程序运行结果如下
Main: 100 AddTask threads have been launched Main: Size of the List: 1000000 Main: 100 PollTask threads have been launched Main: Size of the List: 0
使用size()方法输出列表中的元素数量。需要注意的是,这个方法返回的值可能不是真实的,尤其当有线程在添加数据或者移除数据时,这个方法需要遍历整个列表来计算元素数量,而遍历过的数据可能已经改变。仅当没有任何线程修改列表时,才能保证返回的结果是准确的。
并发列表允许不同的线程在同一时间添加或者移除列表中的元素,而不会造成数据的不一致。
在本节,你会学会如何在并发程序中使用阻塞式列表。阻塞式列表与非阻塞式列表的主要差别是:阻塞式列表在插入和删除操作时,如果列表已满或为空,操作不会立即执行,而是将调用这个操作的线程阻塞,直到操作可以执行成功。Java引入了LinkedBlockingDeque类来实现阻塞式列表。
将要实现的范例包括以下两个不同的任务:
1. 创建名为Client的类,并实现Runnable接口。
import java.util.Date; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; public class Client implements Runnable { private LinkedBlockingDeque<String> requestList; public Client(LinkedBlockingDeque<String> requestList){ this.requestList = requestList; } @Override public void run() { try { for(int i=0;i<3;i++){ for(int j=0;j<5;j++){ StringBuilder sb = new StringBuilder(); sb.append(i); sb.append(":"); sb.append(j); requestList.put(sb.toString()); System.out.printf("Client: %s at %s.\n", sb.toString(), new Date()); } TimeUnit.SECONDS.sleep(2); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Client: End\n"); } }
2. 创建范例的主类Main,并添加main()方法。
import java.util.Date; import java.util.concurrent.LinkedBlockingDeque; public class Main { public static void main(String[] args) { //指定固定容量 LinkedBlockingDeque<String> list = new LinkedBlockingDeque<String>(3); Client client = new Client(list); Thread thread = new Thread(client); thread.start(); try { for(int i=0;i<5;i++){ for(int j=0;j<3;j++){ String request = list.take(); System.out.printf("Main: Request: %s at %s. Size: %d\n", request, new Date(), list.size()); } Thread.sleep(300); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Main: End of the program."); } }
3. 程序运行结果如下
数据结构应用中的一个经典需求是实现一个有序列表。Java引入了PriorityBlockingQueue类来满足这类需求。
所有添加进PriorityBlockingQueue的元素必须实现Comparable接口。这个接口提供了compareTo()方法,它的传入参数是一个同类型的对象。这样就有了两个同类型的对象并且相互比较:其中一个是执行这个方法的对象,另一个是参数传入的对象。这个方法必须返回一个数字值,如果当前对象小于参数传入的对象,那么返回一个小于0的值;如果当前对象大于参数传入的对象,那么返回一个大于0的值;如果两个对象相等就返回0。
当插入元素时,PriorityBlockingQueue使用compareTo()方法来决定插入元素的位置。元素越大越靠后。
PriorityBlockingQueue的另一个重要特性是:它是阻塞式数据结构(BlockingDataStructure)。当它的方法被调用并且不能立即执行时,调用这个方法的线程将被阻塞知道方法执行成功。
在本节,我们将学习如何使用PriorityBlockingQueue类。在范例中我们将大量不同优先级的事件存放到同一个列表中,并且检查队列是否按预期排序。
1. 创建名为Event的类并实现Comparable接口,指定Comparable接口的泛型参数是Event类。
public class Event implements Comparable<Event>{ private int thread; //存放创建了Event的线程 private int priority; public Event(int thread, int priority){ this.thread = thread; this.priority = priority; } @Override public int compareTo(Event o) { if(this.priority>o.priority) return -1; if(this.priority<o.priority) return 1; return 0; } public int getThread() { return thread; } public int getPriority() { return priority; } }
2. 创建一个名为Task的类,实现Runnable接口。
import java.util.concurrent.PriorityBlockingQueue; public class Task implements Runnable { private int id; private PriorityBlockingQueue<Event> queue; public Task(int id, PriorityBlockingQueue<Event> queue){ this.id = id; this.queue = queue; } @Override public void run() { for(int i=0;i<1000;i++){ Event event = new Event(id, i); queue.add(event); } } }
3. 创建范例的主类Main,并实现main()方法。
import java.util.concurrent.PriorityBlockingQueue; public class Main { public static void main(String[] args) { PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<>(); Thread taskThreads[] = new Thread[5]; for(int i=0;i<taskThreads.length;i++){ Task task = new Task(i, queue); taskThreads[i] = new Thread(task); } //启动线程 for(int i=0;i<taskThreads.length;i++){ taskThreads[i].start(); } //等待线程执行结束 try { for(int i=0;i<taskThreads.length;i++){ taskThreads[i].join(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("Main: Queue Size: %d\n", queue.size()); for(int i=0;i<taskThreads.length*1000;i++){ Event event = queue.poll(); System.out.printf("Thread %s: Priority %d\n", event.getThread(), event.getPriority()); } System.out.printf("Main: Queue Size: %d\n", queue.size()); System.out.println("Main: End of the program."); } }
4. 程序运行结果如下
Java API提供了一种用于并发应用的有趣的数据结构,即DelayQueue类。这个类可以存放带有激活日期的元素。当调用方法从队列中返回或提取元素时,未来的元素日期将被忽略。这些元素对于这些方法是不可见的。
为了具有调用行为,存放到DelayQueue类中的元素必须继承Delayed接口。Dealyed接口使对象成为延迟对象,它使存放在DealayQueue类中的对象具有了激活的日期,即到激活日期的时间。该接口强制执行下列两个方法。
本例中,将会学习使用DelayQueue类来存放具有不同激活日期的event。
1. 创建名为Event的类并实现Delayed接口。
import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Event implements Delayed{ private Date startDate; public Event(Date startDate){ this.startDate = startDate; } @Override public int compareTo(Delayed o) { long result = this.getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS); if(result<0) return -1; if(result>0) return 1; return 0; } @Override public long getDelay(TimeUnit unit) { Date now = new Date(); long diff = startDate.getTime()-now.getTime(); return unit.convert(diff, TimeUnit.MILLISECONDS); } }
2. 创建名为Task的类,并实现Runnable接口。
import java.util.Date; import java.util.concurrent.DelayQueue; public class Task implements Runnable { private int id; private DelayQueue<Event> queue; public Task(int id, DelayQueue<Event> queue){ this.id = id; this.queue = queue; } @Override public void run() { Date now = new Date(); Date delay = new Date(); delay.setTime(now.getTime()+id*1000); System.out.printf("Thread %s: %s\n", id, delay); for(int i=0;i<100;i++){ Event event = new Event(delay); queue.add(event); } } }
3. 创建范例的主类Main,并添加main()方法。
import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { DelayQueue<Event> queue = new DelayQueue<>(); Thread threads[] = new Thread[5]; for(int i=0;i<threads.length;i++){ Task task = new Task(i+1, queue); threads[i] = new Thread(task); } for(int i=0;i<threads.length;i++){ threads[i].start(); } try { for(int i=0;i<threads.length;i++){ threads[i].join(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { while(queue.size()>0){ int counter = 0; Event event; do{ event = queue.poll(); if(event!=null) counter++; }while(event!=null); System.out.printf("At %s you have read %d events\n", new Date(),counter); Thread.sleep(500); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Main: End."); } }
4. 程序运行结果如下
DelayQueue类本身是使用纳秒工作的,但是对于使用者来说,是透明的。
注:使用size()方法必须小心,它返回的是列表中元素的总数,包括活动和非活动的元素。
Java API提供了一种用于并发应用的有趣的数据结构,即ConcurrentNavigableMap接口及其实现类。实现这个接口的类以如下两个部分存放元素:
每一个组成部分都必须在不同的类中实现。
Java API也提供了一个实现ConcurrentSkipListMap接口的类,ConcurrentSkipListMap接口实现了与ConcurrentNavigableMap接口有相同行为的一个非阻塞式列表。从内部实现机制来讲,它使用了一个Skip List来存放数据。Skip List是基于并发列表的数据结构,效率与二叉树相近。有了它,就有了一个数据结构,比如有序列表在添加、搜索或删除元素时耗费更少的访问时间。
备注:Skip List由William Pugh在1990年引入,详见http://www.cs.umd.edu/~pugh/。
当你插入元素到映射中时,ConcurrentSkipListMap接口类使用键值来排序所有元素。除了提供返回一个具体元素的方法之外,这个类也提供获取子映射的方法。
本节将要学习如何使用ConcurrentSkipListMap类实现对联系人对象的映射。
1. 创建名为Contact的类。
public class Contact { private String name; private String phone; public Contact(String name, String phone){ this.name = name; this.phone = phone; } public String getName() { return name; } public String getPhone() { return phone; } }
2. 创建名为Task的类,并实现Runnable接口。
import java.util.concurrent.ConcurrentSkipListMap; public class Task implements Runnable { private String id; private ConcurrentSkipListMap<String, Contact> map; public Task(String id, ConcurrentSkipListMap<String, Contact> map){ this.id = id; this.map = map; } @Override public void run() { for(int i=0;i<1000;i++){ Contact contact = new Contact(id, String.valueOf(i+1000)); map.put(id+contact.getPhone(), contact); } } }
3. 实现范例的主类Main,并添加main()方法。
import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; public class Main { public static void main(String[] args) { ConcurrentSkipListMap<String, Contact> map = new ConcurrentSkipListMap<>(); Thread threads[] = new Thread[26]; int counter = 0; for(char i=‘A‘;i<=‘Z‘;i++){ Task task = new Task(String.valueOf(i), map); threads[counter] = new Thread(task); threads[counter].start(); counter++; } try { for(int i=0;i<26;i++){ threads[i].join(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("Main: Size of the map: %d\n", map.size()); Map.Entry<String, Contact> element; Contact contact; //输出第一个实体 element = map.firstEntry(); contact = element.getValue(); System.out.printf("Main: First Entry: %s: %s\n", contact.getName(), contact.getPhone()); //输出最后一个实体 element = map.lastEntry(); contact = element.getValue(); System.out.printf("Main: Last Entry: %s: %s\n", contact.getName(), contact.getPhone()); //使用subMap()取得map的一个子映射,并输出到控制台。 System.out.printf("Main: Submap from A1996 to B1002: \n"); ConcurrentNavigableMap<String, Contact> summap = map.subMap("A1996", "B1002"); do{ element = summap.pollFirstEntry(); if(element!=null){ contact = element.getValue(); System.out.printf("%s: %s\n", contact.getName(), contact.getPhone()); } }while(element!=null); } }
4. 程序运行结果如下
Main: Size of the map: 26000 Main: First Entry: A: 1000 Main: Last Entry: Z: 1999 Main: Submap from A1996 to B1002: A: 1996 A: 1997 A: 1998 A: 1999 B: 1000 B: 1001
Java并发API提供了一个特殊的类用以在并发程序中生成伪随机数(Pseudo-Random Number),即Java 7新引入的ThreadLocalRandom类。它是线程本地变量。每个生成随机数的线程都有一个不同的生成器,但是都在同一类中被管理,对程序员来说是透明的。相比于使用共享的Random对象为所有线程生成随机数,这种机制具有更好的性能。
下面我们将学习如何使用ThreadLocalRandom类在并发应用中生成随机数。
1. 创建名为TaskLocalRandom的类并实现Runnable接口。
import java.util.concurrent.ThreadLocalRandom; public class TaskLocalRandom implements Runnable { //实现类构造器,使用current()方法为当前线程初始化随机数生成器 public TaskLocalRandom(){ ThreadLocalRandom.current(); } @Override public void run() { String name = Thread.currentThread().getName(); for(int i=0;i<10;i++){ System.out.printf("%s: %d\n", name, ThreadLocalRandom.current().nextInt(10)); } } }
2. 创建本范例的主类Main,并实现main()方法。
public class Main { public static void main(String[] args) { Thread threads[] = new Thread[3]; for(int i=0;i<3;i++){ TaskLocalRandom task = new TaskLocalRandom(); threads[i] = new Thread(task); threads[i].start(); } } }
3. 程序运行结果如下
原子变量(Atomic Variable)是从Java 5开始引入的,它提供了单个变量上的原子操作。在编译程序时,Java代码中的每个变量、每个操作都将被转换成机器可以理解的指令。例如,当给一个变量赋值时,在Java代码中只使用一个指令,但是编译这个程序时,指令被转换成JVM语言中的不同指令。当多个线程共享同一个变量时,就会发生数据不一致的错误。
为了避免这类错误,Java引入了原子变量。当一个线程在对原子变量操作时,如果其他线程也试图对同一原子变量执行操作,原子变量的实现类提供了一套机制来检查操作是否在一步内完成。一般来说,这个操作先获取变量值,然后在本地改变变量的值,然后试图用这个改变的值去替换之前的值。如果之前的值没有被其他线程改变,就可以执行这个替换操作。否则,方法将再执行这个操作。这种操作称之为CAS原子操作(Compare and Set)。
原子变量不使用锁或者其他同步机制来保护对其值的并发访问。所有操作都是基于CAS原子操作的。它保证了多线程在同一时间操作一个原子变量而不会产生数据不一致的错误,并且它的性能优于使用同步机制保护的普通变量。
本节将要学习如何使用原子变量实现一个银行账号和两个不同的任务:一个加钱到账号上,另一个从账号上取钱。在例子的实现中使用了AtomicLong类。
1. 创建名为Account的类来模拟银行账户。
import java.util.concurrent.atomic.AtomicLong; public class Account { //存放账户余额 private AtomicLong balance; public Account(){ balance = new AtomicLong(); } public long getBalance() { return balance.get(); } public void setBalance(long balance) { this.balance.set(balance); } //增加余额 public void addAccount(long amount){ this.balance.getAndAdd(amount); } //减少余额 public void substractAmount(long amount){ this.balance.getAndAdd(-amount); } }
2. 创建一个名为Company的类并实现Runnable接口。这个类模拟公司的付款。
public class Company implements Runnable { private Account account; public Company(Account account){ this.account = account; } @Override public void run() { for(int i=0;i<10;i++){ account.addAccount(1000); } } }
3. 创建名为Bank的类并实现Runnable接口。这个类模拟从账户中取钱。
public class Bank implements Runnable { private Account account; public Bank(Account account){ this.account = account; } @Override public void run() { for(int i=0;i<10;i++){ account.substractAmount(1000); } } }
4. 创建名为Main的主类,并实现main()方法。
public class Main { public static void main(String[] args) { Account account = new Account(); account.setBalance(1000); Company company = new Company(account); Thread companyThread = new Thread(company); Bank bank = new Bank(account); Thread bankThread = new Thread(bank); System.out.printf("Account : Initial Balance: %d\n", account.getBalance()); companyThread.start(); bankThread.start(); //等待所有线程执行结束 try { companyThread.join(); bankThread.join(); System.out.printf("Account : Final Balance: %d\n", account.getBalance()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
5. 程序运行结果如下
Account : Initial Balance: 1000 Account : Final Balance: 1000
Java还提供了其他的原子类,AtomicBoolean、AtomicInteger和AtomicReference是原子类的其他实现类。
当发现一个并发应用时,将不可避免地会有多线程共享一个或者多个对象的现象,为了避免数据不一致错误,需要使用同步机制(如锁或synchronized关键字)来保护对这些共享属性的访问。但是,这些同步机制存在下列问题。
针对这种情况,为了提供更优的性能,Java于是引入了比较和交换操作(Compare-and-Swap Operation)。这个操作使用以下三步修改变量的值。
采用比较和交换机制不需要使用同步机制,不仅可以避免死锁并且性能更好。
Java在原子变量中实现了这种机制。这些变量提供了实现比较和交换操作的comparaAndSet()方法,其他方法也基于它展开。
Java也引入了原子数组(Atomic Array)提供对integer或long数字数组的原子操作。本节将学习如何使用AtomicIntegerArray类的原子数组。
1. 创建名为Incrementer的类实现Runnable接口。
import java.util.concurrent.atomic.AtomicIntegerArray; public class Incrementer implements Runnable { private AtomicIntegerArray vector; public Incrementer(AtomicIntegerArray vector){ this.vector = vector; } @Override public void run() { for(int i=0;i<vector.length();i++){ vector.getAndIncrement(i); } } }
2. 创建名为Decrementer的类并实现Runnable接口。
import java.util.concurrent.atomic.AtomicIntegerArray; public class Decrementer implements Runnable { private AtomicIntegerArray vector; public Decrementer(AtomicIntegerArray vector){ this.vector = vector; } @Override public void run() { for(int i=0;i<vector.length();i++){ this.vector.getAndDecrement(i); } } }
3. 创建范例的主类Main,并实现main()方法。
import java.util.concurrent.atomic.AtomicIntegerArray; public class Main { public static void main(String[] args) { //创建有1000个元素的原子数组 final int THREADS = 1000; AtomicIntegerArray vector = new AtomicIntegerArray(THREADS); Incrementer incrementer = new Incrementer(vector); Decrementer decrementer = new Decrementer(vector); Thread threadIncrementer[] = new Thread[THREADS]; Thread threadDecrementer[] = new Thread[THREADS]; for(int i=0;i<THREADS;i++){ threadIncrementer[i] = new Thread(incrementer); threadDecrementer[i] = new Thread(decrementer); threadIncrementer[i].start(); threadDecrementer[i].start(); } try { for(int i=0;i<THREADS;i++){ threadIncrementer[i].join(); threadDecrementer[i].join(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } for(int i=0;i<vector.length();i++){ if(vector.get(i)!=0) System.out.printf("Vector[%d] : %d\n", i, vector.get(i)); } System.out.println("Main: End of the example"); } }
4. 程序运行结果如下
Main: End of the example
Java还提供了另一个原子数组类,即AtomicLongArray类,它的方法与AtomicIntegeArray类相同。
这些原子数组还提供了其他方法。
标签:
原文地址:http://www.cnblogs.com/panxuejun/p/5924411.html