标签:tag oid 发送 uda manage start rollback discover required
==看官方RocketMQ指导==
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
private final RocketMQTemplate rocketMQTemplate;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键' ,
`transaction_Id` varchar(45) NOT NULL COMMENT '事务' ,
`log` varchar(45) NOT NULL COMMENT '日志' ,
PRIMARY KEY (`id`)
);
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法!该分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
}
// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 发送半消息。。
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
// header也有妙用...
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(auditDTO))
.build(),
auditDTO
);
}
else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share写到缓存
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("审核分享...")
.build()
);
}
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
ShareAuditDTO auditDTO = (ShareAuditDTO)arg;
try {
this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
==@EnableBinding({Source.class})==
// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
==修改com.alibaba.nacos日志级别==
logging:
level:
com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
com.alibaba.nacos: error
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
==@EnableBinding({Sink.class})==
// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
}
==修改com.alibaba.nacos日志级别==
logging:
level:
com.alibaba.nacos: error
如下==三个链接查看SpringCloudStream==的监控
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
System.out.println("Handling ERROR: " + errorMessage);
}
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
transactional: true
group: tx-add-bonus-group
bindings:
output:
# 用来指定topic
destination: add-bonus
==@EnableBinding({Source.class})==
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法!该分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
}
// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 发送半消息。。
String transactionId = UUID.randomUUID().toString();
this.source.output()
.send(
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
// header也有妙用...
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(auditDTO))
.build()
);
}
else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share写到缓存
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("审核分享...")
.build()
);
}
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
String dtoString = (String) headers.get("dto");
ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);
try {
this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
destination: add-bonus
group: binder-group
==@EnableBinding({Sink.class})==
// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
}
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusStreamConsumer {
private final UserService userService;
@StreamListener(Sink.INPUT)
public void receive(UserAddBonusMsgDTO message) {
message.setEvent("CONTRIBUTE");
message.setDescription("投稿加积分..");
this.userService.addBonus(message);
}
}
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO msgDTO) {
// 1. 为用户加积分
Integer userId = msgDTO.getUserId();
Integer bonus = msgDTO.getBonus();
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 记录日志到bonus_event_log表里面
this.bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(userId)
.value(bonus)
.event(msgDTO.getEvent())
.createTime(new Date())
.description(msgDTO.getDescription())
.build()
);
log.info("积分添加完毕...");
}
5.【Spring Cloud Alibaba】消息驱动的微服务-SpringCloudAlibabaRocketMQ
标签:tag oid 发送 uda manage start rollback discover required
原文地址:https://www.cnblogs.com/xjknight/p/12349104.html