标签:container delete cti 最小 min article oschina operation order
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// lckfree.h // Implementation of lock free queue using CAS operations // for simple multi-threading use cases like: // 1. multiple worker to process incoming messages // 2. async processing using a thread pool // 3. simple tcp server deal with async requests // Author: typhoon_1986@163.com // Refrence: http://coolshell.cn/articles/8239.html #ifndef __LCKFREE_H__ #define __LCKFREE_H__ #include <string> using namespace std; namespace bfd { struct LinkNode { string data; LinkNode* next; }; typedef struct LinkNode LinkNode; class LckFreeQueue { public : LckFreeQueue(); ~LckFreeQueue(); int push( const string &msg); string pop(); // non-block pop method // string bpop(); // block pop method bool empty(); private : LinkNode * head_; LinkNode * tail_; bool empty_; unsigned int length_; }; } // namespace bfd #endif |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
#include <lckfree.h> namespace bfd { LckFreeQueue::LckFreeQueue(): head_(NULL), tail_(NULL), empty_( true ), length_(0) { head_ = new LinkNode; head_->next = NULL; tail_ = head_; } LckFreeQueue::~LckFreeQueue() { LinkNode *p = head_; if (p) { LinkNode *q = p->next; delete p; p = q; } } int LckFreeQueue::push( const string &msg) { LinkNode * q = new LinkNode; q->data = msg; q->next = NULL; LinkNode * p = tail_; LinkNode * oldp = p; do { while (p->next != NULL) p = p->next; } while ( __sync_bool_compare_and_swap(&(p->next), NULL, q) != true ); //如果没有把结点链在尾上,再试 __sync_bool_compare_and_swap(&tail_, oldp, q); //置尾结点 return 0; } string LckFreeQueue::pop() { LinkNode * p; do { p = head_; if (p->next == NULL){ return "" ; } } while ( __sync_bool_compare_and_swap(&head_, p, p->next) != true ); return p->next->data; } bool LckFreeQueue::empty() { return empty_; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// workthreadpool.h // 一个用于将消息分发给多个进程,并使用多个进程处理的库,工作进程并不返回数据 #ifndef __WORK_THREAD_POOL__ #define __WORK_THREAD_POOL__ #include <stdio.h> #include <thread> #include <queue> #include <string> #include <vector> #include "lckfree.h" using namespace std; namespace bfd { class WorkThreadPool { public : WorkThreadPool( int size); virtual ~WorkThreadPool(); // 需要子类继承并实现的函数,每个线程实际执行的内容 virtual void Init() {}; virtual void Finish() {}; virtual void Handle( const string &msg)=0; // 将消息放入处理队列, 消息只支持string类型 int SendMessage( const string &msg); int Start(); int Stop(); private : void Worker(); int size_; LckFreeQueue msg_queue_; // 线程池的协作基于这个无锁队列 vector< thread > thread_pool_; }; } // namespace #endif |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#include "workthreadpool.h" #include <sstream> #include <unistd.h> namespace bfd { WorkThreadPool::WorkThreadPool( int size) { if (size <= 0) { // 最小也需要有1个线程 size_ = 1; } else { size_ = size; } } WorkThreadPool::~WorkThreadPool() { } int WorkThreadPool::SendMessage( const string &msg) { msg_queue_.push(msg); return 0; } void WorkThreadPool::Worker() { unsigned int msg_count = 0; while (1) { string msg = msg_queue_.pop(); if (msg.empty()) { printf ( "no msg got, sleep for 0.1 sec\n" ); usleep(100000); // 0.1 sec continue ; } if (msg == "__exit__" ) { stringstream ss; ss << "exit worker: " << std::this_thread::get_id() << ", processed: " << msg_count << ".." ; printf ( "%s\n" , ss.str().c_str()); return ; } Handle(msg); msg_count++; if (msg_count % 1000 == 0) { printf ( "every 1000 msg count\n" ); } } } int WorkThreadPool::Start() { for ( int i=0; i < size_; i++) { thread_pool_.push_back( thread (&WorkThreadPool::Worker, this ) ); } return 0; } int WorkThreadPool::Stop() { for ( int i=0; i < size_; i++) { SendMessage( "__exit__" ); } for ( int i=0; i < size_; i++) { thread_pool_[i].join(); } return 0; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#include "workthreadpool.h" #include <sstream> #include <math.h> class MyThreadPool : public bfd::WorkThreadPool { public : MyThreadPool( int size) : bfd::WorkThreadPool(size) { } void Handle( const string &msg) { stringstream ss; ss << "worker (" << std::this_thread::get_id() << ") got msg: " << msg; printf ( "%s\n" , ss.str().c_str()); for ( int i=0; i<=999999; i++) { double result = sqrt ( sqrt (i) / 93.234); } } }; int main() { printf ( "start running ....\n" ); MyThreadPool pool(5); pool.Start(); for ( int i=0; i<100; i++) { pool.SendMessage( "msg info ----------" ); } pool.Stop(); return 0; } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
LIB_SRC_FILES = src/workthreadpool.cpp src/lckfree.cpp TEST_SRC_FILES = src/main.cpp INCLUDE_DIR = src STD_FLAG = -std=c++0x all: main.o libs g++ $(STD_FLAG) -o test_workthreadpool main.o libworkthreadpool.so -lpthread main.o: $(TEST_SRC_FILES) g++ $(STD_FLAG) -c $(TEST_SRC_FILES) -I$(INCLUDE_DIR) libs: $(LIB_SRC_FILES) g++ $(STD_FLAG) -o libworkthreadpool.so -fPIC -O2 -shared -Wl,--no-as-needed -Isrc $(LIB_SRC_FILES) -lpthread .PHONY : clean clean : rm -f test_workthreadpool main.o libworkthreadpool.so |
标签:container delete cti 最小 min article oschina operation order
原文地址:https://www.cnblogs.com/lidabo/p/9767068.html