读取zookeeper保存的topic元数据
Table of Contents
1 有以下问题
- 需要使用producer才能获得元数据
- 当producer和consumer共用一些对象时会出现无法读取数据的问题
2 解决方法
用独立的类封装获取元数据的代码,避免共用变量
3 代码
3.1 KafkaHelper类
#ifndef KAFKA_HELPER_H_ #define KAFKA_HELPER_H_ #include <string> using std::string; #include "librdkafka/rdkafkacpp.h" #include "librdkafka/rdkafka.h" #include <zookeeper/zookeeper.h> #include <zookeeper/zookeeper.jute.h> #include <jansson.h> #define BROKER_PATH "/brokers/ids" static rd_kafka_t *rk; class KafkaHelper { public: static string Brokers(string const& zookeeper) { zhandle_t * zh = initialize_zookeeper(zookeeper); char brokers[1024]; set_brokerlist_from_zookeeper(zh, brokers); return brokers; } static void PrintTopicMeta(string const& topic_name) { /* * Create producer using accumulated global configuration. */ RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); string zookeeper("localhost:2181"); string brokers = KafkaHelper::Brokers(zookeeper); string errstr; global_conf->set("metadata.broker.list", brokers, errstr); RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr); if (!producer) { std::cerr << "Failed to create producer: " << errstr << std::endl; exit(1); } std::cout << "% Created producer " << producer->name() << std::endl; /* * Create topic handle. */ RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr); if (!topic) { std::cerr << "Failed to create topic: " << errstr << std::endl; exit(1); } bool run = true; while (run) { class RdKafka::Metadata *metadata; // Fetch metadata RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000); if (err != RdKafka::ERR_NO_ERROR) { std::cerr << "%% Failed to acquire metadata: " << RdKafka::err2str(err) << std::endl; run = 0; break; } KafkaHelper::PrintMeta(topic_name, metadata); delete metadata; run = 0; } } static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) { std::cout << "Metadata for " << (topic.empty() ? "" : "all topics") << "(orig broker id from broker " << metadata->orig_broker_id() << ":" << metadata->orig_broker_name() << std::endl; /* Iterate brokers */ std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl; RdKafka::Metadata::BrokerMetadataIterator ib; for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) { std::cout << " broker " << (*ib)->id() << " at " << *(*ib)->host() << ":" << (*ib)->port() << std::endl; } /* Iterate topics */ std::cout << metadata->topics()->size() << " topics:" << std::endl; RdKafka::Metadata::TopicMetadataIterator it; for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) { std::cout << " topic "<< *(*it)->topic() << " with " << (*it)->partitions()->size() << " partitions" << std::endl; if ((*it)->err() != RdKafka::ERR_NO_ERROR) { std::cout << " " << err2str((*it)->err()); if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) { std::cout << " (try again)"; } } std::cout << std::endl; /* Iterate topic‘s partitions */ RdKafka::TopicMetadata::PartitionMetadataIterator ip; for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) { std::cout << " partition " << (*ip)->id() << " leader " << (*ip)->leader() << ", replicas: "; /* Iterate partition‘s replicas */ RdKafka::PartitionMetadata::ReplicasIterator ir; for (ir = (*ip)->replicas()->begin(); ir != (*ip)->replicas()->end() ; ++ir) { std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir; } /* Iterate partition‘s ISRs */ std::cout << ", isrs: "; RdKafka::PartitionMetadata::ISRSIterator iis; for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis) std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis; if ((*ip)->err() != RdKafka::ERR_NO_ERROR) std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl; else std::cout << std::endl; } } } private: static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) { char brokers[1024]; if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0) { brokers[0] = ‘\0‘; set_brokerlist_from_zookeeper(zh, brokers); if (brokers[0] != ‘\0‘ && rk != NULL) { rd_kafka_brokers_add(rk, brokers); rd_kafka_poll(rk, 10); } } } static zhandle_t* initialize_zookeeper(string const& zookeeper) { zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0); if (zh == NULL) { fprintf(stderr, "Zookeeper connection not established."); exit(1); } return zh; } static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) { if (zzh) { struct String_vector brokerlist; if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) { fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH); return; } int i; char *brokerptr = brokers; for (i = 0; i < brokerlist.count; i++) { char path[255], cfg[1024]; sprintf(path, "/brokers/ids/%s", brokerlist.data[i]); int len = sizeof(cfg); zoo_get(zzh, path, 0, cfg, &len, NULL); if (len > 0) { cfg[len] = ‘\0‘; json_error_t jerror; json_t *jobj = json_loads(cfg, 0, &jerror); if (jobj) { json_t *jhost = json_object_get(jobj, "host"); json_t *jport = json_object_get(jobj, "port"); if (jhost && jport) { const char *host = json_string_value(jhost); const int port = json_integer_value(jport); sprintf(brokerptr, "%s:%d", host, port); brokerptr += strlen(brokerptr); if (i < brokerlist.count - 1) { *brokerptr++ = ‘,‘; } } json_decref(jobj); } } } deallocate_String_vector(&brokerlist); printf("Found brokers %s\n", brokers); } } }; #endif
3.2 main.cc完整代码
这里包含了读取数据的代码
#include <iostream> #include <string> #include <cstdlib> #include <cstdio> #include <csignal> #include <cstring> #include <getopt.h> #include <list> #include "helper/kafka_helper.h" using std::string; using std::list; using std::cout; using std::endl; static bool run = true; static bool exit_eof = true; class MyEventCb : public RdKafka::EventCb { public: void event_cb (RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; void msg_consume(RdKafka::Message* message, void* opaque) { switch (message->err()) { case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: /* Real message */ std::cout << "Read msg at offset " << message->offset() << std::endl; if (message->key()) { std::cout << "Key: " << *message->key() << std::endl; } cout << static_cast<const char *>(message->payload()) << endl; break; case RdKafka::ERR__PARTITION_EOF: cout << "reach last message" << endl; /* Last message */ if (exit_eof) { run = false; } break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: std::cerr << "Consume failed: " << message->errstr() << std::endl; run = false; break; default: /* Errors */ std::cerr << "Consume failed: " << message->errstr() << std::endl; run = false; } } class MyConsumeCb : public RdKafka::ConsumeCb { public: void consume_cb (RdKafka::Message &msg, void *opaque) { msg_consume(&msg, opaque); } }; static void sigterm (int sig) { run = false; } int main (int argc, char **argv) { /* * Process kill signal, quit from the loop */ signal(SIGINT, sigterm); signal(SIGTERM, sigterm); /* * Get broker list from zookeeper */ string zookeeper("localhost:2181"); string brokers = KafkaHelper::Brokers(zookeeper); cout << "brokers from zookeeper is: " << brokers << endl; string topic_name = "test2"; /* * Print topic meta */ KafkaHelper::PrintTopicMeta(topic_name); /* * Global conf objects */ RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); string errstr; global_conf->set("metadata.broker.list", brokers, errstr); MyEventCb ex_event_cb; global_conf->set("event_cb", &ex_event_cb, errstr); /* * Topic conf objects */ RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /* * Create consumer using accumulated global configuration. */ RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr); if (!consumer) { std::cerr << "Failed to create consumer: " << errstr << std::endl; exit(1); } std::cout << "% Created consumer " << consumer->name() << std::endl; /* * Start consumer for topic+partition at start offset */ int32_t partition = 0; int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr); RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset); if (resp != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; exit(1); } /* * Consume messages */ MyConsumeCb ex_consume_cb; int use_ccb = 0; while (run) { if (use_ccb) { consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb); } else { RdKafka::Message *msg = consumer->consume(topic2, partition, 1000); msg_consume(msg, NULL); delete msg; } consumer->poll(0); } /* * Stop consumer */ consumer->stop(topic2, partition); consumer->poll(1000); delete topic2; delete consumer; /* * Wait for RdKafka to decommission. * This is not strictly needed (when check outq_len() above), but * allows RdKafka to clean up all its resources before the application * exits so that memory profilers such as valgrind wont complain about * memory leaks. */ RdKafka::wait_destroyed(5000); return 0; }