标签:read alt ret mil pre tar return cat 方式
一、Stream简介
@Data
public class User implements Serializable {
/**
* ID
*/
private Long id;
/**
* 用户名称
*/
private String name;
}
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @Description: 用户信息输出
* @date: 2019/6/23 13:05
*/
public interface UserMessage {
@Output("user-message")
MessageChannel output();
}
@Autowired
private UserMessage userMessage;
@PostMapping("/user/save/message/stream")
public boolean saveUserByRabbitMessage(@RequestBody User user){
MessageChannel messageChannel = userMessage.output();
return messageChannel.send(MessageBuilder.withPayload(user).build());
}
## Kafka 生产者配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id= yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### user-message 为输出管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
@EnableBinding(UserMessage.class)
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
public interface UserMessage {
@Input("user-message") //管道名称
SubscribableChannel input();
}
@Autowired
private UserMessage userMessage;
@Autowired
private UserService userService;
@Autowired
private ObjectMapper objectMapper;
@ServiceActivator(inputChannel = "user-message")
public void listen(String data) throws IOException {
System.out.println("ServiceActivator实现"+data);
saveUser(data);
}
@StreamListener("user-message")
public void onMessage(String data) throws IOException {
System.out.println(" @StreamListeners实现"+data);
saveUser(data);
}
private void saveUser(String data) throws IOException {
User user = objectMapper.readValue(data, User.class);
userService.saveUser(user);
}
@PostConstruct
public void init() {
SubscribableChannel subscribableChannel = userMessage.input();
subscribableChannel.subscribe(message -> {
System.out.println("SubscribableChannel实现"+message);
});
}
#Kafka配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id=yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### userMessage 为输入管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
//激活 Stream Binding到UserMessage @EnableBinding(UserMessage.class)
三、验证
项目启动顺序:spring-cloud-eureka-server -> spring-cloud-api-client -> spring-cloud-api-provider






这里乱码的应该是kafka的其他属性没有转换过来,这里我也没有处理这些。
SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)
标签:read alt ret mil pre tar return cat 方式
原文地址:https://www.cnblogs.com/yangk1996/p/11072712.html