标签:wait 消费者 remove setdaemon service sync off 版本 个数
/**
*
*
* @author Lean @date:2014-9-28
*/
public class StockExchange {
public static void main(String[] args) {
BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>();
Saller saller=new Saller(queue);
Buyer buyer=new Buyer(queue);
Thread[] sallerThreads=new Thread[20];
Thread[] buyerThreads=new Thread[20];
for (int i = 0; i <sallerThreads.length; i++) {
sallerThreads[i]=new Thread(saller);
sallerThreads[i].start();
buyerThreads[i]=new Thread(buyer);
buyerThreads[i].start();
}
try {
Thread.sleep(20);
} catch (InterruptedException e) {
}
System.out.println("all thread interrupt!");
for (Thread thread : sallerThreads) {
thread.interrupt();
}
for (Thread thread : buyerThreads) {
thread.interrupt();
}
}
static class Saller implements Runnable{
private BlockingQueue<Integer> mQueue;
private boolean shutDownRequest;
public Saller(BlockingQueue<Integer> queue){
mQueue=queue;
}
@Override
public void run() {
while (shutDownRequest==false) {
int quantity=(int)(Math.random()*100);
try {
mQueue.put(quantity);
// System.out.println("saller order by Thread:"+Thread.currentThread().getName()+" quantity:"+quantity);
} catch (InterruptedException e) {
shutDownRequest=true;
}
}
}
}
static class Buyer implements Runnable{
private BlockingQueue<Integer> mQueue;
private boolean shutDownRequest;
public Buyer(BlockingQueue<Integer> queue){
mQueue=queue;
}
@Override
public void run() {
while (shutDownRequest==false) {
try {
System.out.println("buyer order by Thread:"+Thread.currentThread().getName()+" quantity:"+mQueue.take());
} catch (InterruptedException e) {
shutDownRequest=true;
}
}
}
}
}
---------------
---------------
/**
*
* @author Lean @date:2014-9-28
*/
public class LuckyNumberGenerator {
public static void main(String[] args) {
TransferQueue<String> queue=new LinkedTransferQueue<String>();
Thread producerThread=new Thread(new Producer(queue));
producerThread.setDaemon(true);
producerThread.start();
for (int i = 0; i < 20; i++) {
Thread comsumerThread=new Thread(new Comsumer(queue));
comsumerThread.setDaemon(true);
comsumerThread.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getThreadGroup().activeCount());
}
static class Producer implements Runnable{
private TransferQueue<String> mQueue;
public Producer(TransferQueue<String> queue){
this.mQueue=queue;
}
public String product(){
return "your lucky number is: "+((int)(Math.random()*100));
}
@Override
public void run() {
while (true) {
try {
if (mQueue.hasWaitingConsumer()) {
mQueue.put(product());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Comsumer implements Runnable{
private TransferQueue<String> mQueue;
public Comsumer(TransferQueue<String> queue){
this.mQueue=queue;
}
@Override
public void run() {
try {
System.out.println(mQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
---------------
/**
*
*
* @author Lean
*/
public class Bank {
private static final int COUNT=100;
private static final Semaphore semaphore=new Semaphore(2,true);
public static void main(String[] args) {
for (int i = 0; i < COUNT; i++) {
final int count=i;
new Thread(){
@Override
public void run() {
try {
if (semaphore.tryAcquire(10, TimeUnit.MILLISECONDS)) {
try {
Teller.getService(count);
}finally{
semaphore.release();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
}
}
static class Teller{
public static void getService(int i){
System.out.println("serving:"+i);
try {
Thread.sleep((long)(Math.random()*10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 屏障(会合点)
* sample:计算平方和
* @author Lean @date:2014-9-29
*/
public class CalculateSum {
public static final int COUNT=3;
public static int[] tempArray=new int[COUNT];
public static void main(String[] args) {
CyclicBarrier barrier=new CyclicBarrier(COUNT,new Runnable() {
@Override
public void run() {
int sum=0;
for (int i = 0; i < COUNT; i++) {
sum=sum+tempArray[i];
}
System.out.println("the result is:"+sum);
}
});
for (int i = 0; i <COUNT; i++) {
new Thread(new Square(i,barrier)).start();
}
System.out.println("caculate now...");
}
static class Square implements Runnable{
private int initSize;
private CyclicBarrier barrier;
public Square(int initSize,CyclicBarrier barrier){
this.initSize=initSize;
this.barrier=barrier;
}
@Override
public void run() {
int result=initSize*initSize;
tempArray[initSize]=result;
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
调用countDown(),倒数-1,当倒数为0时,运行CountDownLatch对象await()后的代码.相比于CyclicBarrier,
CountDownLatch提供了手动控制屏蔽,比較灵活
/**
*
* @author Lean @date:2014-9-29
*/
public class EnhancedStockExchange {
public static void main(String[] args) {
BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>();
CountDownLatch startLatch=new CountDownLatch(1);
final CountDownLatch stopLatch=new CountDownLatch(200);
Producer producer=new Producer(startLatch, stopLatch, queue);
Saller saller=new Saller(startLatch, stopLatch, queue);
Thread[] sellerThreads=new Thread[100];
for (int i = 0; i < sellerThreads.length; i++) {
sellerThreads[i]=new Thread(saller);
sellerThreads[i].start();
}
Thread[] producerThreads=new Thread[100];
for (int i = 0; i < producerThreads.length; i++) {
producerThreads[i]=new Thread(producer);
producerThreads[i].start();
}
//倒数闭锁,当前倒数为1,运行例如以下函数,倒数0;
startLatch.countDown();
new Thread(new Runnable() {
@Override
public void run() {
try {
//运行await(),暂停直至倒数器为0
stopLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("all thread countdown!");
}
}).start();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Terminating...");
//运行interrupt(),运行while语句后的mStopLatch.countDown();倒数为1
for (Thread thread : sellerThreads) {
thread.interrupt();
}
for (Thread thread : producerThreads) {
thread.interrupt();
}
//倒数为0,运行run()方法内await()后的代码;
stopLatch.countDown();
}
static class Producer implements Runnable{
public CountDownLatch mStartLatch;
public CountDownLatch mStopLatch;
private BlockingQueue<Integer> mQueue;
private boolean shutDownRequest;
public Producer(CountDownLatch startLatch,CountDownLatch stopLatch,BlockingQueue<Integer> queue){
mStartLatch=startLatch;
mStopLatch=stopLatch;
mQueue=queue;
}
@Override
public void run() {
try {
mStartLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
while (shutDownRequest==false) {
try {
mQueue.put((int)(Math.random()*(100)));
} catch (InterruptedException e) {
shutDownRequest=true;
}
}
mStopLatch.countDown();
}
}
static class Saller implements Runnable{
public CountDownLatch mStartLatch;
public CountDownLatch mStopLatch;
private BlockingQueue<Integer> mQueue;
private boolean shutDownRequest;
public Saller(CountDownLatch startLatch,CountDownLatch stopLatch,BlockingQueue<Integer> queue){
mStartLatch=startLatch;
mStopLatch=stopLatch;
mQueue=queue;
}
@Override
public void run() {
try {
mStartLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
while (shutDownRequest==false) {
try {
System.out.println("saller comsume: "+mQueue.take());
} catch (InterruptedException e) {
shutDownRequest=true;
}
}
mStopLatch.countDown();
}
}
}
在执行的过程中,动态添加拦截数可调用manager.register();当调用manager.arriveAndDeregister()时,当前全部
等待线程继续运行;在线程运行中,可调用manager.arriveAndAwaitAdvance();等待其它线程;同一时候我们能够调用manager.getArrivedParties()查看等待线程数;
/**
*
* @author Lean @date:2014-9-29
*/
public class HorseRace {
private final int NUMBER_OF_HORSE=12;
private static final int INIT_PARTIES=1;
private static final Phaser manager=new Phaser(INIT_PARTIES);
public static void main(String[] args) {
//检查准备就绪的马匹数量
Thread raceMonitor=new Thread(new RaceMonitor());
raceMonitor.setDaemon(true);
raceMonitor.start();
new HorseRace().managerRace();
}
private void managerRace() {
ArrayList<Horse> horses=new ArrayList<HorseRace.Horse>();
for (int i = 0; i < NUMBER_OF_HORSE; i++) {
horses.add(new Horse());
}
runRace(horses);
}
private void runRace(Iterable<Horse> horses) {
for (final Horse horse : horses) {
manager.register();
new Thread(){
@Override
public void run() {
try {
Thread.sleep((new Random()).nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
manager.arriveAndAwaitAdvance();
horse.run();
};
}.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
manager.arriveAndDeregister();
}
private static class RaceMonitor implements Runnable{
@Override
public void run() {
while (true) {
// System.out.println("number of horses to run:"+HorseRace.manager.getArrivedParties());
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Horse implements Runnable{
private static final AtomicInteger idSource=new AtomicInteger();
private final int id=idSource.incrementAndGet();
@Override
public void run() {
System.out.println(toString()+" is running");
}
@Override
public String toString() {
return "Horse [id=" + id + "]";
}
}
}
/**
*
* @author Lean @date:2014-9-29
*/
public class ProductExchange {
public static Exchanger<ArrayList<Integer>> exchanger=new Exchanger<ArrayList<Integer>>();
public static void main(String[] args) {
Thread producerThread=new Thread(new Producer());
Thread comsumeThread=new Thread(new Comsume());
producerThread.start();
comsumeThread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producerThread.interrupt();
comsumeThread.interrupt();
}
private static class Producer implements Runnable{
private static ArrayList<Integer> buffers=new ArrayList<Integer>();
private boolean okToRun=true;
@Override
public void run() {
while (okToRun) {
try {
if (buffers.isEmpty()) {
for (int i = 0; i <10; i++) {
buffers.add((int)(Math.random()*100));
}
Thread.sleep(200);
for (int i : buffers) {
System.out.print(i+" ,");
}
System.out.println("");
buffers=ProductExchange.exchanger.exchange(buffers, 1000,TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
okToRun=false;
} catch (TimeoutException e) {
System.out.println("produce time out!");
}
}
}
}
private static class Comsume implements Runnable{
private static ArrayList<Integer> buffers=new ArrayList<Integer>();
private boolean okToRun=true;
@Override
public void run() {
while (okToRun) {
try {
if (buffers.isEmpty()) {
buffers=ProductExchange.exchanger.exchange(buffers);
for (int i : buffers) {
System.out.print(i+" ,");
}
System.out.println("");
Thread.sleep(200);
buffers.clear();
}
} catch (InterruptedException e) {
okToRun=false;
}
}
}
}
}
标签:wait 消费者 remove setdaemon service sync off 版本 个数
原文地址:http://www.cnblogs.com/liguangsunls/p/7264368.html