码迷,mamicode.com
首页 > 其他好文 > 详细

ACE中TASK架构简介及简单应用

时间:2017-10-10 17:50:53      阅读:200      评论:0      收藏:0      [点我收藏+]

标签:简介   bre   windows   失败   ...   class   复制   编译   分享   

一、基础功能介绍

1、ACE_Message_Block*,Windows消息用MSG结构表示,ACE_Task中因为不能预计各种应用中消息的类型,所以ACE_Message_Block基本上可以理解为是对一个指针的封装,这个指针指向实际的一块内存或是一个对象等等。在创建ACE_Message_Block时,可以指定是由ACE_Message_Block来管理内存(构造函数中指定一个 size_t类型的大小),还是由我们自己管理内存(构造函数中指定一个指针)。而一个ACE_Message_Block类型的指针,就是一个消息,我们通过传递它来进行逻辑的业务处理。

2、ACE_Task::putq,事实上,到底用SendMessage还是PostMessage与ACE_Task::putq来进行类比,我很为难,PostMessage发送一个消息后立刻返回,这与通常的ACE_Task::putq行为非常类似,因为ACE_Task是运行在另外一个线程上,ACE_Task::putq只是完成将消息插入到消息队列的工作,理论上它应该立刻返回,但实际上,ACE_Task的消息队列有容量大小限制,这个限制由我们自己限定,当当前消息队列满时,ACE_Task::putq将阻塞一直到可以插入,这时候就比较类似与SendMessage,

3、ACE_Task::getq,

1 ACE_Message_Block * msg;
2 while(getq(msg) != -1)    // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
3 {
4     // process msg here
5 }

4、消息处理函数,默认没有提供,svc

二、要搭架一个基于ACE_Task的消息系统,通常要做如下的步骤:

1、编写一个派生自ACE_Task的类,指定它的同步模式
ACE_Task的消息队列可以由多个处理线程共享使用,所以需要提供同步模式,例如 ACE_MT_SYNCH和ACE_NULL_SYNCH分别表示基于多线程的同步和不使用同步,这个参数是ACE_Task的一个模板参数。

1 class My_Task : public ACE_Task<ACE_MT_SYNCH>
2 {
3 public:
4     virtual int svc();
5 }

2、重载 ACE_Task的 svc 方法,编写消息循环相关的代码

1 int My_Task::svc()
2 {
3     ACE_Message_Block * msg;
4     while(getq(msg) != -1)    // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
5     {
6         // process msg here
7     }
8 }

svc 方法相当与处理线程的入口方法。

3、假设 My_Task是一个基于ACE_Task的类,创建一个唯一的My_Task实例,这个可以通过typedef ACE_Singleton<MyTask, SYNCH_METHOD> MYTASK;然后总是使用MYTASK::instance方法来获取一个My_Task的指针来完成。

4、在适当位置(一般是程序开始的时候),让My_Task开始工作

MYTASK::intance()->activate(
THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , // 线程创建的属性
n_threads = 1, // 线程的数目,即有多少处理线程
...)

5、在有消息发生的时候发送消息

1 ACE_Message_Block * msg;
2 // fill the msg
3 ...
4 MYTASK::intance()->putq(msg);

三、简单示例

