标签:read alt ret mil pre tar return cat 方式
一、Stream简介
@Data public class User implements Serializable { /** * ID */ private Long id; /** * 用户名称 */ private String name; }
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * @Description: 用户信息输出 * @date: 2019/6/23 13:05 */ public interface UserMessage { @Output("user-message") MessageChannel output(); }
@Autowired private UserMessage userMessage;
@PostMapping("/user/save/message/stream") public boolean saveUserByRabbitMessage(@RequestBody User user){ MessageChannel messageChannel = userMessage.output(); return messageChannel.send(MessageBuilder.withPayload(user).build()); }
## Kafka 生产者配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id= yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### user-message 为输出管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
@EnableBinding(UserMessage.class)
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
public interface UserMessage { @Input("user-message") //管道名称 SubscribableChannel input(); }
@Autowired private UserMessage userMessage; @Autowired private UserService userService; @Autowired private ObjectMapper objectMapper; @ServiceActivator(inputChannel = "user-message") public void listen(String data) throws IOException { System.out.println("ServiceActivator实现"+data); saveUser(data); } @StreamListener("user-message") public void onMessage(String data) throws IOException { System.out.println(" @StreamListeners实现"+data); saveUser(data); } private void saveUser(String data) throws IOException { User user = objectMapper.readValue(data, User.class); userService.saveUser(user); } @PostConstruct public void init() { SubscribableChannel subscribableChannel = userMessage.input(); subscribableChannel.subscribe(message -> { System.out.println("SubscribableChannel实现"+message); }); }
#Kafka配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id=yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### userMessage 为输入管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
//激活 Stream Binding到UserMessage @EnableBinding(UserMessage.class)
三、验证
项目启动顺序:spring-cloud-eureka-server -> spring-cloud-api-client -> spring-cloud-api-provider
这里乱码的应该是kafka的其他属性没有转换过来,这里我也没有处理这些。
SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)
标签:read alt ret mil pre tar return cat 方式
原文地址:https://www.cnblogs.com/yangk1996/p/11072712.html