标签:des style class blog java http
上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。
在介绍之前,先抛几个问题。
- Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
- Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
- 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
- LockSupport.park()和unpark(),与object.wait()和notify()的区别?
- LockSupport.park(Object blocker)传递的blocker对象做什么用?
- LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
- Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。
那如果不清楚的,带着这几个问题,一起来梳理下。
Thread的interrupt处理的几个方法:
- public void interrupt() : 执行线程interrupt事件
- public boolean isInterrupted() : 检查当前线程是否处于interrupt
- public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()
理解:
1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态
2. 一般调用Thread.interrupt()会有两种处理方式
- 遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
- 其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。
在interrupt javadoc中描述:
最佳实践
- Don‘t swallow interrupts (别吃掉Interrupt,一般是两种处理: 继续throw InterruptedException异常。 另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。
-
public class TaskRunner implements Runnable {
-
private BlockingQueue<Task> queue;
-
-
public TaskRunner(BlockingQueue<Task> queue) {
-
this.queue = queue;
-
}
-
-
public void run() {
-
try {
-
while (true) {
-
Task task = queue.take(10, TimeUnit.SECONDS);
-
task.execute();
-
}
-
}
-
catch (InterruptedException e) {
-
-
Thread.currentThread().interrupt();
-
}
-
}
-
}
- Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)
-
public class PrimeProducer extends Thread {
-
private final BlockingQueue<BigInteger> queue;
-
-
PrimeProducer(BlockingQueue<BigInteger> queue) {
-
this.queue = queue;
-
}
-
-
public void run() {
-
try {
-
BigInteger p = BigInteger.ONE;
-
while (!Thread.currentThread().isInterrupted())
-
queue.put(p = p.nextProbablePrime());
-
} catch (InterruptedException consumed) {
-
-
}
-
}
-
-
public void cancel() { interrupt(); }
-
}<span style="white-space: normal;"> </span>
注册Interrupt处理事件(非正常用法)
一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。
来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。
-
interface InterruptAble {
-
-
public void interrupt() throws InterruptedException;
-
}
-
-
abstract class InterruptSupport implements InterruptAble {
-
-
private volatile boolean interrupted = false;
-
private Interruptible interruptor = new Interruptible() {
-
-
public void interrupt() {
-
interrupted = true;
-
InterruptSupport.this.interrupt();
-
}
-
};
-
-
public final boolean execute() throws InterruptedException {
-
try {
-
blockedOn(interruptor);
-
if (Thread.currentThread().isInterrupted()) {
-
interruptor.interrupt();
-
}
-
-
bussiness();
-
} finally {
-
blockedOn(null);
-
}
-
-
return interrupted;
-
}
-
-
public abstract void bussiness() ;
-
-
public abstract void interrupt();
-
-
-
static void blockedOn(Interruptible intr) {
-
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
-
}
-
}
代码说明,几个取巧的点:
位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。
位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。
位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。
使用:
-
class InterruptRead extends InterruptSupport {
-
-
private FileInputStream in;
-
-
@Override
-
public void bussiness() {
-
File file = new File("/dev/urandom");
-
try {
-
in = new FileInputStream(file);
-
byte[] bytes = new byte[1024];
-
while (in.read(bytes, 0, 1024) > 0) {
-
-
-
-
-
}
-
} catch (Exception e) {
-
throw new RuntimeException(e);
-
}
-
}
-
-
public FileInputStream getIn() {
-
return in;
-
}
-
-
@Override
-
public void interrupt() {
-
try {
-
in.getChannel().close();
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
}
-
-
public static void main(String args[]) throws Exception {
-
final InterruptRead test = new InterruptRead();
-
Thread t = new Thread() {
-
-
@Override
-
public void run() {
-
long start = System.currentTimeMillis();
-
try {
-
System.out.println("InterruptRead start!");
-
test.execute();
-
} catch (InterruptedException e) {
-
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
-
e.printStackTrace();
-
}
-
}
-
};
-
t.start();
-
-
Thread.sleep(3000);
-
-
t.interrupt();
-
}
jdk源码介绍:
1. sun提供的钩子可以查看System的相关代码, line : 1125
-
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
-
public sun.reflect.ConstantPool getConstantPool(Class klass) {
-
return klass.getConstantPool();
-
}
-
public void setAnnotationType(Class klass, AnnotationType type) {
-
klass.setAnnotationType(type);
-
}
-
public AnnotationType getAnnotationType(Class klass) {
-
return klass.getAnnotationType();
-
}
-
public <E extends Enum<E>>
-
E[] getEnumConstantsShared(Class<E> klass) {
-
return klass.getEnumConstantsShared();
-
}
-
public void blockedOn(Thread t, Interruptible b) {
-
t.blockedOn(b);
-
}
-
});
2. Thread.interrupt()
-
public void interrupt() {
-
if (this != Thread.currentThread())
-
checkAccess();
-
-
synchronized (blockerLock) {
-
Interruptible b = blocker;
-
if (b != null) {
-
interrupt0();
-
b.interrupt();
-
return;
-
}
-
}
-
interrupt0();
-
}
更多
更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
最后来解答一下之前的几个问题:
问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。
-
if (Thread.interrupted())
-
throw new InterruptedException();
问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。
问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。
问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?
答:
1. 面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
2. 实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.
问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?
答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.
-
public static void park(Object blocker) {
-
Thread t = Thread.currentThread();
-
setBlocker(t, blocker);
-
unsafe.park(false, 0L);
-
setBlocker(t, null);
-
}
具体LockSupport的javadoc描述也比较清楚,可以看下:
问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:
相关测试代码
最后
发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.
本文仅当抛砖引玉,欢迎发言!
原文地址:http://agapple.iteye.com/blog/970055;
Java线程阻塞中断和LockSupport的常见问题,布布扣,bubuko.com
Java线程阻塞中断和LockSupport的常见问题
标签:des style class blog java http
原文地址:http://blog.csdn.net/aigoogle/article/details/30062307