标签:接口 rom users nod res 基础 create until free
原理在网上资料很多,这里不展开。
简单点说,动态申请的每线程变量。有一类比较熟悉的每线程变量是一个带__thread的每线程变量,两者的区别在于,TLS这类每线程变量是动态申请的。有以下一系列接口:
#include <pthread.h>
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));
int pthread_key_delete(pthread_key_t key);
int pthread_setspecific(pthread_key_t key, const void *value);
void *pthread_getspecific(pthread_key_t key);
一般使用方法是
struct rcu_reader {
/* Data used by both reader and synchronize_rcu() */
unsigned long ctr;
/* Data used for registry */
struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
pthread_t tid;
int alloc; /* registry entry allocated */
};
其中的ctr代表当前线程的锁状态,tid表示当前线程号,alloc是一个标记位,代表该锁状态结构体有没有被使用;因为要保存每线程的当前锁状态,bp代码中是以数组的形式访问的,第一次申请8个,第二次申请16个,第三次申请32个,以成倍增加的方式扩大。
锁的状态有以下三种,分别为,
enum rcu_state {
RCU_READER_ACTIVE_CURRENT, 当前读者,写者在第一步抓取这些读者
RCU_READER_ACTIVE_OLD, 老读者,写者在第二步检测这些读者
RCU_READER_INACTIVE, 不在读操作中
};
struct rcu_gp {
/*
* Global grace period counter.
* Contains the current RCU_GP_CTR_PHASE.
* Also has a RCU_GP_COUNT of 1, to accelerate the reader fast path.
* Written to only by writer with mutex taken.
* Read by both writer and readers.
*/
unsigned long ctr;
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
struct rcu_gp rcu_gp = { .ctr = RCU_GP_COUNT };
记录了全局的ctr,默认值为1.
#define RCU_GP_COUNT (1UL << 0) -------------1
/* Use the amount of bits equal to half of the architecture long size */
#define RCU_GP_CTR_PHASE (1UL << (sizeof(long) << 2))-----------64位系统为2的32次,32位系统为2的16次
#define RCU_GP_CTR_NEST_MASK (RCU_GP_CTR_PHASE - 1)----------64位系统0xffffffff,32位系统0xffff
void rcu_bp_register(void);
每个需要使用的bprcu的线程只要调用上面的函数进行注册即可,这个函数的工作是:
static inline void _rcu_read_lock_update(unsigned long tmp)/* tmp是当前线程的ctr */
{
/* 这段表示,当前的线程中ctr中的低位为全0时,取全局的ctr值 */
if (caa_likely(!(tmp & RCU_GP_CTR_NEST_MASK))) {
_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, _CMM_LOAD_SHARED(rcu_gp.ctr));
urcu_bp_smp_mb_slave();
} else
/* 这段表示如果当前线程中的ctr中的低位不为0,ctr值加1, */
_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, tmp + RCU_GP_COUNT);
}
static inline void _rcu_read_unlock(void)
{
unsigned long tmp;
tmp = URCU_TLS(rcu_reader)->ctr;
urcu_assert(tmp & RCU_GP_CTR_NEST_MASK);
/* Finish using rcu before decrementing the pointer. */
urcu_bp_smp_mb_slave();
_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, tmp - RCU_GP_COUNT);
cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */
}
仅仅是在当前的ctr基础上减1.
加锁时,如果当前ctr值为0,也就是初始值,即第一次上锁时,需要将ctr的值更新为全局的ctr值。
解锁时,无条件将该当前ctr值减1,所以加解锁必须是对称的。
参与rcu,上锁状态ctr低位值大于0,空闲状态等于0.
由下面这个函数解析。
static inline enum rcu_state rcu_reader_state(unsigned long *ctr)
{
unsigned long v;
/* 没有使用过,返回未进入临界区 */
if (ctr == NULL)
return RCU_READER_INACTIVE;
/*
* Make sure both tests below are done on the same version of *value
* to insure consistency.
*/
v = CMM_LOAD_SHARED(*ctr);
/* RCU_GP_CTR_NEST_MASK这个值为0xffffffff,即为ctr的低位值为0时,返回未进入临界区 */
if (!(v & RCU_GP_CTR_NEST_MASK))
return RCU_READER_INACTIVE;
/* 先看第三步的非运算,即里面的值为0,则是读状态,否则是老的读状态
再看第二部的与运算,检测第16的值是否为1,为1则是老的读状态,为0则是读状态
最后第一步的异或运算,相等则为0,则肯定是读状态,不相等,则要看第十六位是否相同,相同则为读状态,不相等则为老的读状态
总之,只要ctr的值不为0,就是读状态
*/
if (!((v ^ rcu_gp.ctr) & RCU_GP_CTR_PHASE))
return RCU_READER_ACTIVE_CURRENT;
/* 不识别状态 */
return RCU_READER_ACTIVE_OLD;
}
static void wait_for_readers(struct cds_list_head *input_readers,
struct cds_list_head *cur_snap_readers,
struct cds_list_head *qsreaders)
{
unsigned int wait_loops = 0;
struct rcu_reader *index, *tmp;
/*
* Wait for each thread URCU_TLS(rcu_reader).ctr to either
* indicate quiescence (not nested), or observe the current
* rcu_gp.ctr value.
*/
for (;;) {
if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS)
wait_loops++;
cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
switch (rcu_reader_state(&index->ctr)) {
case RCU_READER_ACTIVE_CURRENT:
if (cur_snap_readers) {
cds_list_move(&index->node,
cur_snap_readers);
break;
}
/* Fall-through */
case RCU_READER_INACTIVE:
cds_list_move(&index->node, qsreaders);
break;
case RCU_READER_ACTIVE_OLD:
/*
* Old snapshot. Leaving node in
* input_readers will make us busy-loop
* until the snapshot becomes current or
* the reader becomes inactive.
*/
break;
}
}
if (cds_list_empty(input_readers)) {
break;
} else {
/* Temporarily unlock the registry lock. */
mutex_unlock(&rcu_registry_lock);
if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)
(void) poll(NULL, 0, RCU_SLEEP_DELAY_MS);
else
caa_cpu_relax();
/* Re-lock the registry lock before the next loop. */
mutex_lock(&rcu_registry_lock);
}
}
}
这个是snap可能是snapshot的缩写,快照的意思,也就是代表当前正在读的线程合影。
RCU_READER_ACTIVE_CURRENT将处与读状态的线程移入正在读链表,不在读状态的线程移入qs链表,其他状态等待其变成正在读或者不读。
void synchronize_rcu(void)
{
CDS_LIST_HEAD(cur_snap_readers);
CDS_LIST_HEAD(qsreaders);
sigset_t newmask, oldmask;
int ret;
ret = sigfillset(&newmask);
assert(!ret);
ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
assert(!ret);
mutex_lock(&rcu_gp_lock);
mutex_lock(&rcu_registry_lock);
if (cds_list_empty(®istry))
goto out;
/* All threads should read qparity before accessing data structure
* where new ptr points to. */
/* Write new ptr before changing the qparity */
smp_mb_master();
/*
* Wait for readers to observe original parity or be quiescent.
* wait_for_readers() can release and grab again rcu_registry_lock
* interally.
*/
wait_for_readers(®istry, &cur_snap_readers, &qsreaders);
/*
* Adding a cmm_smp_mb() which is _not_ formally required, but makes the
* model easier to understand. It does not have a big performance impact
* anyway, given this is the write-side.
*/
cmm_smp_mb();
/* Switch parity: 0 -> 1, 1 -> 0 */
CMM_STORE_SHARED(rcu_gp.ctr, rcu_gp.ctr ^ RCU_GP_CTR_PHASE);
/*
* Must commit qparity update to memory before waiting for other parity
* quiescent state. Failure to do so could result in the writer waiting
* forever while new readers are always accessing data (no progress).
* Ensured by CMM_STORE_SHARED and CMM_LOAD_SHARED.
*/
/*
* Adding a cmm_smp_mb() which is _not_ formally required, but makes the
* model easier to understand. It does not have a big performance impact
* anyway, given this is the write-side.
*/
cmm_smp_mb();
/*
* Wait for readers to observe new parity or be quiescent.
* wait_for_readers() can release and grab again rcu_registry_lock
* interally.
*/
wait_for_readers(&cur_snap_readers, NULL, &qsreaders);
/*
* Put quiescent reader list back into registry.
*/
cds_list_splice(&qsreaders, ®istry);
/*
* Finish waiting for reader threads before letting the old ptr being
* freed.
*/
smp_mb_master();
out:
mutex_unlock(&rcu_registry_lock);
mutex_unlock(&rcu_gp_lock);
ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
assert(!ret);
}
第一步,将参与本次rcu活动的线程挑选出来。
第二步,将全局ctr的值翻转,各位0->1, 1->0。全局ctr只有两种取值情况,
1和0x1 0000 0001。
第三步,等待所有读者退出。
第四步,同步等待结束。
异步与同步的区别在于写者是等待所有读者退出,本线程执行写操作,还是由call_rcu线程等待所有写者退出,由call_rcu线程执行写操作。因为没有实际研究这段代码,直接看调用栈
#0 0x000000fff066ab90 in syscall () from /lib/libc.so.6
(gdb) bt
#0 0x000000fff066ab90 in syscall () from /lib/libc.so.6
#1 0x000000fff08f7ea8 in smp_mb_master ()
at ../userspace-rcu-0.9.3/urcu.c:165
#2 0x000000fff08f817c in wait_for_readers (
input_readers=0xfff090e5f0 <registry>, cur_snap_readers=0xff60dfe4b8,
qsreaders=0xff60dfe4c8) at ../userspace-rcu-0.9.3/urcu.c:290
#3 0x000000fff08f84ec in synchronize_rcu_memb ()
at ../userspace-rcu-0.9.3/urcu.c:426
#4 0x000000fff08f9b50 in call_rcu_thread (arg=0xff640030b0)
at ../userspace-rcu-0.9.3/urcu-call-rcu-impl.h:362
#5 0x000000fff0a22040 in start_thread () from /lib/libpthread.so.0
#6 0x000000fff066f5c4 in __thread_start () from /lib/libc.so.6
与同步的操作类型,也是通过wait_for_readers的调用等待读者退出
为了实现读者从不阻塞,而写者应该区别出这位读者是否是本次的读者。
假设存在两个读者群,一个写者。读者群1正在读操作中,写者进入等待读者状态,读者群2也进入了临界区,但是读者群2不应该被列入本次的探测中。
再看一次锁状态的解析
if (!(v & RCU_GP_CTR_NEST_MASK))
return RCU_READER_INACTIVE;
if (!((v ^ rcu_gp.ctr) & RCU_GP_CTR_PHASE))
return RCU_READER_ACTIVE_CURRENT;
return RCU_READER_ACTIVE_OLD;
第一段还是原来的逻辑,没参与本地读写操作。
第二段的操作,具体分析,
Ctr值 |
全局ctr值 |
结果 |
1 |
1 |
RCU_READER_ACTIVE_CURRENT |
1 |
0x1 0000 0001 |
RCU_READER_ACTIVE_OLD |
2 |
1 |
RCU_READER_ACTIVE_CURRENT |
0x1 0000 0001 |
1 |
RCU_READER_ACTIVE_OLD |
0x1 0000 0001 |
0x1 0000 0001 |
RCU_READER_ACTIVE_CURRENT |
所以只要非0,都是读状态,只是区别出是写者开始等待前的读者还是写者开始等待后的读者。
有2个线程,一个写,一个读。全局ctr为1
标签:接口 rom users nod res 基础 create until free
原文地址:http://www.cnblogs.com/leo0000/p/7478644.html