标签:des style blog http color java 使用 os
root@m1:/home/hadoop/kafka_2.9.2- pwd /home/hadoop/kafka_2.9.2- |
root@m1:/home/hadoop/kafka_2.9.2- cat config/server.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. #整数,建议根据ip区分,这里我是使用zookeeper中的id来设置 broker.id=1 ############################# Socket Server Settings ############################# # The port the socket server listens on #broker用于接收producer消息的端口 port=9092 #port=44444 # Hostname the broker will bind to. If not set, the server will bind to all interfaces #broker的hostname host.name=m1 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #这个是配置PRODUCER/CONSUMER连上来的时候使用的地址 advertised.host.name=m1 # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> # The number of threads handling network requests num.network.threads=2 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files #kafka存放消息文件的路径 log.dirs=/home/hadoop/kafka_2.9.2- # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. #topic的默认分区数 num.partitions=2 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion #kafka接收日志的存储目录(目前我们保存7天数据log.retention.hours=168) log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don‘t drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=60000 # By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. ",,". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=m1:2181,m2:2181,s1:2181,s2:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 |
root@m1:/home/hadoop# /home/hadoop/zookeeper-3.4.5/bin/zkServer.sh status JMX enabled by default Using config: /home/hadoop/zookeeper-3.4.5/bin/../conf/zoo.cfg Mode: leader |
root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2- /home/hadoop/kafka_2.9.2- & [1] 31823 root@m1:/home/hadoop# [2014-08-05 10:03:11,210] INFO Verifying properties (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,261] INFO Property advertised.host.name is overridden to m1 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,261] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,264] INFO Property host.name is overridden to m1 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,264] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,264] INFO Property log.dirs is overridden to /home/hadoop/kafka_2.9.2- (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,265] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,265] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,265] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,265] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,266] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,266] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,267] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,267] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,268] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,268] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,268] INFO Property zookeeper.connect is overridden to m1:2181,m2:2181,s1:2181,s2:2181 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,269] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties) [2014-08-05 10:03:11,302] INFO [Kafka Server 1], starting (kafka.server.KafkaServer) [2014-08-05 10:03:11,303] INFO [Kafka Server 1], Connecting to zookeeper on m1:2181,m2:2181,s1:2181,s2:2181 (kafka.server.KafkaServer) [2014-08-05 10:03:11,335] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2014-08-05 10:03:11,348] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,348] INFO Client environment:host.name=m1 (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,349] INFO Client environment:java.version=1.7.0_65 (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,349] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,349] INFO Client environment:java.home=/usr/lib/jvm/java-7-oracle/jre (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,349] INFO Client environment:java.class.path=.:/usr/lib/jvm/java-7-oracle/lib/tools.jar:/usr/lib/jvm/java-7-oracle/lib/dt.jar:/home/hadoop/kafka_2.9.2-*.jar:/home/hadoop/kafka_2.9.2-*.jar:/home/hadoop/kafka_2.9.2-*.jar:/home/hadoop/kafka_2.9.2-*.jar:/home/hadoop/kafka_2.9.2-*.jar:/home/hadoop/kafka_2.9.2-*.jar:/home/hadoop/kafka_2.9.2-*.jar (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,350] INFO Client environment:java.library.path=:/usr/local/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,350] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,350] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,350] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,350] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,351] INFO Client environment:os.version=3.11.0-15-generic (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,351] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,351] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,351] INFO Client environment:user.dir=/home/hadoop (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,352] INFO Initiating client connection, connectString=m1:2181,m2:2181,s1:2181,s2:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@51f782b8 (org.apache.zookeeper.ZooKeeper) [2014-08-05 10:03:11,380] INFO Opening socket connection to server m2/ (org.apache.zookeeper.ClientCnxn) [2014-08-05 10:03:11,386] INFO Socket connection established to m2/, initiating session (org.apache.zookeeper.ClientCnxn) [2014-08-05 10:03:11,398] INFO Session establishment complete on server m2/, sessionid = 0x247a3e09b460000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2014-08-05 10:03:11,400] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2014-08-05 10:03:11,652] INFO Loading log ‘test-1‘ (kafka.log.LogManager) [2014-08-05 10:03:11,681] INFO Recovering unflushed segment 0 in log test-1. (kafka.log.Log) [2014-08-05 10:03:11,711] INFO Completed load of log test-1 with log end offset 137 (kafka.log.Log) SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. [2014-08-05 10:03:11,747] INFO Loading log ‘idoall.org-0‘ (kafka.log.LogManager) [2014-08-05 10:03:11,748] INFO Recovering unflushed segment 0 in log idoall.org-0. (kafka.log.Log) [2014-08-05 10:03:11,754] INFO Completed load of log idoall.org-0 with log end offset 5 (kafka.log.Log) [2014-08-05 10:03:11,760] INFO Loading log ‘test-0‘ (kafka.log.LogManager) [2014-08-05 10:03:11,765] INFO Recovering unflushed segment 0 in log test-0. (kafka.log.Log) [2014-08-05 10:03:11,777] INFO Completed load of log test-0 with log end offset 151 (kafka.log.Log) [2014-08-05 10:03:11,779] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager) [2014-08-05 10:03:11,782] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-08-05 10:03:11,800] INFO Awaiting socket connections on m1:9092. (kafka.network.Acceptor) [2014-08-05 10:03:11,802] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer) [2014-08-05 10:03:11,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-08-05 10:03:11,919] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-08-05 10:03:12,359] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2014-08-05 10:03:12,387] INFO Registered broker 1 at path /brokers/ids/1 with address m1:9092. (kafka.utils.ZkUtils$) [2014-08-05 10:03:12,392] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [2014-08-05 10:03:12,671] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager) [2014-08-05 10:03:12,741] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager) [2014-08-05 10:03:25,327] INFO Partition [test,0] on broker 1: Expanding ISR for partition [test,0] from 1 to 1,2 (kafka.cluster.Partition) [2014-08-05 10:03:25,334] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1 to 1,2 (kafka.cluster.Partition) [2014-08-05 10:03:26,905] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1,2 to 1,2,3 (kafka.cluster.Partition) [2014-08-05 10:03:28,299] INFO Partition [test,0] on broker 1: Expanding ISR for partition [test,0] from 1,2 to 1,2,4 (kafka.cluster.Partition) |
#KAFKA有几个,replication-factor就填几个 root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2- --create --topic idoall_testTopic --replication-factor 4 --partitions 2 --zookeeper m1:2181 Created topic "idoall_testTopic". [2014-08-05 10:08:29,315] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,0] (kafka.server.ReplicaFetcherManager) [2014-08-05 10:08:29,334] INFO Completed load of log idoall_testTopic-0 with log end offset 0 (kafka.log.Log) [2014-08-05 10:08:29,373] INFO Created log for partition [idoall_testTopic,0] in /home/hadoop/kafka_2.9.2- with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-08-05 10:08:29,384] WARN Partition [idoall_testTopic,0] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,0] (kafka.cluster.Partition) [2014-08-05 10:08:29,415] INFO Completed load of log idoall_testTopic-1 with log end offset 0 (kafka.log.Log) [2014-08-05 10:08:29,416] INFO Created log for partition [idoall_testTopic,1] in /home/hadoop/kafka_2.9.2- with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-08-05 10:08:29,422] WARN Partition [idoall_testTopic,1] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,1] (kafka.cluster.Partition) [2014-08-05 10:08:29,430] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,1] (kafka.server.ReplicaFetcherManager) [2014-08-05 10:08:29,438] INFO Truncating log idoall_testTopic-1 to offset 0. (kafka.log.Log) [2014-08-05 10:08:29,473] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall_testTopic,1], initOffset 0 to broker id:2,host:m2,port:9092] ) (kafka.server.ReplicaFetcherManager) [2014-08-05 10:08:29,475] INFO [ReplicaFetcherThread-0-2], Starting (kafka.server.ReplicaFetcherThread) |
root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2- --list --zookeeper m1:2181 idoall_testTopic |
root@m2:/home/hadoop# /home/hadoop/kafka_2.9.2- --broker-list m1:9092 --sync --topic idoall_testTopic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. hello idoall.org |
root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2- --zookeeper m1:2181 --topic idoall_testTopic --from-beginning SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. hello idoall.org |
root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2- --create --topic idoall --replication-factor 4 --partitions 2 --zookeeper m1:2181 Created topic "idoall". [2014-08-05 10:38:30,862] INFO Completed load of log idoall-1 with log end offset 0 (kafka.log.Log) [2014-08-05 10:38:30,864] INFO Created log for partition [idoall,1] in /home/hadoop/kafka_2.9.2- with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-08-05 10:38:30,870] WARN Partition [idoall,1] on broker 1: No checkpointed highwatermark is found for partition [idoall,1] (kafka.cluster.Partition) [2014-08-05 10:38:30,878] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall,1] (kafka.server.ReplicaFetcherManager) [2014-08-05 10:38:30,880] INFO Truncating log idoall-1 to offset 0. (kafka.log.Log) [2014-08-05 10:38:30,885] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall,1], initOffset 0 to broker id:3,host:s1,port:9092] ) (kafka.server.ReplicaFetcherManager) [2014-08-05 10:38:30,887] INFO [ReplicaFetcherThread-0-3], Starting (kafka.server.ReplicaFetcherThread) root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2- --list --zookeeper m1:2181 idoall idoall_testTopic root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2- kafka.admin.DeleteTopicCommand --topic idoall --zookeeper m2:2181 deletion succeeded! root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2- --list --zookeeper m1:2181 idoall_testTopic root@m1:/home/hadoop# |
root@m1:/home/hadoop# /home/hadoop/zookeeper-3.4.5/bin/zkCli.sh Connecting to localhost:2181 2014-08-05 10:15:21,863 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT 2014-08-05 10:15:21,871 [myid:] - INFO [main:Environment@100] - Client environment:host.name=m1 2014-08-05 10:15:21,871 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.7.0_65 2014-08-05 10:15:21,872 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2014-08-05 10:15:21,872 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-7-oracle/jre 2014-08-05 10:15:21,873 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/home/hadoop/zookeeper-3.4.5/bin/../build/classes:/home/hadoop/zookeeper-3.4.5/bin/../build/lib/*.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/slf4j-api-1.6.1.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/netty-3.2.2.Final.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/log4j-1.2.15.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/jline-0.9.94.jar:/home/hadoop/zookeeper-3.4.5/bin/../zookeeper-3.4.5.jar:/home/hadoop/zookeeper-3.4.5/bin/../src/java/lib/*.jar:/home/hadoop/zookeeper-3.4.5/bin/../conf:.:/usr/lib/jvm/java-7-oracle/lib/tools.jar:/usr/lib/jvm/java-7-oracle/lib/dt.jar 2014-08-05 10:15:21,874 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=:/usr/local/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2014-08-05 10:15:21,874 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2014-08-05 10:15:21,874 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2014-08-05 10:15:21,875 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2014-08-05 10:15:21,875 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2014-08-05 10:15:21,876 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.11.0-15-generic 2014-08-05 10:15:21,876 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2014-08-05 10:15:21,877 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2014-08-05 10:15:21,878 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/home/hadoop 2014-08-05 10:15:21,879 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@666c211a Welcome to ZooKeeper! 2014-08-05 10:15:21,920 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@966] - Opening socket connection to server localhost/ Will not attempt to authenticate using SASL (unknown error) 2014-08-05 10:15:21,934 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@849] - Socket connection established to localhost/, initiating session JLine support is enabled 2014-08-05 10:15:21,966 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1207] - Session establishment complete on server localhost/, sessionid = 0x147a3e1246b0007, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, hadoop-ha, admin, zookeeper, consumers, config, controller, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls /brokers [topics, ids] [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics [idoall_testTopic] |
package idoall.testkafka; import java.util.Date; import java.util.Properties; import java.text.SimpleDateFormat; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 消息生产端 * @author 迦壹 * @Time 2014-08-05 */ public class Producertest { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "m1:2181,m2:2181,s1:2181,s2:2181"); // serializer.class为消息的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例 props.put("metadata.broker.list", "m1:9092,m2:9092,s1:9092,s2:9092"); // 设置Partition类, 对队列进行合理的划分 //props.put("partitioner.class", "idoall.testkafka.Partitionertest"); // ACK机制, 消息发送需要kafka服务端确认 props.put("request.required.acks", "1"); props.put("num.partitions", "4"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (int i = 0; i < 10; i++) { // KeyedMessage<K, V> // K对应Partition Key的类型 // V对应消息本身的类型 // topic: "test", key: "key", message: "message" SimpleDateFormat formatter = new SimpleDateFormat ("yyyy年MM月dd日 HH:mm:ss SSS"); Date curDate = new Date(System.currentTimeMillis());//获取当前时间 String str = formatter.format(curDate); String msg = "idoall.org" + i+"="+str; String key = i+""; producer.send(new KeyedMessage<String, String>("idoall_testTopic",key, msg)); } } } |
package idoall.testkafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; /** * 消息消费端 * @author 迦壹 * @Time 2014-08-05 */ public class Consumertest extends Thread{ private final ConsumerConnector consumer; private final String topic; public static void main(String[] args) { Consumertest consumerThread = new Consumertest("idoall_testTopic"); consumerThread.start(); } public Consumertest(String topic) { consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic =topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); // 设置zookeeper的链接地址 props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181"); // 设置group id props.put("group.id", "1"); // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新 props.put("auto.commit.interval.ms", "1000"); props.put("zookeeper.session.timeout.ms","10000"); return new ConsumerConfig(props); } public void run(){ //设置Topic=>Thread Num映射关系, 构建具体的流 Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap); KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); ConsumerIterator<byte[],byte[]> it =stream.iterator(); System.out.println("*********Results********"); while(it.hasNext()){ System.err.println("get data:" +new String(it.next().message())); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>idoall.testkafka</groupId> <artifactId>idoall.testkafka</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>idoall.testkafka</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>com.sksamuel.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0-beta1</version> </dependency> </dependencies> <build> <finalName>idoall.testkafka</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> <configuration> |