标签:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ExecutorService threadpool= Executors.newFixedThreadPool(10); threadpool.execute(new Runnable(){...});
Future<?> future = threadpool.submit(new Runnable(){...}); try { Object res = future.get(); } catch (InterruptedException e) { // 处理中断异常 e.printStackTrace(); } catch (ExecutionException e) { // 处理无法执行任务异常 e.printStackTrace(); }finally{ // 关闭线程池 executor.shutdown(); }
public class BankCount { public synchronized void addMoney(int money){//存钱 System.out.println(Thread.currentThread().getName() + ">存入:" + money); } public synchronized void getMoney(int money){//取钱 System.out.println(Thread.currentThread().getName() + ">取钱:" + money); } }
public class BankTest { public static void main(String[] args) { final BankCount bankCount = new BankCount(); ExecutorService executor = Executors.newFixedThreadPool(10); executor.execute(new Runnable() {//存钱线程 @Override public void run() { int i = 5; while(i-- > 0){ bankCount.addMoney(200); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }); Future<?> future = executor.submit(new Runnable() {//取钱线程 @Override public void run() { int i = 5; while(i-- > 0){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } bankCount.getMoney(200); } } }); try { Object res = future.get(); System.out.println(res); } catch (InterruptedException e) { // 处理中断异常 e.printStackTrace(); } catch (ExecutionException e) { // 处理无法执行任务异常 e.printStackTrace(); }finally{ // 关闭线程池 executor.shutdown(); } } }
/** * 仓库 */ public class Storage { private static final int MAX_SIZE = 100;//仓库的最大容量 private List<Object> data = new ArrayList<Object>();//存储载体 /** * 生产操作 */ public synchronized void produce(int num){ if(data.size() + num > MAX_SIZE){//如果生产这些产品将超出仓库的最大容量,则生产操作阻塞 System.out.println("生产操作-->数量:" + num + ",超出仓库容量,生产阻塞!------库存:" + data.size()); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常生产产品 for(int i = 0; i < num; i++){//生产num个产品 data.add(new Object()); } System.out.println("生产操作-->数量:" + num + ",成功入库~------库存:" + data.size()); //生产完产品后,唤醒其他等待消费的线程 notify(); } /** * 消费操作 */ public synchronized void consume(int num){ if(data.size() - num < 0){//如果产品数量不足 System.out.println("消费操作-->数量:" + num + ",库存不足,消费阻塞!------库存:" + data.size()); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常消费 for(int i = 0; i < num; i++){//消费num个产品 data.remove(0); } System.out.println("消费操作-->数量:" + num + ",消费成功~------库存:" + data.size()); //消费完产品后,唤醒其他等待生产的线程 notify(); } }
public class Producer implements Runnable{ private Storage storage; private int num;//每次生产多少个 public Producer(Storage sto,int num){ storage = sto; this.num = num; } @Override public void run() { storage.produce(num); } }
public class Consumer implements Runnable{ private Storage storage; private int num;//每次消费多少个 public Consumer(Storage sto,int num){ storage = sto; this.num = num; } @Override public void run() { storage.consume(num); } }
public class StorageTest { public static void main(String[] args) { Storage storage = new Storage(); ExecutorService taskSubmit = Executors.newFixedThreadPool(10); //来使用使用上一节我们总结的线程池知识 //给定4个消费者 taskSubmit.submit(new Consumer(storage, 30)); taskSubmit.submit(new Consumer(storage, 10)); taskSubmit.submit(new Consumer(storage, 20)); //给定6个生产者 taskSubmit.submit(new Producer(storage, 70)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.submit(new Producer(storage, 20)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.shutdown(); } }
XmppManager xmppManager = notificationService.getXmppManager(); if(xmppManager != null){ if(!xmppManager.isAuthenticated()){ try { synchronized (xmppManager) {//等待客户端连接认证成功 Log.d(LOGTAG, "wait for authenticated..."); xmppManager.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } }
/** * 仓库 */ public class Storage { private static final int MAX_SIZE = 100;//仓库的最大容量 private List<Object> data = new ArrayList<Object>();//存储载体 private Lock lock = new ReentrantLock();//可重入锁 private Condition full = lock.newCondition();//仓库满的条件变量 private Condition empty = lock.newCondition();//仓库空时的条件变量 /** * 生产操作 */ public void produce(int num){ lock.lock(); //加锁 if(data.size() + num > MAX_SIZE){//如果生产这些产品将超出仓库的最大容量,则生产操作阻塞 System.out.println("生产操作-->数量:" + num + ",超出仓库容量,生产阻塞!------库存:" + data.size()); try { full.await(); //阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常生产产品 for(int i = 0; i < num; i++){//生产num个产品 data.add(new Object()); } System.out.println("生产操作-->数量:" + num + ",成功入库~------库存:" + data.size()); //生产完产品后,唤醒其他等待消费的线程 empty.signalAll(); lock.unlock(); //释放锁 } /** * 消费操作 */ public void consume(int num){ lock.lock(); //加锁 if(data.size() - num < 0){//如果产品数量不足 System.out.println("消费操作-->数量:" + num + ",库存不足,消费阻塞!------库存:" + data.size()); try { empty.await(); //阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常消费 for(int i = 0; i < num; i++){//消费num个产品 data.remove(0); } System.out.println("消费操作-->数量:" + num + ",消费成功~------库存:" + data.size()); //消费完产品后,唤醒其他等待生产的线程 full.signalAll(); lock.unlock(); //释放锁 } }
public class Storage { private static final int MAX_SIZE = 100;//仓库的最大容量 private BlockingQueue<Object> data = new LinkedBlockingQueue<Object>(MAX_SIZE); //使用阻塞队列作为存储载体 /** * 生产操作 */ public void produce(int num){ if(data.size() == MAX_SIZE){//如果仓库已达最大容量 System.out.println("生产操作-->仓库已达最大容量!"); } //到这里,表示可以正常生产产品 for(int i = 0; i < num; i++){//生产num个产品 try { data.put(new Object()); //put内部自动实现了判断,超过最大容量自动阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("生产操作-->数量:" + num + ",成功入库~------库存:" + data.size()); } /** * 消费操作 */ public void consume(int num){ if(data.size() == 0){//如果产品数量不足 System.out.println("消费操作--库存不足!"); } //到这里,表示可以正常消费 for(int i = 0; i < num; i++){//消费num个产品 try { data.take(); //take内部自动判断,消耗后库存是否充足,不足自我阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费操作-->数量:" + num + ",消费成功~------库存:" + data.size()); } }
标签:
原文地址:http://blog.csdn.net/shakespeare001/article/details/51330745