Kafka是一个分布式的,分区,复制的提交日志服务。它提供了一个信息系统的功能,但有一个独特的设计。
这一切意味着什么?
首先让我们回顾一些基本的通信术语:
Kafka保持的类别称为主题的邮件订阅。
我们称之为发布消息到Kafka主题的生产流程。
我们会打电话的过程,订阅的主题和过程的发布消息消费者。饲料
Kafka是作为一个由一个或多个服务器的集群称为经纪人。
因此,在一个高水平的生产者发送消息,通过网络向Kafka集群,反过来又服务于他们的消费者喜欢这个:
客户和服务器之间的通信是做了一个简单的,高性能的,与语言无关的TCP协议。我们为KafkaJava客户端,但客户端在很多语言中都可用。
让我们先进入高层次的抽象Kafka提供的话题。
一个主题是一个类或饲料的名字,消息被发布。对于每一个主题,Kafka集群保持分区日志看起来像这样:
每个分区是一个有序的信息,是不断追加对提交的日志不可变的序列。在分区的消息都分配一个序列号唯一标识每个消息称为分区中的偏移量。
Kafka集群保留所有发布的消息是否已为一个可配置的时间消耗。例如,如果日志保留设置为两天,然后对两天前一消息发布可供消费的,之后它将被丢弃,自由的空间。Kafka的表现实际上是恒定的相对于数据的大小,所以保持大量的数据是没有问题的。
事实上,只有元数据保留在每一个消费者的基础是日志中的消费者的立场,称为“抵消”。这个偏移量是由消费者控制:一般消费者将推进其线性偏移因为它读取的消息,但事实上的位置是由消费者的控制,它可以在任何命令它喜欢使用消息。例如,消费者可以重置一个旧的偏移处理。
结合这个特点意味着Kafka的消费者都很便宜,他们可以来来去去,没有太大的影响,群集或其他消费者。例如,您可以使用我们的命令行工具的“尾巴”的主题没有改变什么是由任何现有的消费者消费的内容。
日志中的分区有多种用途。首先,他们允许日志规模超出大小适合在一个单一的服务器。每个分区必须适合于服务器主机,但一个话题可能有许多分区,它可以处理任意数量的数据。二是并行性更高一点的单位。
#
日志的分区分布在Kafka集群中的服务器,每个服务器处理数据和请求共享的分区。每个分区复制在一个可配置的服务器数量的容错。每个分区都有一个服务器充当“领导者”和零个或更多的服务器充当“追随者”。领导者处理所有分区的读写请求而被动复制领导人追随者。如果领导者失败,其中一个追随者将自动成为新的领袖。每个服务器充当领袖的分区和跟风者的为他人所以负载均衡的集群中的。
生产商将数据发布到主题的选择。生产者负责选择哪个消息分配到哪个分区内的话题。循环的方式可以简单地平衡负载或它可以根据一些语义配分函数(比如基于一些关键的消息)。更多的使用在第二个分区。
消息通常有两个模型:队列和发布-订阅。在一个队列,消费者可能会从服务器读取和每个消息去其中的一个,在发布-订阅消息被广播给所有的消费者。Kafka提供单个消费者的抽象,概括了these-the消费者团体。
消费者与消费者团体名称,标签和每个消息发布到主题是交付给一个消费者实例在每个订阅的消费者群体。可以在单独的进程或消费者实例在不同的机器上。
如果所有的消费者实例有相同的消费群体,那么这个作品就像一个传统的队列在消费者均衡负载。
如果所有的消费者实例有不同的消费群体,那么这个作品发布-订阅和所有消息被广播给所有的消费者。
更常见的,然而,我们发现,主题有一个小数量的消费者团体,每个“逻辑订户”一个。每组由许多消费者对可扩展性和容错性的实例。这只不过是发布-订阅语义订阅者是集群的消费者,而不是单个的过程。
两个服务器集群Kafka举办四个分区(P0-P3)两个消费群体。消费者团体有两个使用者实例和B组有四个。
Kafka保证比传统的消息传递系统,具有较强的排序。
传统的队列在服务器上保留消息顺序,从队列中,如果多个消费者消费,那么服务器分发消息的顺序存储。然而,尽管服务器分发消息,消息异步传递给消费者,所以他们可能准时到达的顺序不同的消费者。这实际上意味着失去消息的排序的并行消费。消息传递系统经常解决这个概念的“独家消费”,只允许一个进程使用的队列,但当然,这意味着没有并行处理。
Kafka它更好。通过的概念parallelism-the partition-within主题,Kafka能够提供订购担保和负载均衡池的消费过程。这是分配的分区主题来达到消费者消费者团体,每个分区都被一个消费者。这样我们确保消费者是唯一的读者,分区和消耗的数据。因为这仍然有很多分区平衡负载在许多消费者实例。但是要注意,不能有更多的消费者比分区实例。
Kafka仅提供了一个全序的消息在一个分区中,而不是在不同的分区之间的话题。Per-partition分区数据排序结合能力的关键是足够的对于大多数应用程序。然而,如果你需要一个全序与主题信息就可以实现这一点,只有一个分区,尽管这将意味着只有一个消费者的过程。
给出更多的细节在这些担保在设计部分的文档。
[root@rs229 ~]# wget -c -P /root http://mirrors.cnnic.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
[root@rs229 kafka]# pwd
/usr/local/adsit/yting/apache/kafka
[root@rs229 kafka]# ll
total 14716
-rw-r--r-- 1 root root 15067175 Jun 26 21:50kafka_2.9.2-0.8.1.1.tgz
[root@rs229 kafka]# tar -zxvf kafka_2.9.2-0.8.1.1.tgz
[root@rs229 kafka]# ll
total 14720
drwxr-xr-x 5 root root 4096 Apr 23 03:37 kafka_2.9.2-0.8.1.1
-rw-r--r-- 1 root root 15067175 Jun 26 21:50kafka_2.9.2-0.8.1.1.tgz
[root@rs229 kafka]# cd kafka_2.9.2-0.8.1.1
[root@rs229 kafka_2.9.2-0.8.1.1]# ll
total 28
drwxr-xr-x 3 root root 4096 Apr 23 03:37 bin
drwxr-xr-x 2 root root 4096 Apr 23 03:37 config
drwxr-xr-x 2 root root 4096 Apr 23 03:37 libs
-rw-rw-r-- 1 root root 11358 Apr 23 02:37 LICENSE
-rw-rw-r-- 1 root root 162 Apr 23 02:37 NOTICE(这么一看真简洁啊!)
[root@rs229 bin]# pwd
/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin
[root@rs229 bin]# vi /etc/profile.d/yting.sh
# yousmile
# env configure start
JAVA_HOME=/usr/local/adsit/yting/jdk/jdk1.7.0_60
HADOOP_HOME=/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0
HBASE_HOME=/usr/local/adsit/yting/apache/hbase/hbase-0.96.2-hadoop2
ZOOKEEPER_HOME=/usr/local/adsit/yting/apache/zookeeper/zookeeper-3.4.6
HIVE_HOME=/usr/local/adsit/yting/apache/hive/apache-hive-0.13.1-bin
MAVEN_HOME=/usr/local/adsit/yting/apache/maven/apache-maven-3.2.1
MAHOUT_HOME=/usr/local/adsit/yting/apache/mahout/mahout-distribution-0.9
MAHOUT_LOCAL=MAHOUT_HOME
SCALA_HOME=/usr/local/adsit/yting/apache/scala/scala-2.10.4
STORM_HOME=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.1-incubating
REDIS_HOME=/usr/local/adsit/yting/apache/redis/redis-2.8.9
FLUME_HOME=/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/
KAFKA_HOME=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1
PATH=.:$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$ZOOKEEPER_HOME/bin:$HIVE_HOME/bin:$MAVEN_HOME/bin:$MAHOUT_HOME/bin:$SCALA_HOME/bin:$STORM_HOME/bin:$REDIS_HOME/bin:$FLUME_HOME/bin:$KAFKA_HOME/bin
export JAVA_HOME HADOOP_HOME ZOOKEEPER_HOME$HIVE_HOME MAVEN_HOME MAHOUT_HOMEMAHOUT_LOCAL SCALA_HOME STORM_HOME REDIS_HOME FLUME_HOME KAFKA_HOME PATH
export HADOOP_HOME_WARN_SUPPRESS=1
# env configure end
# alias configure start
alias cls=‘clear‘
alias wai=‘who am i‘
alias cdd=‘cd ..‘
alias cdp=‘cd -‘
alias sep=‘source /etc/profile‘
alias yousister=‘reboot‘
# alias configure end
这个简单例子的最后后面一句英文,简直就是坑爹,英文原文为:
If you have each of the above commandsrunning in a different terminal then you should now be able to type messagesinto the producer terminal and see them appear in the consumer terminal.
All of the command line tools haveadditional options; running the command with no arguments will display usageinformation documenting them in more detail.(如果你有上面的命令运行在一个不同的终端,那么你现在应该可以输入消息生产者终端和看到他们出现在消费终端。所有的命令行工具附加选项,不使用任何参数运行命令将显示使用信息记录他们的更多细节。)
这要是不懂linux的一直在一个终端运行以上命令,还不怒砸电脑了,各种错!
这里以后台方式启动哦,不然退出就没了,饿有点懒、、、
[root@rs229 kafka_2.9.2-0.8.1.1]#bin/zookeeper-server-start.sh config/zookeeper.properties &
[2014-06-26 22:07:43,287] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-06-26 22:07:43,288] WARN Either no config or noquorum defined in config, running instandalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2014-06-26 22:07:43,316] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-06-26 22:07:43,317] INFO Starting server(org.apache.zookeeper.server.ZooKeeperServerMain)
[2014-06-26 22:07:43,326] INFO Serverenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,326] INFO Serverenvironment:host.name=rs229 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:java.version=1.7.0_60 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:java.vendor=Oracle Corporation(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Server environment:java.class.path=:/…/kafka_2.8.0*.jar(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:java.compiler=<NA>(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Serverenvironment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,327] INFO Server environment:os.version=2.6.32-279.el6.x86_64(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,328] INFO Serverenvironment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,328] INFO Serverenvironment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,328] INFO Serverenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,338] INFO tickTime set to 3000(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,338] INFO minSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,338] INFO maxSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-26 22:07:43,358] INFObinding to port 0.0.0.0/0.0.0.0:2181(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:07:43,379] INFO Snapshotting: 0(org.apache.zookeeper.server.persistence.FileTxnSnapLog)
---------------------------------------------------------------------------------------------------------------------------------
[root@rs229 kafka_2.9.2-0.8.1.1]#ps -ef | grep zookeeper
root 1260211904 3 22:13 pts/0 00:00:00/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -Xmx512M -Xms512M -server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true-Xloggc:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs/zookeeper-gc.log-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps-Dcom.sun.management.jmxremote-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs-Dlog4j.configuration=file:bin/../config/log4j.properties -cp :/…/kafka_2.8.0*.jarorg.apache.zookeeper.server.quorum.QuorumPeerMain config/zookeeper.properties
root 1264911904 0 22:13 pts/0 00:00:00 grep zookeeper
[root@rs229 kafka_2.9.2-0.8.1.1]#bin/kafka-server-start.sh config/server.properties &
[2] 12678
[root@rs229 kafka_2.9.2-0.8.1.1]# [2014-06-2622:17:10,854] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,907] INFO Property broker.id isoverridden to 0 (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,907] INFO Propertylog.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,907] INFO Property log.dirs isoverridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,907] INFO Propertylog.retention.check.interval.ms is overridden to 60000(kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,908] INFO Property log.retention.hoursis overridden to 168 (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,908] INFO Propertylog.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,908] INFO Propertynum.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,908] INFO Propertynum.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,908] INFO Propertynum.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,909] INFO Property port isoverridden to 9092 (kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,909] INFO Propertysocket.receive.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,909] INFO Propertysocket.request.max.bytes is overridden to 104857600(kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,909] INFO Propertysocket.send.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,909] INFO Propertyzookeeper.connect is overridden to localhost:2181(kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,909] INFO Propertyzookeeper.connection.timeout.ms is overridden to 1000000(kafka.utils.VerifiableProperties)
[2014-06-26 22:17:10,925] INFO [Kafka Server 0],starting (kafka.server.KafkaServer)
[2014-06-26 22:17:10,927] INFO [Kafka Server 0],Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2014-06-26 22:17:10,938] INFO Starting ZkClientevent thread. (org.I0Itec.zkclient.ZkEventThread)
[2014-06-26 22:17:10,945] INFO Clientenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,945] INFO Clientenvironment:host.name=rs229 (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,945] INFO Clientenvironment:java.version=1.7.0_60 (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,945] INFO Clientenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,945] INFO Clientenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,945] INFO Clientenvironment:java.class.path=:/…/kafka_2.8.0*.jar(org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,945] INFO Clientenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,946] INFO Clientenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,947] INFO Initiating clientconnection, connectString=localhost:2181 sessionTimeout=6000watcher=org.I0Itec.zkclient.ZkClient@7c8b7ac9 (org.apache.zookeeper.ZooKeeper)
[2014-06-26 22:17:10,960] INFO Opening socket connection to serverlocalhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2014-06-26 22:17:10,967] INFO Socket connection established tolocalhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2014-06-26 22:17:10,969] INFO Accepted socket connection from /127.0.0.1:43490(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:17:10,976] INFO Client attempting toestablish new session at /127.0.0.1:43490(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:17:10,980] INFO Creating new log file:log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2014-06-26 22:17:10,994] INFO Established session0x146d885fce70000 with negotiated timeout 6000 for client /127.0.0.1:43490(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:17:10,997] INFO Session establishment complete on serverlocalhost/127.0.0.1:2181, sessionid = 0x146d885fce70000, negotiated timeout =6000 (org.apache.zookeeper.ClientCnxn)
[2014-06-26 22:17:11,001] INFO zookeeper state changed (SyncConnected)(org.I0Itec.zkclient.ZkClient)
[2014-06-26 22:17:11,042] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x4 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/brokers Error:KeeperErrorCode = NoNode for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:17:11,057] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0xa zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/configError:KeeperErrorCode = NoNode for /config(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:17:11,064] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x10 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/adminError:KeeperErrorCode = NoNode for /admin(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:17:11,119] INFO Log directory‘/tmp/kafka-logs‘ not found, creating it. (kafka.log.LogManager)
[2014-06-26 22:17:11,133] INFO Starting log cleanupwith a period of 60000 ms. (kafka.log.LogManager)
[2014-06-26 22:17:11,137] INFO Starting log flusherwith a default period of 9223372036854775807 ms. (kafka.log.LogManager)
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-06-26 22:17:11,169] INFO Awaiting socketconnections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-06-26 22:17:11,171] INFO [Socket Server onBroker 0], Started (kafka.network.SocketServer)
[2014-06-26 22:17:11,240] INFO Will not load MX4J,mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-06-26 22:17:11,275] INFO 0 successfully electedas leader (kafka.server.ZookeeperLeaderElector)
[2014-06-26 22:17:11,281] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:setDatacxid:0x19 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:17:11,347] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:deletecxid:0x27 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for/admin/preferred_replica_election(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:17:11,369] INFO Registered broker 0 atpath /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)
[2014-06-26 22:17:11,380] INFO [Kafka Server 0],started (kafka.server.KafkaServer)
[2014-06-26 22:17:11,470] INFO New leader is 0(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
---------------------------------------------------------------------------------------------------------------------------------
[root@rs229 kafka_2.9.2-0.8.1.1]#ps -ef | grep kafka
root 12602 11904 0 22:13 pts/0 00:00:01/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -Xmx512M -Xms512M -server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true-Xloggc:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs/zookeeper-gc.log-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps-Dcom.sun.management.jmxremote-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs-Dlog4j.configuration=file:bin/../config/log4j.properties -cp:/…/kafka_2.8.0*.jar org.apache.zookeeper.server.quorum.QuorumPeerMainconfig/zookeeper.properties
root 12678 11904 0 22:17 pts/0 00:00:03/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -Xmx1G -Xms1G -server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true-Xloggc:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs/kafkaServer-gc.log-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false-Dkafka.logs.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs-Dlog4j.configuration=file:bin/../config/log4j.properties -cp:/…/kafka_2.8.0*.jar kafka.Kafka config/server.properties
root 12811 11904 0 22:27 pts/0 00:00:00 grep kafka
[root@rs229 kafka_2.9.2-0.8.1.1]#
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh--create --zookeeper localhost:2181 --replication-factor 1 --partitions 1--topic test &
[3] 12867
[root@rs229 kafka_2.9.2-0.8.1.1]# [2014-06-2622:35:26,593] INFO Accepted socket connection from /127.0.0.1:43553(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:35:26,598] INFO Client attempting toestablish new session at /127.0.0.1:43553(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:35:26,600] INFO Established session0x146d885fce70001 with negotiated timeout 30000 for client /127.0.0.1:43553(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:35:26,741] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70001 type:setDatacxid:0x3 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:35:26,753] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70001 type:createcxid:0x4 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics(org.apache.zookeeper.server.PrepRequestProcessor)
Created topic "test".
[2014-06-26 22:35:26,768] INFO Processed sessiontermination for sessionid: 0x146d885fce70001(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:35:26,770] INFO Closed socketconnection for client /127.0.0.1:43553 which had sessionid 0x146d885fce70001(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:35:26,816] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x3b zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for/brokers/topics/test/partitions/0(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:35:26,818] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x3c zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:35:26,904] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions [test,0](kafka.server.ReplicaFetcherManager)
[2014-06-26 22:35:26,956] INFO Completed load of logtest-0 with log end offset 0 (kafka.log.Log)
[2014-06-26 22:35:26,968] INFO Created log forpartition [test,0] in /tmp/kafka-logs with properties {segment.index.bytes-> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912,flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000,index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy ->delete, segment.ms -> 604800000, max.message.bytes -> 1000012,flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5,retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-06-26 22:35:26,969] WARN Partition [test,0] onbroker 0: No checkpointed highwatermark is found for partition [test,0](kafka.cluster.Partition
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh--list --zookeeper localhost:2181
test
输入一条信息(Thisis a message: The you smile until forever),并且Ctrl+z退出shell
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-console-producer.sh --broker-list localhost:9092--topic test
This is a message: The you smile until forever
[2014-06-26 22:43:29,572] INFO Closing socketconnection to /127.0.0.1. (kafka.network.Processor)
^C[2014-06-2622:43:30,528] INFO Closing socket connection to /116.255.224.229.(kafka.network.Processor)
输入命令之后打印出一些信息,最后面显示了刚刚输入的信息:Thisis a message: The you smile until forever
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-console-producer.sh --broker-list localhost:9092--topic test
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
This is a message: The you smile until forever
[2014-06-26 22:43:29,572] INFO Closing socketconnection to /127.0.0.1. (kafka.network.Processor)
^C[2014-06-26 22:43:30,528] INFO Closing socketconnection to /116.255.224.229. (kafka.network.Processor)
[root@rs229 kafka_2.9.2-0.8.1.1]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning
[2014-06-26 22:45:14,066] INFO Accepted socketconnection from /127.0.0.1:43617 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:45:14,070] INFO Client attempting toestablish new session at /127.0.0.1:43617 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:45:14,072] INFO Established session0x146d885fce70010 with negotiated timeout 6000 for client /127.0.0.1:43617(org.apache.zookeeper.server.NIOServerCnxn)
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-06-26 22:45:14,110] INFO Accepted socketconnection from /127.0.0.1:43618 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:45:14,110] INFO Client attempting toestablish new session at /127.0.0.1:43618(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:45:14,111] INFO Established session0x146d885fce70011 with negotiated timeout 30000 for client /127.0.0.1:43618(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:45:14,125] INFO Processed sessiontermination for sessionid: 0x146d885fce70011(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:45:14,126] INFO Closed socketconnection for client /127.0.0.1:43618 which had sessionid 0x146d885fce70011(org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-26 22:45:14,225] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70010 type:createcxid:0x2 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455/ids Error:KeeperErrorCode = NoNode for/consumers/console-consumer-55455/ids(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:45:14,227] INFO Got user-level KeeperExceptionwhen processing sessionid:0x146d885fce70010 type:create cxid:0x3zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455 Error:KeeperErrorCode = NoNode for/consumers/console-consumer-55455 (org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:45:14,485] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70010 type:createcxid:0x16 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455/owners/test Error:KeeperErrorCode =NoNode for /consumers/console-consumer-55455/owners/test(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:45:14,487] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70010 type:createcxid:0x17 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455/owners Error:KeeperErrorCode = NoNodefor /consumers/console-consumer-55455/owners(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-26 22:45:14,623] INFO Closing socketconnection to /116.255.224.229. (kafka.network.Processor)
This is a message: The you smileuntil forever
单机的案例,不是Cluster的
[root@rs229 config]# pwd
/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/config
[root@rs229 config]# cat zookeeper.properties
# Licensed to the Apache Software Foundation (ASF)under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regardingcopyright ownership.
# The ASF licenses this file to You under the ApacheLicense, Version 2.0
# (the "License"); you may not use thisfile except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to inwriting, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eitherexpress or implied.
# See the License for the specific language governingpermissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number ofconnections since this is a non-production config
maxClientCnxns=0
[root@rs229 kafka_2.9.2-0.8.1.1]# pwd
/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/zookeeper-server-start.sh config/zookeeper.properties
[2014-06-29 21:43:05,531] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-06-29 21:43:05,532] WARN Either no config or noquorum defined in config, running instandalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2014-06-29 21:43:05,561] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2014-06-29 21:43:05,561] INFO Starting server(org.apache.zookeeper.server.ZooKeeperServerMain)
[2014-06-29 21:43:05,570] INFO Serverenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:host.name=rs229 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:java.version=1.7.0_60 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:java.class.path=:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:java.compiler=<NA>(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Serverenvironment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,571] INFO Server environment:os.version=2.6.32-279.el6.x86_64(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,572] INFO Serverenvironment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,572] INFO Serverenvironment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,572] INFO Serverenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,582] INFO tickTime set to 3000(org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,582] INFO minSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,582] INFO maxSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:05,602] INFO binding to port0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-06-29 21:43:05,621] INFO Reading snapshot/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/zookeeper/version-2/snapshot.0(org.apache.zookeeper.server.persistence.FileSnap)
[2014-06-29 21:43:05,650] INFO Snapshotting: 40(org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2014-06-29 21:43:12,000] INFO Expiring session0x146e71987810001, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2014-06-29 21:43:12,002] INFO Processed sessiontermination for sessionid: 0x146e71987810001(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-06-29 21:43:12,003] INFO Creating new log file:log.41 (org.apache.zookeeper.server.persistence.FileTxnLog)
[root@rs229 config]# cat server.properties
# Licensed to the Apache Software Foundation (ASF)under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regardingcopyright ownership.
# The ASF licenses this file to You under the ApacheLicense, Version 2.0
# (the "License"); you may not use thisfile except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to inwriting, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governingpermissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional detailsand defaults
############################# Server Basics#############################
# The id of the broker. This must be set to a unique integerfor each broker.
broker.id=0
############################# Socket Server Settings#############################
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, theserver will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers andconsumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returnedfrom
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable byclients>
# The port to publish to ZooKeeper for clients touse. If this is not set,
# it will publish the same port that the broker bindsto.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socketserver
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socketserver
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socketserver will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics#############################
# A comma seperated list of directories under whichto store log files
#log.dirs=/tmp/kafka-logs
log.dirs=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs
# The default number of log partitions per topic.More partitions allow greater
# parallelism for consumption, but this will alsoresult in more files across
# the brokers.
num.partitions=2
############################# Log Flush Policy#############################
# Messages are immediately written to the filesystembut by default we only fsync() to sync
# the OS cache lazily. The following configurationscontrol the flush of data to disk.
# There are a few important trade-offs here:
# 1.Durability: Unflushed data may be lost if you are not using replication.
# 2.Latency: Very large flush intervals may lead to latency spikes when the flushdoes occur as there will be a lot of data to flush.
# 3.Throughput: The flush is generally the most expensive operation, and a smallflush interval may lead to exceessive seeks.
# The settings below allow one to configure the flushpolicy to flush data after a period of time or
# every N messages (or both). This can be doneglobally and overridden on a per-topic basis.
# The number of messages to accept before forcing aflush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in alog before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy#############################
# The following configurations control the disposalof log segments. The policy can
# be set to delete segments after a period of time,or after a given size has accumulated.
# A segment will be deleted whenever *either* ofthese criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible fordeletion
log.retention.hours=168
# A size-based retention policy for logs. Segmentsare pruned from the log as long as the remaining
# segments don‘t drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When thissize is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked tosee if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the logretention policy will default to just delete segments after their retentionexpires.
# If log.cleaner.enable=true is set the cleaner willbe enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper#############################
# Zookeeper connection string (see zookeeper docs fordetails).
# This is a comma separated host:port pairs, eachcorresponding to a zk
# server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string tothe urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/server.properties
[2014-06-29 21:44:30,062] INFO Verifying properties(kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,125] INFO Property broker.id isoverridden to 0 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,126] INFO Propertylog.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,126] INFO Property log.dirs isoverridden to /usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/logs(kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,126] INFO Propertylog.retention.check.interval.ms is overridden to 60000(kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,127] INFO Propertylog.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,127] INFO Propertylog.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,127] INFO Propertynum.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,127] INFO Propertynum.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,128] INFO Propertynum.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,128] INFO Property port isoverridden to 9092 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,128] INFO Propertysocket.receive.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,129] INFO Property socket.request.max.bytesis overridden to 104857600 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,129] INFO Propertysocket.send.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,129] INFO Property zookeeper.connectis overridden to localhost:2181 (kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,130] INFO Propertyzookeeper.connection.timeout.ms is overridden to 1000000(kafka.utils.VerifiableProperties)
[2014-06-29 21:44:30,156] INFO [Kafka Server 0],starting (kafka.server.KafkaServer)
[2014-06-29 21:44:30,158] INFO [Kafka Server 0],Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2014-06-29 21:44:30,170] INFO Starting ZkClientevent thread. (org.I0Itec.zkclient.ZkEventThread)
[2014-06-29 21:44:30,177] INFO Clientenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Clientenvironment:host.name=rs229 (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Clientenvironment:java.version=1.7.0_60 (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Clientenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Client environment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Clientenvironment:java.class.path=:/…/kafka_2.8.0*.jar(org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Clientenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Client environment:java.compiler=<NA>(org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,177] INFO Clientenvironment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,178] INFO Clientenvironment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,178] INFO Clientenvironment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,178] INFO Clientenvironment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,178] INFO Client environment:user.home=/root(org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,178] INFO Clientenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,178] INFO Initiating client connection,connectString=localhost:2181 sessionTimeout=6000watcher=org.I0Itec.zkclient.ZkClient@7c8b7ac9 (org.apache.zookeeper.ZooKeeper)
[2014-06-29 21:44:30,191] INFO Opening socketconnection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2014-06-29 21:44:30,198] INFO Socket connectionestablished to localhost/127.0.0.1:2181, initiating session(org.apache.zookeeper.ClientCnxn)
[2014-06-29 21:44:30,215] INFO Session establishmentcomplete on server localhost/127.0.0.1:2181, sessionid = 0x146e7dd68ba0000,negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-06-29 21:44:30,218] INFO zookeeper statechanged (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2014-06-29 21:44:30,441] INFO Loading log‘page_visits-1‘ (kafka.log.LogManager)
[2014-06-29 21:44:30,464] INFO Recovering unflushedsegment 0 in log page_visits-1. (kafka.log.Log)
[2014-06-29 21:44:30,472] INFO Completed load of logpage_visits-1 with log end offset 0 (kafka.log.Log)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-06-29 21:44:30,485] INFO Loading log‘page_visits-0‘ (kafka.log.LogManager)
[2014-06-29 21:44:30,486] INFO Recovering unflushedsegment 0 in log page_visits-0. (kafka.log.Log)
[2014-06-29 21:44:30,486] INFO Completed load of logpage_visits-0 with log end offset 0 (kafka.log.Log)
[2014-06-29 21:44:30,487] INFO Loading log ‘test-0‘ (kafka.log.LogManager)
[2014-06-29 21:44:30,488] INFO Recovering unflushedsegment 0 in log test-0. (kafka.log.Log)
[2014-06-29 21:44:30,488] INFO Completed load of logtest-0 with log end offset 0 (kafka.log.Log)
[2014-06-29 21:44:30,489] INFO Loading log ‘page_visits-4‘(kafka.log.LogManager)
[2014-06-29 21:44:30,490] INFO Recovering unflushedsegment 0 in log page_visits-4. (kafka.log.Log)
[2014-06-29 21:44:30,491] INFO Completed load of logpage_visits-4 with log end offset 0 (kafka.log.Log)
[2014-06-29 21:44:30,492] INFO Loading log‘page_visits-3‘ (kafka.log.LogManager)
[2014-06-29 21:44:30,493] INFO Recovering unflushedsegment 0 in log page_visits-3. (kafka.log.Log)
[2014-06-29 21:44:30,493] INFO Completed load of logpage_visits-3 with log end offset 0 (kafka.log.Log)
[2014-06-29 21:44:30,494] INFO Loading log‘page_visits-2‘ (kafka.log.LogManager)
[2014-06-29 21:44:30,495] INFO Recovering unflushedsegment 0 in log page_visits-2. (kafka.log.Log)
[2014-06-29 21:44:30,495] INFO Completed load of logpage_visits-2 with log end offset 0 (kafka.log.Log)
[2014-06-29 21:44:30,496] INFO Starting log cleanupwith a period of 60000 ms. (kafka.log.LogManager)
[2014-06-29 21:44:30,500] INFO Starting log flusherwith a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-06-29 21:44:30,517] INFO Awaiting socketconnections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-06-29 21:44:30,518] INFO [Socket Server onBroker 0], Started (kafka.network.SocketServer)
[2014-06-29 21:44:30,588] INFO Will not load MX4J,mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-06-29 21:44:30,614] INFO 0 successfully electedas leader (kafka.server.ZookeeperLeaderElector)
[2014-06-29 21:44:31,038] INFO New leader is 0(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-06-29 21:44:31,042] INFO Registered broker 0 atpath /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)
[2014-06-29 21:44:31,059] INFO [Kafka Server 0],started (kafka.server.KafkaServer)
[2014-06-29 21:44:31,319] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions[page_visits,4],[page_visits,2],[page_visits,0],[page_visits,3],[page_visits,1],[test,0](kafka.server.ReplicaFetcherManager)
[2014-06-29 21:44:31,392] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions[page_visits,4],[page_visits,2],[page_visits,0],[page_visits,3],[page_visits,1],[test,0](kafka.server.ReplicaFetcherManager)
[root@rs229 kafka_2.9.2-0.8.1.1]# kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yting_page_visits--zookeeper localhost:2181
deletion succeeded!
上面红色框框的是官网给的命令,貌似是0.8.0版本的,在0.8.1版本中行不通,郁闷了!所以就使用kafka-run-class.sh这样的命令来删除topic了
进入zookeeper确认下是否borker被删除了
[root@rs229 kafka_2.9.2-0.8.1.1]# zookeeper-shell.sh
USAGE:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/zookeeper-shell.shzookeeper_host:port[/path] [args...]
[root@rs229 kafka_2.9.2-0.8.1.1]# zookeeper-shell.shlocalhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
[consumers, config, controller,zookeeper, brokers, admin, controller_epoch]
ls /brokers
[topics, ids]
ls /brokers/topics
[test]
可以确认zookeeper中的broker也已经被删除了,ok
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 5 --topic yting_page_visits
Created topic "yting_page_visits".
[root@rs229 kafka_2.9.2-0.8.1.1]#
[root@rs229kafka_2.9.2-0.8.1.1]#
bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic yting_page_visits
Errorwhile executing topic command replication factor: 3 larger than availablebrokers: 1
kafka.admin.AdminOperationException:replication factor: 3 larger than available brokers: 1
atkafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)
atkafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86)
atkafka.admin.TopicCommand$.main(TopicCommand.scala:50)
atkafka.admin.TopicCommand.main(TopicCommand.scala)
原因:
因为这是单机测试,创建topic的参数--replication-factor3,饿在电脑上只开了一个broker,报错提示replicationfactor: 3 larger than available brokers: 1
中可以看出来的、、、
解决:
将--replication-factor 3改成--replication-factor1就行了
控制台的输出信息:
log4j:WARN No appenders could befound for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize thelog4j system properly.
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation(NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinderfor further details.
可以在Linux shell下运行命令,然后就看到生产者Producer生产的数据了
说明一下这里看的是5个分区的数据哦
[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic yting_page_visits --from-beginning
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
id---> 3 --->1404281896546,www.ytingxmei1129.com,13.14.20.179
id---> 6 --->1404281897616,www.ytingxmei1129.com,13.14.20.114
id---> 8 --->1404281897786,www.ytingxmei1129.com,13.14.20.64
id---> 19 --->1404281898657,www.ytingxmei1129.com,13.14.20.54
id---> 24 --->1404281898874,www.ytingxmei1129.com,13.14.20.19
id---> 30 --->1404281899132,www.ytingxmei1129.com,13.14.20.189
id---> 42 --->1404281899657,www.ytingxmei1129.com,13.14.20.99
id---> 48 --->1404281900467,www.ytingxmei1129.com,13.14.20.154
id---> 62 --->1404281901328,www.ytingxmei1129.com,13.14.20.129
id---> 63 --->1404281901374,www.ytingxmei1129.com,13.14.20.29
id---> 65 --->1404281901461,www.ytingxmei1129.com,13.14.20.139
id---> 74 --->1404281901856,www.ytingxmei1129.com,13.14.20.69
id---> 80 --->1404281902158,www.ytingxmei1129.com,13.14.20.94
id---> 81 --->1404281902204,www.ytingxmei1129.com,13.14.20.74
id---> 83 --->1404281902289,www.ytingxmei1129.com,13.14.20.204
id---> 84 --->1404281902330,www.ytingxmei1129.com,13.14.20.59
id---> 87 --->1404281902462,www.ytingxmei1129.com,13.14.20.154
id---> 89 --->1404281902548,www.ytingxmei1129.com,13.14.20.229
id---> 92 --->1404281902676,www.ytingxmei1129.com,13.14.20.104
id---> 98 --->1404281903106,www.ytingxmei1129.com,13.14.20.49
id---> 10 --->1404281897959,www.ytingxmei1129.com,13.14.20.118
id---> 12 --->1404281898130,www.ytingxmei1129.com,13.14.20.173
id---> 17 --->1404281898517,www.ytingxmei1129.com,13.14.20.98
id---> 18 --->1404281898610,www.ytingxmei1129.com,13.14.20.173
id---> 21 --->1404281898742,www.ytingxmei1129.com,13.14.20.158
id---> 34 --->1404281899306,www.ytingxmei1129.com,13.14.20.83
id---> 35 --->1404281899353,www.ytingxmei1129.com,13.14.20.58
id---> 40 --->1404281899571,www.ytingxmei1129.com,13.14.20.53
id---> 46 --->1404281900298,www.ytingxmei1129.com,13.14.20.118
id---> 53 --->1404281900848,www.ytingxmei1129.com,13.14.20.198
id---> 54 --->1404281900933,www.ytingxmei1129.com,13.14.20.118
id---> 58 --->1404281901154,www.ytingxmei1129.com,13.14.20.228
id---> 60 --->1404281901238,www.ytingxmei1129.com,13.14.20.223
id---> 61 --->1404281901287,www.ytingxmei1129.com,13.14.20.8
id---> 67 --->1404281901550,www.ytingxmei1129.com,13.14.20.18
id---> 71 --->1404281901728,www.ytingxmei1129.com,13.14.20.133
id---> 75 --->1404281901901,www.ytingxmei1129.com,13.14.20.133
id---> 78 --->1404281902070,www.ytingxmei1129.com,13.14.20.58
id---> 82 --->1404281902247,www.ytingxmei1129.com,13.14.20.68
id---> 85 --->1404281902374,www.ytingxmei1129.com,13.14.20.48
id---> 86 --->1404281902416,www.ytingxmei1129.com,13.14.20.183
id---> 90 --->1404281902592,www.ytingxmei1129.com,13.14.20.143
id---> 91 --->1404281902634,www.ytingxmei1129.com,13.14.20.183
id---> 94 --->1404281902764,www.ytingxmei1129.com,13.14.20.228
id---> 95 --->1404281902850,www.ytingxmei1129.com,13.14.20.233
id---> 96 --->1404281902934,www.ytingxmei1129.com,13.14.20.143
id---> 1 --->1404281895240,www.ytingxmei1129.com,13.14.20.16
id---> 2 --->1404281896467,www.ytingxmei1129.com,13.14.20.96
id---> 7 --->1404281897700,www.ytingxmei1129.com,13.14.20.246
id---> 14 --->1404281898302,www.ytingxmei1129.com,13.14.20.1
id---> 16 --->1404281898472,www.ytingxmei1129.com,13.14.20.116
id---> 25 --->1404281898916,www.ytingxmei1129.com,13.14.20.106
id---> 28 --->1404281899047,www.ytingxmei1129.com,13.14.20.121
id---> 29 --->1404281899090,www.ytingxmei1129.com,13.14.20.141
id---> 36 --->1404281899395,www.ytingxmei1129.com,13.14.20.86
id---> 44 --->1404281899789,www.ytingxmei1129.com,13.14.20.56
id---> 49 --->1404281900511,www.ytingxmei1129.com,13.14.20.111
id---> 51 --->1404281900679,www.ytingxmei1129.com,13.14.20.111
id---> 72 --->1404281901770,www.ytingxmei1129.com,13.14.20.21
id---> 77 --->1404281902028,www.ytingxmei1129.com,13.14.20.6
id---> 79 --->1404281902116,www.ytingxmei1129.com,13.14.20.16
id---> 97 --->1404281903019,www.ytingxmei1129.com,13.14.20.21
id---> 4 --->1404281896631,www.ytingxmei1129.com,13.14.20.62
id---> 9 --->1404281897873,www.ytingxmei1129.com,13.14.20.27
id---> 20 --->1404281898701,www.ytingxmei1129.com,13.14.20.167
id---> 22 --->1404281898785,www.ytingxmei1129.com,13.14.20.182
id---> 32 --->1404281899220,www.ytingxmei1129.com,13.14.20.32
id---> 33 --->1404281899263,www.ytingxmei1129.com,13.14.20.32
id---> 37 --->1404281899438,www.ytingxmei1129.com,13.14.20.177
id---> 38 --->1404281899485,www.ytingxmei1129.com,13.14.20.92
id---> 39 --->1404281899529,www.ytingxmei1129.com,13.14.20.72
id---> 41 --->1404281899616,www.ytingxmei1129.com,13.14.20.47
id---> 45 --->1404281899874,www.ytingxmei1129.com,13.14.20.252
id---> 52 --->1404281900765,www.ytingxmei1129.com,13.14.20.217
id---> 55 --->1404281901017,www.ytingxmei1129.com,13.14.20.47
id---> 57 --->1404281901110,www.ytingxmei1129.com,13.14.20.177
id---> 64 --->1404281901419,www.ytingxmei1129.com,13.14.20.242
id---> 68 --->1404281901596,www.ytingxmei1129.com,13.14.20.62
id---> 70 --->1404281901684,www.ytingxmei1129.com,13.14.20.37
id---> 88 --->1404281902506,www.ytingxmei1129.com,13.14.20.72
id---> 5 --->1404281897529,www.ytingxmei1129.com,13.14.20.40
id---> 11 --->1404281898044,www.ytingxmei1129.com,13.14.20.110
id---> 13 --->1404281898215,www.ytingxmei1129.com,13.14.20.20
id---> 15 --->1404281898387,www.ytingxmei1129.com,13.14.20.165
id---> 23 --->1404281898832,www.ytingxmei1129.com,13.14.20.75
id---> 26 --->1404281898959,www.ytingxmei1129.com,13.14.20.30
id---> 27 --->1404281899004,www.ytingxmei1129.com,13.14.20.55
id---> 31 ---> 1404281899176,www.ytingxmei1129.com,13.14.20.180
id---> 43 --->1404281899703,www.ytingxmei1129.com,13.14.20.200
id---> 47 --->1404281900381,www.ytingxmei1129.com,13.14.20.40
id---> 50 --->1404281900596,www.ytingxmei1129.com,13.14.20.95
id---> 56 --->1404281901064,www.ytingxmei1129.com,13.14.20.20
id---> 59 --->1404281901196,www.ytingxmei1129.com,13.14.20.30
id---> 66 --->1404281901502,www.ytingxmei1129.com,13.14.20.155
id---> 69 --->1404281901640,www.ytingxmei1129.com,13.14.20.55
id---> 73 --->1404281901811,www.ytingxmei1129.com,13.14.20.120
id---> 76 --->1404281901944,www.ytingxmei1129.com,13.14.20.125
id---> 93 --->1404281902719,www.ytingxmei1129.com,13.14.20.0
id---> 99 --->1404281903191,www.ytingxmei1129.com,13.14.20.65
id---> 100 --->1404281903277,www.ytingxmei1129.com,13.14.20.75
^C[2014-07-02 14:18:59,463] WARN Reconnect due tosocket error: null (kafka.consumer.SimpleConsumer)
Consumed 100 messages
[root@rs229 kafka_2.9.2-0.8.1.1]#
第一:数据无序的
第二:该你自己想了(提示:请看TestSimplePartitioner.java这个类)
20+16+18+26+20刚好有100条数据,没有丢失!分区数据的数据大小也很均衡的样子,如果使用一致性哈希的话就跟均衡了
[2014-07-01 22:54:07,583] ERROR Closing socket for/116.255.224.229because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
atsun.nio.ch.FileDispatcherImpl.write0(Native Method)
atsun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
atsun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
atsun.nio.ch.IOUtil.write(IOUtil.java:65)
atsun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
atkafka.api.PartitionDataSend.writeTo(FetchResponse.scala:67)
atkafka.network.MultiSend.writeTo(Transmission.scala:102)
atkafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
atkafka.network.MultiSend.writeTo(Transmission.scala:102)
atkafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
atkafka.network.Processor.write(SocketServer.scala:375)
atkafka.network.Processor.run(SocketServer.scala:247)
atjava.lang.Thread.run(Thread.java:745)
[2014-07-01 22:53:22,561] INFO Got user-levelKeeperException when processing sessionid:0x146f265ce780006 type:setDatacxid:0x37 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-84796/offsets/yting_page_visits/4Error:KeeperErrorCode = NoNode for /consumers/console-consumer-84796/offsets/yting_page_visits/4(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-07-01 22:53:22,563] INFO Got user-levelKeeperException when processing sessionid:0x146f265ce780006 type:createcxid:0x38 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-84796/offsets/yting_page_visitsError:KeeperErrorCode = NodeExists for/consumers/console-consumer-84796/offsets/yting_page_visits(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-07-01 22:54:07,589] INFO Processed sessiontermination for sessionid: 0x146f265ce780006(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-07-01 22:54:07,591] INFO Closed socketconnection for client /127.0.0.1:38575 which had sessionid 0x146f265ce780006(org.apache.zookeeper.server.NIOServerCnxn)
[2014-07-01 22:54:07,594] INFO Accepted socketconnection from /127.0.0.1:38586 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-07-01 22:54:07,594] INFO Client attempting toestablish new session at /127.0.0.1:38586(org.apache.zookeeper.server.NIOServerCnxn)
[2014-07-01 22:54:07,596] INFO Established session0x146f265ce780008 with negotiated timeout 30000 for client /127.0.0.1:38586(org.apache.zookeeper.server.NIOServerCnxn)
[2014-07-01 22:54:07,628] INFO Processed sessiontermination for sessionid: 0x146f265ce780008(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-07-01 22:54:07,629] INFO Closed socketconnection for client /127.0.0.1:38586 which had sessionid 0x146f265ce780008(org.apache.zookeeper.server.NIOServerCnxn)
分析:看下面两条信息,一个是kafka-server打印出来的日志信息,一个是kafka-zookeeper打印出来的日志信息,这是由于运行命令bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic yting_page_visits --from-beginning后按了Ctrl+C才会出现Kafka-server上面这样的信息,至于kafka-zookeeper出现的信息,也不懂,后面再来弄了,留个悬念!、、、
package com.yting.cloud.kafka.producer;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Kafka官网给的案例 Producer,饿在Eclipse下本地连接服务器测试,所以修改了一些代码
*
* @Author王扬庭
* @Time 2014-07-01
*
*/
publicclass TestProducer {
publicstaticvoid main(String[] args) {
// longevents = Long.parseLong(args[0]);
long events = 100;
Randomrnd = new Random();
Propertiesprops = new Properties();
// props.put("metadata.broker.list","broker1:9092,broker2:9092");
props.put("metadata.broker.list","rs229:9092");// Eclipse下rs229在本地hosts也要配置,或者写成ip形式也可以
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("partitioner.class","com.yting.cloud.kafka.partition.TestSimplePartitioner");
props.put("request.required.acks","1");
ProducerConfigconfig = newProducerConfig(props);
Producer<String,String> producer = newProducer<String, String>(config);
for (long nEvents = 0;nEvents < events; nEvents++) {
long runtime =new Date().getTime();
Stringip = "13.14.20." +rnd.nextInt(255);
Stringmsg = runtime + ",www.ytingxmei1129.com," + ip;
KeyedMessage<String,String> data = newKeyedMessage<String, String>("yting_page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
package com.yting.cloud.kafka.partition;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* Kafka官网给的案例 SimplePartitioner,官网给的是0.8.0的版本,跟0.8.1的版本不一样,所以改了下,你懂的!
*
* @Author 王扬庭
* @Time 2014-07-01
*
*/
public class TestSimplePartitioner implementsPartitioner {
public TestSimplePartitioner(VerifiableProperties props) {
}
// public int partition(String key, int a_numPartitions){
// int partition = 0;
// int offset = key.lastIndexOf(‘.‘);
// if (offset > 0) {
// partition = Integer.parseInt(key.substring(offset + 1)) %
// a_numPartitions;
// }
// return partition;
// }
@Override
public int partition(Object obj, inta_numPartitions) {
String key = obj.toString();
int partition = 0;
int offset = key.lastIndexOf(‘.‘);
if (offset > 0) {
partition = Integer.parseInt(key.substring(offset +1))
% a_numPartitions;
}
return partition;
}
}
请参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
package com.yting.cloud.kafka.consumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Kafka官网给的案例SimpleConsumer,饿在Eclipse本地连接服务器测试,所以修改了一些代码
*
* @Author 王扬庭
* @Time2014-06-29 15:09:21
*
*/
public class TestSimpleConsumer {
publicstatic void main(String args[]) {
TestSimpleConsumerexample = new TestSimpleConsumer();
//long maxReads = Long.parseLong(args[0]);
//String topic = args[1];
//int partition = Integer.parseInt(args[2]);
//seeds.add(args[3]);
//int port = Integer.parseInt(args[4]);
longmaxReads = 100;
Stringtopic = "yting_page_visits";
intpartition = 1;
List<String>seeds = new ArrayList<String>();
seeds.add("rs229");
intport = Integer.parseInt("9092");
try{
example.run(maxReads,topic, partition, seeds, port);
}catch (Exception e) {
System.out.println("Oops:"+ e);
e.printStackTrace();
}
}
privateList<String> m_replicaBrokers = new ArrayList<String>();
publicTestSimpleConsumer() {
m_replicaBrokers= new ArrayList<String>();
}
publicvoid run(long a_maxReads, String a_topic, int a_partition, List<String>a_seedBrokers, int a_port) throws Exception {
//find the meta data about the topic and partition we are interested in
//
PartitionMetadatametadata = findLeader(a_seedBrokers, a_port, a_topic,
a_partition);
if(metadata == null) {
System.out
.println("Can‘tfind metadata for Topic and Partition. Exiting");
return;
}
if(metadata.leader() == null) {
System.out
.println("Can‘tfind Leader for Topic and Partition. Exiting");
return;
}
StringleadBroker = metadata.leader().host();
StringclientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumerconsumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024,clientName);
longreadOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.EarliestTime(),clientName);
intnumErrors = 0;
while(a_maxReads > 0) {
if(consumer == null) {
consumer= new SimpleConsumer(leadBroker, a_port, 100000,
64* 1024, clientName);
}
FetchRequestreq = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic,a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might needto be increased if large batches are written to Kafka
.build();
FetchResponsefetchResponse = consumer.fetch(req);
if(fetchResponse.hasError()) {
numErrors++;
//Something went wrong!
shortcode = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Errorfetching data from the Broker:"
+leadBroker + " Reason: " + code);
if(numErrors > 5)
break;
if(code == ErrorMapping.OffsetOutOfRangeCode()) {
//We asked for an invalid offset. For simple case ask for
//the last element to reset
readOffset= getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(),clientName);
continue;
}
consumer.close();
consumer= null;
leadBroker= findNewLeader(leadBroker, a_topic, a_partition,
a_port);
continue;
}
numErrors= 0;
longnumRead = 0;
for(MessageAndOffset messageAndOffset : fetchResponse.messageSet(
a_topic,a_partition)) {
longcurrentOffset = messageAndOffset.offset();
if(currentOffset < readOffset) {
System.out.println("Foundan old offset: " + currentOffset
+" Expecting: " + readOffset);
continue;
}
readOffset= messageAndOffset.nextOffset();
ByteBufferpayload = messageAndOffset.message().payload();
byte[]bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if(numRead == 0) {
try{
Thread.sleep(1000);
}catch (InterruptedException ie) {
}
}
}
if(consumer != null)
consumer.close();
}
publicstatic long getLastOffset(SimpleConsumer consumer, String topic,
intpartition, long whichTime, String clientName) {
TopicAndPartitiontopicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition,new PartitionOffsetRequestInfo(
whichTime,1));
kafka.javaapi.OffsetRequestrequest = new kafka.javaapi.OffsetRequest(
requestInfo,kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponseresponse = consumer.getOffsetsBefore(request);
if(response.hasError()) {
System.out
.println("Errorfetching data Offset Data the Broker. Reason: "
+response.errorCode(topic, partition));
return0;
}
long[]offsets = response.offsets(topic, partition);
returnoffsets[0];
}
privateString findNewLeader(String a_oldLeader, String a_topic,
inta_partition, int a_port) throws Exception {
for(int i = 0; i < 3; i++) {
booleangoToSleep = false;
PartitionMetadatametadata = findLeader(m_replicaBrokers, a_port,
a_topic,a_partition);
if(metadata == null) {
goToSleep= true;
}else if (metadata.leader() == null) {
goToSleep= true;
}else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&&i == 0) {
//first time through if the leader hasn‘t changed give
//ZooKeeper a second to recover
//second time, assume the broker did recover before failover,
//or it was a non-Broker issue
//
goToSleep= true;
}else {
returnmetadata.leader().host();
}
if(goToSleep) {
try{
Thread.sleep(1000);
}catch (InterruptedException ie) {
}
}
}
System.out
.println("Unableto find new leader after Broker failure. Exiting");
thrownew Exception(
"Unableto find new leader after Broker failure. Exiting");
}
privatePartitionMetadata findLeader(List<String> a_seedBrokers,
inta_port, String a_topic, int a_partition) {
PartitionMetadatareturnMetaData = null;
loop:for (String seed : a_seedBrokers) {
SimpleConsumerconsumer = null;
try{
consumer= new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
"leaderLookup");
List<String>topics = Collections.singletonList(a_topic);
TopicMetadataRequestreq = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponseresp = consumer.send(req);
List<TopicMetadata>metaData = resp.topicsMetadata();
for(TopicMetadata item : metaData) {
for(PartitionMetadata part : item.partitionsMetadata()) {
if(part.partitionId() == a_partition) {
returnMetaData= part;
breakloop;
}
}
}
}catch (Exception e) {
System.out.println("Errorcommunicating with Broker [" + seed
+"] to find Leader for [" + a_topic + ", "
+a_partition + "] Reason: " + e);
}finally {
if(consumer != null)
consumer.close();
}
}
if(returnMetaData != null) {
m_replicaBrokers.clear();
for(kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
returnreturnMetaData;
}
}
[root@rs229 config]# pwd
/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/config
[root@rs229 config]# cat zookeeper.properties
# Licensed to the Apache Software Foundation (ASF)under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regardingcopyright ownership.
# The ASF licenses this file to You under the ApacheLicense, Version 2.0
# (the "License"); you may not use thisfile except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to inwriting, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eitherexpress or implied.
# See the License for the specific language governingpermissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number ofconnections since this is a non-production config
maxClientCnxns=0
[root@rs229 config]# cat server.properties
# Licensed to the Apache Software Foundation (ASF)under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regardingcopyright ownership.
# The ASF licenses this file to You under the ApacheLicense, Version 2.0
# (the "License"); you may not use thisfile except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to inwriting, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governingpermissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional detailsand defaults
############################# Server Basics#############################
# The id of the broker. This must be set to a unique integerfor each broker.
broker.id=0
############################# Socket Server Settings#############################
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, theserver will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers andconsumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returnedfrom
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable byclients>
# The port to publish to ZooKeeper for clients touse. If this is not set,
# it will publish the same port that the broker bindsto.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socketserver
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socketserver
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socketserver will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics#############################
# A comma seperated list of directories under whichto store log files
#log.dirs=/tmp/kafka-logs
log.dirs=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs
# The default number of log partitions per topic.More partitions allow greater
# parallelism for consumption, but this will alsoresult in more files across
# the brokers.
num.partitions=2
############################# Log Flush Policy#############################
# Messages are immediately written to the filesystembut by default we only fsync() to sync
# the OS cache lazily. The following configurationscontrol the flush of data to disk.
# There are a few important trade-offs here:
# 1.Durability: Unflushed data may be lost if you are not using replication.
# 2.Latency: Very large flush intervals may lead to latency spikes when the flushdoes occur as there will be a lot of data to flush.
# 3.Throughput: The flush is generally the most expensive operation, and a smallflush interval may lead to exceessive seeks.
# The settings below allow one to configure the flushpolicy to flush data after a period of time or
# every N messages (or both). This can be doneglobally and overridden on a per-topic basis.
# The number of messages to accept before forcing aflush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in alog before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy#############################
# The following configurations control the disposalof log segments. The policy can
# be set to delete segments after a period of time,or after a given size has accumulated.
# A segment will be deleted whenever *either* ofthese criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible fordeletion
log.retention.hours=168
# A size-based retention policy for logs. Segmentsare pruned from the log as long as the remaining
# segments don‘t drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When thissize is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked tosee if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the logretention policy will default to just delete segments after their retentionexpires.
# If log.cleaner.enable=true is set the cleaner willbe enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper#############################
# Zookeeper connection string (see zookeeper docs fordetails).
# This is a comma separated host:port pairs, eachcorresponding to a zk
# server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string tothe urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用),布布扣,bubuko.com
_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)
原文地址:http://blog.csdn.net/u012185296/article/details/36418827