标签:cti inter font 服务器 nal 使用 过程 问控制 pre
创建过程不再描述,创建后的工程结构如下:
POM文件中要加入几个依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.zhbf</groupId> <artifactId>springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入kafka依赖--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 添加 gson 依赖 --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
启动SpringbootApplication.java,出现下图界面则说明工程创建好了:
/** * Kafka消息生产类 */ @Log @Component public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.topic.user}") private String topicUser;//topic名称 /** * 发送用户消息 * * @param user 用户信息 */ public void sendUserMessage(User user) { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); builder.setDateFormat("yyyy-MM-dd HH:mm:ss"); String message = builder.create().toJson(user); kafkaTemplate.send(topicUser, message); log.info("\n生产消息至Kafka\n" + message); } }
启动方式参考上一篇文章,戳这里~
/** * 测试控制器 * PS:@RestController 注解: 该注解是 @Controller 和 @ResponseBody 注解的合体版 */ @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private User user; @Autowired private KafkaProducer kafkaProducer; @RequestMapping("/createMsg") public void createMsg() { kafkaProducer.sendUserMessage(user); } }
可以看到控制台和消费者窗口都打印了kafka生成的消息。
@Log @Component public class KafkaConsumerDemo { public void consume() { Properties props = new Properties(); // 必须设置的属性 props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "group-user"); // 可选设置属性 props.put("enable.auto.commit", "true"); // 自动提交offset,每1s提交一次 props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest "); props.put("client.id", "zy_client_id"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅test1 topic consumer.subscribe(Collections.singletonList("topic-user")); while (true) { // 从服务器开始拉取数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("成功消费消息:topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); }); } } }
kafka学习(五)Spring Boot 整合 Kafka
标签:cti inter font 服务器 nal 使用 过程 问控制 pre
原文地址:https://www.cnblogs.com/riches/p/11720068.html