码迷,mamicode.com
首页 > 编程语言 > 详细

线程消息通信与异步处理

时间:2015-06-11 19:32:50      阅读:158      评论:0      收藏:0      [点我收藏+]

标签:

转载请标明出处: 

http://blog.csdn.net/yujun411522/article/details/46444869
本文出自:【yujun411522的博客】


关于android内消息通信和handler的知识在之前的Handler中已经简要介绍过了,这里介绍在native层的实现机制。
相信大家都知道一个标准的Looper线程的写法:
public MyLooperThread extends Thread{
     private Handler mHandler;
     public void run(){
     //在实例化handler之前一定要先调用Looper.prepare()方法
     //1 Looper.prepare()方法
     Looper.prepare();
     //2 实例化handler
     //mHandler = new Handler(){
               public void handleMessage(Message msg){
                         //处理msg操作
                    }

          };
     //3 Looper.loop()不断从消息队列中取出来信息
     Looper.loop();
     }     
}
典型的三个操作:
1 调用Looper.prepare方法,进入准备阶段。
2 实例化Handler对象。
3 调用Looper.loop方法,进入循环。
注意,三者的顺序不能有错,下面会介绍 

7.1 Looper.prepare进入准备阶段
先看Looper类中prepare方法:
public class Looper {
   // sThreadLocal.get() will return null unless you've called prepare().
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    final MessageQueue mQueue;//内部维护一个消息队列
    final Thread mThread;//对应的线程
    volatile boolean mRun;//运行状态
    private static Looper mMainLooper = null;  // guarded by Looper.class,主线程的Looper

   public static void prepare() {
        if (sThreadLocal.get() != null) {//之前已经设置过Looper对象了,报异常,说明只能设置一次
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());//为sThreadLocal变量设置Looper,这个Looper必须有,且只能有一个,不能重复设置
    }
}
sThreadLocal 是一个用来存储当前线程状态的一个成员变量,存储范围为线程内部。在调用prepare方法时,如果之前线程设置Looper对象就报异常,如果没有设置Looper就为sThreadLocal设置一个new Looper()对象,再看Looper的构造函数: 
private Looper() {//私有方法,不允许外部访问它
        mQueue = new MessageQueue();//构造一个MessageQueue对象,Looper中维护了一个消息队列,这一点很重要
        mRun = true;//设置mRun为true
        mThread = Thread.currentThread();//mThread设置为当前线程
    }
其中重要的就是MessageQueue消息队列的创建,看它的构造方法:
 MessageQueue() {
        nativeInit();//这是一个本地方法
    }
nativeInit是一个本地方法,对应的实现方法在android_os_MessageQueue.cpp文件中   
先看NativeMessageQueue类:
class NativeMessageQueue {
public:
    NativeMessageQueue();
    ~NativeMessageQueue();

    inline sp<Looper> getLooper() { return mLooper; }//返回该Looper变量,这个Looper类是native中的Looper类,不是java层的

    void pollOnce(int timeoutMillis);//后面会分析,进行询问
    void wake();//后面会分析,向管道写入"W"

private:
    sp<Looper> mLooper;//维护一个Native层的Looper变量
};
再来看MessageQueue中nativeInit方法中的的实现android_os_MessageQueue_nativeInit函数:
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
     //obj为java层的MessageQueue对象
     //1 创建一个NativeMessageQueue对象,这个是native层的
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (! nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return;
    }
    //2 将NativeMessageQueue和 java层的MessageQueue对象 关联起来,实际就是讲nativeMessagQueue对象保存到MessageQueue中的mPtr变量中
    android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}
两个主要工作: 1 创建NativeMessageQueue对象 ;2 将java层的MessageQueue对象和NativeMessageQueue关联起来

7.1.1 创建NativeMessageQueue对象
看NativeMessageQueue的构造函数:
NativeMessageQueue::NativeMessageQueue() {
    mLooper = Looper::getForThread();//调用getForThread,看返回值是否为null
    if (mLooper == NULL) {//如果返回值为null,则创建一个Looper并设置
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);//
    }
}
先看Looper::getForThread函数是否为null,如果为null,构造一个Looper对象,并调用setForThread设置为线程唯一的一个Looper,这一部分工作和java层中的Looper.prepare()方法相同。那么再看一下native层的Looper对象是如何实例化的,看它的构造函数:
Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    //创建一个管道,读写端分别为wakeFds[0]和wakeFds[1] 
    int result = pipe(wakeFds); 

    mWakeReadPipeFd = wakeFds[0];//读端
    mWakeWritePipeFd = wakeFds[1];//写端

    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//设置非阻塞方式
    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//设置非阻塞方式 
 
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);//监听管道
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);//只监听mWakeReadPipeFd ,也就是读端
    }

