标签:curl 生产者 class ack end finish producer 文件路径 err
kafka知识总结
//切换到安装路径命令
cd /home/kafka/kafka_2.11-0.10.2.1/bin
//启动kafka服务,三台主机分别输入此指令:
./kafka-server-start.sh $KAFKA_HOME/config/server.properties &
//以后台的方式启动
nohup ./kafka-server-start.sh $KAFKA_HOME/config/server.properties &
//查看topic名
./kafka-topics.sh --list --zookeeper 10.101.22.41:2181
./kafka-topics.sh --list --zookeeper 10.101.22.41:2181,10.101.22.42:2181,10.101.22.43:2181
//查询topic内容
./kafka-console-consumer.sh --bootstrap-server 10.101.22.41:9092,10.101.22.42:9093,10.101.22.43:9094 --topic oth_cpd_active_realtime_data --from-beginning
//查看某个Topic的详情
./kafka-topics.sh --topic oth_cpd_active_realtime_data --describe --zookeeper 10.101.22.41:2181
./kafka-topics.sh --topic stat_cpd-flow_cv-data --describe --zookeeper 10.101.22.41:2181
//查看消费者
./kafka-consumer-groups.sh --bootstrap-server 10.101.22.41:9092 --list
//查看消息队列生产队列堆积情况
./kafka-consumer-groups.sh --bootstrap-server 10.101.22.41:9092 --group ad-statistics-state-press --describe
//修改分区数
./kafka-topics.sh --zookeeper 10.101.22.41:2181 -alter --partitions 4 --topic oth_cpd_active_realtime_data
//kafka删除topic的数据
修改server.properties,添加以下内容
delete.topic.enable=true
删除命令:./kafka-run-class.sh kafka.admin.TopicCommand --delete --topicstat_cpd-flow_cv-data --zookeeper 10.101.22.41:2181,10.101.22.42:2181,10.101.22.43:2181
删除kafka存储目录(server.properties文件log.dirs配置)相关topic的数据目录。如果有多个分区,要到kafka群里的每台机器上,删除相关topic的数据目录。
进入到zk的bin目录
sh zkCli.sh -server 10.101.22.41:2181,10.101.22.42:2181,10.101.22.43:2181
ls /brokers/topics
rmr /brokers/topics/stat_cpd-flow_cv-data
ls /admin/delete_topics
rmr /admin/delete_topics/stat_cpd-flow_cv-data
ls /config/topics
rmr /config/topics/stat_cpd-flow_cv-dataa
//创建topic
./kafka-topics.sh --create --zookeeper 10.101.22.41:2181 --replication-factor 1 --partitions 4 --topic oth_cpd_active_realtime_data
//kafka重启步骤:
1、kill掉kafka进程(集群中每台机器都要)
ps -ef|grep kafka (查看进程PID)
kill -9 PID (可能需要root权限)
2、清空kafka 的data文件和log文件(集群中每台机器都要)
rm -rf /home/press/kafka/kafka-2.11-Cluster/kafka-2.11-1/kafka-logs/ *
3、 启动三个zookeeper及kafka服务
cd /home/kafka/zookeeper-3.4.10/bin
./zkServer.sh restart
cd /home/kafka/kafka_2.11-0.10.2.1/bin
./kafka-server-start.sh -daemon ../config/server.properties
# 4、查询es进程
ps -ef|grep kafka
踩坑总结:
#问题一:启动kafaka集群,必须先要启动zookeeper集群。
#问题二:kafka端口查看及修改,配置文件路径: vim /home/kafka/kafka_2.11-0.10.2.1/config/server.properties IP+端口号:listeners=PLAINTEXT://10.101.22.41:9092
#问题三:配置中心的kafka配置中ip:port是kafka的端口,不是zookeeper的端口
#问题四:测试topic不同分区的消费的性能时,建议从分区数小开始(分区数不可修改变小,只能删除后新建topic重新分区)
#问题五:消费者数量和topic分区数不是倍数时,会出现消费不均的情况(正常现象)
python生成数据脚本:
#-*- coding:utf-8 -*-
# 向kafka发送数据进行测试
import sys
import time
import json
import threading, logging, time
from kafka import SimpleProducer
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.client import KafkaClient
from kafka.errors import KafkaError
#file=open(‘D:\\My Documents\\Desktop\\t_adId_materialUUID.txt‘)
#dataMat=[]
#for line in file.readlines():
#curLine=line.strip().split("\t")
# floatLine=map(float,curLine)#这里使用的是map函数直接把数据转化成为float类型
#dataMat.append(curLine[0:2])
#print ("dataMat = " +str(len(dataMat)))
# 对应kafka的IP:端口
producer = KafkaProducer(bootstrap_servers=‘10.101.22.41:9092‘)
# 填写对应的topic
topic = ‘oth_cpd_active_realtime_data‘
def test1():
# 只执行一次
a1 = time.strftime("%H:%M:%S", time.localtime())
# 每分钟统计速率
num1 = 0
# 消息总数初始化
countSum = 0
# 发送消息的速率/min
rate = 60
while(1):
t = round(time.time() * 1000) # 当前时间戳
#print("时间戳 = "+str(t))
ideaId = 20009677 #创意id,需参数化
adsResp = ‘{"cfrom": "219","cvTime":‘ + str(t) + ‘,"cvType":7,"ideaId":‘ + str(ideaId) + ‘,"appId": "1695285","appPackage": "com.jzyd.coupon","changeId": "","channelType": 1,"chargeMode": -1,"cp": "23","cpdps": "20190815085536,20384,,72c19c1c35444d5aa6c1297d72d4f53b,tt-300,5.05E-4,59760801D3E3762F,afterDownloadRecommend","cvDate": "20190828","cvTs": 1565923987000,"downloadTime": 1565830560000,"imei": "864092048676953","page": "others","placeType": "afterDownloadRecommend","price": 3.8,"reqId": "72c19c1c35444d5aa6c1297d72d4f53b"}‘
# 循环体
a2 = time.strftime("%H:%M:%S", time.localtime())
if (a1 == a2):
# 每分钟消息总数
if (num1 < rate // 60):
num1 = num1 + 1;
# kafka 发送生产者消息
producer.send(topic, adsResp.encode())
# 消息总数
countSum = countSum + 1
print (‘发送总的消息——计数:‘ + str(countSum))
#print ("end : 当前时间 = " + a2 + ‘__每分钟速率=‘ + str(rate // 60 * 60) + ‘/min__每秒钟速率current=‘ + str(
#num1) + "/秒__当前发送消息总数=" + str(countSum))
else:
print (‘end : 当前时间 = ‘ + a2 + ‘每分钟理论速率=‘ + str(rate // 60 * 60) +‘/min__每秒钟理论速率=‘ +str(rate//60)+‘/秒__每分钟实际速率=‘+ str(num1*60) +‘/min__每秒钟实际速率=‘+str(num1)+‘/秒__当前发送消息总数=‘ + str(countSum))
# 每分钟消息——清零
num1 = 0
print ("start :重新计时 = " + a2 + ‘__清空计数:‘ + str(num1))
# 重新获取当前时间
a1 = time.strftime("%H:%M:%S", time.localtime())
print (‘发送总的消息——计数:‘ + str(countSum))
if __name__ == ‘__main__‘:
# 循环调用
print(‘send to kafka start!----- ‘)
test1()
print(‘send to kafka finished! ----- ‘)
标签:curl 生产者 class ack end finish producer 文件路径 err
原文地址:https://www.cnblogs.com/pshik/p/11440016.html