码迷,mamicode.com
首页 > 其他好文 > 详细

ACE_Reactor的notify阻塞问题

时间:2015-11-02 22:36:04      阅读:244      评论:0      收藏:0      [点我收藏+]

标签:

今天听到一种说法:

ACE_Reactor的notify可能会发生阻塞。windwos与linux的消息队列满了之后默认会阻塞掉。linux可以设置成异步的,但是notify队列满了之后,无论异步还是阻塞,新来的信号都会被丢失。

信号队列长度,linux下与文件句柄数一样。

今天再windwos上测试,当信号多余1023个时,notify就会阻塞。

 

linux下待测试……

windows测试部分代码:

// t4l.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "ace/ACE.h"
#include "ace/Reactor.h"
#include "ace/Task_Ex_T.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Sock_Connect.h"
#include "ace/SOCK_Connector.h"
#include "ace/Connector.h"
#include "ace/Svc_Handler.h"
#include "ace/OS.h"
#include "rapidjson/document.h"
#include "rapidjson/prettywriter.h"
#include "rapidjson/stringbuffer.h"

class TestEvent
    :public ACE_Task_Ex<ACE_MT_SYNCH, TestEvent>
{
public:
      virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE)
      {
          printf("handle_input\n");
          return 0;
      }
      
      int putq()
      {
          ar_->notify(this);
          return 0;
      }
      virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE)
      {
          printf("handle_output\n");
          return 0;
      }


      virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE)
      {
          size_ ++;
           printf("handle_exception:%d\n", size_);
        if (is_sleep_)
        {
            ACE_OS::sleep(30);
            is_sleep_ = false;
        }
        
          
         ar_->remove_handler(this, ACE_Event_Handler::EXCEPT_MASK);
         return 0;
      }

  virtual int handle_timeout (const ACE_Time_Value &current_time,
      const void *act = 0)
  {
      ACE_Time_Value atv(1,0);
      ACE_OS::sleep(atv);
      //ar_->remove_handler(this, ACE_Event_Handler::EXCEPT_MASK);
      ar_->cancel_timer(this);
      printf("handle_timeout\n");
      return 0;
  }
  virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  {
      printf("handle_close\n");
      return 0;
  }
  int open()
  {
      size_ = 0;
      is_sleep_ = true;
      ar_ = ACE_Reactor::instance();
      ar_->register_handler(this,ACE_Event_Handler::EXCEPT_MASK);
      activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED ,
          1,0,ACE_DEFAULT_THREAD_PRIORITY);
      //ACE_OS::sleep(1);
      for (int i= 0; i < 8900; i ++)
      {
          // ACE_OS::sleep(1);
          printf("opened:%d\n",i);

          putq();
      }
      printf("opened\n");
      ACE_Time_Value atv(1,0);
      ACE_Reactor::instance()->schedule_timer(this, NULL, atv, atv);
      printf("putq leave");
      return 0;
  }
  int svc()
  {
      ACE_Time_Value tv = ACE_Time_Value(1,0);
        ar_->owner(ACE_Thread::self());
        //ar_->run_reactor_event_loop();
        while (1)
        {
    //        ACE_Reactor::instance()->event_loop_done();
            ar_->handle_events(&tv);
        }
    return 0;
  }
private:
    ACE_Reactor* ar_;
    size_t  size_;
    bool is_sleep_;
};
class Server
    :public ACE_Event_Handler
{
public:
    int open(const char* addr)
    {
        ACE_INET_Addr ace_addr;
        if (-1 == ace_addr.set(addr,strlen(addr)))
        {
            return ACE_OS::last_error();
        }
        int ret = acceptor_.open(ace_addr);
        reactor_ = new ACE_Reactor;
        this->reactor(reactor_);
        this->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
        return 0;
    }

public:
    int handle_input(ACE_HANDLE fd)
    {
        printf("connector(%d) comming\n", fd);

    }
    ACE_HANDLE get_handle()
    {
        return this->acceptor_.get_handle();
    }


private:
    ACE_SOCK_Acceptor acceptor_;
    ACE_Reactor* reactor_;

};
class ConnectInst
    :public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>
{
public:
    ConnectInst()
        :timeout_(1000)
    {
    }
    ~ConnectInst()
    {
    }
private:
    int timeout_;
};

class Client
{
public:
    int open()
    {
        reactor_ = new ACE_Reactor;
        connector_.open(reactor_);
        return 0;
    }
    int connect(const char* addr, int time)
    {
        ConnectInst* inst;
        int ret = connector_.connect(inst, ACE_INET_Addr(addr));
    }
private:
    ACE_Connector<ConnectInst, ACE_SOCK_CONNECTOR> connector_;
    ACE_Reactor* reactor_;

};


int main(int argc, char* argv[])
{
    TestEvent* te = new TestEvent();
    
    te->open();
    ACE_Reactor::instance()->run_reactor_event_loop();
    te->wait();
    system("pause");
    return 0;
}

 

ACE_Reactor的notify阻塞问题

标签:

原文地址:http://www.cnblogs.com/longking/p/4931476.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!