码迷,mamicode.com
首页 > 其他好文 > 详细

canal的使用

时间:2020-12-09 11:33:40      阅读:4      评论:0      收藏:0      [点我收藏+]

标签:启动   and   重启   port   core   protoc   conf   enc   tar   

  • mysql开启binlog模式
  • 查看mysql是否开启binlog模式

SHOW VARIABLES LIKE ‘%log_bin%‘ </DI< div>

  • 修改/etc/my.cnf 需要开启binlog模式

[mysqld]

log-bin=mysql-bin

binlog-format=ROW

server_id=1 </DI< div>

<dependency>

<groupId>com.xpand</groupId>

<artifactId>starter-canal</artifactId>

<version>0.0.1-SNAPSHOT</version>

</dependency>

canal使用

  • 启动类添加
@EnableCanalClient
  • 使用
package com.changgou.canal.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.canal.config.RabbitMQConfig;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

@CanalEventListener //声明当前的类是canal的监听类
public class BusinessListener {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
*
* @param eventType 当前操作数据库的类型
* @param rowData 当前操作数据库的数据
*/
@ListenPoint(schema = "changgou_business",table = "tb_ad")
public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
System.out.println("广告表数据发生改变");
//获取改变之前的数据
//rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue()));

//获取改变之后的数据
//rowData.getAfterColumnsList().forEach((c)-> System.out.println("改变之后的数据:"+c.getName()+"::"+c.getValue()));

for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
if ("position".equals(column.getName())){
System.out.println("发送最新的数据到MQ:"+column.getValue());

//发送消息
rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,column.getValue());
}
}
}
}

canal的使用

标签:启动   and   重启   port   core   protoc   conf   enc   tar   

原文地址:https://www.cnblogs.com/lgee/p/14085432.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!