标签:
前面一个系列的文章都在围绕hash展开,今天准备先说下concurrent包,这个系列可能会以使用场景说明为主,concurrent包本身的代码分析可能比较少; 我在这方面的实践经验较为有限,有错误欢迎批评指正
不过前一个系列并未结束,还有一些文章没有放出来,欢迎关注核桃博客2. concurrent包里面的一些操作是基于硬件级别的CAS(compare and swap),就是在cpu级别提供了原子操作,简单的说就可以提供无阻塞、无锁定的算法; 而现代cpu大部分都是支持这样的算法的;
前面一篇说了concurrent包的基本结构,接下来首先看一下一个非常有用的类,CountDownLatch, 可以用来在一个线程中等待多个线程完成任务的类;
前面一篇说了concurrent包的基本结构,接下来首先看一下一个非常有用的类,CountDownLatch, 可以用来在一个线程中等待多个线程完成任务的类;Java代码
@Test public void demoCountDown() { int count = 10; final CountDownLatch l = new CountDownLatch(count); for(int i = 0; i < count; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread " + index + " has finished..."); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("now all threads have finished"); }运行的结果
前面10个线程的执行完成顺序会变化,但是最后一句始终会等待前面10个线程都完成之后才会执行
有了CountDownLatch,涉及到多线程同步的演示就比较容易了,接下来我们看下Atomic相关的类, 比如AtomicLong, AtomicInteger等这些;
有了CountDownLatch,涉及到多线程同步的演示就比较容易了,接下来我们看下Atomic相关的类, 比如AtomicLong, AtomicInteger等这些;Java代码
set()
get()
getAndSet()
getAndIncrement()
getAndDecrement()
getAndAdd()
Java代码
package com.hetaoblog.concurrent.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; /** * * by http://www.hetaoblog.com * @author hetaoblog * */ public class AtomicTest { @Test public void testAtomic() { final int loopcount = 10000; int threadcount = 10; final NonSafeSeq seq1 = new NonSafeSeq(); final SafeSeq seq2 = new SafeSeq(); final CountDownLatch l = new CountDownLatch(threadcount); for(int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < loopcount; ++j) { seq1.inc(); seq2.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("NonSafeSeq:" + seq1.get()); System.out.println("SafeSeq with atomic: " + seq2.get()); } } class NonSafeSeq{ private long count = 0; public void inc() { count++; } public long get() { return count; } } class SafeSeq{ private AtomicLong count = new AtomicLong(0); public void inc() { count.incrementAndGet(); } public long get() { return count.longValue(); } }其中NonSafeSeq是作为对比的类,直接放一个private long count不是线程安全的,而SafeSeq里面放了一个AtomicLong,是线程安全的;可以直接调用incrementAndGet来增加
注意,这个例子也说明,虽然long本身的单个设置是原子的,要么成功要么不成功,但是诸如count++这样的操作就不是线程安全的;因为这包括了读取和写入两步操作;
在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,可以加在不同的instance上或者class上,从使用的角度则分别可以加在非静态方法,静态方法,以及直接synchronized(MyObject)这样的用法;
在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,可以加在不同的instance上或者class上,从使用的角度则分别可以加在非静态方法,静态方法,以及直接synchronized(MyObject)这样的用法;Java代码
package com.hetaoblog.concurrent.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; public class ReentrantLockDemo { @Test public void demoLock() { final int loopcount = 10000; int threadcount = 10; final SafeSeqWithLock seq = new SafeSeqWithLock(); final CountDownLatch l = new CountDownLatch(threadcount); for(int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < loopcount; ++j) { seq.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("SafeSeqWithLock:" + seq.get()); } } class SafeSeqWithLock{ private long count = 0; private ReentrantLock lock = new ReentrantLock(); public void inc() { lock.lock(); try{ count++; } finally{ lock.unlock(); } } public long get() { return count; } }同样以前面的类似Sequence的类举例,通过对inc操作加锁,保证了线程安全;
SafeSeqWithLock:100000
concurrent包里面还提供了一个非常有用的锁,读写锁ReadWriteLock
concurrent包里面还提供了一个非常有用的锁,读写锁ReadWriteLockJava代码
@Test public void testRWLock_getw_onr() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final Lock rlock = lock.readLock(); final Lock wlock = lock.writeLock(); final CountDownLatch l = new CountDownLatch(2); // start r thread new Thread(new Runnable() { @Override public void run() { System.out.println(new Date() + "now to get rlock"); rlock.lock(); try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + "now to unlock rlock"); rlock.unlock(); l.countDown(); } }).start(); // start w thread new Thread(new Runnable() { @Override public void run() { System.out.println(new Date() + "now to get wlock"); wlock.lock(); System.out.println(new Date() + "now to unlock wlock"); wlock.unlock(); l.countDown(); } }).start(); try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + "finished"); }这代码在我机器上打印的结果是, 也就是试图获得写锁的线程只有当另外一个线程将读锁释放了以后才可以获得
Java代码
@Test public void testRWLock_downgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }可以正常打印出
Java代码
@Test public void testRWLock_upgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }只能打印出下面两句,后面就一直挂住了
now to get wlock
有网友建议我在介绍concurrent包之前先介绍下jdk1.5之前的多线程知识,这是个相当不错的想法, 这篇就先介绍下Thread类;
有网友建议我在介绍concurrent包之前先介绍下jdk1.5之前的多线程知识,这是个相当不错的想法, 这篇就先介绍下Thread类;Java代码
public class ThreadDemo { @Test public void testThread() { SimpleThread t = new SimpleThread(); t.start(); } } class SimpleThread extends Thread{ @Override public void run() { System.out.println( Thread.currentThread().getName() + " is running "); } }通常在run方法里面实现自己要做的功能,这里简单的打印了了一句话, 运行结果是
Java代码
public class ThreadDemo { public static void main(String[] args) { PeriodicalRunningThread t = new PeriodicalRunningThread(); t.start(); System.out.println("main thread is going to sleep..."); try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " now to stop PeriodicalRunningThread"); t.setRunning(false); } } class PeriodicalRunningThread extends Thread{ private volatile boolean running = true; @Override public void run() { while(running) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " is running " + new Date()); try { Thread.currentThread().sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(new Date() + " " + Thread.currentThread().getName() + " will end"); } public void setRunning(boolean running) { this.running = running; } }这段代码的打印结果是:
这样,在这个running标识为true的时候,该线程一直在跑,但是完成一段任务后会sleep一段时间,然后继续执行;
这篇还是Thread和Runnable的基础
这篇还是Thread和Runnable的基础Java代码
public static void testThreadWithRunnable() { final String word = "hello,world"; new Thread(new Runnable() { @Override public void run() { System.out.println(word); } }).start(); } public static void main(String[] args) { //periodicalThreadTest(); testThreadWithRunnable(); }上面的代码会打印
hello,world
下一节我会结合具体的代码来介绍下Condition的使用;
Java代码
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }代码意思不复杂,一个有界的buffer,里面是个数组,可以往里面放数据和取数据;
c. 调用notempty.signal(); 如果有线程在take()的时候await住了,那么就会被通知到,可以继续进行操作
前面一篇说了Condition和BoundedBuffer的基本代码,下面写一个简单的程序测试下这个BoundedBuffer;
前面一篇说了Condition和BoundedBuffer的基本代码,下面写一个简单的程序测试下这个BoundedBuffer;Java代码
package com.hetaoblog.concurrent.test; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; public class BoundedBufferTest { @Test public void testPutTake() { final BoundedBuffer bb = new BoundedBuffer(); int count = 10; final CountDownLatch c = new CountDownLatch(count * 2); System.out.println(new Date() + " now try to call put for " + count ); for(int i = 0; i < count ; ++i) { final int index = i; try { Thread t = new Thread(new Runnable() { @Override public void run() { try { bb.put(index); System.out.println(new Date() + " put finished: " + index); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } catch (Exception e) { e.printStackTrace(); } } try { System.out.println(new Date() + " main thread is going to sleep for 10 seconds"); Thread.sleep(10 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(new Date() + " now try to take for count: " + count); for(int i =0; i < count; ++i) { Thread t= new Thread(new Runnable() { @Override public void run() { try { Object o = bb.take(); System.out.println(new Date() + " take get: " + o); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } try { System.out.println(new Date() + ": main thread is to wait for all threads"); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " all threads finished"); } } class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { System.out.println(new Date() + " put is to wait...."); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { System.out.println(new Date() + " take is going to wait.."); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }下面是这段程序在我机器上的运行结果:
在几次不同的执行中,始终可以观察到任何时候,未完成的take()线程数>= 未完成的put()线程; 在未完成的线程数相等的情况下,即使jvm首先调度到了take()线程,也会进入notEmpty.await()释放锁,进入等待
前面一篇说了一个Condition和BoundedBuffer的测试代码例子,前面测试的是先put()再take()的操作,这篇说一下先take()再put()的操作;
前面一篇说了一个Condition和BoundedBuffer的测试代码例子,前面测试的是先put()再take()的操作,这篇说一下先take()再put()的操作;Java代码
@Test public void testTakePut() { final BoundedBuffer bb = new BoundedBuffer(); int count = 10; final CountDownLatch c = new CountDownLatch(count * 2); System.out.println(new Date() + " first try to call take for count: " + count); for(int i =0; i < count; ++i) { final int index = i; Thread t= new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().setName(" TAKE " + index); Object o = bb.take(); System.out.println(new Date() + " " + " take get: " + o ); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } try { System.out.println(new Date() + " main thread is going to sleep for 10 seconds"); Thread.sleep(10 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(new Date() + " now try to call put for " + count ); for(int i = 0; i < count ; ++i) { final int index = i; try { Thread t = new Thread(new Runnable() { @Override public void run() { Thread.currentThread().setName(" PUT " + index); try { bb.put(index); System.out.println(new Date() + " " + " put finished: " + index ); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } catch (Exception e) { e.printStackTrace(); } } try { System.out.println(new Date() + ": main thread is to wait for all threads"); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " all threads finished"); } class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " put is to wait....: " + System.currentTimeMillis()); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " take is going to wait.. " + System.currentTimeMillis()); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }运行结果1:
标签:
原文地址:http://my.oschina.net/u/1185331/blog/502350