标签:des style blog http os 使用 java io ar
假设我们有集群中,需要配置4个broker,形成下面图表的Kafka集群。
配置所有的Kafka的Producer文件,其中,brokerpid的值是独一无二的数字值。几个核心属性如下:
<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker. broker.id=11 # The port the socketserver listens on port=9092 # Hostname the broker willbind to. If not set, the server will bind to all interfaces host.name=hadoop-master #zookeeper 集群 zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222</span>
由于Kafka依赖于Zookeeper集群,所以,必须先启动Zookeeper集群。这里不作具体介绍。
在机器machine-0和Hadoop-master,配置好对应broker配置文件,两个机器上的配置属性相同。
Hadoop-master 上,Kafka的server.properties配置:
<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker. broker.id=11 #############################Socket Server Settings ############################# # The port the socketserver listens on port=9092 # Hostname the broker willbind to. If not set, the server will bind to all interfaces host.name=hadoop-master # Hostname the broker willadvertise to producers and consumers. If not set, it uses the # value for"host.name" if configured. Otherwise, it will use the value returned from #java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostnameroutable by clients> # The port to publish toZooKeeper for clients to use. If this is not set, # it will publish the sameport that the broker binds to. #advertised.port=<portaccessible by clients> # The number of threadshandling network requests num.network.threads=3 # The number of threadsdoing disk I/O num.io.threads=8 # The send buffer(SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer(SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of arequest that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 #############################Log Basics ############################# # A comma seperated list ofdirectories under which to store log files log.dirs=/opt/kafka/logs # The default number of logpartitions per topic. More partitions allow greater # parallelism forconsumption, but this will also result in more files across # the brokers. num.partitions=3 #############################Log Flush Policy ############################# # Messages are immediatelywritten to the filesystem but by default we only fsync() to sync # the OS cache lazily. Thefollowing configurations control the flush of data to disk. # There are a few importanttrade-offs here: # 1. Durability: Unflushed data may be lostif you are not using replication. # 2. Latency: Very large flush intervals maylead to latency spikes when the flush does occur as there will be a lot of datato flush. # 3. Throughput: The flush is generally themost expensive operation, and a small flush interval may lead to exceessiveseeks. # The settings below allowone to configure the flush policy to flush data after a period of time or # every N messages (orboth). This can be done globally and overridden on a per-topic basis. # The number of messages toaccept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount oftime a message can sit in a log before we force a flush #log.flush.interval.ms=1000 #############################Log Retention Policy ############################# # The followingconfigurations control the disposal of log segments. The policy can # be set to delete segmentsafter a period of time, or after a given size has accumulated. # A segment will be deletedwhenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a logfile to be eligible for deletion log.retention.hours=168 # A size-based retentionpolicy for logs. Segments are pruned from the log as long as the remaining # segments don't drop belowlog.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a logsegment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # The interval at which logsegments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=60000 # By default the logcleaner is disabled and the log retention policy will default to just deletesegments after their retention expires. # Iflog.cleaner.enable=true is set the cleaner will be enabled and individual logscan then be marked for log compaction. log.cleaner.enable=false #############################Zookeeper ############################# # Zookeeper connectionstring (see zookeeper docs for details). # This is a comma separatedhost:port pairs, each corresponding to a zk # server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append anoptional chroot string to the urls to specify the # root directory for allkafka znodes. zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222 #server.1=machine-0:2888:3888 #server.2=machine-1:2888:3888 #server.3=machine-2:2888:3888 # Timeout in ms forconnecting to zookeeper zookeeper.connection.timeout.ms=1000000 </span>
Hadoop-master 上,Kafka的server-1.properties配置:
<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker. broker.id=12 #############################Socket Server Settings ############################# # The port the socketserver listens on port=9093 # Hostname the broker willbind to. If not set, the server will bind to all interfaces host.name=hadoop-master # Hostname the broker willadvertise to producers and consumers. If not set, it uses the # value for"host.name" if configured. Otherwise, it will use the value returned from #java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostnameroutable by clients> # The port to publish toZooKeeper for clients to use. If this is not set, # it will publish the sameport that the broker binds to. #advertised.port=<portaccessible by clients> # The number of threadshandling network requests num.network.threads=3 # The number of threadsdoing disk I/O num.io.threads=8 # The send buffer(SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer(SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of arequest that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 #############################Log Basics ############################# # A comma seperated list ofdirectories under which to store log files log.dirs=/opt/kafka/logs-1 # The default number of logpartitions per topic. More partitions allow greater # parallelism forconsumption, but this will also result in more files across # the brokers. num.partitions=3 #############################Log Flush Policy ############################# # Messages are immediatelywritten to the filesystem but by default we only fsync() to sync # the OS cache lazily. Thefollowing configurations control the flush of data to disk. # There are a few importanttrade-offs here: # 1. Durability: Unflushed data may be lostif you are not using replication. # 2. Latency: Very large flush intervals maylead to latency spikes when the flush does occur as there will be a lot of datato flush. # 3. Throughput: The flush is generally themost expensive operation, and a small flush interval may lead to exceessiveseeks. # The settings below allowone to configure the flush policy to flush data after a period of time or # every N messages (orboth). This can be done globally and overridden on a per-topic basis. # The number of messages toaccept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount oftime a message can sit in a log before we force a flush #log.flush.interval.ms=1000 #############################Log Retention Policy ############################# # The followingconfigurations control the disposal of log segments. The policy can # be set to delete segmentsafter a period of time, or after a given size has accumulated. # A segment will be deletedwhenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a logfile to be eligible for deletion log.retention.hours=168 # A size-based retentionpolicy for logs. Segments are pruned from the log as long as the remaining # segments don't drop belowlog.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a logsegment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # The interval at which logsegments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=60000 # By default the logcleaner is disabled and the log retention policy will default to just deletesegments after their retention expires. # Iflog.cleaner.enable=true is set the cleaner will be enabled and individual logscan then be marked for log compaction. log.cleaner.enable=false #############################Zookeeper ############################# # Zookeeper connectionstring (see zookeeper docs for details). # This is a comma separatedhost:port pairs, each corresponding to a zk # server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append anoptional chroot string to the urls to specify the # root directory for allkafka znodes. zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222 #server.1=machine-0:2888:3888 #server.2=machine-1:2888:3888 #server.3=machine-2:2888:3888 # Timeout in ms forconnecting to zookeeper zookeeper.connection.timeout.ms=1000000</span>
在两台机器上,启动对应的脚本:
<span style="font-size:18px;">bin/kafka-server-start.shconfig/server.properties bin/kafka-server-start.shconfig/server-1.properties</span>
创建topic:
<span style="font-size:18px;">##创建topic cluster_topic bin/kafka-topics.sh --create--zookeeper machine-1:2222,machine-0:2222,machine-2:2222 --replication-factor 3 --partitions 3 --topiccluster_topic</span>
下面,将执行Producer和Consumer端的shell脚本。发送端和接受端的交互,展现订阅发布的过程。启动一个Producer,发布消息。
<span style="font-size:18px;">bin/kafka-console-producer.sh--broker-list hadoop-master:9092,hadoop-master:9093,machine-0:9092,machine-0:9093--topic cluster_topic</span>
发送的消息将会被发送到指定的4个broker中,下面将启动Consumer接受消息。
<span style="font-size:18px;"> bin/kafka-console-consumer.sh --zookeeper machine-1:2222,machine-0:2222,machine-2:2222--topic cluster_topic --from-beginning</span>
上面的脚本接受来自主题为cluster_topic的消息,这就意味着,所有发送给cluster_topic的消息,将会被这个Consumer接受。发送下面图表中显示的消息,Consumer端也会打印出对应的消息。
假设,让machine-0当掉。在使用这个Producer发送消息,你会发现,消息照样会被接受到。这也说明,集群实现了容错功能。
下面,将介绍几个有用的脚本,帮助我们监控Kafka的有关信息。
列出topic
<span style="font-size:18px;">bin/kafka-topics.sh--list --zookeeper machine-1:2222,machine-0:2222,machine-2:2222</span>
查询某个配置文件的执行线程,比如,下面查询所执行配置文件为server-1.properties的进程信息。
<span style="font-size:18px;">ps | grep server-1.properties</span>
<span style="font-size:18px;">root 21765 21436 0 09:48 pts/0 00:00:00 grep server.properties root 23156 1 0 Aug27 ? 00:06:11 /usr/java/latest/bin/java -Xmx1G -Xms1G -server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka/bin/../logs -Dlog4j.configuration=file:/root/kafka/bin/../config/log4j.properties -cp :/root/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/root/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/root/kafka/bin/../clients/build/libs//kafka-clients*.jar:/root/kafka/bin/../examples/build/libs//kafka-examples*.jar:/root/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/root/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/root/kafka/bin/../libs/jopt-simple-3.2.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/root/kafka/bin/../libs/log4j-1.2.15.jar:/root/kafka/bin/../libs/metrics-core-2.2.0.jar:/root/kafka/bin/../libs/scala-library-2.10.1.jar:/root/kafka/bin/../libs/slf4j-api-1.7.2.jar:/root/kafka/bin/../libs/snappy-java-1.0.5.jar:/root/kafka/bin/../libs/zkclient-0.3.jar:/root/kafka/bin/../libs/zookeeper-3.3.4.jar:/root/kafka/bin/../core/build/libs/kafka_2.8.0*.jar kafka.Kafka /root/kafka/bin/../config/server.properties</span>
标签:des style blog http os 使用 java io ar
原文地址:http://blog.csdn.net/john_f_lau/article/details/38919991