Android多线程分析之四:MessageQueue的实现
在前面两篇文章《Android多线程分析之二:Thread的实现》,《Android多线程分析之三:Handler,Looper的实现》中分别介绍了 Thread 的创建,运行,销毁的过程以及 Thread与 Handler,Looper 之间的关联:Thread 在其 run() 方法中创建和运行消息处理循环 Looper,而 Looper::loop() 方法不断地从 MessageQueue 中获取消息,并由 Handler 分发处理该消息。接下来就来介绍 MessageQueue 的运作机制,MessageQueue。
参考源码:
android/framework/base/core/java/android/os/MessageQueue.java android/framework/base/core/java/android/os/Message.java android/frameworks/base/core/jni/android_os_MessageQueue.h android/frameworks/base/core/jni/android_os_MessageQueue.cpp
// True if the message queue can be quit. private final boolean mQuitAllowed; private int mPtr; // used by native code Message mMessages; private boolean mQuiting; // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout. private boolean mBlocked;
MessageQueue 的构造函数很简单:
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; nativeInit(); }
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return; } nativeMessageQueue->incStrong(env); android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue); } static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj, NativeMessageQueue* nativeMessageQueue) { env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr, reinterpret_cast<jint>(nativeMessageQueue)); }
class MessageQueue : public RefBase { public: /* Gets the message queue‘s looper. */ inline sp<Looper> getLooper() const { return mLooper; } bool raiseAndClearException(JNIEnv* env, const char* msg); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) = 0; protected: MessageQueue(); virtual ~MessageQueue(); protected: sp<Looper> mLooper; }; class NativeMessageQueue : public MessageQueue { public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, int timeoutMillis); void wake(); private: bool mInCallback; jthrowable mExceptionObj; };
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } } void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) { mInCallback = true; mLooper->pollOnce(timeoutMillis); mInCallback = false; } void NativeMessageQueue::wake() { mLooper->wake(); }
对于Android MessageQueue 来说,其主要的工作就是:接收投递进来的消息,获取下一个需要处理的消息。这两个功能是通过 enqueueMessage() 和 next() 方法实现的。next() 在前一篇文章介绍 Looper.loop() 时提到过。
在分析这两个函数之前,先来介绍一下 Message:前面说过 Message 是完备的,即它同时带有消息内容和处理消息的 Handler 或 callback。下面列出它的主要成员变量:
public int what; // 消息 id public int arg1; // 消息参数 public int arg2; // 消息参数 public Object obj; // 消息参数 long when; // 处理延迟时间,由 Handler 的 sendMessageDelayed/postDelayed 设置 Handler target; // 处理消息的 Handler Runnable callback; // 处理消息的回调 Message next; // 链表结构,指向下一个消息
接下来分析 enqueueMessage:
final boolean enqueueMessage(Message msg, long when) { if (msg.isInUse()) { throw new AndroidRuntimeException(msg + " This message is already in use."); } if (msg.target == null) { throw new AndroidRuntimeException("Message must have a target."); } boolean needWake; synchronized (this) { if (mQuiting) { return false; } msg.when = when; Message p = mMessages; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don‘t have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } } if (needWake) { nativeWake(mPtr); } return true; }
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
下面来分析如何从 MessageQueue 中获取合适的消息, 这是 next() 要做的最主要的事情,next() 方法还做了其他一些事情,这些其它事情是为了提高系统效果,利用消息队列在空闲时通过 idle handler 做一些事情,比如 gc 等等。但它们和获取消息关系不大,所以这部分将从略介绍。
final Message next() { int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(mPtr, nextPollTimeoutMillis); synchronized (this) { if (mQuiting) { return null; } // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf("MessageQueue", "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn‘t called on this thread."); } final MessageQueue queue = me.mQueue; ... for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } msg.target.dispatchMessage(msg); msg.recycle(); } }如果队列中没有消息或者第一个待处理的消息时机未到,且也没有其他利用队列空闲要处理的事务,则将队列设置为设置 blocked 状态,进入等待状态;否则就利用队列空闲处理其它事务。
至此,已经对 Android 多线程相关的主要概念 Thread, HandlerThread, Handler, Looper, Message, MessageQueue 作了一番介绍,下一篇就要讲讲 AsyncTask,这是为了简化 UI 多线程编程为提供的一个便利工具类。
Android多线程分析之四:MessageQueue的实现,布布扣,bubuko.com
Android多线程分析之四:MessageQueue的实现
原文地址:http://blog.csdn.net/kesalin/article/details/37765707