标签:inter enter bin .repo default etc 其它 ret parent
1. Apache Kafka是一个分布式流平台
1.1 流平台有三个关键功能:
1.2 Kafka通常用于两大类应用:
1.3 有几个特别重要的概念:
Kafka is run as a cluster on one or more servers that can span multiple datacenters.
The Kafka cluster stores streams of records in categories called topics.
Each record consists of a key, a value, and a timestamp.
Kafka作为集群运行在一个或多个可以跨多个数据中心的服务器上
从这句话表达了三个意思:
Kafka集群按分类存储流记录,这个分类叫做主题
这句话表达了以下几个信息:
每个记录由一个键、一个值和一个时间戳组成
1.4 Kafka有四个核心API:
(画外音:我理解这四个核心API其实就是:发布、订阅、转换处理、从第三方采集数据。)
在Kafka中,客户端和服务器之间的通信是使用简单的、高性能的、与语言无关的TCP协议完成的。
2. Topics and Logs(主题和日志)
一个topic是一个分类,或者说是记录被发布的时候的一个名字(画外音:可以理解为记录要被发到哪儿去)。
在Kafka中,topic总是有多个订阅者,因此,一个topic可能有0个,1个或多个订阅该数据的消费者。
对于每个主题,Kafka集群维护一个分区日志,如下图所示:
每个分区都是一个有序的、不可变的记录序列,而且记录会不断的被追加,一条记录就是一个结构化的提交日志(a structured commit log)。
分区中的每条记录都被分配了一个连续的id号,这个id号被叫做offset(偏移量),这个偏移量唯一的标识出分区中的每条记录。(PS:如果把分区比作数据库表的话,那么偏移量就是主键)
Kafka集群持久化所有已发布的记录,无论它们有没有被消费,记录被保留的时间是可以配置的。例如,如果保留策略被设置为两天,那么在记录发布后的两天内,可以使用它,之后将其丢弃以释放空间。在对数据大小方面,Kafka的性能是高效的,恒定常量级的,因此长时间存储数据不是问题。
事实上,唯一维护在每个消费者上的元数据是消费者在日志中的位置或者叫偏移量。偏移量是由消费者控制的:通常消费者在读取记录的时候会线性的增加它的偏移量,但是,事实上,由于位置(偏移量)是由消费者控制的,所有它可以按任意它喜欢的顺序消费记录。例如:一个消费者可以重置到一个较旧的偏移量来重新处理之前已经处理过的数据,或者跳转到最近的记录并从“现在”开始消费。
这种特性意味着消费者非常廉价————他们可以来来去去的消息而不会对集群或者其它消费者造成太大影响。
日志中的分区有几个用途。首先,它们允许日志的规模超出单个服务器的大小。每个独立分区都必须与宿主的服务器相匹配,但一个主题可能有多个分区,所以它可以处理任意数量的数据。第二,它们作为并行的单位——稍后再进一步。
(
画外音:简单地来说,日志分区的作用有两个:一、日志的规模不再受限于单个服务器;二、分区意味着可以并行。
什么意思呢?主题建立在集群之上,每个主题维护了一个分区日志,顾名思义,日志是分区的;每个分区所在的服务器的资源(比如:CPU、内存、带宽、磁盘等)是有限的,如果不分区(可以理解为等同于只有一个)的话,必然受限于这个分区所在的服务器,那么多个分区的话就不一样了,就突破了这种限制,服务器可以随便加,分区也可以随便加。
)
3. Distribution(分布)
日志的分区分布在集群中的服务器上,每个服务器处理数据,并且分区请求是共享的。每个分区被复制到多个服务器上以实现容错,到底复制到多少个服务器上是可以配置的。
Each partition is replicated across a configurable number of servers for fault tolerance.
每个分区都有一个服务器充当“leader”角色,并且有0个或者多个服务器作为“followers”。leader处理对这个分区的所有读和写请求,而followers被动的从leader那里复制数据。如果leader失败,followers中的其中一个会自动变成新的leader。每个服务器充当一些分区的“leader”的同时也是其它分区的“follower”,因此在整个集群中负载是均衡的。
也就是说,每个服务器既是“leader”也是“follower”。我们知道一个主题可能有多个分区,一个分区可能在一个服务器上也可能跨多个服务器,然而这并不以为着一台服务器上只有一个分区,是可能有多个分区的。每个分区中有一个服务器充当“leader”,其余是“follower”。leader负责处理这个它作为leader所负责的分区的所有读写请求,而该分区中的follow只是被动复制leader的数据。这个有点儿像HDFS中的副本机制。例如:分区-1有服务器A和B组成,A是leader,B是follower,有请求要往分区-1中写数据的时候就由A处理,然后A把刚才写的数据同步给B,这样的话正常请求相当于A和B的数据是一样的,都有分区-1的全部数据,如果A宕机了,B成为leader,接替A继续处理对分区-1的读写请求。
需要注意的是,分区是一个虚拟的概念,是一个逻辑单元。
4. Producers(生产者)
生产者发布数据到它们选择的主题中。生产者负责选择将记录投递到哪个主题的哪个分区中。要做这件事情,可以简单地用循环方式以到达负载均衡,或者根据一些语义分区函数(比如:基于记录中的某些key)
5. Consumers(消费者)
消费者用一个消费者组名来标识它们自己(PS:相当于给自己贴一个标签,标签的名字是组名,以表明自己属于哪个组),并且每一条发布到主题中的记录只会投递给每个订阅的消费者组中的其中一个消费者实例。消费者实例可能是单独的进程或者在单独的机器上。
如果所有的消费者实例都使用相同的消费者组,那么记录将会在这些消费者之间有效的负载均衡。
如果所有的消费者实例都使用不同的消费者组,那么每条记录将会广播给所有的消费者进程。
上图中其实那个Kafka Cluster换成Topic会更准确一些
一个Kafka集群有2个服务器,4个分区(P0-P3),有两个消费者组。组A中有2个消费者实例,组B中有4个消费者实例。
通常我们会发现,主题不会有太多的消费者组,每个消费者组是一个“逻辑订阅者”(以消费者组的名义订阅主题,而非以消费者实例的名义去订阅)。每个组由许多消费者实例组成,以实现可扩展性和容错。这仍然是发布/订阅,只不过订阅者是一个消费者群体,而非单个进程。
在Kafka中,这种消费方式是通过用日志中的分区除以使用者实例来实现的,这样可以保证在任意时刻每个消费者都是排它的消费,即“公平共享”。Kafka协议动态的处理维护组中的成员。如果有心的实例加入到组中,它们将从组中的其它成员那里接管一些分区;如果组中有一个实例死了,那么它的分区将会被分给其它实例。
(画外音:什么意思呢?举个例子,在上面的图中,4个分区,组A有2个消费者,组B有4个消费者,那么对A来讲组中的每个消费者负责4/2=2个分区,对组B来说组中的每个消费者负责4/4=1个分区,而且同一时间消息只能被组中的一个实例消费。如果组中的成员数量有变化,则重新分配。)
Kafka只提供分区下的记录的总的顺序,而不提供主题下不同分区的总的顺序。每个分区结合按key划分数据的能力排序对大多数应用来说是足够的。然而,如果你需要主题下总的记录顺序,你可以只使用一个分区,这样做的做的话就意味着每个消费者组中只能有一个消费者实例。
6. 保证
在一个高级别的Kafka给出下列保证:
7. Spring Kafka
Spring提供了一个“模板”作为发送消息的高级抽象。它也通过使用@KafkaListener注释和“监听器容器”提供对消息驱动POJOs的支持。这些库促进了依赖注入和声明式的使用。
7.1 纯Java方式
1 package com.cjs.example.quickstart; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.apache.kafka.clients.producer.ProducerConfig; 6 import org.apache.kafka.common.serialization.IntegerDeserializer; 7 import org.apache.kafka.common.serialization.IntegerSerializer; 8 import org.apache.kafka.common.serialization.StringDeserializer; 9 import org.apache.kafka.common.serialization.StringSerializer; 10 import org.springframework.kafka.core.*; 11 import org.springframework.kafka.listener.KafkaMessageListenerContainer; 12 import org.springframework.kafka.listener.MessageListener; 13 import org.springframework.kafka.listener.config.ContainerProperties; 14 15 import java.util.HashMap; 16 import java.util.Map; 17 18 public class PureJavaDemo { 19 20 /** 21 * 生产者配置 22 */ 23 private static Map<String, Object> senderProps() { 24 Map<String, Object> props = new HashMap<>(); 25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093"); 26 props.put(ProducerConfig.RETRIES_CONFIG, 0); 27 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 30 return props; 31 } 32 33 /** 34 * 消费者配置 35 */ 36 private static Map<String, Object> consumerProps() { 37 Map<String, Object> props = new HashMap<>(); 38 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093"); 39 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello"); 40 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 41 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 42 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 44 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 45 return props; 46 } 47 48 /** 49 * 发送模板配置 50 */ 51 private static KafkaTemplate<Integer, String> createTemplate() { 52 Map<String, Object> senderProps = senderProps(); 53 ProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps); 54 KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory); 55 return kafkaTemplate; 56 } 57 58 /** 59 * 消息监听器容器配置 60 */ 61 private static KafkaMessageListenerContainer<Integer, String> createContainer() { 62 Map<String, Object> consumerProps = consumerProps(); 63 ConsumerFactory<Integer, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps); 64 ContainerProperties containerProperties = new ContainerProperties("test"); 65 KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); 66 return container; 67 } 68 69 70 public static void main(String[] args) throws InterruptedException { 71 String topic1 = "test"; // 主题 72 73 KafkaMessageListenerContainer container = createContainer(); 74 ContainerProperties containerProperties = container.getContainerProperties(); 75 containerProperties.setMessageListener(new MessageListener<Integer, String>() { 76 @Override 77 public void onMessage(ConsumerRecord<Integer, String> record) { 78 System.out.println("Received: " + record); 79 } 80 }); 81 container.setBeanName("testAuto"); 82 83 container.start(); 84 85 KafkaTemplate<Integer, String> kafkaTemplate = createTemplate(); 86 kafkaTemplate.setDefaultTopic(topic1); 87 88 kafkaTemplate.sendDefault(0, "foo"); 89 kafkaTemplate.sendDefault(2, "bar"); 90 kafkaTemplate.sendDefault(0, "baz"); 91 kafkaTemplate.sendDefault(2, "qux"); 92 93 kafkaTemplate.flush(); 94 container.stop(); 95 96 System.out.println("结束"); 97 } 98 99 }
运行结果:
Received: ConsumerRecord(topic = test, partition = 0, offset = 67, CreateTime = 1533300970788, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = foo) Received: ConsumerRecord(topic = test, partition = 0, offset = 68, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = bar) Received: ConsumerRecord(topic = test, partition = 0, offset = 69, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = baz) Received: ConsumerRecord(topic = test, partition = 0, offset = 70, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = qux)
7.2 更简单一点儿,用SpringBoot
1 package com.cjs.example.quickstart; 2 3 import org.apache.kafka.clients.consumer.ConsumerRecord; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.boot.CommandLineRunner; 6 import org.springframework.context.annotation.Bean; 7 import org.springframework.context.annotation.Configuration; 8 import org.springframework.kafka.annotation.KafkaListener; 9 import org.springframework.kafka.core.KafkaTemplate; 10 11 @Configuration 12 public class JavaConfigurationDemo { 13 14 @KafkaListener(topics = "test") 15 public void listen(ConsumerRecord<String, String> record) { 16 System.out.println("收到消息: " + record); 17 } 18 19 @Bean 20 public CommandLineRunner commandLineRunner() { 21 return new MyRunner(); 22 } 23 24 class MyRunner implements CommandLineRunner { 25 26 @Autowired 27 private KafkaTemplate<String, String> kafkaTemplate; 28 29 @Override 30 public void run(String... args) throws Exception { 31 kafkaTemplate.send("test", "foo1"); 32 kafkaTemplate.send("test", "foo2"); 33 kafkaTemplate.send("test", "foo3"); 34 kafkaTemplate.send("test", "foo4"); 35 } 36 } 37 }
application.properties配置
spring.kafka.bootstrap-servers=192.168.101.5:9092 spring.kafka.consumer.group-id=world
8. 生产者
1 package com.cjs.example.send; 2 3 import org.apache.kafka.clients.producer.ProducerConfig; 4 import org.apache.kafka.common.serialization.IntegerSerializer; 5 import org.apache.kafka.common.serialization.StringSerializer; 6 import org.springframework.context.annotation.Bean; 7 import org.springframework.context.annotation.Configuration; 8 import org.springframework.kafka.core.DefaultKafkaProducerFactory; 9 import org.springframework.kafka.core.KafkaTemplate; 10 import org.springframework.kafka.core.ProducerFactory; 11 12 import java.util.HashMap; 13 import java.util.Map; 14 15 @Configuration 16 public class Config { 17 18 public Map<String, Object> producerConfigs() { 19 Map<String, Object> props = new HashMap<>(); 20 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092"); 21 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 22 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 23 return props; 24 } 25 26 public ProducerFactory<Integer, String> producerFactory() { 27 return new DefaultKafkaProducerFactory<>(producerConfigs()); 28 } 29 30 @Bean 31 public KafkaTemplate<Integer, String> kafkaTemplate() { 32 return new KafkaTemplate<Integer, String>(producerFactory()); 33 } 34 35 }
1 package com.cjs.example.send; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.boot.CommandLineRunner; 5 import org.springframework.kafka.core.KafkaTemplate; 6 import org.springframework.kafka.support.SendResult; 7 import org.springframework.stereotype.Component; 8 import org.springframework.util.concurrent.ListenableFuture; 9 import org.springframework.util.concurrent.ListenableFutureCallback; 10 11 @Component 12 public class MyCommandLineRunner implements CommandLineRunner { 13 14 @Autowired 15 private KafkaTemplate<Integer, String> kafkaTemplate; 16 17 public void sendTo(Integer key, String value) { 18 ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send("test", key, value); 19 listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { 20 @Override 21 public void onFailure(Throwable throwable) { 22 System.out.println("发送失败啦"); 23 throwable.printStackTrace(); 24 } 25 26 @Override 27 public void onSuccess(SendResult<Integer, String> sendResult) { 28 System.out.println("发送成功," + sendResult); 29 } 30 }); 31 } 32 33 @Override 34 public void run(String... args) throws Exception { 35 sendTo(1, "aaa"); 36 sendTo(2, "bbb"); 37 sendTo(3, "ccc"); 38 } 39 40 41 }
运行结果:
发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=aaa, timestamp=null), recordMetadata=test-0@37] 发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=2, value=bbb, timestamp=null), recordMetadata=test-0@38] 发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value=ccc, timestamp=null), recordMetadata=test-0@39]
9. 消费者@KafkaListener
1 package com.cjs.example.receive; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.apache.kafka.common.serialization.IntegerDeserializer; 6 import org.apache.kafka.common.serialization.StringDeserializer; 7 import org.springframework.context.annotation.Bean; 8 import org.springframework.context.annotation.Configuration; 9 import org.springframework.kafka.annotation.KafkaListener; 10 import org.springframework.kafka.annotation.TopicPartition; 11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 12 import org.springframework.kafka.config.KafkaListenerContainerFactory; 13 import org.springframework.kafka.core.ConsumerFactory; 14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 15 import org.springframework.kafka.listener.AbstractMessageListenerContainer; 16 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; 17 import org.springframework.kafka.listener.config.ContainerProperties; 18 import org.springframework.kafka.support.Acknowledgment; 19 import org.springframework.kafka.support.KafkaHeaders; 20 import org.springframework.messaging.handler.annotation.Header; 21 import org.springframework.messaging.handler.annotation.Payload; 22 23 import java.util.HashMap; 24 import java.util.List; 25 import java.util.Map; 26 27 @Configuration 28 public class Config2 { 29 30 @Bean 31 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { 32 ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 33 factory.setConsumerFactory(consumerFactory()); 34 factory.setConcurrency(3); 35 ContainerProperties containerProperties = factory.getContainerProperties(); 36 containerProperties.setPollTimeout(2000); 37 // containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); 38 return factory; 39 } 40 41 private ConsumerFactory<Integer,String> consumerFactory() { 42 return new DefaultKafkaConsumerFactory<>(consumerProps()); 43 } 44 45 private Map<String, Object> consumerProps() { 46 Map<String, Object> props = new HashMap<>(); 47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092"); 48 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hahaha"); 49 // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 50 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 51 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 52 return props; 53 } 54 55 56 @KafkaListener(topics = "test") 57 public void listen(String data) { 58 System.out.println("listen 收到: " + data); 59 } 60 61 62 @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 63 public void listen2(String data, Acknowledgment ack) { 64 System.out.println("listen2 收到: " + data); 65 ack.acknowledge(); 66 } 67 68 @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = "0")}) 69 public void listen3(ConsumerRecord<?, ?> record) { 70 System.out.println("listen3 收到: " + record.value()); 71 } 72 73 74 @KafkaListener(id = "xyz", topics = "test") 75 public void listen4(@Payload String foo, 76 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, 77 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 78 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, 79 @Header(KafkaHeaders.OFFSET) List<Long> offsets) { 80 System.out.println("listen4 收到: "); 81 System.out.println(foo); 82 System.out.println(key); 83 System.out.println(partition); 84 System.out.println(topic); 85 System.out.println(offsets); 86 } 87 88 }
9.1 Committing Offsets
如果enable.auto.commit设置为true,那么kafka将自动提交offset。如果设置为false,则支持下列AckMode(确认模式)。
消费者poll()方法将返回一个或多个ConsumerRecords
10. Spring Boot Kafka
10.1 application.properties
spring.kafka.bootstrap-servers=192.168.101.5:9092
10.2 发送消息
1 package com.cjs.example; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.kafka.core.KafkaTemplate; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.RestController; 7 8 import javax.annotation.Resource; 9 10 @RestController 11 @RequestMapping("/msg") 12 public class MessageController { 13 14 @Resource 15 private KafkaTemplate<String, String> kafkaTemplate; 16 17 @RequestMapping("/send") 18 public String send(String topic, String key, String value) { 19 kafkaTemplate.send(topic, key, value); 20 return "ok"; 21 } 22 23 }
10.3 接收消息
1 package com.cjs.example; 2 3 import org.apache.kafka.clients.consumer.ConsumerRecord; 4 import org.springframework.kafka.annotation.KafkaListener; 5 import org.springframework.kafka.annotation.KafkaListeners; 6 import org.springframework.stereotype.Component; 7 8 @Component 9 public class MessageListener { 10 11 /** 12 * 监听订单消息 13 */ 14 @KafkaListener(topics = "ORDER", groupId = "OrderGroup") 15 public void listenToOrder(String data) { 16 System.out.println("收到订单消息:" + data); 17 } 18 19 /** 20 * 监听会员消息 21 */ 22 @KafkaListener(topics = "MEMBER", groupId = "MemberGroup") 23 public void listenToMember(ConsumerRecord<String, String> record) { 24 System.out.println("收到会员消息:" + record); 25 } 26 27 /** 28 * 监听所有消息 29 * 30 * 任意时刻,一条消息只会发给组中的一个消费者 31 * 32 * 消费者组中的成员数量不能超过分区数,这里分区数是1,因此订阅该主题的消费者组成员不能超过1 33 */ 34 // @KafkaListeners({@KafkaListener(topics = "ORDER", groupId = "OrderGroup"), 35 // @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")}) 36 // public void listenToAll(String data) { 37 // System.out.println("啊啊啊"); 38 // } 39 40 }
11. pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cjs.example</groupId> <artifactId>cjs-kafka-example</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>cjs-kafka-example</name> <description></description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
12. 其它
有兴趣的话可以看一下其它几篇:
《Kafka介绍》
13. 参考
http://spring.io/projects/spring-kafka
https://docs.spring.io/spring-boot/docs/2.0.4.RELEASE/reference/htmlsingle/#boot-features-kafka
标签:inter enter bin .repo default etc 其它 ret parent
原文地址:https://www.cnblogs.com/cjsblog/p/9416380.html