标签:blank hand targe efault let 自己 register print var
int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0
本文主要是分析代码,方便自己日后查阅.
=========================================
1. socket类型
每个socket类型有一个类与之对应. 所有的这些类都继承于socket_base_t.各子类的继承关系图请查看笔记一.
class socket_base_t :
public own_t,
public array_item_t <>,
public i_poll_events,
public i_pipe_events
{
friend class reaper_t;
public:
......
int send (zmq::msg_t *msg_, int flags_);
int recv (zmq::msg_t *msg_, int flags_);
int add_signaler (signaler_t *s);
int remove_signaler (signaler_t *s);
int close ();
// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
bool has_out ();
......
// i_poll_events implementation. This interface is used when socket
// is handled by the poller in the reaper thread.
void in_event ();
void out_event ();
void timer_event (int id_);
......
protected:
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false);
virtual ~socket_base_t ();
.....
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
virtual int xsend (zmq::msg_t *msg_);
// The default implementation assumes that recv in not supported.
virtual bool xhas_in ();
virtual int xrecv (zmq::msg_t *msg_);
......
private:
// Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
// Map of open endpoints.
typedef std::pair <own_t *, pipe_t*> endpoint_pipe_t;
typedef std::multimap <std::string, endpoint_pipe_t> endpoints_t;
endpoints_t endpoints;
// Map of open inproc endpoints.
typedef std::multimap <std::string, pipe_t *> inprocs_t;
inprocs_t inprocs;
// Moves the flags from the message to local variables,
// to be later retrieved by getsockopt.
void extract_flags (msg_t *msg_);
......
int process_commands (int timeout_, bool throttle_);
// Socket‘s mailbox object.
i_mailbox *mailbox;
// List of attached pipes.
typedef array_t <pipe_t, 3> pipes_t;
pipes_t pipes;
// Reaper‘s poller and handle of this socket within it.
poller_t *poller;
poller_t::handle_t handle;
......
};
socket_base_t这个父类做了大部分逻辑,子类再按需实现函数重载. 拿req_t为例, req_t继承dealer_t,dealer_t继承socket_base_t. 子类以实现xsend/xrecv等带x前缀的重载函数为主,而父类socket_base_t对外暴露的是不带前缀x的函数.
class req_t : public dealer_t
{
public:
req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~req_t ();
// Overrides of functions from socket_base_t.
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xpipe_terminated (zmq::pipe_t *pipe_);
protected:
......
private:
......
// The pipe the request was sent to and where the reply is expected.
zmq::pipe_t *reply_pipe;
......
req_t (const req_t&);
const req_t &operator = (const req_t&);
};
2.mailbox
基类socket_base_t有一个成员变量 i_mailbox *mailbox. 这就是socket的邮箱了,所有投递给socket的命令消息command_t都会放到这个邮箱的队列里.
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
own_t (parent_, tid_),
......
thread_safe (thread_safe_),
reaper_signaler (NULL)
{
options.socket_id = sid_;
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
if (thread_safe)
mailbox = new mailbox_safe_t(&sync);
else {
mailbox_t *m = new mailbox_t();
if (m->get_fd () != retired_fd)
mailbox = m;
else {
LIBZMQ_DELETE (m);
mailbox = NULL;
}
}
}
由构造函数可知,mailbox是有线程安全的分别的, mailbox_safe_t和mailbox_t都是mutex_t sync作为访问互斥. 这是因为mailbox的消息队列 ypipe_t是无锁链表,读写需要同步,ypipe_t更详细的实现和分析可参考这篇博客.
// The pipe to store actual commands.
typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t;
cpipe_t cpipe;
邮箱的sync在mailbox_safe_t是以socket_base_t的sync指针来初始化的,而mailbox_t则是独立于socket本身的.
对于mailbox_t来说,任意时刻只能有一个线程去读它的命令消息队列,读消息不用加锁,并只需要一个signaler去通知读线程; 而写入消息队列时,却可能有多个线程写,所以需要在写入队列时加锁互斥.
// Signaler to pass signals from writer thread to reader thread.
signaler_t signaler;
对于mailbox_safe_t则是根据对socket本身的互斥访问来读写它的命令消息队列,并且有多个signaler来通知可读状态.
std::vector <zmq::signaler_t* > signalers;
实际上读的时候它使用的是pthread_cond_wait和pthread_cond_broadcast的组合来获得锁.
void zmq::mailbox_safe_t::send (const command_t &cmd_)
{
sync->lock ();
cpipe.write (cmd_, false);
const bool ok = cpipe.flush ();
if (!ok) {
cond_var.broadcast (); //调用pthread_cond_broadcast唤醒正在等待pthread_cond_wait返回的读线程
for (std::vector<signaler_t*>::iterator it = signalers.begin(); it != signalers.end(); ++it){
(*it)->send(); //唤醒各个reader有消息可读
}
}
sync->unlock ();
}
int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
{
// Try to get the command straight away.
if (cpipe.read (cmd_)) //无锁队列,能获取消息则必定由一个线程取出,compare_and_swap原子操作
return 0;
// Wait for signal from the command sender.
int rc = cond_var.wait (sync, timeout_); //获取sync的锁,并休眠等待pthread_cond_broadcast信号唤醒; 注意,pthread_cond_wait返回后,其实同时也获得了sync的锁
if (rc == -1) {
errno_assert (errno == EAGAIN || errno == EINTR);
return -1;
}
// Another thread may already fetch the command
const bool ok = cpipe.read (cmd_);
if (!ok) {
errno = EAGAIN;
return -1;
}
return 0;
}
笔者的分析是基于mailbox_t而不是mailbox_safe_t,所以对mailbox_safe_t的使用场合并没有经验研究.
3.signaler
邮箱是否有可待读取的命令消息,依靠signaler来通知.先来看一下这个类结构:
class signaler_t
{
public:
signaler_t ();
~signaler_t ();
fd_t get_fd () const;
void send ();
int wait (int timeout_);
void recv ();
int recv_failable ();
......
private:
// Creates a pair of file descriptors that will be used
// to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles
fd_t w;
fd_t r;
......
};
signaler类主要是提供一对socket句柄(w/r).在支持socketpair的平台下(*nix),可直接调用返回;而在windows平台下,是通过打通w/r两个socket句柄的通信.当有写线程给mailbox发送命令消息时,判断如果持有mailbox的读线程挂起了,就调用mailbox的signaler->send():
void zmq::signaler_t::send ()
{
#if defined HAVE_FORK
if (unlikely (pid != getpid ())) {
//printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
return; // do not send anything in forked child context
}
#endif
#if defined ZMQ_HAVE_EVENTFD
......
#elif defined ZMQ_HAVE_WINDOWS
unsigned char dummy = 0;
int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
wsa_assert (nbytes != SOCKET_ERROR);
zmq_assert (nbytes == sizeof (dummy));
#else
unsigned char dummy = 0;
while (true) {
ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
if (unlikely (nbytes == -1 && errno == EINTR))
continue;
#if defined(HAVE_FORK)
if (unlikely (pid != getpid ())) {
//printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
errno = EINTR;
break;
}
#endif
zmq_assert (nbytes == sizeof dummy);
break;
}
#endif
}
给w发送消息,这样r变成可读状态,挂起的select阻塞调用立即返回. mailbox.get_fd()返回的其实就是mailbox.signaler.r. *请注意*, signaler的w/r套接字句柄是可阻塞的.
对于非线程安全的mailbox_t,对于socket类对象,它们本身并没有I/O线程的loop()轮询函数,那么它的mailbox的可读消息状态是由signaler的r句柄通知,由signaler.wait()函数对r进行select调用,而signaler.wait()是一般是通过socket_base_t:process_commands() -> mailbox_t:recv () -> signaler:wait () 调用链.当mailbox的命令队列为空,r也没可读状态时,signaler:wait (int timeout) ,传入的timeout=-1,由于signaler的w/r是可阻塞的,这时调用process_commands()的线程将会阻塞在wait()的select调用.当然,context的I/O线程依然会继续loop()轮询.
那么阻塞了socket的线程如何被唤醒? 答案是通过给socket的mailbox发送消息.
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_){
......
// Create the socket and register its mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
if (!s) {
empty_slots.push_back (slot);
slot_sync.unlock ();
return NULL;
}
sockets.push_back (s);
slots [slot] = s->get_mailbox ();
......
}
在create_socket这个函数里,为context新增一个socket时,socket的mailbox就加入了slots的数组管理器里. 当I/O线程(或其他知道该scoket的mailbox对应的slot id的实例)给对应的mailbox发送消息,就会唤醒正在阻塞的socket了.
标签:blank hand targe efault let 自己 register print var
原文地址:http://www.cnblogs.com/wqchen/p/6978006.html