标签:
typedef ssize_t int; typedef size_t unsigned; ssize_t read(int fd, void *buf, size_t n);
这个函数从描述符为fd的当前文件位置复制至多n个字节到内存缓冲区buf。若执行成功则返回读取到的字节数;若失败则返回-1。read系统调用默认会阻塞,也就是说系统会一直等待这个函数执行完毕直到它产生一个返回值。然而我们知道,磁盘通常是一种慢速I/O设备,这意味着我们用read函数读取磁盘文件内容时,往往需要比较长的时间(相对于访问内存或者计算一些数值来说)。那么阻塞的时候我们当然不想让系统傻等着,我们想在这期间做点儿别的事情,等着磁盘准备好了通知我们一下,我们再来读取文件内容。实际上,操作系统正是这样做的。当阻塞在read这类系统调用中的时候,操作系统通常都会让该进程暂时休眠,调度一个别的进程来执行,以免干等着浪费时间,等到磁盘准备好了可以让我们来进行I/O了,它会发送一个中断信号通知操作系统,这时候操作系统重新调度原来的进程来继续执行read函数。这就是通过多进程实现的并发。
class MyRunnable implements Runnable { ... public void run() { //这里是新线程需要执行的任务 } } Runnable r = new MyRunnable(); Thread t = new Thread(r);
class MyThread extends Thread { public void run() { //这里是线程要执行的任务 } }
创建了一个线程对象后,我们直接对其调用start方法即可启动这个线程:
t.start();
void setPriority(int newPriority) //设置线程的优先级,可以使用系统提供的三个优先级常量 static void yield() //使当前线程处于让步状态,这样当存在其他优先级大于等于本线程的线程时,线程调度程序会调用那个线程
public class Counter { private long count = 0; public void add(long value) { this.count = this.count + value; } }
我们注意一下改变count值的那一行,通常这个操作不是一步完成的,它大概分为以下三步:
Lock myLock = new ReentrantLock(); public void add(long value) { myLock.lock(); try { this.count = this.count + value; } finally { myLock.unlock(); } }
public void add(int value) { if (this.count > 5) { this.count = this.count + value; } }
public void add(int value) { myLock.lock(); try { while (counter.getCount() <= 5) { //等待直到大于5 } this.count = this.count + value; } finally { myLock.unlock(); } }
Condition enoughCount = myLock.newCondition();
然后,线程A发现count值不够时,调用“enoughCount.await()”即可,这时它便会进入Waiting状态,放弃它持有的锁对象,以便其他线程能够进入临界区。当线程B进入临界区修改了count值后,发现了count值大于5,线程B可通过"enoughCount.signalAll()"来“唤醒所有等待这一条件满足的线程(这里只有线程A)。此时线程A会从Waiting状态进入Runnable状态。当线程A再次被调度时,它便会从await方法返回,重新获得锁并接着刚才继续执行。注意,此时线程A会再次测试条件是否满足,若满足则执行相应操作。也就是说signalAll方法仅仅是通知线程A一声count的值可能大于5了,应该再测试一下。还有一个signal方法,会随机唤醒一个正在等待某条件的线程,这个方法的风险在于若随机唤醒的线程测试条件后发现仍然不满足,它还是会再次进入Waiting状态,若以后不再有线程唤醒它,它便不能再运行了。
public synchronized void add(int value) { ... }
等价于:
public void add(int value) { this.<em>innerLock</em>.lock(); try { ... } finally { this.<em>innerLock</em>.unlock(); } }
这意味着,我们通过给add方法加上synchronized关键字即可保护它,加锁解锁的工作不需要我们再手动完成。对象的内部锁在同一时刻只能由一个线程持有,其他尝试获取的线程都会被阻塞直至该线程释放锁。
public synchronized void add(int value) { while (this.count <= 5) { wait(); } this.count += value; notifyAll(); }
这份代码显然比我们上面的实现要简洁得多,实际开发中也更加常用。
synchronized (obj) { //临界区 }
一个线程执行上面的代码块便可以获取obj对象的内部锁,直至它离开这个代码块才会释放锁。
public class Counter { private Object lock = new Object(); synchronized (lock) { //临界区 } ... }
那么这种使用这种锁有什么好处呢?我们知道Counter对象只有一个内部锁,这个内部锁在同一时刻只能被一个对象持有,那么设想Counter对象中定义了两个synchronized方法。在某一时刻,线程A进入了其中一个synchronized方法并获取了内部锁,此时线程B尝试进去另一个synchronized方法时由于对象内部锁还没有被线程A释放,因此线程B只能被阻塞。然而我们的两个synchronized方法是两个不同的临界区,它们不会相互影响,所以它们可以在同一时刻被不同的线程所执行。这时我们就可以使用如上面所示的显式的锁对象,它允许不同的方法同步在不同的锁上。
(7)死锁
假设现在进程中只有线程A和线程B这两个线程,考虑下面这样一种情形:
线程A获取了counterA对象的内部锁,线程B获取了counterB对象的内部锁。而线程A只有在获取counterB的内部锁后才能继续执行,线程B只有在获取线程A的内部锁后才能继续执行。这样一来,两个线程在互相等待对方释放锁从而谁也没法继续执行,这种现象就叫做死锁(deadlock)。
除了以上情况,还有一种类似的死锁情况是两个线程获取锁后都不满足条件从而进入条件的等待集中,相互等待对方唤醒自己。
Java没有为解决死锁提供内在机制,因此我们只有在开发时格外小心,以避免死锁的发生。
(8)中断线程
介绍中断线程之前,我们先来介绍一下join方法,它是一个实例方法。我们可以在主线程中对某个子线程调用join方法,这样主线程就会阻塞直到那个子线程执行完毕后主线程才跟着它后头继续运行(所以叫join)。那么,有时候子线程会运行很长时间,这时候它可能想要告诉主线程一声“我还得好久,你不用等我了”。这种情况下,我们可以通过中断线程来实现。在一个线程上调用interrupt方法即可中断它,实际上这个方法通过修改相应线程的中断状态来告诉它“你被中断了”。每个线程都有一个被称为中断状态的boolean类型的标识。对于不在阻塞状态的线程,对其调用这个方法仅仅会修改它的中断状态;而对于通过调用可中断的阻塞方法(sleep、join、wait、await等等)而被阻塞的线程,当其收到“中断信号“时,会先把中断状态设为true,而后抛出一个InterruptedException异常,同时把中断状态设回false。这样一来我们就可以捕获InterruptedException并进行相应处理了,比如以上我们举的主线程join子线程的例子中,我们就可以在主线程的InterruptedException处理器中接着做之前的工作。
若InterruptedException异常未被捕获,相应的线程会被终止,进入Terminated状态。以下代码即可获取当前线程的中断状态:
boolean isInterrupted =Thread.currentThread().isInterrupted();
(9)锁测试与超时
lockObject.lock方法默认是阻塞的,也就是说当锁未被释放时,申请获取锁的线程会一直阻塞,知道得到一个锁。tryLock方法也用来获取锁,不过当它发现锁被占用时会立刻返回false,若此时锁未被占用会获取锁并返回true。线程便可以去做其他工作。比如以下代码:
if (myLock.tryLock()) { try { … } finally { myLock.unlock(); } } else { //做其他的工作 }
tryLock方法还有一个计时等待版本,可以指定最长等待多长时间,如以下代码最多等待100毫秒,若还无法获取锁即返回:
if (myLock.tryLock(100,TimeUnit.MILLISECONDS)) …
除了MILLISECONDS,我们还可以指定不同的时间单位:SECONDS、MICROSECONDS、NANOSECONDS。
tryLock方法与lock方法还有一个显著的差异,那就是lock方法不能被中断,而tryLock方法可以被中断。
await方法也有计时等待的版本,比如以下代码:
enougnCountCondition.await(100, TimeUnit.MILLISECONDS);
//构造一个ReentrantReadWriteLock对象 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); //分别从中“提取”读锁和写锁 private Lock readLock = rwl.readLock(); private Lock writeLock = rwl.writeLock(); //对所有的Reader线程加读锁 readLock.lock(); try { //读操作可并发,但写操作会互斥 } finally { readLock.unlock(); } //对所有的Writer线程加写锁 writeLock.lock(); try { //排斥所有其他线程的读和写操作 } finally { writeLock.unlock(); }
public class BlockingQueueTest { private int size = 20; private ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(size); public static void main(String[] args) { BlockingQueueTest test = new BlockingQueueTest(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { while(true){ try { //从阻塞队列中取出一个元素 queue.take(); System.out.println("队列剩余" + queue.size() + "个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { while (true) { try { //向阻塞队列中插入一个元素 queue.put(1); System.out.println("队列剩余空间:" + (size - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
ExecutorService newCachedThreadPool() //返回一个带缓存的线程池,该池在必要的时候创建线程,在线程空闲60s后终止线程 ExecutorService newFixedThreadPool(int threads) //返回一个线程池,线程数目由threads参数指明 ExecutorService newSingleThreadExecutor() //返回只含一个线程的线程池,它在一个单一的线程中依次执行各个任务
Future<T> submit(Callable<T> task) Future<T> submit(Runnable task, T result) Future<?> submit(Runnable task)
ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUnit unit) ScheduledFuture<?> schedule(Runnable task, long time, TimeUnit unit) //以上两个方法预定在指定时间过后执行任务 SchedukedFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) //在指定的延迟(initialDelay)过后,周期性地执行给定任务 ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) //在指定延迟(initialDelay)过后周期性的执行任务,每两个任务间的间隔为delay指定的时间
T invokeAny(Collection<Callable<T>> tasks) T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
该方法可以指定一个超时参数。这个方法的不足再于我们无法知道它返回的结果是哪个任务执行的结果。如果集合中的任意Callable对象的执行结果都能满足我们的需求的话,使用invokeAny方法是很好的。
List<Future<T>> invokeAll(Collection<Callable<T>> tasks) List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
public interface Callable<V> { V call() throws Exception; }
类型参数V即为异步方法call的返回值类型。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
也就是说Future提供了三种功能:
Future接口的实现类是FutureTask:
public class FutureTask<V> implements RunnableFuture<V>
FutureTask类实现了RunnableFuture接口,这个接口的定义如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
FutureTask类有如下两个构造器:
public FutureTask(Callable<V> callable) public FutureTask(Runnable runnable, V result)
标签:
原文地址:http://www.cnblogs.com/absfree/p/5327678.html