码迷,mamicode.com
首页 > 其他好文 > 详细

(三)Kafka

时间:2020-07-15 01:03:51      阅读:81      评论:0      收藏:0      [点我收藏+]

标签:callback   max   partition   发送   memory   使用   ssi   不用   内容   

课程内容:

1. 简单的操作一下集群

2. 简单的介绍几个工具(企业)

3. Producer的原理(核心,重点)

4. 简单kafka的代码

5. 介绍里面的核心参数(重点)

========================

消费者原理

--replica-factor 2

--partitions 2我们一般设置分区数,建议是节点的倍数

=========================================

Producer的原理

*******************kafka************************************

topic:TopicA
多个分区
p0:leader parititon hdp1
p1:leader partition hdp2
需要把数据发送到leader partition

生产者,生产数据,需要把数据封装成ProducerRecord
①ProducerRecord
②序列化
③partitioner(获取元数据,找到一个broker就可以
知道有多少个分区,并清除哪个是leader partition,并把数据发送到哪)

存入
缓冲区***
④Sender (一个线程)从缓冲区取出消息,封装为一个批次(Batch)
Batch
Batch
Batch




--------------------------------------------
zookeeper


------------------------------------------------
broker(去zookeeper注册,选举controller)
controller(监听zookeeper元数据,动态变化,进行同步)
(hdp1)
去zookeeper同步元数据信息,(并分发给其他节点)
p0



-------------------------------------------------
broker(去zookeeper注册,选举controller)
(hdp2)
p1

---------------------------------------------------


 

//创建了一个配置文件的对象
Properties props = new Properties();
//这个参数,目的就是为了获取kafka集群的元数据
//我们写一个主机也行,写多个的话,更安全
//使用的是主机名,原因是server.properties里面填进去的是主机名,必须配置hosts文件
props.put("bootstrap.servers","hadoop1:9092,hadoop2:9092,hadoop3:9092");
//设置序列化器==》kafka的数据是用的网路传输的,所以里面都是二进制的数据
//我们发送消息的时候,默认的情况下就是发送一个消息就可以了
//但是你也可以给你的每条消息都指定一个key也是可以的
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
//调优的参数,后面解释
// acks
//-1: 如何判断一条消息发送成功?首先消息要写入leader partition,这些消息还需要被另外的所有的这个分区的副本同步了,才算发送成功
// 1: 发送的消息写到leader partition 就算写入成功,然后服务器端就返回响应就可以了,默认就是这个参数,有可能会丢数据
// 0: 消息主要发送出去了,就认为是成功的(允许丢数据,只是处理一些不重要的日志,不需要得到准确的数据)
// kafka里面的分区是有副本的,比如一个主题TopicA,这个主题有两个parittion,每个partition有三个副本
// p0: leader parittion ,follower parititon,follower partition
// p1: leader partition,follower partition,follower partition
props.put("acks","-1")

//重试次数,网络抖动(5-10次)
props.put("retries",3)
//每隔多久重试一次 2s
props.put("retry.backoff.ms",2000)
//提升消息吞入量
//设置压缩格式,lz4,
props.put("compression.type","lz4")
//适当增大缓冲区大小,32M(基本上这个参数不需要设置)
props.put("buffer.memory",33554432)
//批次大小,默认16k,这里设置32k;设置这个批次的大小 还跟我们的消息的大小有关
//假设一条消息1k==》设置100k
props.put("batch.size",323840)
//比如我们设置的一个批次的大小是32k,但是size没有满,无论如何到了这个时间都要把消息发送出去了
//默认是0,100ms
props.put("linger.ms",100)

//这个值,默认是1M,代表的是,生产者发送消息的时候,最大的一条消息(注意说的不是批次)
// byte,如果消息超过1M,程序会报错,可以设置为10M
props.put("max.request.size",1024*1024*10)

// 消息发送出去后,多久没有响应,默认为超时
// 如果网络不稳定,可以适当增大
props.put("request.timeout.ms",3000)

//创建生产者
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
//创建消息
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"xoxo","this_is_key","this_is_value")

//发送消息,
//异步发送:性能比较好,也是我们生产里面使用的方式
//同步发送:等到返回响应,发送下一条,性能不好,我们生产里面一般不用

producer.send(record,new Callback(){
  public void onCompletion(RecordMetadata metadata,Exception exception){
    if(exception == null){
      sout("success")
    } else {
      sout("error")
    }
  }
})

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

(三)Kafka

标签:callback   max   partition   发送   memory   使用   ssi   不用   内容   

原文地址:https://www.cnblogs.com/hanchaoyue/p/13302910.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!