标签:mic dem test app boot esc 分布式应用 解压 count()
一、Kafka介绍
kafka是消息中间件的一种,一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写
1.应用场景
1.名词解译:
producer
:生产者,。consumer
:消费者,。topic
:可以把它理解为标签,生产者每生产出来一个消息就贴上一个标签(topic),消费者可以根据TopicName选择需要消费的消息。broker
:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。。
Topic写入流程
创建Topic
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tes
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
13.生产者输入消息可在消费者命令这命令窗口消费到此内容
三、Java操作
1.Pom添加引用
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.2.0-RC1</version> </dependency>
2.创建生产者
package com.answern.openplatform;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @description: Kafka生产者
* @author: shenling
* @create: 2019-08-08 12:44
**/
public class KafkaProducerDemo implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaProducerDemo(String topicName) {
Properties props = new Properties();
//服务器地址,如果有多个Broker,地址参数中用","分割
props.put("bootstrap.servers", "127.0.0.1:9092");
//acks 参数共有三个值:0、1 和 all。
//0:生产者只要吧消息发送出去,不管消费者是否正确消费,生产者就认为消费者成功消费,
//1:只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。注意:kafka默认配置是为1
//allartition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。:
props.put("acks", "all");
//如果发送请求到Topic失败,重试次数
props.put("retries", 0);
//生产者发送数据大小基本单位,单位为Bytes
props.put("batch.size", 16384);
//对Key进行序列化操作(二进制格式)
props.put("key.serializer", StringSerializer.class.getName());
//对Value进行序列化操作(二进制格式)
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
//创建生产者结束
}
@Override
public void run() {
int messageNo = 1;
try {
for(;;) {
String messageStr="你好,这是第"+messageNo+"条数据";
producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
//生产了100条就打印
if(messageNo%100==0){
System.out.println("发送的信息:" + messageStr);
}
//生产1000条就退出
if(messageNo%1000==0){
System.out.println("成功发送了"+messageNo+"条");
break;
}
messageNo++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String args[]) {
KafkaProducerDemo test = new KafkaProducerDemo("applog");
Thread thread = new Thread(test);
thread.start();
}
}
2.创建消费者
package com.answern.openplatform;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
/**
* @description: kafka消费者
* @author: shenling
* @create: 2019-08-08 12:42
**/
public class KafkaConsumerDemo implements Runnable {
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUPID = "groupA";
public KafkaConsumerDemo(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092"); ////kafka地址,多个地址用逗号分割
//相同组内,一条数据只能被消费一次。比如说我起了十个消费者,同时消费一个Topic,如果这些消费者不是在一个组里面那么,
// 会存在一条数据被十个消费者消费,共消费十次的情况
props.put("group.id", GROUPID);
//是否自动提交Offset
props.put("enable.auto.commit", "true");
//然后计算下次的auto commit时间
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
int messageNo = 1;
System.out.println("---------开始消费---------");
try {
for (;;) {
msgList = consumer.poll(1000);
if(null!=msgList&&msgList.count()>0){
for (ConsumerRecord<String, String> record : msgList) {
//消费100条就打印 ,但打印的数据不一定是这个规律的
if(messageNo%100==0){
System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
}
//当消费了1000条就退出
if(messageNo%10000==0){
break;
}
messageNo++;
}
}else{
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerDemo test1 = new KafkaConsumerDemo("applog");
Thread thread1 = new Thread(test1);
thread1.start();
}
}
参考网址:
https://www.cnblogs.com/flower1990/p/7466882.html
标签:mic dem test app boot esc 分布式应用 解压 count()
原文地址:https://www.cnblogs.com/shenling/p/11304458.html