标签:tutorials ogr adc 显示 == async 就会 改变 递归
转自:http://dreamrunner.org/blog/2014/08/07/C-multithreading-programming/
随着多核 CPU 随处可见,多线程(multithreading)可以被用来实现并行,提高 CPU 的利用率和性能显著的提高.掌握多线程编程也成为现代实现软件的基本要求技能之一.Introduction to Parallel Computing详细的介绍了 Parallel Computing; 为什么使用它;Parallel Computing 的分类;Parallel Computing 的 limits 和 costs; Parallel Computing 的程序模型;如何设计 Parallel 程序等.
这里先介绍多线程的概念,多线程中涉及的基本概念,然后用实例介绍 Pthread 库的使用,并介绍 Google Code 中如何把它封装成 C++类,最后介绍可移植并大量使用的 Boost Thread 库.
还有一些其他的 Thread 库:
A thread is defined as an independent stream of instructions that can be scheduled to run as such by the operating system.所以它是在程序中独立于其他代码可由操作系统调度的一段指令.
那么是操作系统是如何具体实现这一独立性呢?
要理解 thread,必须先明白 process.进程由操作系统创建来运行相应的程序,进程包含程序资源和程序执行状态的信息.以 Linux 的进程为例包含:
Thread 使用 Process 的资源,并且能成为独立的元件被操作系统调度,是因为它仅重复那些使得它们能成为独立运行代码的必要资源.Thread 维护它自己如下的信息:
与 Process 比较,Thread 可以总结如下:
Posix Thread 基本模型如下图,一些有关其中 Thread 的术语:
Threads 能提供益处 对于相适 的应用.所以 thread 的并行性对于应用来说也有它的限制.
Amdahl 法则 陈述到潜在的程序加速由能被并行的代码率 P 定义为:
$$ \begin{align} speedup = \dfrac{1}{1-P} \end{align} $$引入能并行的处理器个数,那么进一步可以定义为:
$$ \begin{align} speedup = \dfrac{1}{\dfrac{P}{N} + (1-P)} 其中 P 并行率,N 处理器个数 \end{align} $$Pareto 原则 陈述到 80%的处理器时间花在 20%的代码中.所以仔细分析代码,不要把时间花在并行/优化那部分不重要的代码.
在程序中有不同的方法使用线程,这里讨论 3 种线程设计模式,没有哪一种模式最好,每种模式都有相应适合的应用场合.
如上图,一个 Boss 线程创建其他 Worker 线程,并给它们分配任务,必要的话,并等待其他线程运行结束.通常 Boss 线程会在初始建立 Thread Pool 来为之后分配.尽管线程是轻量级的,但是创建它们仍是有开销的.
Peer 模式又叫做 workcrew 模式,一个 thread 创建其他 peer threads 当程序开始,但是如上图,与 Boss/worker 模式不同,这个 thread 之后也变成 peer thread 去处理自己的任务.
Pipeline 模式假定:
如上图, Pipeline 就像流水线一般,每个 thread 是一个长链中的一部分.每个 thread 处理由之前 thread 过的数据.
如上线程中的定义,线程们共享进程中的全局变量或资源,它们可以并行同时对这些数据和资源操作,如果没有一定的机制协调它们,那么数据或资源将处于一个不安全状态,引起诸如如下的一些问题:
所以我们需要如下的一些线程同步原语满足不同的线程间同步需求.
Mutex 又被称为 Lock,所以它就像一把 Lock,一个线程 Lock 住一段资源,那么其他线程就不能去访问那段资源,只有等到第一个线程 Unlock 那么资源,它才能访问.
在 Lock 和 Unlock 之间的代码,一般被称为 critical section.
Mutex 也包含一些复杂的类型,如下:
但 Mutex 也会引入其他一些问题,如deadlock 和 priority inversion.
在 Blog 中之前浅谈 Mutex (Lock)中可以看到更多有关 Mutex 的性能和开销分析,并如何实现一个轻量级的 Mutex.
线程 join 机制能让一个线程 join 到另外一个线程中.比如一个子线程 join 回主线程,那么主线程就会等待子线程运行结束.从而达到线程间等待的同步机制.
Condition variable 允许线程同步到某个共享资源的某个值.
比如,程序有一个计数器,当计数器达到某一个值时去激活某个线程运行.把计数器当成一个 Condition variable.这个线程可以等待这个 Condition variable,其他 active 线程操作完这个 Condition variable,可以通过 signal/broadcast 去唤醒那些等待这个 Condition variable 睡眠的线程.
Barrier 是一种能让一系列线程在某个点得到同步的方法,通过让参与 barrier 的线程等待直到所有参与线程都调用了这个 barrier 函数.本质上就是,阻塞所有参与 barrier 的线程直到最慢的那个参与线程调用 barrier.
Spinlock 与 mutex 类似,是种锁,但当获取锁失败时,spinlock 不会让线程进入睡眠,而是不断 poll 去获取这个锁直到获取成功.更多Mutex 与 Spinlock 的区别.
当某些资源具有多个时,简单的 Mutex 不能满足,引入 Semphore,Semphore 可以根据资源个数初始化为任意值.当线程们占有所有资源,使得 Semphore 为 0,那么其他线程再获取资源只有等待.当 Semphore 值只能是 1 或 0 时,它相当于简单的 Mutex.
原始的 Pthread API 由 ANSI/IEEE POSIX 1003.1 - 1995 standard 定义.POSIX 标准也随着时间不断改进.
接下来主要把 Pthread API 分成如下主要 5 部分:
如果想把 Pthread 封装成类对象或 Scoped Lock,可以参考之后 Google wrap the Pthread,或直接使用之后介绍的Boost thread library.
如果更全面的 API 参考文章最后的Pthread Library Routines Reference.更多有关资料参考文章后的其他资料.
对于 POSIX 系统,包含头文件 pthread.h
. 如果使用 semaphore
, 包含 semaphore.h
.
#include <pthread.h> #include <semaphore.h>
对于 Gcc 编译器,使用选项 -l
,如下:
gcc Program.o -o Program -lpthread
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg); void pthread_exit(void *value_ptr); int pthread_cancel(pthread_t thread); int pthread_attr_init(pthread_attr_t *attr); int pthread_attr_destroy(pthread_attr_t *attr);
pthread_create
创建一个新的线程并运行它.它能在代码的任何处被多次调用.
pthread_create
的参数:
thread
:返回新 thread 程的唯一标识.attr
:设置 thread 的性质.NULL 为默认性质.start_routine
: 新 thread 运行的函数指针.arg
:传给 start_routine
的参数,必须强制转换成 void *
.NULL 为没有参数传入.Process 能创建的最大 thread 个数由系统配置决定.如下 Ubuntu 打印出的结果:
$ limit cputime unlimited filesize unlimited datasize unlimited stacksize 8MB coredumpsize 0kB memoryuse unlimited maxproc 62694 descriptors 1024 memorylocked 64kB addressspace unlimited maxfilelocks unlimited sigpending 62694 msgqueue 819200 nice 0 rt_priority 0 rt_time unlimited
pthread_attr_init
和 pthread_attr_destroy
被用来初始化/销毁 thread 性质对象.
性质包括:
Pthread APIs 并没有提供 binding threads 到特定 cpus/cores 的接口.但不同系统可能包含这功能,比如提供非标准的pthread_setaffinity_np
接口.
比如设置两个线程都在 core0 上运行,如下设置:
cpu_set_t cpus; CPU_ZERO(&cpus); CPU_SET(0, &cpus); pthread_setaffinity_np(thread[0], sizeof(cpu_set_t), &cpus); pthread_setaffinity_np(thread[1], sizeof(cpu_set_t), &cpus);
一个线程有很多种方法终止:
pthread_exit
无论它的工作完成否.pthread_cancel
来取消.exec()
或 exit()
.main()
函数先完成,没有调用 pthread_exit
.pthread_exit()
允许指定一个可选的终止 status parameter
.这个可选参数一般返回给线程”joining”到这个终止线程.
pthread_exit()
不关闭文件,在线程打开的任何文件将继续打开在线程终止后.
在 main()
调用 pthread_exit()
:
main()
在它创建的 threads 之前终止,并没有显示的调用 pthread_exit()
,这将是个问题.所有创建的线程将终止因为 main()结束,不再存在支持这些线程.pthread_exit()
, main()将阻塞并保持存活来支持它创建的线程运行直到它们完成.如果注释掉 main()中最后的 pthread_exit(NULL);
,那么它创建的线程将会完成不了所有的打印而被强制退出.
#include <pthread.h> #include <cstdio> #include <cstdlib> void *ThreadProc(void *param) { int id; id = *(static_cast<int *>(param)); for (int i = 0; i < 10; ++i) { printf("thread %d: run %d \n", id, i); } pthread_exit(NULL); } int main(int argc, char *argv[]) { const int kNumThreads = 4; pthread_t threads[kNumThreads]; int thread_ids[kNumThreads]; for (int i = 0; i < kNumThreads; ++i) { thread_ids[i] = i; int rt = pthread_create(&threads[i], NULL, ThreadProc, static_cast<void *>(&thread_ids[i])); if (rt) { printf("ERROR: pthread_create failed, rt=%d\n", rt); exit(1); } } pthread_exit(NULL); }
int pthread_join(pthread_t thread, void **value_ptr); int pthread_detach(pthread_t thread); int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate); int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate);
Joining 是同步不同线程的方法之一,原理如下图:
pthread_join()
阻塞调用它的线程直到指定的 threadid
的线程终止.status
只要目标线程调用 pthread_exit()
.pthread_create()
中的 attr
参数.典型的步骤是:pthread_attr_t
类型的 pthread 属性;pthread_attr_init()
初始化属性变量;pthread_attr_setdetachstate()
设置 detached 属性;pthread_attr_destroy()
释放属性使用的资源.pthread_detach()
能显示的 detach 一个线程即使它是以可 join 创建.#include <pthread.h> #include <cstdio> #include <cstdlib> void *ThreadProc(void *param) { int id; id = *(static_cast<int *>(param)); for (int i = 0; i < 10; ++i) { printf("thread %d: run %d \n", id, i); } pthread_exit(param); } int main(int argc, char *argv[]) { const int kNumThreads = 4; pthread_t threads[kNumThreads]; int thread_ids[kNumThreads]; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i = 0; i < kNumThreads; ++i) { thread_ids[i] = i; int rt = pthread_create(&threads[i], &attr, ThreadProc, static_cast<void *>(&thread_ids[i])); if (rt) { printf("ERROR: pthread_create failed, rt=%d\n", rt); exit(1); } } for (int i = 0; i < kNumThreads; ++i) { void *status; int rt = pthread_join(threads[i], &status); if (rt) { printf("ERROR: pthread_join failed, rt=%d\n", rt); exit(1); } printf("completed join with thread %d having a status of %d\n" , i, *static_cast<int *>(status)); } pthread_exit(NULL); }
int pthread_attr_getstacksize(const pthread_attr_t *restrict attr, size_t *restrict stacksize); int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize); int pthread_attr_getstackaddr(const pthread_attr_t *restrict attr, void **restrict stackaddr); int pthread_attr_setstackaddr(pthread_attr_t *attr, void *stackaddr);
每个线程都有各自独立的 stack, pthread_attr_getstackaddr
和 pthread_attr_setstackaddr
分别获取和设置线程的 stack 属性.
#include <pthread.h> #include <cstdio> #include <cstdlib> pthread_attr_t attr; void *ThreadProc(void *param) { int id; size_t thread_stack_size; id = *(static_cast<int *>(param)); pthread_attr_getstacksize(&attr, &thread_stack_size); printf("thread %d: stack size = %d\n", id, thread_stack_size); for (int i = 0; i < 10; ++i) { printf("thread %d: run %d \n", id, i); } pthread_exit(NULL); } int main(int argc, char *argv[]) { const int kNumThreads = 4; const int kThround = 1000; pthread_t threads[kNumThreads]; int thread_ids[kNumThreads]; size_t stack_size; pthread_attr_init(&attr); pthread_attr_getstacksize(&attr, &stack_size); printf("Default stack size = %d\n", stack_size); stack_size = sizeof(double) * kThround * kThround; printf("Setting stack size = %d\n", stack_size); pthread_attr_setstacksize(&attr, stack_size); for (int i = 0; i < kNumThreads; ++i) { thread_ids[i] = i; int rt = pthread_create(&threads[i], &attr, ThreadProc, static_cast<void *>(&thread_ids[i])); if (rt) { printf("ERROR: pthread_create failed, rt=%d\n", rt); exit(1); } } pthread_exit(NULL); pthread_attr_destroy(&attr); return 0; }
pthread_t pthread_self(void); int pthread_equal(pthread_t t1, pthread_t t2); int pthread_once(pthread_once_t *once_control, void (*init_routine)(void)); pthread_once_t once_control = PTHREAD_ONCE_INIT;
pthread_self
返回调用线程的唯一 thread ID.pthread_equal
比较两个线程 ID 是否相等.pthread_once
只执行 init_routine
仅仅一次在进程中.Mutex 以”mutual exclusion”(互斥)简称.
Mutex variable 就像一把”锁”一样保护共享数据资源.mutex 的基本概念就是,只有一个线程能 lock 一个 mutex 变量在任何时候.所以,即使很多线程尝试去锁一个 mute,也仅仅只有一个线程能成功.
典型使用 mutex 的顺序如下:
int pthread_mutex_destroy(pthread_mutex_t *mutex); int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; int pthread_mutexattr_destroy(pthread_mutexattr_t *attr); int pthread_mutexattr_init(pthread_mutexattr_t *attr);
Mutex 变量由 pthread_mutex_t
声明定义,而且必须初始化在使用前.两种方法初始:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_init()
函数,并能设置 mutex 的属性 attr
.attr
用来设置 mutex 变量的属性,必须是 pthread_mutexattr_t
类型.Pthread 标准中定义的 3 种可选 mutex 属性:
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
pthread_mutex_lock()
函数被用来获取传入的 mutex 变量,如果 mutex 已经被其他线程占用,那么这个调用就阻塞调用线程,使它进入睡眠等待这个 mutex 直到它被释放.
pthread_mutex_trylock()
仅尝试获取锁,若不成功也立即返回’busy’信号.
#include <pthread.h> #include <cstdio> #include <cstdlib> struct ThreadData { int tid; int data; }; int shared_x; pthread_mutex_t lock; void *ThreadProc(void *param) { ThreadData *data = static_cast<ThreadData *>(param); printf("begin from thread id: %d\n", data->tid); pthread_mutex_lock(&lock); shared_x += data->data; printf("thread %d: x = %d\n", data->tid, shared_x); pthread_mutex_unlock(&lock); pthread_exit(NULL); } int main(int argc, char *argv[]) { const int kNumThreads = 4; pthread_t threads[kNumThreads]; ThreadData threads_data[kNumThreads]; pthread_attr_t attr; shared_x = 0; pthread_mutex_init(&lock, NULL); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i = 0; i < kNumThreads; ++i) { threads_data[i].tid = i; threads_data[i].data = i * i; int rt = pthread_create(&threads[i], &attr, ThreadProc, static_cast<void *>(&threads_data[i])); if (rt) { printf("ERROR: pthread_create failed, rt=%d\n", rt); exit(1); } } for (int i = 0; i < kNumThreads; ++i) { void *status; pthread_join(threads[i], &status); } pthread_attr_destroy(&attr); pthread_exit(NULL); return 0; }
Mutex 变量如锁一般防止多个线程访问共享数据资源,如果某个线程等待某个共享数据达到某个数值才进行相应的操作,那么这个线程需要不断的去 poll,查看是否满足需要的值,这样开销很大,因为线程需要一直处于忙状态.
引入 Condition Variables 来完成这样的同步到某个实际数据值而不要不断 poll.
Condition 变量一般与 mutex 一起使用.锁住查看的共享数据资源.
使用 Condition 的一般步骤如下:
int pthread_cond_destroy(pthread_cond_t *cond); int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); int pthread_condattr_destroy(pthread_condattr_t *attr); int pthread_condattr_init(pthread_condattr_t *attr);
Condition 变量由 pthread_cond_t
声明定义,而且必须初始化在使用前.两种方法初始:
pthread_cond_t convar = PTHREAD_COND_INITIALIZER;
pthread_cond_init()
函数,并能设置 condition 的属性 attr
.attr
用来设置 condition 变量的属性,必须是 pthread_condattr_t
类型.只有一种属性可选:是否进程共享,也就是允许其他进程中的线程也能看到它.
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_wait()
阻塞调用它的线程直到其中 cond
被 signal.这个函数需要在占有 mutex 时被调用,而它将 自动释放 mutex 当它等待时.等到 signal 收到,线程被唤醒, mutex 将 自动被占有 .最后当线程完成 condition 的操作,要负责对 mutex 解锁.
pthread_cond_signal()
用来 signal 其他等待这个 cond
的线程.它需要在占有 mutex 时被调用.然后必须对 mutex 解锁来完成 pthread_cond_wait
的等待.
如果有多余一个线程处于等待 cond
而阻塞, 应该用 pthread_cond_broadcast()
替换 pthread_cond_signal()
.
#include <pthread.h> #include <cstdio> #include <cstdlib> #include <unistd.h> const int kNumThreads = 3; const int kLoops = 10; const int kCountLimit = 15; int g_count; pthread_mutex_t count_mutex; pthread_cond_t count_cv; void *IncreaseCount(void *param) { int id; id = *(static_cast<int *>(param)); for (int i = 0; i < kLoops; ++i) { pthread_mutex_lock(&count_mutex); g_count++; if (g_count == kCountLimit) { pthread_cond_signal(&count_cv); printf("increse thread %d: count = %d, signal cond\n", id, g_count); } printf("increse thread %d: count = %d, unlock mutex\n", id, g_count); pthread_mutex_unlock(&count_mutex); sleep(1); } pthread_exit(NULL); } void *WatchCount(void *param) { int id; id = *(static_cast<int *>(param)); pthread_mutex_lock(&count_mutex); while (g_count < kCountLimit) { pthread_cond_wait(&count_cv, &count_mutex); printf("watch thread %d: count = %d, receive signal\n", id, g_count); } pthread_mutex_unlock(&count_mutex); pthread_exit(NULL); } int main(int argc, char *argv[]) { pthread_t threads[kNumThreads]; int thread_ids[kNumThreads]; pthread_attr_t attr; pthread_mutex_init(&count_mutex, NULL); pthread_cond_init(&count_cv, NULL); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i = 0; i < kNumThreads; ++i) { thread_ids[i] = i; } int rt; rt = pthread_create(&threads[0], &attr, WatchCount, static_cast<void *>(&thread_ids[0])); if (rt) { printf("ERROR: pthread_create failed, rt=%d\n", rt); exit(1); } rt = pthread_create(&threads[1], &attr, IncreaseCount, static_cast<void *>(&thread_ids[1])); if (rt) { printf("ERROR: pthread_create failed, rt=%d\n", rt); exit(1); } rt = pthread_create(&threads[2], &attr, IncreaseCount, static_cast<void *>(&thread_ids[2])); if (rt) { printf("ERROR: pthread_create failed, rt=%d\n", rt); exit(1); } for (int i = 0; i < kNumThreads; ++i) { pthread_join(threads[i], NULL); } pthread_attr_destroy(&attr); pthread_cond_destroy(&count_cv); pthread_mutex_destroy(&count_mutex); pthread_exit(NULL); }
Barrier 就是栅栏一样,调用等待 barrier 的线程需要等待直到满足调用 barrier 的线程个数达到要求的 count
.
int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned count); pthread_barrier_t barrier = PTHREAD_BARRIER_INITIALIZER(count); int pthread_barrier_destroy(pthread_barrier_t *barrier); int pthread_barrierattr_init(pthread_barrierattr_t *attr); int pthread_barrierattr_destroy(pthread_barrierattr_t *attr); int pthread_barrier_wait(pthread_barrier_t *barrier);
Barrier 变量由 pthread_barrier_t
声明定义,而且必须初始化在使用前.需要传入满足 barrier 等待的个数 count
, 两种方法初始:
pthread_barrier_t barrier = PTHREAD_BARRIER_INITIALIZER(count);
pthread_barrier_init()
函数,并能设置 barrier 的属性 attr
.线程调用 barrier,只需要调用 pthread_barrier_wait
来等待 barrier 达到满足条件.
标签:tutorials ogr adc 显示 == async 就会 改变 递归
原文地址:http://www.cnblogs.com/dk666/p/7341020.html