7.1.2 将java层的MessageQueue对象和NativeMessageQueue关联起来
关联工作由android_os_MessageQueue_setNativeMessageQueue函数来完成:
static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
        NativeMessageQueue* nativeMessageQueue) {
    env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
             reinterpret_cast<jint>(nativeMessageQueue));
}
实际就是将nativeMessageQueue对象的地址复制给java层MessageQueue中的gMessageQueueClassInfo.mPtr变量,那么gMessageQueueClassInfo中mPtr是什么:
static struct {
    jfieldID mPtr;   // native object attached to the DVM MessageQueue
} gMessageQueueClassInfo;
它的实例化在register_android_os_MessageQueue函数中:
int register_android_os_MessageQueue(JNIEnv* env) {
    int res = jniRegisterNativeMethods(env, "android/os/MessageQueue",
            gMessageQueueMethods, NELEM(gMessageQueueMethods));
   
    jclass clazz;
    FIND_CLASS(clazz, "android/os/MessageQueue");
    //mPtre指向Java层中MessageQueue的mPtr成员变量
    GET_FIELD_ID(gMessageQueueClassInfo.mPtr, clazz,"mPtr", "I");    
    return 0;
}
绑定的结果就是在java层中的MessageQueue的mPtr保存了NativeMessageQueue变量的地址,这样就可以通过MessageQueue访问NativeMessageQueue了,到此为止java层MessageQueue构造完成。
可以看出Java层Looper.prepare()函数的作用就是在Looper中持有一个MessageQueue和当前线程的引用,而MessageQueue中的mPtr指向native层的NativeMessageQueue对象,NativeMessageQueue对象中又保存了线程唯一的一个native Looper变量,四者之间的关系:
技术分享
技术分享

7.2 Handler的创建和发送消息
Looper是用来驱动消息的,而Looper的消息从哪里来,Handler的发送;Looper的消息哪里去,Handler的处理。所以Handler的作用就是发送和处理消息.

7.2.1 Handler的创建
看没有参数的构造方法: 
 public Handler() {
         ....
        //返回当前线程的Looper对象
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;//将Looper中的mQueue赋值给Handler的成员变量mQueue
        mCallback = null;
    }
它的构造中有一点注意,先通过Looper.myLooper方法获取当前线程的Looper对象,如果为null报异常,所以在创建创建Handler之前一定要先调用Looper.prepare方法。

7.2.2 创建消息

7.2.3 消息发送
1 java层
无论是通过post还是send都最终通过sendMessageAtTime函数来实现
 public boolean sendMessageAtTime(Message msg, long uptimeMillis)//uptimeMillis 绝对时间,开机时间作为基准
    {
        boolean sent = false;
        MessageQueue queue = mQueue;//looper中创建的MessageQueue
        if (queue != null) {
            msg.target = this;//指明msg的处理者为this
            sent = queue.enqueueMessage(msg, uptimeMillis);//调用MessageQueue  的enqueueMessage 方法
        }
        else {
          .....
        }
        return sent;
    }
调用MessageQueue  的enqueueMessage 方法   
 final boolean enqueueMessage(Message msg, long when) {
        ....
        final boolean needWake;
        synchronized (this) {
            if (mQuiting) {
              return false;//handler所在线程正在退出
            } else if (msg.target == null) {
                mQuiting = true;
            }

            msg.when = when;//指明消息何时处理
            //mMessages 消息队列头部,p为当前消息
            Message p = mMessages;
             // 消息队列为空或者新消息需要立马处理或者新消息处理时间早于队首消息的处理时间
            if (p == null || when == 0 || when < p.when) {//如果是这些情况,需要将新消息插入到队首立马处理
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked; // new head, might need to wake up
            } else {//否则找到合适的位置插入,按照时间的顺序
                Message prev = null;
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
                msg.next = prev.next;
                prev.next = msg;
                needWake = false; // still waiting on head, no need to wake up
            }
        }
        if (needWake) {//新加入消息队列,且处于block状态,需要唤醒
            nativeWake(mPtr);//本地方法,mPtr 就是NativeMessageQueue的native地址
        }
        return true;
    }
新加入消息队列,且处于block状态,需要唤醒,调用本地方法nativeWake,mPtr 就是NativeMessageQueue的native地址,对应的native实现在android_os_MessageQueue.cpp文件中的android_os_MessageQueue_nativeWake方法:
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
     //ptr 就是NativeMessageQueue的地址
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    return nativeMessageQueue->wake();
}
继续调用nativeMessageQueue的wake方法:
void NativeMessageQueue::wake() {
    mLooper->wake();
}
调用mLooper的wake方法,在Looper.cpp文件中:
void Looper::wake() {
 
   if (mPendingWakeCount++ == 0) {
        mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
    }
 
   ssize_t nWrite;
    do {
        nWrite = write(mWakeWritePipeFd, "W", 1);//向mWakeWritePipeFd 写入一个"W"
    } while (nWrite == -1 && errno == EINTR);

    if (nWrite != 1) {
        if (errno != EAGAIN) {
            LOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}
还记得native层创建的Looper对象时创建的管道吗?mWakeWritePipeFd 就是管道的写端,一旦写入了"W"字符,处理消息的线程就会被I/O唤醒。

2 native层
native层可以发送消息,但不是像java层中通过handler来发送,而是通过native层的Looper对象来发送,最终都是通过sendMessageAtTime方法来实现,在Looper.cpp文件中:
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
        const Message& message) { 

    size_t i = 0;
    { 
        AutoMutex _l(mLock);
         //native层消息队列的大小
        size_t messageCount = mMessageEnvelopes.size();
         //按照时间先后顺序找到它插入的位置 
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }
        //利用MessageEnvelope  ,将发送时间,messageHandler和消息封装在MessageEnvelope  中
        MessageEnvelope messageEnvelope(uptime, handler, message);
        //插入到native消息队列中
        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

        if (mSendingMessage) {
            return;
        }
    }  
     
    if (i == 0) {
        wake();//和java层调用nativeWake 方法效果一样,向管道写端写入一个"W"
    }
}
native层发送消息的过程是先找到插入的位置,然后将message,handler,发送时间等信息封装到MessageEnvelope中,最后插入到消息队列的合适位置之中。
其中涉及到一个结构体:MessageEnvelope: 
struct MessageEnvelope {
        MessageEnvelope() : uptime(0) { }

        MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,
                const Message& message) : uptime(uptime), handler(handler), message(message) {
        }

        nsecs_t uptime; //执行事件
        sp<MessageHandler> handler;//执行handler
        Message message;//消息
    };
到目前为止,消息已经发送成功了,接下来就是取出消息并处理。
 
7.3 Looper.loop 循环处理消息
看Looper.looper方法的实现:   
  
  public static void loop() {
        Looper me = myLooper();
        if (me == null) {//可以看出,必须在设置当前线程的Looper之后才能执行loop方法
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        MessageQueue queue = me.mQueue;
       
        .....       
        while (true) {
            //1 从MessageQueue中取出消息
            Message msg = queue.next(); // might block
            if (msg != null) {
                if (msg.target == null) {
                 return;//没有消息时退出
                }
                .....
                //派发消息
                msg.target.dispatchMessage(msg);

                ....
               //回收消息
                msg.recycle();
            }
        }
    }
重要的操作就两个,最重要的第一步从messageQuene中取出消息;2 派发该消息处理函数 

7.3.1 messageQuene.next取出合适的消息   
final Message next() {      
        int nextPollTimeoutMillis = 0;//下一次poll的时间,0表示马上进行
        for (;;) {
             ...
            //本地方法nativePollOnce
            nativePollOnce(mPtr, nextPollTimeoutMillis);

            synchronized (this) {//同步              
                final long now = SystemClock.uptimeMillis();
                final Message msg = mMessages;//队首消息
                if (msg != null) {
                    final long when = msg.when;//队首消息的执行事件 ,这里假如是4s,now为5s
                    if (now >= when) {//说明队首消息已经到了或者过了执行事件,所以立马执行
                        mBlocked = false;
                        mMessages = msg.next;//取出队首消息,返回
                        msg.next = null;
                        
                        msg.markInUse();//标记正在使用
                        return msg;
                    } else {
                        //否则,假如队首消息执行时间when为10s,now为5s,则过when-now= 5s之后再询问

                        nextPollTimeoutMillis = (int) Math.min(when - now, Integer.MAX_VALUE);
                    }
                } else {
                    //消息队列中没有消息
                    nextPollTimeoutMillis = -1;
                }                 
            }              
        }
    }
这里的关键部分是nativePollOnce本地方法,它的实现在android_os_MessageQueue.cpp中:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jint ptr, jint timeoutMillis) {
    //其中ptr就是NativeMessageQueue对象地址,timeOutMills就是经过多长时间询问
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(timeoutMillis);
}
可以看出调用NativeMessageQueue的pollOnce方法
void NativeMessageQueue::pollOnce(int timeoutMillis) {
    mLooper->pollOnce(timeoutMillis);
}
又调用Looper的pollOnce方法:
 inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, NULL, NULL, NULL);
    }
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
     //timeoutMillis, NULL, NULL, NULL 参数
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            ALooper_callbackFunc callback = response.request.callback;
            if (!callback) {
                int ident = response.request.ident;
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data; 
 
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) { 
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = NULL;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        //执行pollInner方法
        result = pollInner(timeoutMillis);
    }
}
进一步调用了pollInner方法:
int Looper::pollInner(int timeoutMillis) { 
    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        } 
    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
 
    // Wait for wakeAndLock() waiters to run then set mPolling to true.
    mLock.lock();
    while (mWaiters != 0) {
        mResume.wait(mLock);
    }
    mPolling = true;
    mLock.unlock();

    size_t requestedCount = mRequestedFds.size();
    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);//监控mRequestedFds 上的时间 
 
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();//调用awoken方法
            } else {
                  ....
            }
        } else {
          .....
        }
    }
Done: ;
 
    ...
    return result;
}
这里调用的awoken方法:
void Looper::awoken() {
    char buffer[16];
    ssize_t nRead;
    do {
         //读管道写端写入的"W"
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

7.3.2 msg.target.dispatchMessage(msg)分发消息


 
7.4 AsyncTask











线程消息通信与异步处理

标签:

原文地址:http://blog.csdn.net/yujun411522/article/details/46459655

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!