首页
Web开发
Windows程序
编程语言
数据库
移动开发
系统相关
微信
其他好文
会员
首页
>
系统相关
> 详细
Memcached源码分析之thread.c
时间:
2016-09-07 19:23:22
阅读:
165
评论:
0
收藏:
0
[点我收藏+]
标签:
/*
* 文件开头先啰嗦几句:
*
* thread.c文件代表的是线程模块。但是你会看到这个模块里面有很多其它方法,
例如关于item的各种操作函数,item_alloc,item_remove,item_link等等。
我们有个items模块,这些不都是items模块要做的事情吗?为什么thread模块也有?
你仔细看会发现,thread里面的这种函数,例如item_remove,items模块里面
都会有一个对应的do_item_remove函数,而thread中的item_remove仅仅是调用
items模块中的do_item_remove,唯一多出来的就是thread在do_item_remove前后
加了加锁和解锁的操作。
其实这是很好的一种设计。
1)因为像"删除item"这样的一个逻辑都是由某个线程,而且这里是工作线程执行,
所以这是一个线程层面的事情。就是说是“某个工作线程去删除item”这样一件事。
2)更重要的是原子性及一致性问题,某个item数据,很有可能同时多个线程在修改,
那么需要加锁,那么锁最应该加在哪个地方?既然问题是线程引起的,那么负责
解决的无疑是线程模块。
3)所以这里像这种函数,thread此时相当于是items的外壳,起调控作用,在线程层面
开放给外部调用,同时在内部加锁。而items模块里面定义的do_xxx函数都不需要多
加考虑,无条件执行对item进行修改,而由外部被调用方来控制。相信很多需要加锁
的项目都会面临这样的问题:锁应该加在哪一层?可以参考memcached这样的设计。
*
*/
#include
"memcached.h"
#include
<assert.h>
#include
<stdio.h>
#include
<errno.h>
#include
<stdlib.h>
#include
<errno.h>
#include
<string.h>
#include
<pthread.h>
#ifdef
__sun
#include
<atomic.h>
#endif
#define
ITEMS_PER_ALLOC
64
/**
下面这个CQ_ITEM结构体:
可以这么理解,主线程accept连接后,把client fd
分发到worker线程的同时会顺带一些与此client连接相关的信息,
而CQ_ITEM是包装了这些信息的一个对象,有点"参数对象"的概念。
记住这货是主线程那边丢过来的。
CQ_ITEM中的CQ虽然是connection queue的缩写,
它与memcached.h中定义的conn结构体是完全不一样的概念,
但worker线程会利用这个CQ_ITEM对象去初始化conn对象
*/
typedef
struct
conn_queue_item CQ_ITEM
;
struct
conn_queue_item
{
int
sfd
;
enum
conn_states init_state
;
int
event_flags
;
int
read_buffer_size
;
enum
network_transport transport
;
CQ_ITEM
*
next
;
};
/*
上面的CQ_ITEM的队列对象,每个worker线程对象都保存着这样一个队列,处理
主线程那边分发过来的连接请求时用到。
*/
typedef
struct
conn_queue CQ
;
struct
conn_queue
{
CQ_ITEM
*
head
;
CQ_ITEM
*
tail
;
pthread_mutex_t
lock
;
};
//下面是各种锁
/**
个人认为这个锁用于锁住全局数量不变的对象,例如slabclass,LRU链表等等
区别于item锁,由于item对象是动态增长的,数量非常多,
item锁是用hash的方式分配一张大大的item锁表来控制锁的粒度
*/
pthread_mutex_t
cache_lock
;
pthread_mutex_t
conn_lock
=
PTHREAD_MUTEX_INITIALIZER
;
//连接锁
#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
pthread_mutex_t
atomics_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
#endif
static
pthread_mutex_t
stats_lock
;
//统计锁
static
CQ_ITEM
*
cqi_freelist
;
static
pthread_mutex_t
cqi_freelist_lock
;
static
pthread_mutex_t
*
item_locks
;
//item锁
static
uint32_t
item_lock_count
;
//item锁总数
static
unsigned
int
item_lock_hashpower
;
//item锁的hash表 指数,锁总数为2的item_lock_hashpower个,见下面的hashsize
#define
hashsize
(
n
)
((
unsigned
long
int
)
1
<<(
n
))
#define
hashmask
(
n
)
(
hashsize
(
n
)-
1
)
static
pthread_mutex_t
item_global_lock
;
static
pthread_key_t
item_lock_type_key
;
static
LIBEVENT_DISPATCHER_THREAD dispatcher_thread
;
static
LIBEVENT_THREAD
*
threads
;
static
int
init_count
=
0
;
//有多少个worker线程已经被初始化
static
pthread_mutex_t
init_lock
;
//初始化锁
static
pthread_cond_t
init_cond
;
//初始化条件变量
static
void
thread_libevent_process
(
int
fd
,
short
which
,
void
*
arg
);
//引用计数加1
unsigned
short
refcount_incr
(
unsigned
short
*
refcount
)
{
#ifdef
HAVE_GCC_ATOMICS
return
__sync_add_and_fetch
(
refcount
,
1
);
#elif
defined
(
__sun
)
return
atomic_inc_ushort_nv
(
refcount
);
#else
unsigned
short
res
;
mutex_lock
(&
atomics_mutex
);
(*
refcount
)++;
res
=
*
refcount
;
mutex_unlock
(&
atomics_mutex
);
return
res
;
#endif
}
//引用计数减1
unsigned
short
refcount_decr
(
unsigned
short
*
refcount
)
{
#ifdef
HAVE_GCC_ATOMICS
return
__sync_sub_and_fetch
(
refcount
,
1
);
#elif
defined
(
__sun
)
return
atomic_dec_ushort_nv
(
refcount
);
#else
unsigned
short
res
;
mutex_lock
(&
atomics_mutex
);
(*
refcount
)--;
res
=
*
refcount
;
mutex_unlock
(&
atomics_mutex
);
return
res
;
#endif
}
void
item_lock_global
(
void
)
{
mutex_lock
(&
item_global_lock
);
}
void
item_unlock_global
(
void
)
{
mutex_unlock
(&
item_global_lock
);
}
void
item_lock
(
uint32_t
hv
)
{
uint8_t
*
lock_type
=
pthread_getspecific
(
item_lock_type_key
);
if
(
likely
(*
lock_type
==
ITEM_LOCK_GRANULAR
))
{
mutex_lock
(&
item_locks
[
hv
&
hashmask
(
item_lock_hashpower
)]);
}
else
{
mutex_lock
(&
item_global_lock
);
}
}
void
*
item_trylock
(
uint32_t
hv
)
{
pthread_mutex_t
*
lock
=
&
item_locks
[
hv
&
hashmask
(
item_lock_hashpower
)];
if
(
pthread_mutex_trylock
(
lock
)
==
0
)
{
return
lock
;
}
return
NULL
;
}
void
item_trylock_unlock
(
void
*
lock
)
{
mutex_unlock
((
pthread_mutex_t
*)
lock
);
}
void
item_unlock
(
uint32_t
hv
)
{
uint8_t
*
lock_type
=
pthread_getspecific
(
item_lock_type_key
);
if
(
likely
(*
lock_type
==
ITEM_LOCK_GRANULAR
))
{
mutex_unlock
(&
item_locks
[
hv
&
hashmask
(
item_lock_hashpower
)]);
}
else
{
mutex_unlock
(&
item_global_lock
);
}
}
static
void
wait_for_thread_registration
(
int
nthreads
)
{
while
(
init_count
<
nthreads
)
{
pthread_cond_wait
(&
init_cond
,
&
init_lock
);
//主线程利用条件变量等待所有worker线程启动完毕
}
}
//worker线程注册函数,主要是统计worker线程完成初始化个数。
static
void
register_thread_initialized
(
void
)
{
pthread_mutex_lock
(&
init_lock
);
init_count
++;
pthread_cond_signal
(&
init_cond
);
pthread_mutex_unlock
(&
init_lock
);
}
//item锁的粒度有几种,这里是切换类型
void
switch_item_lock_type
(
enum
item_lock_types type
)
{
char
buf
[
1
];
int
i
;
switch
(
type
)
{
case
ITEM_LOCK_GRANULAR
:
buf
[
0
]
=
‘l‘
;
break
;
case
ITEM_LOCK_GLOBAL
:
buf
[
0
]
=
‘g‘
;
break
;
default
:
fprintf
(
stderr
,
"Unknown lock type: %d\n"
,
type
);
assert
(
1
==
0
);
break
;
}
pthread_mutex_lock
(&
init_lock
);
init_count
=
0
;
for
(
i
=
0
;
i
<
settings
.
num_threads
;
i
++)
{
if
(
write
(
threads
[
i
].
notify_send_fd
,
buf
,
1
)
!=
1
)
{
perror
(
"Failed writing to notify pipe"
);
/* TODO: This is a fatal problem. Can it ever happen temporarily? */
}
}
wait_for_thread_registration
(
settings
.
num_threads
);
pthread_mutex_unlock
(&
init_lock
);
}
/*
* Initializes a connection queue.
初始化一个CQ对象,CQ结构体和CQ_ITEM结构体的作用见它们定义处。
*/
static
void
cq_init
(
CQ
*
cq
)
{
pthread_mutex_init
(&
cq
->
lock
,
NULL
);
cq
->
head
=
NULL
;
cq
->
tail
=
NULL
;
}
/**
从worker线程的CQ队列里面pop出一个CQ_ITEM对象
*/
static
CQ_ITEM
*
cq_pop
(
CQ
*
cq
)
{
CQ_ITEM
*
item
;
pthread_mutex_lock
(&
cq
->
lock
);
item
=
cq
->
head
;
if
(
NULL
!=
item
)
{
cq
->
head
=
item
->
next
;
if
(
NULL
==
cq
->
head
)
cq
->
tail
=
NULL
;
}
pthread_mutex_unlock
(&
cq
->
lock
);
return
item
;
}
/**
push一个CQ_ITEM对象到worker线程的CQ队列中
*/
static
void
cq_push
(
CQ
*
cq
,
CQ_ITEM
*
item
)
{
item
->
next
=
NULL
;
pthread_mutex_lock
(&
cq
->
lock
);
if
(
NULL
==
cq
->
tail
)
cq
->
head
=
item
;
else
cq
->
tail
->
next
=
item
;
cq
->
tail
=
item
;
pthread_mutex_unlock
(&
cq
->
lock
);
}
/*
* Returns a fresh connection queue item.
分配一个CQ_ITEM对象
*/
static
CQ_ITEM
*
cqi_new
(
void
)
{
CQ_ITEM
*
item
=
NULL
;
pthread_mutex_lock
(&
cqi_freelist_lock
);
if
(
cqi_freelist
)
{
item
=
cqi_freelist
;
cqi_freelist
=
item
->
next
;
}
pthread_mutex_unlock
(&
cqi_freelist_lock
);
if
(
NULL
==
item
)
{
int
i
;
/* Allocate a bunch of items at once to reduce fragmentation */
item
=
malloc
(
sizeof
(
CQ_ITEM
)
*
ITEMS_PER_ALLOC
);
if
(
NULL
==
item
)
{
STATS_LOCK
();
stats
.
malloc_fails
++;
STATS_UNLOCK
();
return
NULL
;
}
for
(
i
=
2
;
i
<
ITEMS_PER_ALLOC
;
i
++)
item
[
i
-
1
].
next
=
&
item
[
i
];
pthread_mutex_lock
(&
cqi_freelist_lock
);
item
[
ITEMS_PER_ALLOC
-
1
].
next
=
cqi_freelist
;
cqi_freelist
=
&
item
[
1
];
pthread_mutex_unlock
(&
cqi_freelist_lock
);
}
return
item
;
}
/*
* Frees a connection queue item (adds it to the freelist.)
*/
static
void
cqi_free
(
CQ_ITEM
*
item
)
{
pthread_mutex_lock
(&
cqi_freelist_lock
);
item
->
next
=
cqi_freelist
;
cqi_freelist
=
item
;
pthread_mutex_unlock
(&
cqi_freelist_lock
);
}
/*
创建并启动worker线程,在thread_init主线程初始化时调用
*/
static
void
create_worker
(
void
*(*
func
)(
void
*),
void
*
arg
)
{
pthread_t
thread
;
pthread_attr_t
attr
;
int
ret
;
pthread_attr_init
(&
attr
);
if
((
ret
=
pthread_create
(&
thread
,
&
attr
,
func
,
arg
))
!=
0
)
{
fprintf
(
stderr
,
"Can‘t create thread: %s\n"
,
strerror
(
ret
));
exit
(
1
);
}
}
void
accept_new_conns
(
const
bool
do_accept
)
{
pthread_mutex_lock
(&
conn_lock
);
do_accept_new_conns
(
do_accept
);
pthread_mutex_unlock
(&
conn_lock
);
}
/****************************** LIBEVENT THREADS *****************************/
/*
* 装备worker线程,worker线程的event_base在此设置
*/
static
void
setup_thread
(
LIBEVENT_THREAD
*
me
)
{
me
->
base
=
event_init
();
//为每个worker线程分配自己的event_base
if
(!
me
->
base
)
{
fprintf
(
stderr
,
"Can‘t allocate event base\n"
);
exit
(
1
);
}
/* Listen for notifications from other threads */
event_set
(&
me
->
notify_event
,
me
->
notify_receive_fd
,
EV_READ
|
EV_PERSIST
,
thread_libevent_process
,
me
);
//监听管道接收fd,这里即监听
//来自主线程的消息,事件处理函数为thread_libevent_process
event_base_set
(
me
->
base
,
&
me
->
notify_event
);
if
(
event_add
(&
me
->
notify_event
,
0
)
==
-
1
)
{
fprintf
(
stderr
,
"Can‘t monitor libevent notify pipe\n"
);
exit
(
1
);
}
me
->
new_conn_queue
=
malloc
(
sizeof
(
struct
conn_queue
));
//CQ_ITEM队列
if
(
me
->
new_conn_queue
==
NULL
)
{
perror
(
"Failed to allocate memory for connection queue"
);
exit
(
EXIT_FAILURE
);
}
cq_init
(
me
->
new_conn_queue
);
//初始化CQ_ITEM对象队列
if
(
pthread_mutex_init
(&
me
->
stats
.
mutex
,
NULL
)
!=
0
)
{
perror
(
"Failed to initialize mutex"
);
exit
(
EXIT_FAILURE
);
}
me
->
suffix_cache
=
cache_create
(
"suffix"
,
SUFFIX_SIZE
,
sizeof
(
char
*),
NULL
,
NULL
);
if
(
me
->
suffix_cache
==
NULL
)
{
fprintf
(
stderr
,
"Failed to create suffix cache\n"
);
exit
(
EXIT_FAILURE
);
}
}
/*
* 这里主要是让worker线程进入event_base_loop
*/
static
void
*
worker_libevent
(
void
*
arg
)
{
LIBEVENT_THREAD
*
me
=
arg
;
/* Any per-thread setup can happen here; thread_init() will block until
* all threads have finished initializing.
*/
/* set an indexable thread-specific memory item for the lock type.
* this could be unnecessary if we pass the conn *c struct through
* all item_lock calls...
*/
me
->
item_lock_type
=
ITEM_LOCK_GRANULAR
;
pthread_setspecific
(
item_lock_type_key
,
&
me
->
item_lock_type
);
//每一个worker线程进入loop,全局init_count++操作,
//见thread_init函数后面几行代码和wait_for_thread_registration函数,
//主线程通过init_count来确认所有线程都启动完毕。
register_thread_initialized
();
event_base_loop
(
me
->
base
,
0
);
return
NULL
;
}
//主线程分发client fd给worker线程后,同时往管道写入buf,唤醒worker线程调用此函数
static
void
thread_libevent_process
(
int
fd
,
short
which
,
void
*
arg
)
{
LIBEVENT_THREAD
*
me
=
arg
;
CQ_ITEM
*
item
;
char
buf
[
1
];
if
(
read
(
fd
,
buf
,
1
)
!=
1
)
if
(
settings
.
verbose
>
0
)
fprintf
(
stderr
,
"Can‘t read from libevent pipe\n"
);
switch
(
buf
[
0
])
{
case
‘c‘
:
item
=
cq_pop
(
me
->
new_conn_queue
);
//取出主线程丢过来的CQ_ITEM
if
(
NULL
!=
item
)
{
/*
worker线程创建 conn连接对象,注意由主线程丢过来的CQ_ITEM的init_state为conn_new_cmd (TCP情况下)
*/
conn
*
c
=
conn_new
(
item
->
sfd
,
item
->
init_state
,
item
->
event_flags
,
item
->
read_buffer_size
,
item
->
transport
,
me
->
base
);
if
(
c
==
NULL
)
{
if
(
IS_UDP
(
item
->
transport
))
{
fprintf
(
stderr
,
"Can‘t listen for events on UDP socket\n"
);
exit
(
1
);
}
else
{
if
(
settings
.
verbose
>
0
)
{
fprintf
(
stderr
,
"Can‘t listen for events on fd %d\n"
,
item
->
sfd
);
}
close
(
item
->
sfd
);
}
}
else
{
c
->
thread
=
me
;
//设置监听连接的线程为当前worker线程
}
cqi_free
(
item
);
}
break
;
/* we were told to flip the lock type and report in */
case
‘l‘
:
me
->
item_lock_type
=
ITEM_LOCK_GRANULAR
;
register_thread_initialized
();
break
;
case
‘g‘
:
me
->
item_lock_type
=
ITEM_LOCK_GLOBAL
;
register_thread_initialized
();
break
;
}
}
void
dispatch_conn_new
(
int
sfd
,
enum
conn_states init_state
,
int
event_flags
,
int
read_buffer_size
,
enum
network_transport transport
)
{
/**
这下面有一个CQ_ITEM结构体,可以这么理解,主线程accept连接后,把client fd
分发到worker线程的同时会顺带一些与此client连接相关的信息,例如dispatch_conn_new的形参上面列的,
而CQ_ITEM是包装了这些信息的一个对象。
CQ_ITEM中的CQ是connection queue的缩写,但它与conn结构体是完全不一样的概念,CQ_ITEM仅仅是把client连接相关的信息
打包成一个对象而已。
*/
CQ_ITEM
*
item
=
cqi_new
();
char
buf
[
1
];
if
(
item
==
NULL
)
{
close
(
sfd
);
/* given that malloc failed this may also fail, but let‘s try */
fprintf
(
stderr
,
"Failed to allocate memory for connection object\n"
);
return
;
}
int
tid
=
(
last_thread
+
1
)
%
settings
.
num_threads
;
LIBEVENT_THREAD
*
thread
=
threads
+
tid
;
//通过简单的轮叫方式选择处理当前client fd的worker线程
last_thread
=
tid
;
//初始化CQ_ITEM对象,即把信息包装
item
->
sfd
=
sfd
;
item
->
init_state
=
init_state
;
item
->
event_flags
=
event_flags
;
item
->
read_buffer_size
=
read_buffer_size
;
item
->
transport
=
transport
;
cq_push
(
thread
->
new_conn_queue
,
item
);
//每个worker线程保存着所有被分发给自己的CQ_ITEM,即new_conn_queue
MEMCACHED_CONN_DISPATCH
(
sfd
,
thread
->
thread_id
);
/*
主线程向处理当前client fd的worker线程管道中简单写进一个‘c‘字符,
由于每个worker线程都监听了管道的receive_fd,于是相应的worker进程收到事件通知,
触发注册的handler,即thread_libevent_process
*/
buf
[
0
]
=
‘c‘
;
if
(
write
(
thread
->
notify_send_fd
,
buf
,
1
)
!=
1
)
{
perror
(
"Writing to thread notify pipe"
);
}
}
int
is_listen_thread
()
{
return
pthread_self
()
==
dispatcher_thread
.
thread_id
;
}
/********************************* ITEM ACCESS *******************************/
/**
下面是一堆关于item操作的函数,具体逻辑代码都放在items::do_xxx相应的地方
就像本文件开头说的,这里主要是加了锁而已
*/
/*
* Allocates a new item.
分配item空间
*/
item
*
item_alloc
(
char
*
key
,
size_t
nkey
,
int
flags
,
rel_time_t
exptime
,
int
nbytes
)
{
item
*
it
;
/* do_item_alloc handles its own locks */
/**
这里比较特殊,与其它item_xxx函数不一样,这里把锁放在do_item_alloc里面做了。
个人猜测是因为do_item_alloc这个逻辑实在有点复杂,甚至加解锁有可能在某个if条件下要发
生,加解锁和逻辑本身代码耦合,所以外部不好加锁。因此把锁交给do_item_alloc内部进行考虑。
*/
it
=
do_item_alloc
(
key
,
nkey
,
flags
,
exptime
,
nbytes
,
0
);
return
it
;
}
/*
* Returns an item if it hasn‘t been marked as expired,
* lazy-expiring as needed.
取得item,上面这里有句英文注释,说返回不超时的item,因为memcached并没有做实时或者定时把
超时item清掉的逻辑,而是用了延迟超时。就是当要用这个item的时候,再来针对这个item做超时处理
*/
item
*
item_get
(
const
char
*
key
,
const
size_t
nkey
)
{
item
*
it
;
uint32_t
hv
;
hv
=
hash
(
key
,
nkey
);
item_lock
(
hv
);
it
=
do_item_get
(
key
,
nkey
,
hv
);
item_unlock
(
hv
);
return
it
;
}
item
*
item_touch
(
const
char
*
key
,
size_t
nkey
,
uint32_t
exptime
)
{
item
*
it
;
uint32_t
hv
;
hv
=
hash
(
key
,
nkey
);
item_lock
(
hv
);
it
=
do_item_touch
(
key
,
nkey
,
exptime
,
hv
);
item_unlock
(
hv
);
return
it
;
}
/*
* Links an item into the LRU and hashtable.
*/
int
item_link
(
item
*
item
)
{
int
ret
;
uint32_t
hv
;
hv
=
hash
(
ITEM_key
(
item
),
item
->
nkey
);
item_lock
(
hv
);
ret
=
do_item_link
(
item
,
hv
);
item_unlock
(
hv
);
return
ret
;
}
void
item_remove
(
item
*
item
)
{
uint32_t
hv
;
hv
=
hash
(
ITEM_key
(
item
),
item
->
nkey
);
item_lock
(
hv
);
do_item_remove
(
item
);
item_unlock
(
hv
);
}
int
item_replace
(
item
*
old_it
,
item
*
new_it
,
const
uint32_t
hv
)
{
return
do_item_replace
(
old_it
,
new_it
,
hv
);
}
/*
* Unlinks an item from the LRU and hashtable.
* 见items::item_unlink
*/
void
item_unlink
(
item
*
item
)
{
uint32_t
hv
;
hv
=
hash
(
ITEM_key
(
item
),
item
->
nkey
);
item_lock
(
hv
);
do_item_unlink
(
item
,
hv
);
item_unlock
(
hv
);
}
/**
主要作用是重置在最近使用链表中的位置,更新最近使用时间,见items::do_item_update
*/
void
item_update
(
item
*
item
)
{
uint32_t
hv
;
hv
=
hash
(
ITEM_key
(
item
),
item
->
nkey
);
item_lock
(
hv
);
do_item_update
(
item
);
item_unlock
(
hv
);
}
enum
delta_result_type add_delta
(
conn
*
c
,
const
char
*
key
,
const
size_t
nkey
,
int
incr
,
const
int64_t
delta
,
char
*
buf
,
uint64_t
*
cas
)
{
enum
delta_result_type ret
;
uint32_t
hv
;
hv
=
hash
(
key
,
nkey
);
item_lock
(
hv
);
ret
=
do_add_delta
(
c
,
key
,
nkey
,
incr
,
delta
,
buf
,
cas
,
hv
);
item_unlock
(
hv
);
return
ret
;
}
/*
* Stores an item in the cache (high level, obeys set/add/replace semantics)
* 保存item信息,主要是调用items::do_store_item,但由于是多线程,所以需求加锁
* store_item是线程上的操作,所以写在thread模块,在此对外开放,而内部加锁。
* 除了store_item函数,其它关于item的操作均如此。
*/
enum
store_item_type store_item
(
item
*
item
,
int
comm
,
conn
*
c
)
{
enum
store_item_type ret
;
uint32_t
hv
;
hv
=
hash
(
ITEM_key
(
item
),
item
->
nkey
);
//锁住item
item_lock
(
hv
);
ret
=
do_store_item
(
item
,
comm
,
c
,
hv
);
item_unlock
(
hv
);
return
ret
;
}
void
item_flush_expired
()
{
mutex_lock
(&
cache_lock
);
do_item_flush_expired
();
mutex_unlock
(&
cache_lock
);
}
char
*
item_cachedump
(
unsigned
int
slabs_clsid
,
unsigned
int
limit
,
unsigned
int
*
bytes
)
{
char
*
ret
;
mutex_lock
(&
cache_lock
);
ret
=
do_item_cachedump
(
slabs_clsid
,
limit
,
bytes
);
mutex_unlock
(&
cache_lock
);
return
ret
;
}
void
item_stats
(
ADD_STAT add_stats
,
void
*
c
)
{
mutex_lock
(&
cache_lock
);
do_item_stats
(
add_stats
,
c
);
mutex_unlock
(&
cache_lock
);
}
void
item_stats_totals
(
ADD_STAT add_stats
,
void
*
c
)
{
mutex_lock
(&
cache_lock
);
do_item_stats_totals
(
add_stats
,
c
);
mutex_unlock
(&
cache_lock
);
}
void
item_stats_sizes
(
ADD_STAT add_stats
,
void
*
c
)
{
mutex_lock
(&
cache_lock
);
do_item_stats_sizes
(
add_stats
,
c
);
mutex_unlock
(&
cache_lock
);
}
/******************************* GLOBAL STATS ******************************/
void
STATS_LOCK
()
{
pthread_mutex_lock
(&
stats_lock
);
}
void
STATS_UNLOCK
()
{
pthread_mutex_unlock
(&
stats_lock
);
}
void
threadlocal_stats_reset
(
void
)
{
int
ii
,
sid
;
for
(
ii
=
0
;
ii
<
settings
.
num_threads
;
++
ii
)
{
pthread_mutex_lock
(&
threads
[
ii
].
stats
.
mutex
);
threads
[
ii
].
stats
.
get_cmds
=
0
;
threads
[
ii
].
stats
.
get_misses
=
0
;
threads
[
ii
].
stats
.
touch_cmds
=
0
;
threads
[
ii
].
stats
.
touch_misses
=
0
;
threads
[
ii
].
stats
.
delete_misses
=
0
;
threads
[
ii
].
stats
.
incr_misses
=
0
;
threads
[
ii
].
stats
.
decr_misses
=
0
;
threads
[
ii
].
stats
.
cas_misses
=
0
;
threads
[
ii
].
stats
.
bytes_read
=
0
;
threads
[
ii
].
stats
.
bytes_written
=
0
;
threads
[
ii
].
stats
.
flush_cmds
=
0
;
threads
[
ii
].
stats
.
conn_yields
=
0
;
threads
[
ii
].
stats
.
auth_cmds
=
0
;
threads
[
ii
].
stats
.
auth_errors
=
0
;
for
(
sid
=
0
;
sid
<
MAX_NUMBER_OF_SLAB_CLASSES
;
sid
++)
{
threads
[
ii
].
stats
.
slab_stats
[
sid
].
set_cmds
=
0
;
threads
[
ii
].
stats
.
slab_stats
[
sid
].
get_hits
=
0
;
threads
[
ii
].
stats
.
slab_stats
[
sid
].
touch_hits
=
0
;
threads
[
ii
].
stats
.
slab_stats
[
sid
].
delete_hits
=
0
;
threads
[
ii
].
stats
.
slab_stats
[
sid
].
incr_hits
=
0
;
threads
[
ii
].
stats
.
slab_stats
[
sid
].
decr_hits
=
0
;
threads
[
ii
].
stats
.
slab_stats
[
sid
].
cas_hits
=
0
;
threads
[
ii
].
stats
.
slab_stats
[
sid
].
cas_badval
=
0
;
}
pthread_mutex_unlock
(&
threads
[
ii
].
stats
.
mutex
);
}
}
void
threadlocal_stats_aggregate
(
struct
thread_stats
*
stats
)
{
int
ii
,
sid
;
/* The struct has a mutex, but we can safely set the whole thing
* to zero since it is unused when aggregating. */
memset
(
stats
,
0
,
sizeof
(*
stats
));
for
(
ii
=
0
;
ii
<
settings
.
num_threads
;
++
ii
)
{
pthread_mutex_lock
(&
threads
[
ii
].
stats
.
mutex
);
stats
->
get_cmds
+=
threads
[
ii
].
stats
.
get_cmds
;
stats
->
get_misses
+=
threads
[
ii
].
stats
.
get_misses
;
stats
->
touch_cmds
+=
threads
[
ii
].
stats
.
touch_cmds
;
stats
->
touch_misses
+=
threads
[
ii
].
stats
.
touch_misses
;
stats
->
delete_misses
+=
threads
[
ii
].
stats
.
delete_misses
;
stats
->
decr_misses
+=
threads
[
ii
].
stats
.
decr_misses
;
stats
->
incr_misses
+=
threads
[
ii
].
stats
.
incr_misses
;
stats
->
cas_misses
+=
threads
[
ii
].
stats
.
cas_misses
;
stats
->
bytes_read
+=
threads
[
ii
].
stats
.
bytes_read
;
stats
->
bytes_written
+=
threads
[
ii
].
stats
.
bytes_written
;
stats
->
flush_cmds
+=
threads
[
ii
].
stats
.
flush_cmds
;
stats
->
conn_yields
+=
threads
[
ii
].
stats
.
conn_yields
;
stats
->
auth_cmds
+=
threads
[
ii
].
stats
.
auth_cmds
;
stats
->
auth_errors
+=
threads
[
ii
].
stats
.
auth_errors
;
for
(
sid
=
0
;
sid
<
MAX_NUMBER_OF_SLAB_CLASSES
;
sid
++)
{
stats
->
slab_stats
[
sid
].
set_cmds
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
set_cmds
;
stats
->
slab_stats
[
sid
].
get_hits
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
get_hits
;
stats
->
slab_stats
[
sid
].
touch_hits
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
touch_hits
;
stats
->
slab_stats
[
sid
].
delete_hits
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
delete_hits
;
stats
->
slab_stats
[
sid
].
decr_hits
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
decr_hits
;
stats
->
slab_stats
[
sid
].
incr_hits
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
incr_hits
;
stats
->
slab_stats
[
sid
].
cas_hits
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
cas_hits
;
stats
->
slab_stats
[
sid
].
cas_badval
+=
threads
[
ii
].
stats
.
slab_stats
[
sid
].
cas_badval
;
}
pthread_mutex_unlock
(&
threads
[
ii
].
stats
.
mutex
);
}
}
void
slab_stats_aggregate
(
struct
thread_stats
*
stats
,
struct
slab_stats
*
out
)
{
int
sid
;
out
->
set_cmds
=
0
;
out
->
get_hits
=
0
;
out
->
touch_hits
=
0
;
out
->
delete_hits
=
0
;
out
->
incr_hits
=
0
;
out
->
decr_hits
=
0
;
out
->
cas_hits
=
0
;
out
->
cas_badval
=
0
;
for
(
sid
=
0
;
sid
<
MAX_NUMBER_OF_SLAB_CLASSES
;
sid
++)
{
out
->
set_cmds
+=
stats
->
slab_stats
[
sid
].
set_cmds
;
out
->
get_hits
+=
stats
->
slab_stats
[
sid
].
get_hits
;
out
->
touch_hits
+=
stats
->
slab_stats
[
sid
].
touch_hits
;
out
->
delete_hits
+=
stats
->
slab_stats
[
sid
].
delete_hits
;
out
->
decr_hits
+=
stats
->
slab_stats
[
sid
].
decr_hits
;
out
->
incr_hits
+=
stats
->
slab_stats
[
sid
].
incr_hits
;
out
->
cas_hits
+=
stats
->
slab_stats
[
sid
].
cas_hits
;
out
->
cas_badval
+=
stats
->
slab_stats
[
sid
].
cas_badval
;
}
}
//初始化主线程
void
thread_init
(
int
nthreads
,
struct
event_base
*
main_base
)
{
int
i
;
int
power
;
pthread_mutex_init
(&
cache_lock
,
NULL
);
pthread_mutex_init
(&
stats_lock
,
NULL
);
pthread_mutex_init
(&
init_lock
,
NULL
);
pthread_cond_init
(&
init_cond
,
NULL
);
pthread_mutex_init
(&
cqi_freelist_lock
,
NULL
);
cqi_freelist
=
NULL
;
/* Want a wide lock table, but don‘t waste memory */
/**
初始化item lock
*/
//调配item锁的数量
//之所以需要锁是因为线程之间的并发,所以item锁的数量当然是根据线程的个数进行调配了。
if
(
nthreads
<
3
)
{
power
=
10
;
//这个power是指数
}
else
if
(
nthreads
<
4
)
{
power
=
11
;
}
else
if
(
nthreads
<
5
)
{
power
=
12
;
}
else
{
/* 8192 buckets, and central locks don‘t scale much past 5 threads */
power
=
13
;
}
item_lock_count
=
hashsize
(
power
);
item_lock_hashpower
=
power
;
item_locks
=
calloc
(
item_lock_count
,
sizeof
(
pthread_mutex_t
));
if
(!
item_locks
)
{
perror
(
"Can‘t allocate item locks"
);
exit
(
1
);
}
for
(
i
=
0
;
i
<
item_lock_count
;
i
++)
{
pthread_mutex_init
(&
item_locks
[
i
],
NULL
);
}
pthread_key_create
(&
item_lock_type_key
,
NULL
);
pthread_mutex_init
(&
item_global_lock
,
NULL
);
//_mark2_1
threads
=
calloc
(
nthreads
,
sizeof
(
LIBEVENT_THREAD
));
//创建worker线程对象
if
(!
threads
)
{
perror
(
"Can‘t allocate thread descriptors"
);
exit
(
1
);
}
//_mark2_3
dispatcher_thread
.
base
=
main_base
;
//设置主线程对象的event_base
dispatcher_thread
.
thread_id
=
pthread_self
();
//设置主线程对象pid
//_mark2_5
for
(
i
=
0
;
i
<
nthreads
;
i
++)
{
//为每个worker线程创建与主线程通信的管道
int
fds
[
2
];
if
(
pipe
(
fds
))
{
perror
(
"Can‘t create notify pipe"
);
exit
(
1
);
}
threads
[
i
].
notify_receive_fd
=
fds
[
0
];
//worker线程管道接收fd
threads
[
i
].
notify_send_fd
=
fds
[
1
];
//worker线程管道写入fd
//_mark2_6
setup_thread
(&
threads
[
i
]);
//装载 worker线程
/* Reserve three fds for the libevent base, and two for the pipe */
stats
.
reserved_fds
+=
5
;
}
/* Create threads after we‘ve done all the libevent setup. */
for
(
i
=
0
;
i
<
nthreads
;
i
++)
{
//_mark2_7
create_worker
(
worker_libevent
,
&
threads
[
i
]);
//启动worker线程,见worker_libevent
}
/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock
(&
init_lock
);
wait_for_thread_registration
(
nthreads
);
//等待所有worker线程启动完毕
pthread_mutex_unlock
(&
init_lock
);
}
Memcached源码分析之thread.c
标签:
原文地址:http://www.cnblogs.com/guolanzhu/p/5850220.html
踩
(
0
)
赞
(
0
)
举报
评论
一句话评论(
0
)
登录后才能评论!
分享档案
更多>
2021年07月29日 (22)
2021年07月28日 (40)
2021年07月27日 (32)
2021年07月26日 (79)
2021年07月23日 (29)
2021年07月22日 (30)
2021年07月21日 (42)
2021年07月20日 (16)
2021年07月19日 (90)
2021年07月16日 (35)
周排行
更多
gitlab 在linux安装环境下存储地址
2021-07-29
当 Mac 未检测到外部显示器时如何修复它
2021-07-29
Ubuntu18.04安装qemu遇到问题-qemu : Depends: qemu-system (>= 1:2.11+dfsg-1ubuntu7)
2021-07-28
[Linux]Shell编程【待续】
2021-07-28
Linux系统资源查看
2021-07-27
Archlinux爬坑指南
2021-07-27
[Linux]Linux发展历程
2021-07-27
非桌面系统 (ubuntu)安装google-chrome
2021-07-27
在Ubuntu18.04系统中源码安装 gcc7.3.0
2021-07-23
Linux快捷键杂记
2021-07-22
友情链接
兰亭集智
国之画
百度统计
站长统计
阿里云
chrome插件
新版天听网
关于我们
-
联系我们
-
留言反馈
© 2014
mamicode.com
版权所有 联系我们:gaon5@hotmail.com
迷上了代码!