读取zookeeper保存的topic元数据
Table of Contents
1 有以下问题
- 需要使用producer才能获得元数据
- 当producer和consumer共用一些对象时会出现无法读取数据的问题
2 解决方法
用独立的类封装获取元数据的代码,避免共用变量
3 代码
3.1 KafkaHelper类
#ifndef KAFKA_HELPER_H_
#define KAFKA_HELPER_H_
#include <string>
using std::string;
#include "librdkafka/rdkafkacpp.h"
#include "librdkafka/rdkafka.h"
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper.jute.h>
#include <jansson.h>
#define BROKER_PATH "/brokers/ids"
static rd_kafka_t *rk;
class KafkaHelper {
public:
static string Brokers(string const& zookeeper) {
zhandle_t * zh = initialize_zookeeper(zookeeper);
char brokers[1024];
set_brokerlist_from_zookeeper(zh, brokers);
return brokers;
}
static void PrintTopicMeta(string const& topic_name) {
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
string zookeeper("localhost:2181");
string brokers = KafkaHelper::Brokers(zookeeper);
string errstr;
global_conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
bool run = true;
while (run) {
class RdKafka::Metadata *metadata;
// Fetch metadata
RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "%% Failed to acquire metadata: "
<< RdKafka::err2str(err) << std::endl;
run = 0;
break;
}
KafkaHelper::PrintMeta(topic_name, metadata);
delete metadata;
run = 0;
}
}
static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {
std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")
<< "(orig broker id from broker " << metadata->orig_broker_id()
<< ":" << metadata->orig_broker_name() << std::endl;
/* Iterate brokers */
std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;
RdKafka::Metadata::BrokerMetadataIterator ib;
for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {
std::cout << " broker " << (*ib)->id() << " at "
<< *(*ib)->host() << ":" << (*ib)->port() << std::endl;
}
/* Iterate topics */
std::cout << metadata->topics()->size() << " topics:" << std::endl;
RdKafka::Metadata::TopicMetadataIterator it;
for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
std::cout << " topic "<< *(*it)->topic() << " with "
<< (*it)->partitions()->size() << " partitions" << std::endl;
if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
std::cout << " " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
std::cout << " (try again)";
}
}
std::cout << std::endl;
/* Iterate topic‘s partitions */
RdKafka::TopicMetadata::PartitionMetadataIterator ip;
for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
std::cout << " partition " << (*ip)->id()
<< " leader " << (*ip)->leader()
<< ", replicas: ";
/* Iterate partition‘s replicas */
RdKafka::PartitionMetadata::ReplicasIterator ir;
for (ir = (*ip)->replicas()->begin();
ir != (*ip)->replicas()->end() ;
++ir) {
std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;
}
/* Iterate partition‘s ISRs */
std::cout << ", isrs: ";
RdKafka::PartitionMetadata::ISRSIterator iis;
for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)
std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;
if ((*ip)->err() != RdKafka::ERR_NO_ERROR)
std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;
else
std::cout << std::endl;
}
}
}
private:
static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
char brokers[1024];
if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
{
brokers[0] = ‘\0‘;
set_brokerlist_from_zookeeper(zh, brokers);
if (brokers[0] != ‘\0‘ && rk != NULL)
{
rd_kafka_brokers_add(rk, brokers);
rd_kafka_poll(rk, 10);
}
}
}
static zhandle_t* initialize_zookeeper(string const& zookeeper) {
zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);
if (zh == NULL) {
fprintf(stderr, "Zookeeper connection not established.");
exit(1);
}
return zh;
}
static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {
if (zzh) {
struct String_vector brokerlist;
if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {
fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);
return;
}
int i;
char *brokerptr = brokers;
for (i = 0; i < brokerlist.count; i++) {
char path[255], cfg[1024];
sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);
int len = sizeof(cfg);
zoo_get(zzh, path, 0, cfg, &len, NULL);
if (len > 0) {
cfg[len] = ‘\0‘;
json_error_t jerror;
json_t *jobj = json_loads(cfg, 0, &jerror);
if (jobj) {
json_t *jhost = json_object_get(jobj, "host");
json_t *jport = json_object_get(jobj, "port");
if (jhost && jport) {
const char *host = json_string_value(jhost);
const int port = json_integer_value(jport);
sprintf(brokerptr, "%s:%d", host, port);
brokerptr += strlen(brokerptr);
if (i < brokerlist.count - 1) {
*brokerptr++ = ‘,‘;
}
}
json_decref(jobj);
}
}
}
deallocate_String_vector(&brokerlist);
printf("Found brokers %s\n", brokers);
}
}
};
#endif
3.2 main.cc完整代码
这里包含了读取数据的代码
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <getopt.h>
#include <list>
#include "helper/kafka_helper.h"
using std::string;
using std::list;
using std::cout;
using std::endl;
static bool run = true;
static bool exit_eof = true;
class MyEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
void msg_consume(RdKafka::Message* message, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
cout << static_cast<const char *>(message->payload()) << endl;
break;
case RdKafka::ERR__PARTITION_EOF:
cout << "reach last message" << endl;
/* Last message */
if (exit_eof) {
run = false;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
}
}
class MyConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb (RdKafka::Message &msg, void *opaque) {
msg_consume(&msg, opaque);
}
};
static void sigterm (int sig) {
run = false;
}
int main (int argc, char **argv) {
/*
* Process kill signal, quit from the loop
*/
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
/*
* Get broker list from zookeeper
*/
string zookeeper("localhost:2181");
string brokers = KafkaHelper::Brokers(zookeeper);
cout << "brokers from zookeeper is: " << brokers << endl;
string topic_name = "test2";
/*
* Print topic meta
*/
KafkaHelper::PrintTopicMeta(topic_name);
/*
* Global conf objects
*/
RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
string errstr;
global_conf->set("metadata.broker.list", brokers, errstr);
MyEventCb ex_event_cb;
global_conf->set("event_cb", &ex_event_cb, errstr);
/*
* Topic conf objects
*/
RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
/*
* Create consumer using accumulated global configuration.
*/
RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created consumer " << consumer->name() << std::endl;
/*
* Start consumer for topic+partition at start offset
*/
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);
RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
/*
* Consume messages
*/
MyConsumeCb ex_consume_cb;
int use_ccb = 0;
while (run) {
if (use_ccb) {
consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic2, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}
/*
* Stop consumer
*/
consumer->stop(topic2, partition);
consumer->poll(1000);
delete topic2;
delete consumer;
/*
* Wait for RdKafka to decommission.
* This is not strictly needed (when check outq_len() above), but
* allows RdKafka to clean up all its resources before the application
* exits so that memory profilers such as valgrind wont complain about
* memory leaks.
*/
RdKafka::wait_destroyed(5000);
return 0;
}