生产者消费者示例

  1 /*************************************************************************
  2     > File Name: task.cpp
  3     > Author: 
  4     > Mail: 
  5     > Created Time: Tue 10 Oct 2017 02:49:52 PM CST
  6  ************************************************************************/
  7 
  8 #include <ace/Synch.h>
  9 #include <ace/Task.h>
 10 #include <ace/Message_Block.h>
 11 
 12 char test_message[] = "test_message";
 13 #define MAX_MESSAGES 10
 14 class Counting_Test_Producer : public ACE_Task<ACE_MT_SYNCH>
 15 {
 16     public:
 17         Counting_Test_Producer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
 18         :ACE_Task<ACE_MT_SYNCH>(0,queue) {}
 19         virtual int svc (void);
 20 };
 21 
 22 int Counting_Test_Producer::svc (void)
 23 {
 24     int produced = 0;
 25     char data[256] = {0};
 26     ACE_Message_Block * b = 0;
 27 
 28     while(1)
 29     {
 30         ACE_OS::sprintf(data, "%s--%d.\n", test_message, produced);
 31 
 32         //创建消息块
 33         ACE_NEW_NORETURN (b, ACE_Message_Block (256));
 34         if (b == 0)
 35         {
 36             break;
 37         }
 38         
 39         //将data中的数据复制到消息块中
 40         b->copy(data, 256);
 41         if (produced >= MAX_MESSAGES)
 42         {
 43             //如果是最后一个数据,那么将数据属性设置为MB_STOP
 44             b->msg_type(ACE_Message_Block::MB_STOP);
 45 
 46             //将消息块放入队列中
 47             if (this->putq(b) == -1)
 48             {
 49                 b->release();
 50                 break;
 51             }
 52             produced ++;
 53             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer put the data: %s.\n"), b->base()));
 54             break;
 55         }
 56         if (this->putq(b) == -1)
 57         {
 58             b->release();
 59             break;
 60         }
 61         produced ++;
 62 
 63         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer put the data: %s.\n"), b->base()));
 64         ACE_OS::sleep(1);
 65     }
 66     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer done\n")));
 67     return 0;
 68 }
 69 
 70 class Counting_Test_Consumer : public ACE_Task<ACE_MT_SYNCH>
 71 {
 72     public:
 73         Counting_Test_Consumer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
 74         :ACE_Task<ACE_MT_SYNCH> (0, queue){}
 75         virtual int svc(void);
 76 };
 77 
 78 int Counting_Test_Consumer::svc(void)
 79 {
 80     int consumer = 0;
 81     ACE_Message_Block *b = 0;
 82     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("in consumer svc.\n")));
 83     ACE_OS::sleep(30);
 84     while(1)
 85     {
 86         //循环从队列中读取数据块,如果读取失败,那么退出线程
 87         if (this->getq(b) == -1)
 88         {
 89             break;
 90         }
 91         if (b->msg_type() == ACE_Message_Block::MB_STOP)
 92         {
 93             //如果消息属性是MB_STOP,那么表示其为最后一个数据
 94             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the data: %s.\n"), b->base()));
 95             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the stop msg.\n")));
 96             b->release();
 97             consumer++;
 98             break;
 99         }
100         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the data: %s.\n"), b->base()));
101         b->release();
102         consumer++;
103         ACE_OS::sleep(5);
104     }
105 
106     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer done\n")));
107     return 0;
108 }
109 
110 int ACE_MAIN(int argc, ACE_TCHAR *argv[])
111 {
112     //创建消息队列
113     ACE_Message_Queue<ACE_MT_SYNCH> queue(2*1024*1024);
114 
115     // 创建生产者和消费者,它们使用同一个消息队列,只有这样才能实现线程间消息的传递
116     Counting_Test_Producer producer(&queue);
117     Counting_Test_Consumer consumer(&queue);
118 
119     //调用activate函数创建消费者线程
120     if (consumer.activate(THR_NEW_LWP | THR_DETACHED | THR_INHERIT_SCHED, 1) == -1)
121     {
122         ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("Consumers %p\n"), ACE_TEXT("activate")), -1);
123     }
124 
125     //调用activate函数创建生产者线程
126     if (producer.activate ( THR_NEW_LWP | THR_DETACHED | THR_INHERIT_SCHED, 1) == -1)
127     {
128         ACE_ERROR((LM_ERROR, ACE_TEXT("Producers %p\n"), ACE_TEXT("activate")));
129         consumer.wait();
130         return -1;
131     }
132     //调用wait函数等待线程结束
133     ACE_Thread_Manager::instance()->wait();
134     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Ending test!\n")));
135     return 0;
136 }

技术分享

问题1:

In file included from /usr/include/ace/config-macros.h:24,
                 from /usr/include/ace/config-lite.h:24,
                 from /usr/include/ace/ACE_export.h:11,
                 from /usr/include/ace/Shared_Object.h:18,
                 from /usr/include/ace/Service_Object.h:17,
                 from /usr/include/ace/Task.h:17,
                 from task.cpp:9:
/usr/include/ace/config.h:20:2: error: #error "_FILE_OFFSET_BITS != 64"

编译时添加-D_FILE_OFFSET_BITS=64

ACE中TASK架构简介及简单应用

标签:简介   bre   windows   失败   ...   class   复制   编译   分享   

原文地址:http://www.cnblogs.com/zl1991/p/7646767.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!