码迷,mamicode.com
首页 > 其他好文 > 详细

Gevent的socket协程安全性分析

时间:2015-05-24 23:42:15      阅读:470      评论:0      收藏:0      [点我收藏+]

标签:

一般讨论socket的并发安全性,都是指线程的安全性。。。而且绝大多数的情况下socket都不是线程安全的。。

当然一些框架可能会对socket进行一层封装,让其成为线程安全的。。。例如java的netty框架就是如此,将socket封装成channel,然后让channel封闭到一个线程中,那么这个channel的所有的读写都在它所在的线程中串行的进行,那么自然也就是线程安全的了。。。。。


其实很早看Gevent的源码的时候,就已经看过这部分的东西了,当时就已经知道gevent的socket不是协程安全的,也就是说gevnet的socket不能在不同的协程中同时读取或者写。。。。

例如我们不能同时在两个协程中调用socket.recv方法。。。。

不过好像自己现在已经忘了,那就再看看,顺便写篇博客记录下来,以防以后又忘记了,还找不到资料


那么为什么呢。。?我们来分析一下源代码吧,这里就拿send方法来分析,先来看看gevent的send方法的定义:

    #这里发送数据不会保证发送的数据都发送完,而是能发送多少发送多少
    #然后返回发送的数据大小
    def send(self, data, flags=0, timeout=timeout_default):
        sock = self._sock
        if timeout is timeout_default:
            timeout = self.timeout
        try:
            return sock.send(data, flags)
        except error:
            #EWOULDBLOCK 当前操作可能会阻塞 ,对于非阻塞的socket,也就是说明缓冲区已经写满了
            #那么这里要做的事情就是等待当前的write_event事件,然后再写数据
            ex = sys.exc_info()[1]
            if ex.args[0] != EWOULDBLOCK or timeout == 0.0:
                raise
            sys.exc_clear()
            self._wait(self._write_event)
            try:
                return sock.send(data, flags)
            except error:
                ex2 = sys.exc_info()[1]
                if ex2.args[0] == EWOULDBLOCK:
                    return 0
                raise

也就是说,如果当前没办反发送,那么就会调用_wait方法来等待_write_event事件,

    #等待某一个watcher,可以是read或者write事件,这里可以带有超时
    def _wait(self, watcher, timeout_exc=timeout('timed out')):
        """Block the current greenlet until *watcher* has pending events.

        If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
        By default *timeout_exc* is ``socket.timeout('timed out')``.

        If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
        """
        assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, )
        if self.timeout is not None: #在等待之前,先挂起timeout,如果超市了,将会执行timeout
            #如果超时先结束,那么会返回到当前协程抛出异常
            timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
        else:
            timeout = None
        try:
            self.hub.wait(watcher)  #在hub上面等待这个watcher,在这个里面会切换到hub的运行,然后等到watcher有反应了再切换回来
        finally:
            if timeout is not None:
                timeout.cancel()

其实这里很简单,就是在hub上面等待当前socket的写事件的watcher

    #当在调用gevent.sleep的时候如果传入了大于零的时间,将会用这里来处理
    #watcher是一个在loop上面注册的事件,可能是读,写或者定时
    #用于在loop上面注册watcher,然后将当前协程切换出去
    def wait(self, watcher):
        waiter = Waiter() #首先创建一个waiter对象
        unique = object() 
        watcher.start(waiter.switch, unique) #当watcher超时的时候将会调用waiter的switch方法这样就可以切换回来当前的协程
        try:
            result = waiter.get() #调用waiter的get方法,主要是让将当前调用sleep的greenlet切换出去,然后切换到hub的运行
            assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)
        finally:
            watcher.stop()

这个应该很简单吧。。创建一个waiter事件,然后调用watcher对象的start方法,回调方法就设置成当前waiter对象的switch方法,然后调用get方法,将当前的协程切换出去。。。。


那么来来IO类型的watcher的start方法吧:

#I/Owatcher的定义
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:

    WATCHER_BASE(io)  #通过这个宏定义了一些基本的属性,例如libev的 watcher 引用等

    #这个其实其实就是启动watcher
    #这里的callback一般情况下都是waiter对象的switch方法,这样,当有IO事件之后就可以回到之前的协程了
    def start(self, object callback, *args, pass_events=False):
        CHECK_LOOP2(self.loop)
        if callback is None:
            raise TypeError('callback must be callable, not None')
        self.callback = callback
        if pass_events:
            self.args = (GEVENT_CORE_EVENTS, ) + args
        else:
            self.args = args
        LIBEV_UNREF
        libev.ev_io_start(self.loop._ptr, &self._watcher)  #在libev的loop上面启动这个io watcher
        PYTHON_INCREF

    ACTIVE

嗯,这个是cython的代码,所以稍微别扭一些。。。,那么接下来再来看看libev的ev_io_start函数的实现吧:

void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  int fd = w->fd;

  if (expect_false (ev_is_active (w)))
    return;

  assert (("libev: ev_io_start called with negative fd", fd >= 0));
  assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));

  EV_FREQUENT_CHECK;

  ev_start (EV_A_ (W)w, 1);   //将监视器设置为active
  //判断当前的anfds的大小是否足够放入新的fd,如果不够的话,那么需要重新分配
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
  //将这个watcher放到当前fd的wather队列的头部
  wlist_add (&anfds[fd].head, (WL)w);

  /* common bug, apparently */
  assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w));

  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);   //将该fd放到需要改变的数组,在合适的时候将会在loop上修改
  w->events &= ~EV__IOFDSET;

  EV_FREQUENT_CHECK;
}

这里主要就是激活当前的watcher对象,然后将这个watcher对象放到当前文件描述符的watcher链表的头部。。。嗯。。也就是wlist_add方法要做的事情。。。其实看到这里就知道socket肯定不是协程安全的了。。。


嗯,相信大家也一定懂了。。。。不懂的话。。就再看看代码就知道了。。。。。

Gevent的socket协程安全性分析

标签:

原文地址:http://blog.csdn.net/fjslovejhl/article/details/45956339

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!