Android开发者们应该都知道AsyncTask这个类,它是系统提供的一个异步任务类,可以方便的让我们实现异步操作。在本篇文章中,我将带大家进入源码,简单分析一下AsyncTask的实现。
首先,贴上AsyncTask类的源码:
package android.os;
import java.util.ArrayDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask";
// 获取可用CPU的数目
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
// 线程工厂类,供线程池创建新线程时使用
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
// 创建一个线程池,用来进行我们的任务
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static final int MESSAGE_POST_RESULT = 0x1;
private static final int MESSAGE_POST_PROGRESS = 0x2;
private static final InternalHandler sHandler = new InternalHandler();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private final WorkerRunnable<Params, Result> mWorker; // 可以理解为:要执行的任务
private final FutureTask<Result> mFuture;
private volatile Status mStatus = Status.PENDING;
private final AtomicBoolean mCancelled = new AtomicBoolean(); // 原子布尔值,指示任务是否已经取消
private final AtomicBoolean mTaskInvoked = new AtomicBoolean(); // 指示任务是否被调用执行
// 实现自己的任务执行器,其实作用就是安排任务到线程中执行
// 在下面的具体实现中,我们可以知道,任务是按顺序进行执行
// 的;也就是当一个任务执行完成,才会部署开始下一个任务,
// 即:这个AsyncTask是串行处理任务的。
//
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive; // 当前正在执行的任务
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext(); // 上一个任务完成后,部署进行下一个任务
}
}
});
if (mActive == null) { // 当前没有正在执行的任务,那就启动下一个任务
scheduleNext();
}
}
protected synchronized void scheduleNext() {
// 从任务队列中抽取出队头任务
if ((mActive = mTasks.poll()) != null) {
// 在这里可以看到,任务实际上还是安排到线程池中进行
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
// 定义任务的三种状态,只有处于预备状态的任务才能够调用execute执行,
// 否则将会报错
public enum Status {
PENDING, // 预备状态
RUNNING, // 正在执行状态
FINISHED, // 完成
}
// 进行消息循环
public static void init() {
sHandler.getLooper();
}
// 默认执行器
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
// 在这里调用了doInBackground,也即是在后台线程中执行我们的任务。
// 执行完doInBackground,调用postResult把执行结果发送出去
return postResult(doInBackground(mParams));
}
};
mFuture = new FutureTask<Result>(mWorker) {
// 当任务执行完成的时候调用,我会在后面讲到这个方法在何处调用。
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occured while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
// 通过Handler把我们完成的结果发送到主线程的消息队列中,然后由指定的
// sHandler进行处理。注意:这个方法是在子线程中进行操作的
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
// 任务执行完成,把执行完的结果封装成AsyncTaskResult
Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
public final Status getStatus() {
return mStatus;
}
/**
* 这个方法由子类实现,我们要把程序中想要异步进行的任务放到这个方法中执行
* 注意:这个方法是在子线程中执行的,所以我们不能在这里进行UI相关的操作。
*/
protected abstract Result doInBackground(Params... params);
// 这个方法在doInBackground之前执行;主要是进行异步任务前的准备操作
protected void onPreExecute() {
}
// 这个方法在任务执行完成后调用,参数result就是任务执行完成后的结果
@SuppressWarnings({"UnusedDeclaration"})
protected void onPostExecute(Result result) {
}
// 这个方法能提供任务的执行进度
@SuppressWarnings({"UnusedDeclaration"})
protected void onProgressUpdate(Progress... values) {
}
// 这个方法在任务取消时调用的
@SuppressWarnings({"UnusedParameters"})
protected void onCancelled(Result result) {
onCancelled();
}
// 如果我们想要在任务取消时进行一些操作,可以重写这个方法中进行处理
protected void onCancelled() {
}
// 判断当前任务是否被取消了。返回的值就是前面的原子布尔值中的值
public final boolean isCancelled() {
return mCancelled.get();
}
// 取消正在执行的任务,主要是通过Future类中的cancel方法来处理的,
// 后面我会对这个方法进行分析
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
public final Result get() throws InterruptedException, ExecutionException {
return mFuture.get();
}
public final Result get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return mFuture.get(timeout, unit);
}
// 调用这个方法,则开始了我们的异步任务。该方法需要传入异步任务需要的参数
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
// 前面提到了,AsyncTask是串行的,不过,实际上也是可以改成并行的。下面这个方法可以指定
// 我们自己的执行器来执行操作,因此,只需要传入并行的执行器,则可以使AsyncTask并行起来
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
// 前面已经提到了,只有在预备状态(PENDING)下,才能够调用execute来执行任务
// 否则将会出现异常,这里就是它抛出异常的源头。
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING; // 任务状态改为执行
onPreExecute(); // 主要是让调用者在任务执行前做一些准备操作。
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
// 使用默认的执行器来执行任务
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
// 调用这个方法,可以通知调用者任务执行的进度,注意:这个方法在子线程中进行
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
// 把任务进度封装成AsyncTaskRusult类
sHandler.obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
// 任务完成时调用;任务完成有两种情况:
// 1. 任务被取消,这时将会回调onCanceled()方法,因此,如果我们在任务取消时,
// 想要执行一些处理操作,那么就要重写onCanceled()方法了。
// 2. 任务顺利执行完成。这时会调用onPostExecute方法,同时把执行结果传递给这个
// 方法处理。
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
// 定义一个Handler,处理任务相关的操作
private static class InternalHandler extends Handler {
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult result = (AsyncTaskResult) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// 任务执行完成,
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
// 任务执行进度
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
// 这个类封装了任务所需要的参数。它继承了Callable接口,后面我会讲继承这个接口的作用
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
}
我在代码上面做了一些注释。基本上我们需要了解的也就是这些注释到的地方。下面就来开始分析了:
首先,先看看SerialExecutor。
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive; // 当前正在执行的任务
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext(); // 上一个任务完成后,部署进行下一个任务
}
}
});
if (mActive == null) { // 当前没有正在执行的任务,那就启动下一个任务
scheduleNext();
}
}
protected synchronized void scheduleNext() {
// 从任务队列中抽取出队头任务
if ((mActive = mTasks.poll()) != null) {
// 在这里可以看到,任务实际上还是安排到线程池中进行
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}execute()方法的最后,会判断当前是否有执行的任务,即mActive是否为空,如果不为空,那就调用scheduleNext()来启动下一个任务。现在重点分析scheduleNext()方法。在该方法中,可以看出,任务最终还是被安排到已经创建好的线程池中执行。这里就是实现AsyncTask串行的地方,可以注意一下。
再来分析一下AsyncTask的构造方法:
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
// 在这里调用了doInBackground,也即是在后台线程中执行我们的任务。
// 执行完doInBackground,调用postResult把执行结果发送出去
return postResult(doInBackground(mParams));
}
};
mFuture = new FutureTask<Result>(mWorker) {
// 当任务执行完成的时候调用,我会在后面讲到这个方法在何处调用。
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occured while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
可以知道,构造方法中生成了两个对象,mWorker和mFuture。先看看WorkerRunnable<Params, Result>是如何定义的:
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
它封装了任务所需要的参数,然后就是继承了Callable接口(Callable中只定义了call()方法)。因此,在mWorker对象的初始化的时候,我们必须要实现call()方法;好,现在回到mWorker中的call方法,它调用了doInBackground(),原来,我们在使用AsyncTask的时候,在doINBackground方法中,实现的操作就是在这里被调用的!!!此时,它已经是在子线程中进行的了。为什么呢?我后面再讲!再看看mFuture对象,它需要一个传入一个Callable类型的参数,而mWorker恰好实现了Callable接口!那我们不妨看一下Future类吧。
public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable;
......
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
......
在Future的构造方法中,可以知道,它把我们传进来的Callable对象保存在变量callable中了;我们先简单记住这个变量!后面它还会为我们解答一些问题。回到mFuture的实现上,它重写了done()方法,这里我们只需要知道它在任务完成的时候回调用就行了!这样,我们就把AsyncTask的构造方法简单的分析好了。
接下来,分析AsyncTask的execute(Params... params)方法;
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING; // 任务状态改为执行
onPreExecute(); // 主要是让调用者在任务执行前做一些准备操作。
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}execute(Params... params)方法实际上是调用了executeOnExecutor(Executor exec, Params... params)方法,那我们具体看看这个它里面做了些什么。它调用了onPostExecute(),可以让调用者做一些执行任务前的准备操作、把传入的params参数保存到mWorker中,然后再调用exec的execute(mFuture)方法。其中,exec就是我们传进来的默认执行器sDefaultExecutor!!!sDefaultExecutor的execute()方法我们在前面已经分析了!它是使用线程池来执行我们的任务的。由于TheadPoolExecutor的实现比较复杂,所以这里暂时不做分析。我们只需知道它接下来会调用到mFuture的run()方法即可,那我们先看看这个run()方法中,做了些什么。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
我们只看try里面的代码。可以看出,如果mFuture中的callable对象不为null,那么就会回调它的call方法了。在前面,我们已经知道了mFuture把我们传进来的mWorker对象保存在了Callable变量里面了,因此,这里调用的call()方法就是mWorker的。而这个时候,call()方法是在子线程中调用的,由此就可以知道,doInBackground()也就是在子线程进行的!如果任务正常完成,那么变量ran将会是true,那么它会调用set(result),看看set()方法的源码:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}它把任务完成的结果保存在outcome变量中,接着调用了finishCompletion()方法,了解下这个方法的实现:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion中,它会调用done()方法,让调用者对完成的任务结果进行处理。
我们接下来分析一下,AsyncTask是如何取消任务的!
找到AsyncTask的cancel方法:
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
它调用 了mFuture的cancel方法。下面是改方法的实现:
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}从代码中可以知道,取消操作实际上是让运行当前要取消的任务的线程中断了,同样,该方法也会调用finishCompletion()方法,然后在finishCompletion方法中调用done()方法,让调用者能够对取消任务后进行相关处理。
至此,AsyncTask执行异步任务的基本流程就简单的分析完了。在文章前面的AsyncTask源码里面,我已经做了比较详细的注释,读者可以结合注释,然后对AsyncTask整个流程执行分析,相信读者能够掌握这些步骤!
欢迎大家交流和转载,转载请标明:http://blog.csdn.net/wuzhipeng1991
原文地址:http://blog.csdn.net/wuzhipeng1991/article/details/40504135