标签:指定 producer 集群搭建 conf lis 注意 hup 未能 bat
搭建kafka集群的基本环境需要安装JDK和zookeeper(或zookeeper集群)
下载kafaka安装包到服务器端并解压到指定目录
[root@hadoop01 soft]# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz [root@hadoop01 soft]# tar zxvf kafka_2.11-2.1.0.tgz -C /usr/local/ |
修改kafka配置,配置文件主要修改broker.id、数据存储目录log.data、zookeeper地址(或zookeeper集群地址)和host.name等这几种配置,修改配置server.properties具体说明如下所示
#broker的全局唯一编号,不能重复 #用来监听链接的端口,producer或consumer将在此端口建立连接 #处理网络请求的线程数量 #用来处理磁盘IO的现成数量 #发送套接字的缓冲区大小 #接受套接字的缓冲区大小 #请求套接字的缓冲区大小 #kafka运行日志存放的路径 #topic在当前broker上的分片个数 #用来恢复和清理data下数据的线程数量 #segment文件保留的最长时间,超时将被删除 #滚动生成新的segment文件的最大时间 #日志文件中每个segment的大小,默认为1G #周期性检查文件大小的时间 #日志清理是否打开 #broker需要使用zookeeper保存meta数据 #zookeeper链接超时时间 #partion buffer中,消息的条数达到阈值,将触发flush到磁盘 #消息buffer的时间,达到阈值,将触发flush到磁盘 #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除 #此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误! advertised.host.name=192.168.140.128 |
consumer.properties配置文件说明
# zookeeper连接服务器地址 # zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉 #当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡 # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息 #指定消费
# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
# 自动更新时间。默认60 * 1000 # 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察 # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生 # 最大取多少块缓存到消费者(默认10) # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数. # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存 # 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer # 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest # 指定序列化处理类 |
producer.properties配置文件说明
#指定kafka节点列表,用于获取metadata,不必全部指定 # 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区 # 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。 # 指定序列化处理类 # 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。 # 设置发送数据是否需要服务端的反馈,有三个值0,1,-1 # 在向producer发送ack之前,broker允许等待的最大时间
,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功) # 同步还是异步发送消息,默认"sync"表同步,"async"表异步。异步可以提高发送吞吐量, # 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms # 在async模式下,producer端允许buffer的最大消息量 # 如果是异步,指定每次批量发送数据量,默认为200 # 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后
# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
|
修改完成后,把修改完整的kafka程序复制到其他服务器上,并修改broker.id(保证其唯一性)和host.name(对应到当前服务器的IP地址),修改完成后,启动各个服务器上的kafka程序,启动命令如下所示,启动kafka之前必须启动配置的zookeeper(或配置的zookeeper集群),并且开启配置的Kafka监听端口(默认端口9092),设置防火墙开启端口。
[root@hadoop01 soft]# nohup ./kafka-server-start.sh ../config/server.properties >/dev/null 2>&1 & |
标签:指定 producer 集群搭建 conf lis 注意 hup 未能 bat
原文地址:https://www.cnblogs.com/starzy/p/10376651.html