标签:enc toc 查找 rgs 端口 时间 部分 利用 mode
Kafka快速入门(十一)——RdKafka源码分析RdKafka将与Kafka Broke的交互、内部实现的操作都封装成Operator结构,然后放入OP处理队列里统一处理。Kafka OP队列是线程间通信的管道。
RdKafka队列定义在rdkafka_queue.h文件中,队列相关操作封装在rdsysqueue.h文件中。
(1)Kafka OP队列
typedef struct rd_kafka_q_s rd_kafka_q_t;
struct rd_kafka_q_s
{
mtx_t rkq_lock;// 队列操作加锁
cnd_t rkq_cond; // 队列中放入新元素时, 用条件变量唤醒相应等待线程
struct rd_kafka_q_s *rkq_fwdq; // Forwarded/Routed queue
struct rd_kafka_op_tailq rkq_q; // 放入队列的操作所存储的队列
int rkq_qlen; /* Number of entries in queue */
int64_t rkq_qsize; /* Size of all entries in queue */
int rkq_refcnt; // 引用计数
int rkq_flags; // 当前队列的状态
rd_kafka_t *rkq_rk;// 队列关联的Kafka Handler对象
struct rd_kafka_q_io *rkq_qio; //队列中放入新元素时,向fd写入数据唤醒等待线程
rd_kafka_q_serve_cb_t *rkq_serve; // 队列中的操作被执行时所执行的回调函数
void *rkq_opaque;
const char *rkq_name; // queue name
};
// Kafka Operator队列,对外接口
typedef struct rd_kafka_queue_s rd_kafka_queue_t;
struct rd_kafka_queue_s
{
rd_kafka_q_t *rkqu_q;// Kafka OP 队列
rd_kafka_t *rkqu_rk;// 队列关联的Kafka Handler
int rkqu_is_owner;
};
rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk)
{
rd_kafka_q_t *rkq;
rd_kafka_queue_t *rkqu;
rkq = rd_kafka_q_new(rk);
rkqu = rd_kafka_queue_new0(rk, rkq);
rd_kafka_q_destroy(rkq);
return rkqu;
}
创建OP队列
rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk)
{
return rd_kafka_queue_new0(rk, rk->rk_rep);
}
获取RdKafka与应用程序交互使用的OP队列
rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) {
if (!rk->rk_cgrp)
return NULL;
return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);
}
获取消费者的OP队列
rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
const char *topic,
int32_t partition) {
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_queue_t *result;
if (rk->rk_type == RD_KAFKA_PRODUCER)
return NULL;
s_rktp = rd_kafka_toppar_get2(rk, topic,
partition,
0, /* no ua_on_miss */
1 /* create_on_miss */);
if (!s_rktp)
return NULL;
rktp = rd_kafka_toppar_s2i(s_rktp);
result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);
rd_kafka_toppar_destroy(s_rktp);
return result;
}
获取Topic的分区的OP队列
rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, rd_ts_t timeout_us,
int32_t version,
rd_kafka_q_cb_type_t cb_type,
rd_kafka_q_serve_cb_t *callback,
void *opaque);
处理OP队列中的一个OP操作,按version过滤的可处理OP,没有则等待,如果超时,函数退出。
int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
rd_kafka_q_cb_type_t cb_type,
rd_kafka_q_serve_cb_t *callback,
void *opaque);
批量处理OP队列的OP
int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
rd_kafka_message_t **rkmessages,
size_t rkmessages_size);
处理RD_KAFKA_OP_FETCH OP操作
int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock);
#define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1/*lock*/)
清除OP队列中的所有OP操作rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk);
获取Kafka Handle的后台OP队列
RaKafka OP操作封装在rdkafka_op.h文件中。
typedef enum
{
RD_KAFKA_OP_NONE, // 未指定类型
RD_KAFKA_OP_FETCH, // Kafka thread -> Application
RD_KAFKA_OP_ERR, // Kafka thread -> Application
RD_KAFKA_OP_CONSUMER_ERR, // Kafka thread -> Application
RD_KAFKA_OP_DR, // Kafka thread->Application:Produce message delivery report
RD_KAFKA_OP_STATS, // Kafka thread -> Application
RD_KAFKA_OP_OFFSET_COMMIT, // any -> toppar‘s Broker thread
RD_KAFKA_OP_NODE_UPDATE, // any -> Broker thread: node update
RD_KAFKA_OP_XMIT_BUF, // transmit buffer: any -> broker thread
RD_KAFKA_OP_RECV_BUF, // received response buffer: broker thr -> any
RD_KAFKA_OP_XMIT_RETRY, // retry buffer xmit: any -> broker thread
RD_KAFKA_OP_FETCH_START, // Application -> toppar‘s handler thread
RD_KAFKA_OP_FETCH_STOP, // Application -> toppar‘s handler thread
RD_KAFKA_OP_SEEK, // Application -> toppar‘s handler thread
RD_KAFKA_OP_PAUSE, // Application -> toppar‘s handler thread
RD_KAFKA_OP_OFFSET_FETCH, // Broker->broker thread: fetch offsets for topic
RD_KAFKA_OP_PARTITION_JOIN, // cgrp op:add toppar to cgrp,broker op:add toppar to broker
RD_KAFKA_OP_PARTITION_LEAVE, // cgrp op:remove toppar from cgrp,broker op:remove toppar from rkb
RD_KAFKA_OP_REBALANCE, // broker thread -> app:group rebalance
RD_KAFKA_OP_TERMINATE, // For generic use
RD_KAFKA_OP_COORD_QUERY, // Query for coordinator
RD_KAFKA_OP_SUBSCRIBE, // New subscription
RD_KAFKA_OP_ASSIGN, // New assignment
RD_KAFKA_OP_GET_SUBSCRIPTION,// Get current subscription Reuses u.subscribe
RD_KAFKA_OP_GET_ASSIGNMENT, // Get current assignment Reuses u.assign
RD_KAFKA_OP_THROTTLE, // Throttle info
RD_KAFKA_OP_NAME, // Request name
RD_KAFKA_OP_OFFSET_RESET, // Offset reset
RD_KAFKA_OP_METADATA, // Metadata response
RD_KAFKA_OP_LOG, // Log
RD_KAFKA_OP_WAKEUP, // Wake-up signaling
RD_KAFKA_OP_CREATETOPICS, // Admin: CreateTopics: u.admin_request
RD_KAFKA_OP_DELETETOPICS, // Admin: DeleteTopics: u.admin_request
RD_KAFKA_OP_CREATEPARTITIONS,// Admin: CreatePartitions: u.admin_request
RD_KAFKA_OP_ALTERCONFIGS, // Admin: AlterConfigs: u.admin_request
RD_KAFKA_OP_DESCRIBECONFIGS, // Admin: DescribeConfigs: u.admin_request
RD_KAFKA_OP_ADMIN_RESULT, // Admin API .._result_t
RD_KAFKA_OP_PURGE, // Purge queues
RD_KAFKA_OP_CONNECT, // Connect (to broker)
RD_KAFKA_OP_OAUTHBEARER_REFRESH, // Refresh OAUTHBEARER token
RD_KAFKA_OP_MOCK, // Mock cluster command
RD_KAFKA_OP_BROKER_MONITOR, // Broker state change
RD_KAFKA_OP_TXN, // Transaction command
RD_KAFKA_OP__END // 操作结束符
} rd_kafka_op_type_t;
rd_kafka_op_type_t枚举类型定义了RaKafka 所有OP操作类型。
typedef enum
{
RD_KAFKA_PRIO_NORMAL = 0, // 正常优先级
RD_KAFKA_PRIO_MEDIUM, // 中级
RD_KAFKA_PRIO_HIGH, // 高级
RD_KAFKA_PRIO_FLASH // 最高优先级:立即
} rd_kafka_prio_t;
rd_kafka_prio_t枚举类型定义了Kafka OP操作的所有优先级。
typedef enum
{
RD_KAFKA_OP_RES_PASS, // Not handled, pass to caller
RD_KAFKA_OP_RES_HANDLED, // Op was handled (through callbacks)
RD_KAFKA_OP_RES_KEEP, // Op已经被回调函数处理,但禁止被op_handle()销毁
RD_KAFKA_OP_RES_YIELD // Callback called yield
} rd_kafka_op_res_t;
rd_kafka_op_res_t枚举类型定义了OP被处理后的返回结果类型,
如果返回RD_KAFKA_OP_RES_YIELD,handler处理函数需要确定是否需要将OP重新入队列还是将OP销毁。
typedef enum
{
RD_KAFKA_Q_CB_INVALID, // 非法,未使用
RD_KAFKA_Q_CB_CALLBACK, // 基于OP触发回调函数
RD_KAFKA_Q_CB_RETURN, // 返回OP而不是触发回调函数
RD_KAFKA_Q_CB_FORCE_RETURN, // 无论是否触发回调函数都返回OP
RD_KAFKA_Q_CB_EVENT // 返回Event OP而不是触发回调函数
} rd_kafka_q_cb_type_t;
rd_kafka_q_cb_type_t枚举类型定义了OP队列中OP操作执行回调函数的所有类型。
OP队列执行回调函数类型定义如下:
typedef rd_kafka_op_res_t
(rd_kafka_q_serve_cb_t) (rd_kafka_t *rk,
struct rd_kafka_q_s *rkq,
struct rd_kafka_op_s *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque);
OP回调函数定义如下:
typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk,
rd_kafka_q_t *rkq,
struct rd_kafka_op_s *rko);
OP执行结果数据结构定义如下:
typedef struct rd_kafka_replyq_s
{
rd_kafka_q_t *q;// OP执行结果存储队列
int32_t version;// 版本
} rd_kafka_replyq_t;
Kafka OP数据结构定义如下:
struct rd_kafka_op_s
{
TAILQ_ENTRY(rd_kafka_op_s) rko_link;// 增加TAILQ字段
rd_kafka_op_type_t rko_type; // OP类型
rd_kafka_event_type_t rko_evtype;// Event类型
int rko_flags; // OP标识
int32_t rko_version;// 版本
rd_kafka_resp_err_t rko_err;//
int32_t rko_len; //
rd_kafka_prio_t rko_prio; // OP优先级
shptr_rd_kafka_toppar_t *rko_rktp;// 关联TopicPartition
rd_kafka_replyq_t rko_replyq;//
rd_kafka_q_serve_cb_t *rko_serve;// OP队列回调函数
void *rko_serve_opaque;// OP队列回调函数参数
rd_kafka_t *rko_rk;// Kafka Handle
rd_kafka_op_cb_t *rko_op_cb; // OP回调函数
union
{
struct
{
rd_kafka_buf_t *rkbuf;
rd_kafka_msg_t rkm;
int evidx;
} fetch;
struct
{
rd_kafka_topic_partition_list_t *partitions;
int do_free; // free .partitions on destroy()
} offset_fetch;
struct
{
rd_kafka_topic_partition_list_t *partitions;
void (*cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque);
void *opaque;
int silent_empty; // Fail silently if there are no offsets to commit.
rd_ts_t ts_timeout;
char *reason;
} offset_commit;
struct
{
rd_kafka_topic_partition_list_t *topics;
} subscribe;
struct
{
rd_kafka_topic_partition_list_t *partitions;
} assign;
struct
{
rd_kafka_topic_partition_list_t *partitions;
} rebalance;
struct
{
char *str;
} name;
struct
{
int64_t offset;
char *errstr;
rd_kafka_msg_t rkm;
int fatal;
} err;
struct
{
int throttle_time;
int32_t nodeid;
char *nodename;
} throttle;
struct
{
char *json;
size_t json_len;
} stats;
struct
{
rd_kafka_buf_t *rkbuf;
} xbuf;
// RD_KAFKA_OP_METADATA
struct
{
rd_kafka_metadata_t *md;
int force; // force request regardless of outstanding metadata requests.
} metadata;
struct
{
shptr_rd_kafka_itopic_t *s_rkt;
rd_kafka_msgq_t msgq;
rd_kafka_msgq_t msgq2;
int do_purge2;
} dr;
struct
{
int32_t nodeid;
char nodename[RD_KAFKA_NODENAME_SIZE];
} node;
struct
{
int64_t offset;
char *reason;
} offset_reset;
struct
{
int64_t offset;
struct rd_kafka_cgrp_s *rkcg;
} fetch_start; // reused for SEEK
struct
{
int pause;
int flag;
} pause;
struct
{
char fac[64];
int level;
char *str;
} log;
struct
{
rd_kafka_AdminOptions_t options;
rd_ts_t abs_timeout; // Absolute timeout
rd_kafka_timer_t tmr; // Timeout timer
struct rd_kafka_enq_once_s *eonce; // 只入队列OP一次,用于触发Broker状态变化的OP请求
rd_list_t args; // Type depends on request, e.g. rd_kafka_NewTopic_t for CreateTopics
rd_kafka_buf_t *reply_buf; // Protocol reply
struct rd_kafka_admin_worker_cbs *cbs;
// Worker state
enum
{
RD_KAFKA_ADMIN_STATE_INIT,
RD_KAFKA_ADMIN_STATE_WAIT_BROKER,
RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER,
RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST,
RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE,
} state;
int32_t broker_id; // Requested broker id to communicate with.
// Application‘s reply queue
rd_kafka_replyq_t replyq;
rd_kafka_event_type_t reply_event_type;
} admin_request;
struct
{
rd_kafka_op_type_t reqtype; // Request op type
char *errstr; // 错误信息
rd_list_t results; // Type depends on request type:
void *opaque; // Application‘s opaque as set by rd_kafka_AdminOptions_set_opaque
} admin_result;
struct
{
int flags; // purge_flags from rd_kafka_purge()
} purge;
// Mock cluster command
struct
{
enum
{
RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR,
RD_KAFKA_MOCK_CMD_TOPIC_CREATE,
RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,
RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
RD_KAFKA_MOCK_CMD_COORD_SET,
RD_KAFKA_MOCK_CMD_APIVERSION_SET,
} cmd;
rd_kafka_resp_err_t err; // Error for:TOPIC_SET_ERROR
char *name; // For:TOPIC_SET_ERROR,TOPIC_CREATE,PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,BROKER_SET_RACK,COORD_SET (key_type)
char *str; // For:COORD_SET (key)
int32_t partition; // For:PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,PART_SET_LEADER,APIVERSION_SET (ApiKey)
int32_t broker_id; // For:PART_SET_FOLLOWER,PART_SET_LEADER,BROKER_SET_UPDOWN,BROKER_SET_RACK,COORD_SET
int64_t lo; // Low offset, for:TOPIC_CREATE (part cnt),PART_SET_FOLLOWER_WMARKS,BROKER_SET_UPDOWN, APIVERSION_SET (minver);
int64_t hi; // High offset, for:TOPIC_CREATE (repl fact),PART_SET_FOLLOWER_WMARKS,APIVERSION_SET (maxver)
} mock;
struct
{
struct rd_kafka_broker_s *rkb; // 状态变化的Broker
void (*cb) (struct rd_kafka_broker_s *rkb);// 要在OP处理线程触发的回调函数
} broker_monitor;
struct
{
rd_kafka_error_t *error; // 错误对象
char *group_id; // 要提交位移的消费者组ID
int timeout_ms; /**< Operation timeout */
rd_ts_t abs_timeout; /**< Absolute time */
rd_kafka_topic_partition_list_t *offsets;// 要提交的位移
} txn;
} rko_u;
};
typedef struct rd_kafka_op_s rd_kafka_event_t;
const char *rd_kafka_op2str (rd_kafka_op_type_t type);
返回OP类型的相应字符串void rd_kafka_op_destroy (rd_kafka_op_t *rko);
销毁OP对象
rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type);
#define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)
生成OP对象
rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk,
rd_kafka_q_t *rkq, rd_kafka_op_t *rko);
调用OP的回调函数
rd_kafka_op_res_t rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq,
rd_kafka_op_t *rko, int cb_type)
{
if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
return RD_KAFKA_OP_RES_PASS;
else if (unlikely(rd_kafka_op_is_ctrl_msg(rko)))
{
rd_kafka_op_offset_store(rk, rko);
return RD_KAFKA_OP_RES_HANDLED;
}
else if (cb_type != RD_KAFKA_Q_CB_EVENT &&
rko->rko_type & RD_KAFKA_OP_CB)
return rd_kafka_op_call(rk, rkq, rko);
else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF)
rd_kafka_buf_handle_op(rko, rko->rko_err);
else if (cb_type != RD_KAFKA_Q_CB_RETURN &&
rko->rko_type & RD_KAFKA_OP_REPLY &&
rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
return RD_KAFKA_OP_RES_HANDLED;
else
return RD_KAFKA_OP_RES_PASS;
return RD_KAFKA_OP_RES_HANDLED;
}
对OP进行标准化处理
rd_kafka_op_res_t
rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque,
rd_kafka_q_serve_cb_t *callback)
{
rd_kafka_op_res_t res;
if (rko->rko_serve)
{
callback = rko->rko_serve;
opaque = rko->rko_serve_opaque;
rko->rko_serve = NULL;
rko->rko_serve_opaque = NULL;
}
res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type);
if (res == RD_KAFKA_OP_RES_KEEP)
{
return res;
}
if (res == RD_KAFKA_OP_RES_HANDLED)
{
rd_kafka_op_destroy(rko);
return res;
}
else if (unlikely(res == RD_KAFKA_OP_RES_YIELD))
return res;
if (callback)
res = callback(rk, rkq, rko, cb_type, opaque);
return res;
}
处理OP
rd_kafka_message_t定义在rdkafka.h文件:
typedef struct rd_kafka_message_s
{
rd_kafka_resp_err_t err; // 非0表示错误消息
rd_kafka_topic_t *rkt; // 关联Topic
int32_t partition; // 分区
void *payload; // 消息数据
size_t len; // err为0表示消息数据长度,非0表示错误信息长度
void *key; // err为0表示消息key
size_t key_len; // err为0表示消息key的长度
int64_t offset; // 位移
void *_private; // 对Consumer,为RdKafka私有指针;对于Producer,为dr_msg_cb
} rd_kafka_message_t;
Kafka Producer生产的数据在application层调用接口后最终会将数据封装成rd_kafka_message_t结构,Consumer从Broker消费的数据回调给application层时也会封装成rd_kafka_message_t结构。
rd_kafka_msg_t和rd_kafka_msgq_t定义在rdkafka_msg.h文件:
typedef struct rd_kafka_msg_s
{
rd_kafka_message_t rkm_rkmessage; // Kafka 消息,必须时第一个字段
TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;// 增加TAILQ字段
int rkm_flags; // 消息类型标识
rd_kafka_timestamp_type_t rkm_tstype; // 消息时间戳
int64_t rkm_timestamp;// V1消息格式的时间戳
rd_kafka_headers_t *rkm_headers;
rd_kafka_msg_status_t rkm_status; // 消息持久化状态
union
{
struct
{
rd_ts_t ts_timeout; // 消息超时
rd_ts_t ts_enq; // 入队列或生产消息时间戳
rd_ts_t ts_backoff;
uint64_t msgid; // 用于保序的消息ID,从1开始
uint64_t last_msgid; //
int retries; // 重试次数
} producer;
#define rkm_ts_timeout rkm_u.producer.ts_timeout
#define rkm_ts_enq rkm_u.producer.ts_enq
#define rkm_msgid rkm_u.producer.msgid
struct
{
rd_kafkap_bytes_t binhdrs;
} consumer;
} rkm_u;
} rd_kafka_msg_t;
TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
typedef struct rd_kafka_msgq_s {
struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */
int32_t rkmq_msg_cnt;
int64_t rkmq_msg_bytes;
} rd_kafka_msgq_t;
Kafka Message队列
static rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
return (rd_kafka_msg_t *)rkmessage;
}
将rd_kafka_message_t类型消息转换为rd_kafka_msg_t类型消息
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
int msgflags,
char *payload, size_t len,
const void *keydata, size_t keylen,
void *msg_opaque);
创建一条新的Kafka消息并将其入对到相应分区的消息队列。static void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);
将src消息队列的所有消息合并到dst消息队列尾部,src会被清空。static void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);
将src消息队列的所有元素移动到dst消息队列,src会被清空static void rd_kafka_msgq_purge (rd_kafka_t *rk, rd_kafka_msgq_t *rkmq);
清空Kafka Handle的消息队列
static rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm,
int do_count);
将rkm消息从消息队列rkmq中删除static rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq);
将rkm消息从消息队列rkmq中删除
int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm);
将rkm消息按照消息ID排序插入rnkmq消息队列static void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);
将rkm消息插入消息队列rkmq头部static int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);
将rkm消息追加到rkmq消息队列
int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp,
rd_kafka_msgq_t *rkmq,
rd_kafka_msgq_t *timedout,
rd_ts_t now,
rd_ts_t *abs_next_timeout);
扫描rkmq消息队列,将超时的消息增加到timeout消息队列,并从rkmq消息队列将其删除。
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
rd_dolock_t do_lock);
对写入rkt主题的rkm消息进行分区分配rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);
从OP操作提取消息rd_kafka_message_t *rd_kafka_message_new (void);
创建空的Kafka消息
Kafka Topic相关封装位于rdkafka_topic.h文件中。
struct rd_kafka_itopic_s
{
TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;
rd_refcnt_t rkt_refcnt; // 引入计数
rwlock_t rkt_lock;
rd_kafkap_str_t *rkt_topic; // Topic名称
shptr_rd_kafka_toppar_t *rkt_ua; // 未分配分区
shptr_rd_kafka_toppar_t **rkt_p; // 拥有TopicPartition的链表
int32_t rkt_partition_cnt; // 分区计数
rd_list_t rkt_desp;
rd_ts_t rkt_ts_metadata; // 最近更新Meta的时间戳
mtx_t rkt_app_lock;
rd_kafka_topic_t *rkt_app_rkt; // Topic对应用层的指针
int rkt_app_refcnt;
enum
{
RD_KAFKA_TOPIC_S_UNKNOWN,
RD_KAFKA_TOPIC_S_EXISTS,
RD_KAFKA_TOPIC_S_NOTEXISTS,
} rkt_state; // Topic状态
int rkt_flags; //
rd_kafka_t *rkt_rk; // Kafka Handle
rd_avg_t rkt_avg_batchsize;
rd_avg_t rkt_avg_batchcnt;
shptr_rd_kafka_itopic_t *rkt_shptr_app;
rd_kafka_topic_conf_t rkt_conf; // Topic配置
};
shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic,
rd_kafka_topic_conf_t *conf,
int *existing, int do_lock);
创建rd_kafka_itopic_s对象void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics);
获取当前rd_kafka_t对象持有的所有topic名字,保存在一个rd_list中void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);
扫描Kafka Handle持有的所有topic的所有分区,筛选出未分配分区的超时消息、需要在Broker上创建的Topic、Meta数据太旧需要被更新的Topic、Leader未知的分区。
static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
int32_t partition_cnt);
更新topic的partition个数,如果分区数量有变化,返回1,否则返回0。
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
rd_kafka_topic_conf_t *conf);
创建Topic对象
static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
rd_kafka_resp_err_t err);
分配未分配分区上的消息到可用分区
int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
int32_t partition);
查询Topic的分区是否可用,即分区是否未Leader
rd_kafka_topic_partition_t定义在rdkafka.h文件中。
typedef struct rd_kafka_topic_partition_s
{
char *topic; // Topic名称
int32_t partition; // 分区
int64_t offset; // 位移
void *metadata; // 元数据
size_t metadata_size;
void *opaque;
rd_kafka_resp_err_t err;
void *_private;
} rd_kafka_topic_partition_t;
typedef struct rd_kafka_topic_partition_list_s {
int cnt; // 当前元数数量
int size; // 分配数组大小
rd_kafka_topic_partition_t *elems; // 数组
} rd_kafka_topic_partition_list_t;
struct rd_kafka_toppar_s
{
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink;
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink;
CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_activelink;
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink;
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_txnlink;
rd_kafka_itopic_t *rktp_rkt;
shptr_rd_kafka_itopic_t *rktp_s_rkt; // 指向Topic对象
int32_t rktp_partition; // 分区
int32_t rktp_leader_id; // 当前Leader ID
int32_t rktp_broker_id; // 当前Broker ID
rd_kafka_broker_t *rktp_leader; // 当前Leader Broker
rd_kafka_broker_t *rktp_broker; // 当前preferred Broker
rd_kafka_broker_t *rktp_next_broker; // 下一个preferred Broker
rd_refcnt_t rktp_refcnt; // 引用计数
mtx_t rktp_lock;
rd_kafka_q_t *rktp_msgq_wakeup_q; // 唤醒消息队列
rd_kafka_msgq_t rktp_msgq; //
rd_kafka_msgq_t rktp_xmit_msgq;
int rktp_fetch;
rd_kafka_q_t *rktp_fetchq; // 从Broker取消息的队列
rd_kafka_q_t *rktp_ops; // 主线程OP队列
rd_atomic32_t rktp_msgs_inflight;
uint64_t rktp_msgid; // 当前/最新消息ID
struct
{
rd_kafka_pid_t pid;
uint64_t acked_msgid;
uint64_t epoch_base_msgid;
int32_t next_ack_seq;
int32_t next_err_seq;
rd_bool_t wait_drain;
} rktp_eos;
rd_atomic32_t rktp_version; // 最新OP版本
int32_t rktp_op_version; // 从Broker收到的当前命令的OP版本
int32_t rktp_fetch_version; // 当前Fetch的OP版本
enum
{
RD_KAFKA_TOPPAR_FETCH_NONE = 0,
RD_KAFKA_TOPPAR_FETCH_STOPPING,
RD_KAFKA_TOPPAR_FETCH_STOPPED,
RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
RD_KAFKA_TOPPAR_FETCH_ACTIVE,
} rktp_fetch_state;
int32_t rktp_fetch_msg_max_bytes;
rd_ts_t rktp_ts_fetch_backoff;
int64_t rktp_query_offset;
int64_t rktp_next_offset;
int64_t rktp_last_next_offset;
int64_t rktp_app_offset; //
int64_t rktp_stored_offset; // 最近存储的位移,可能没有提交
int64_t rktp_committing_offset; // 当前正在提交位移
int64_t rktp_committed_offset; // 最新提交位移
rd_ts_t rktp_ts_committed_offset; // 最新提交位移的时间戳
struct offset_stats rktp_offsets; //
struct offset_stats rktp_offsets_fin; //
int64_t rktp_ls_offset; // 当前最新稳定位移
int64_t rktp_hi_offset; // 当前高水位
int64_t rktp_lo_offset; // 当前低水位
rd_ts_t rktp_ts_offset_lag;
char *rktp_offset_path; // 位移文件路径
FILE *rktp_offset_fp; // 位移文件描述符
rd_kafka_cgrp_t *rktp_cgrp;
int rktp_assigned;
rd_kafka_replyq_t rktp_replyq; //
int rktp_flags; // 分区状态
shptr_rd_kafka_toppar_t *rktp_s_for_desp; // rkt_desp链表指针
shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; // rkcg_toppars链表指针
shptr_rd_kafka_toppar_t *rktp_s_for_rkb; // rkb_toppars链表指针
rd_kafka_timer_t rktp_offset_query_tmr; // 位移查询定时器
rd_kafka_timer_t rktp_offset_commit_tmr; // 位移提交定时器
rd_kafka_timer_t rktp_offset_sync_tmr; // 位移文件同步定时器
rd_kafka_timer_t rktp_consumer_lag_tmr; // 消费者滞后监视定时器
rd_interval_t rktp_lease_intvl; // Preferred副本租约
rd_interval_t rktp_new_lease_intvl; // 创建新的Preferred副本租约的间隔
rd_interval_t rktp_new_lease_log_intvl; //
rd_interval_t rktp_metadata_intvl; // Preferred副本的Meta请求的最大频率
int rktp_wait_consumer_lag_resp;
struct rd_kafka_toppar_err rktp_last_err;
struct
{
rd_atomic64_t tx_msgs; // 生产者发送的消息数量
rd_atomic64_t tx_msg_bytes; // 生产者发送的字节数量
rd_atomic64_t rx_msgs; // 消费者接收的消息数量
rd_atomic64_t rx_msg_bytes; // 消费者消费字节数
rd_atomic64_t producer_enq_msgs; // 生产者入对列的消息数量
rd_atomic64_t rx_ver_drops; // 消费者丢弃过期消息数量
} rktp_c;
};
RdKafka与Broker网络通信不需要支持高并发,因此RdKafka选择了Poll网络IO模型,对transport数据传输层进行了封装。
RdKafka与Kafka Broker间采用TCP连接,因此需要根据Kafka Message协议进行拆包组包:前4个字节是payload长度;payload部分分为header和body两部分,接收数据时先收4字节,即payload长度,再根据payload长度收取payload内容。
rd_kafka_transport_s定义在rdkafka_transport_init.h文件:
struct rd_kafka_transport_s
{
rd_socket_t rktrans_s; // 与Broker通信的Socket fd
rd_kafka_broker_t *rktrans_rkb; // 所连接Broker
struct
{
void *state;
int complete;
struct msghdr msg;
struct iovec iov[2];
char *recv_buf;
int recv_of;
int recv_len;
} rktrans_sasl; // SASL权限验证
rd_kafka_buf_t *rktrans_recv_buf; // 接收数据Buffer
rd_pollfd_t rktrans_pfd[2]; // Poll IO模型的fd:TCP Socket,Wake up fd
int rktrans_pfd_cnt; //
size_t rktrans_rcvbuf_size; // Socket接收数据Buffer大小
size_t rktrans_sndbuf_size; // Socket发送数据Buffer大小
};
typedef struct rd_kafka_transport_s rd_kafka_transport_t;
rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
const rd_sockaddr_inx_t *sinx,
char *errstr,
size_t errstr_size)
{
rd_kafka_transport_t *rktrans;
int s = -1;
int r;
rkb->rkb_addr_last = sinx;
s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,
SOCK_STREAM, IPPROTO_TCP,
rkb->rkb_rk->rk_conf.opaque);
if (s == -1)
{
rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
rd_socket_strerror(rd_socket_errno));
return NULL;
}
rktrans = rd_kafka_transport_new(rkb, s, errstr, errstr_size);
if (!rktrans)
goto err;
rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) "
"with socket %i",
rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY |
RD_SOCKADDR2STR_F_PORT),
rd_kafka_secproto_names[rkb->rkb_proto], s);
/* Connect to broker */
if (rkb->rkb_rk->rk_conf.connect_cb)
{
rd_kafka_broker_lock(rkb); /* for rkb_nodename */
r = rkb->rkb_rk->rk_conf.connect_cb(
s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque);
rd_kafka_broker_unlock(rkb);
}
else
{
if (connect(s, (struct sockaddr *)sinx,
RD_SOCKADDR_INX_LEN(sinx)) == RD_SOCKET_ERROR &&
(rd_socket_errno != EINPROGRESS
))
r = rd_socket_errno;
else
r = 0;
}
if (r != 0)
{
rd_rkb_dbg(rkb, BROKER, "CONNECT",
"couldn‘t connect to %s: %s (%i)",
rd_sockaddr2str(sinx,
RD_SOCKADDR2STR_F_PORT |
RD_SOCKADDR2STR_F_FAMILY),
rd_socket_strerror(r), r);
rd_snprintf(errstr, errstr_size,
"Failed to connect to broker at %s: %s",
rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
rd_socket_strerror(r));
goto err;
}
/* Set up transport handle */
rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;
if (rkb->rkb_wakeup_fd[0] != -1)
{
rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];
}
/* Poll writability to trigger on connection success/failure. */
rd_kafka_transport_poll_set(rktrans, POLLOUT);
return rktrans;
err:
if (s != -1)
rd_kafka_transport_close0(rkb->rkb_rk, s);
if (rktrans)
rd_kafka_transport_close(rktrans);
return NULL;
}
建立与Broker建立的TCP连接,初始化rd_kafka_transport_s对象并返回int rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans, int timeout_ms);
Poll并处理IO操作void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans, int events);
处理IO操作
ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
rd_slice_t *slice,
char *errstr, size_t errstr_size);
系统调用sendmsg方法的封装
ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
rd_slice_t *slice,
char *errstr, size_t errstr_size);
系统调用send方法的封装
ssize_t rd_kafka_transport_recv (rd_kafka_transport_t *rktrans,
rd_buf_t *rbuf,
char *errstr, size_t errstr_size);
系统调用recv方法的封装
rd_kafka_transport_t *rd_kafka_transport_new (rd_kafka_broker_t *rkb,
rd_socket_t s,
char *errstr,
size_t errstr_size);
使用已有Socket创建rd_kafka_transport_t对象int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout);
Poll方法封装
ssize_t rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans,
rd_buf_t *buf,
char *errstr, size_t errstr_size) {
#ifndef _MSC_VER
// Windows系统调用封装
return rd_kafka_transport_socket_recvmsg(rktrans, buf,
errstr, errstr_size);
#endif
// Linux系统调用封装
return rd_kafka_transport_socket_recv0(rktrans, buf,
errstr, errstr_size);
}
Kafka集群的Meta Data包括:所有Broker的信息:IP和Port;
所有Topic的信息:Topic名称,Partition数量,每个Partition的Leader,ISR,Replica集合等。
Kafka集群的每一台Broker都会缓存整个集群的Meta Data,当Broker或某一个Topic的Meta Data信息发生变化时, Kafka集群的Controller都会感知到作相应的状态转换,同时把发生变化的新Meta Data信息广播到所有的Broker。
RdKafka对Meta Data的封装和操作包括Meta Data获取、定时刷新以及引用的操作,如Partition Leader迁移,Partition个数的变化,Broker上下线等等。
Meta Data分为Broker、Topic、Partition三种,定义在rdkafka.h中。
typedef struct rd_kafka_metadata_broker
{
int32_t id; // Broker ID
char *host; // Broker主机名称
int port; // Broker监听端口
} rd_kafka_metadata_broker_t;
typedef struct rd_kafka_metadata_partition
{
int32_t id; // Partition ID
rd_kafka_resp_err_t err; // Broker报告的分区错误
int32_t leader; // 分区Leader Broker
int replica_cnt; // 副本中的Broker数量
int32_t *replicas; // 副本Broker列表
int isr_cnt; // ISR列表中的ISR Broker数量
int32_t *isrs; // ISR Broker列表
} rd_kafka_metadata_partition_t;
/**
* @brief Topic information
*/
typedef struct rd_kafka_metadata_topic
{
char *topic; // Topic名称
int partition_cnt; // 分区数量
struct rd_kafka_metadata_partition *partitions; //
rd_kafka_resp_err_t err; // Broker报告的Topic错误
} rd_kafka_metadata_topic_t;
typedef struct rd_kafka_metadata
{
int broker_cnt; // Broker数量
struct rd_kafka_metadata_broker *brokers; // Broker Meta
int topic_cnt; // Topic数量
struct rd_kafka_metadata_topic *topics; // Topic Meta
int32_t orig_broker_id; // Broker ID
char *orig_broker_name; // Broker名称
} rd_kafka_metadata_t;
rd_kafka_resp_err_t
rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
rd_kafka_topic_t *only_rkt,
const struct rd_kafka_metadata **metadatap,
int timeout_ms);
请求Meta Data数据,阻塞操作
struct rd_kafka_metadata *
rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size);
深度拷贝Meta Data
rd_kafka_resp_err_t rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
struct rd_kafka_metadata **mdp);
处理Meta Data请求响应
size_t
rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos,
const rd_kafka_topic_partition_list_t *match);
从当前缓存的Meta Data中查找与match匹配的Topic,并将其加入tinfos
size_t
rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos,
const rd_kafka_topic_partition_list_t *match);
增加缓存Meta Data中与match匹配的所有Topic到tinfos
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const rd_list_t *topics, int force,
const char *reason);
刷新指定topics的所有Meta Data
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
int force, const char *reason);
刷新已知Topic的Meta Data
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const char *reason);
根据Broker刷新Meta Data
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const char *reason);
刷新集群中所有Topic的Meta Data
rd_kafka_resp_err_t
rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason, rd_kafka_op_t *rko);
Meta Data请求void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);
快速刷新分区Leader的Meta Data
Kafka生产者、消费者客户端对象通过rd_kafka_new函数进行创建,rd_kafka_new源码如下:
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
char *errstr, size_t errstr_size){
...
// 创建conf或指定conf
if (!app_conf)
conf = rd_kafka_conf_new();
else
conf = app_conf;
...
/* Call on_new() interceptors */
rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
...
// 创建队列
rk->rk_rep = rd_kafka_q_new(rk);
rk->rk_ops = rd_kafka_q_new(rk);
rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
rk->rk_ops->rkq_opaque = rk;
...
if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)
rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;
else
rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;
if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)
rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
if (rk->rk_conf.rebalance_cb)
rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
if (rk->rk_conf.offset_commit_cb)
rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
if (rk->rk_conf.error_cb)
rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;
rk->rk_controllerid = -1;
...
if (type == RD_KAFKA_CONSUMER &&
RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
rk->rk_cgrp = rd_kafka_cgrp_new(rk,
rk->rk_group_id,
rk->rk_client_id);
...
// 后台线程和后台事件队列创建
if (rk->rk_conf.background_event_cb)
{
/* Hold off background thread until thrd_create() is done. */
rd_kafka_wrlock(rk);
rk->rk_background.q = rd_kafka_q_new(rk);
rk->rk_init_wait_cnt++;
if ((thrd_create(&rk->rk_background.thread,
rd_kafka_background_thread_main, rk)) != thrd_success)
...
}
/* Create handler thread */
rk->rk_init_wait_cnt++;
if ((thrd_create(&rk->rk_thread, rd_kafka_thread_main, rk)) != thrd_success)
{
...
}
// 启动Logic Broker线程
rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
RD_KAFKA_PROTO_PLAINTEXT,
"", 0, RD_KAFKA_NODEID_UA);
// 根据配置增加Broker
if (rk->rk_conf.brokerlist)
{
if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
"No brokers configured");
}
...
}
rd_kafka_new主要工作如下;
(1)根据配置设置属性;
(2)创建Kafka Handle对象的OP队列;
(3)创建后台线程和后台事件队列;
(4)创建RdKafka主线程,执行rd_kafka_thread_main函数,主线程名称为rdk:main;
(5)创建Broker内部线程;
(6)根据配置创建Broker线程(每个Broker一个线程)。
int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)
{
return rd_kafka_brokers_add0(rk, brokerlist);
}
int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist)
{
...
if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
rkb->rkb_source == RD_KAFKA_CONFIGURED)
{
cnt++;
}
else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED,
proto, host, port,
RD_KAFKA_NODEID_UA) != NULL)
cnt++;
...
}
rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
rd_kafka_confsource_t source,
rd_kafka_secproto_t proto,
const char *name, uint16_t port,
int32_t nodeid)
{
...
thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb);
...
}
static int rd_kafka_broker_thread_main (void *arg)
{
rd_kafka_set_thread_name("%s", rkb->rkb_name);
rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);
...
rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
...
rd_kafka_broker_ops_serve(rkb, RD_POLL_NOWAIT);
...
}
(1)rd_kafka_produce
rd_kafka_produce函数位于rdkafka_msg.c文件:
int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
void *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque) {
return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
msgflags, payload, len,
key, keylen, msg_opaque);
}
(2)rd_kafka_msg_new
rd_kafka_msg_new函数位于rdkafka_msg.c文件:
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
int msgflags,
char *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque)
{
...
// 创建rd_kafka_msg_t消息
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,
payload, len, key, keylen, msg_opaque,
&err, &errnox, NULL, 0, rd_clock());
...
// 对消息进行分区分配
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
...
}
rd_kafka_msg_new内部通过rd_kafka_msg_new0创建Kafka消息,使用rd_kafka_msg_partitioner对Kafka消息进行分区分配。
(3)rd_kafka_msg_partitioner
rd_kafka_msg_partitioner函数位于rdkafka_msg.c文件:
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
rd_dolock_t do_lock)
{
// 获取分区号
...
// 获取分区
s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
...
rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1);
/* Update message partition */
if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
rkm->rkm_partition = partition;
// 将消息入队分区队列
rd_kafka_toppar_enq_msg(rktp_new, rkm);
...
}
rd_kafka_msg_partitioner内部通过通过rd_kafka_toppar_enq_msg将分区加入分区队列。
(4)rd_kafka_toppar_enq_msg
void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm)
{
...
// 入队列
if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO)
{
queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
}
else
{
queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, &rktp->rktp_msgq, rkm);
}
...
}
(5)rd_kafka_msgq_enq
static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm)
{
TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;
return (int)++rkmq->rkmq_msg_cnt;
}
(6)rd_kafka_msgq_enq_sorted
rd_kafka_msgq_enq_sorted函数位于rdkafka_msg.c文件:
int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm)
{
rd_dassert(rkm->rkm_u.producer.msgid != 0);
return rd_kafka_msgq_enq_sorted0(rkmq, rkm,
rkt->rkt_conf.msg_order_cmp);
}
int rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm,
int (*order_cmp) (const void *, const void *))
{
TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,
rkm_link, order_cmp);
rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;
return ++rkmq->rkmq_msg_cnt;
}
队列的操作位于rdsysqueue.h文件中。
rd_kafka_broker_add函数位于rdkafka_broker.c文件:
rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
rd_kafka_confsource_t source,
rd_kafka_secproto_t proto,
const char *name, uint16_t port,
int32_t nodeid)
{
rd_kafka_broker_t *rkb;
rkb = rd_calloc(1, sizeof(*rkb));
// 设置rd_kafka_broker_t对象属性
...
if (thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb) != thrd_success)
{
...
}
}
rd_kafka_broker_add创建Broker线程,启动执行rd_kafka_broker_thread_main函数。
static int rd_kafka_broker_thread_main (void *arg)
{
...
rd_kafka_set_thread_name("%s", rkb->rkb_name);
rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);
...
rd_kafka_broker_serve(rkb, ...);
...
}
static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, int timeout_ms) {
...
if (rkb->rkb_source == RD_KAFKA_INTERNAL)
rd_kafka_broker_internal_serve(rkb, abs_timeout);
else if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
rd_kafka_broker_producer_serve(rkb, abs_timeout);
else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER)
rd_kafka_broker_consumer_serve(rkb, abs_timeout);
}
static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb,
rd_ts_t abs_timeout)
{
//
rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup,
do_timeout_scan);
rd_kafka_broker_ops_io_serve(rkb, next_wakeup);
}
static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,
rd_ts_t abs_timeout)
{
...
rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout));
...
}
static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb,
rd_ts_t timeout_us)
{
rd_kafka_op_t *rko;
int cnt = 0;
while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) &&
(cnt++, rd_kafka_broker_op_serve(rkb, rko)))
timeout_us = RD_POLL_NOWAIT;
return cnt;
}
rdkafka_broker.c文件:
static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice)
{
...
r = rd_kafka_transport_send(rkb->rkb_transport, slice,
errstr, sizeof(errstr));
...
}
rdkafka_transport.c文件:
ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
rd_slice_t *slice, char *errstr, size_t errstr_size)
{
..
r = rd_kafka_transport_socket_send(rktrans, slice,
errstr, errstr_size);
...
}
static ssize_t rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans,
rd_slice_t *slice,
char *errstr, size_t errstr_size) {
#ifndef _MSC_VER
/* FIXME: Use sendmsg() with iovecs if there‘s more than one segment
* remaining, otherwise (or if platform does not have sendmsg)
* use plain send(). */
return rd_kafka_transport_socket_sendmsg(rktrans, slice,
errstr, errstr_size);
#endif
return rd_kafka_transport_socket_send0(rktrans, slice,
errstr, errstr_size);
}
static ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
rd_slice_t *slice,
char *errstr, size_t errstr_size)
{
...
r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT
...
}
(1)开启消息消费
RdKafka提供了rd_kafka_consume_start、rd_kafka_consume、rd_kafka_consume_start_queue、rd_kafka_consume_queue接口用于消息消费。
int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
int64_t offset, rd_kafka_q_t *rkq) {
shptr_rd_kafka_toppar_t *s_rktp;
if (partition < 0) {
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
ESRCH);
return -1;
}
if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
return -1;
}
rd_kafka_topic_wrlock(rkt);
s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
rd_kafka_topic_wrunlock(rkt);
/* Verify offset */
if (offset == RD_KAFKA_OFFSET_BEGINNING ||
offset == RD_KAFKA_OFFSET_END ||
offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
/* logical offsets */
} else if (offset == RD_KAFKA_OFFSET_STORED) {
/* offset manager */
if (rkt->rkt_conf.offset_store_method ==
RD_KAFKA_OFFSET_METHOD_BROKER &&
RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
/* Broker based offsets require a group id. */
rd_kafka_toppar_destroy(s_rktp);
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
EINVAL);
return -1;
}
} else if (offset < 0) {
rd_kafka_toppar_destroy(s_rktp);
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
EINVAL);
return -1;
}
rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
rkq, RD_KAFKA_NO_REPLYQ);
rd_kafka_toppar_destroy(s_rktp);
rd_kafka_set_last_error(0, 0);
return 0;
}
int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
int64_t offset) {
rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
"Start consuming partition %"PRId32,partition);
return rd_kafka_consume_start0(rkt, partition, offset, NULL);
}
int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
int64_t offset, rd_kafka_queue_t *rkqu) {
rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
}
static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
rd_kafka_q_t *rkq,
int timeout_ms) {
rd_kafka_op_t *rko;
rd_kafka_message_t *rkmessage = NULL;
rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
if (timeout_ms)
rd_kafka_app_poll_blocking(rk);
rd_kafka_yield_thread = 0;
while ((rko = rd_kafka_q_pop(rkq,
rd_timeout_remains_us(abs_timeout), 0))) {
rd_kafka_op_res_t res;
res = rd_kafka_poll_cb(rk, rkq, rko,
RD_KAFKA_Q_CB_RETURN, NULL);
if (res == RD_KAFKA_OP_RES_PASS)
break;
if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
rd_kafka_yield_thread)) {
/* Callback called rd_kafka_yield(), we must
* stop dispatching the queue and return. */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
EINTR);
rd_kafka_app_polled(rk);
return NULL;
}
/* Message was handled by callback. */
continue;
}
if (!rko) {
/* Timeout reached with no op returned. */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
ETIMEDOUT);
rd_kafka_app_polled(rk);
return NULL;
}
rd_kafka_assert(rk,
rko->rko_type == RD_KAFKA_OP_FETCH ||
rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
/* Get rkmessage from rko */
rkmessage = rd_kafka_message_get(rko);
/* Store offset */
rd_kafka_op_offset_store(rk, rko);
rd_kafka_set_last_error(0, 0);
rd_kafka_app_polled(rk);
return rkmessage;
}
rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
int32_t partition,
int timeout_ms) {
rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_message_t *rkmessage;
rd_kafka_topic_rdlock(rkt);
s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
if (unlikely(!s_rktp))
s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
rd_kafka_topic_rdunlock(rkt);
if (unlikely(!s_rktp)) {
/* No such toppar known */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
ESRCH);
return NULL;
}
rktp = rd_kafka_toppar_s2i(s_rktp);
rkmessage = rd_kafka_consume0(rkt->rkt_rk,
rktp->rktp_fetchq, timeout_ms);
rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
return rkmessage;
}
rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
int timeout_ms) {
return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
}
(2)Poll轮询消息队列
int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {
int r;
if (timeout_ms)
rd_kafka_app_poll_blocking(rk);
r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
rd_kafka_app_polled(rk);
return r;
}
rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
int timeout_ms) {
rd_kafka_cgrp_t *rkcg;
if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
rd_kafka_message_t *rkmessage = rd_kafka_message_new();
rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
return rkmessage;
}
return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
}
rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {
rd_kafka_op_t *rko;
if (timeout_ms)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
rd_kafka_app_polled(rkqu->rkqu_rk);
if (!rko)
return NULL;
return rko;
}
C++ API主要是对RdKafka C API的封装,根据不同的功能模块封装为不同功能类,类定义在rdkafkacpp.h文件中,并使用RdKafka命名空间进行限定,主要类如下Conf、Handle、TopicPartition、Topic、Message、Queue、KafkaConsumer、Consumer、Producer、BrokerMetadata、PartitionMetadata、TopicMetadata、Metadata、DeliveryReportCb、PartitionerCb、PartitionerKeyPointerCb、EventCb、Event、ConsumeCb:Consume、RebalanceCb、OffsetCommitCb、SocketCb、OpenCb。
Consumer对partition、offset有完全的控制能力;KafkaConsumer提供了Topic订阅接口,默认使用latest消费方式,可以通过assign方法指定开始消费的partition和offset。
(1)Producer创建
RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf,
std::string &errstr) {
char errbuf[512];
RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();
rd_kafka_conf_t *rk_conf = NULL;
if (confimpl) {
if (!confimpl->rk_conf_) {
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
delete rkp;
return NULL;
}
rkp->set_common_config(confimpl);
rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
if (confimpl->dr_cb_) {
rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);
rkp->dr_cb_ = confimpl->dr_cb_;
}
}
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf,
errbuf, sizeof(errbuf)))) {
errstr = errbuf;
// rd_kafka_new() takes ownership only if succeeds
if (rk_conf)
rd_kafka_conf_destroy(rk_conf);
delete rkp;
return NULL;
}
rkp->rk_ = rk;
return rkp;
}
创建Producer时需要准备好Conf对象。
(2)生产消息
RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
int32_t partition,
int msgflags,
void *payload, size_t len,
const std::string *key,
void *msg_opaque) {
RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
payload, len,
key ? key->c_str() : NULL, key ? key->size() : 0,
msg_opaque) == -1)
return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
return RdKafka::ERR_NO_ERROR;
}
生产消息时需要指定Topic对象。
(3)Poll轮询
int RdKafka::HandleImpl::poll(int timeout_ms)
{
return rd_kafka_poll(rk_, timeout_ms);
}
produce生产消息是异步的,将消息放入到内部队列后会立刻返回,因此需要由poll返回最终写入结果。 produce是尽力送达的,会在尝试直至超过message.timeout.ms才汇报失败。
(1)创建KafkaConsumer
RdKafka::KafkaConsumer *RdKafka::KafkaConsumer::create (RdKafka::Conf *conf,
std::string &errstr) {
char errbuf[512];
RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
RdKafka::KafkaConsumerImpl *rkc = new RdKafka::KafkaConsumerImpl();
rd_kafka_conf_t *rk_conf = NULL;
size_t grlen;
if (!confimpl || !confimpl->rk_conf_) {
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
delete rkc;
return NULL;
}
if (rd_kafka_conf_get(confimpl->rk_conf_, "group.id",
NULL, &grlen) != RD_KAFKA_CONF_OK ||
grlen <= 1 /* terminating null only */) {
errstr = "\"group.id\" must be configured";
delete rkc;
return NULL;
}
rkc->set_common_config(confimpl);
rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,
errbuf, sizeof(errbuf)))) {
errstr = errbuf;
// rd_kafka_new() takes ownership only if succeeds
rd_kafka_conf_destroy(rk_conf);
delete rkc;
return NULL;
}
rkc->rk_ = rk;
/* Redirect handle queue to cgrp‘s queue to provide a single queue point */
rd_kafka_poll_set_consumer(rk);
return rkc;
}
(2)订阅Topic
RdKafka::ErrorCode
RdKafka::KafkaConsumerImpl::subscribe (const std::vector<std::string> &topics) {
rd_kafka_topic_partition_list_t *c_topics;
rd_kafka_resp_err_t err;
c_topics = rd_kafka_topic_partition_list_new((int)topics.size());
for (unsigned int i = 0 ; i < topics.size() ; i++)
rd_kafka_topic_partition_list_add(c_topics, topics[i].c_str(),
RD_KAFKA_PARTITION_UA);
err = rd_kafka_subscribe(rk_, c_topics);
rd_kafka_topic_partition_list_destroy(c_topics);
return static_cast<RdKafka::ErrorCode>(err);
}
(3)消费消息
RdKafka::Message *RdKafka::KafkaConsumerImpl::consume (int timeout_ms) {
rd_kafka_message_t *rkmessage;
rkmessage = rd_kafka_consumer_poll(this->rk_, timeout_ms);
if (!rkmessage)
return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT);
return new RdKafka::MessageImpl(rkmessage);
}
RdKafka内部使用多线程对硬件资源进行充分利用,RdKafka API是线程安全的,应用程序可以在任意时间调用其线程内的任意API函数。
每个Producer/Consumer实例会创建线程如下:
(1)应用线程,处理具体应用业务逻辑。
(2)Kafka?Handler线程:每创建一个Producer/Consumer即会创建一个Handler线程,即RdKafka主线程,线程名称为rdk::main,线程执行函数为rd_kafka_thread_main。
(3)Kafka?Broker线程:对于增加到Producer/Consumer的每个Broker会创建一个线程,负责与Broker通信,线程执行函数为rd_kafka_broker_thread_main,线程名称为rdk::brokerxxx。
(4)Inner Broker线程,用于处未分配分区的OP操作队列。
(5)后台线程
如果配置对象设置了background_event_cb,Kafka?Handler创建时会创建相应的后台线程和后台队列,线程执行函数为rd_kafka_background_thread_main。
Linux查看KafkaConsumer进程的线程的方法:
ps -T -p pid
top -H -p pid
Cosnumer线程查看结果:
Producer线程查看结果:
标签:enc toc 查找 rgs 端口 时间 部分 利用 mode
原文地址:https://blog.51cto.com/9291927/2504489