解析MySQL的变化数据流Canal(mysql中canal)
解析MySQL的变化数据流——Canal
Canal是淘宝金融开源的一款基于MySQL数据库的增量数据同步工具,它实现了MySQL的binlog协议,将增量数据解析成数据操作语句或者数据变更事件,发送到MQ,Kafka或者其他异步处理系统中,从而实现异地数据共享、多级缓存等场景。下面将介绍Canal的基本原理和使用方法。
一、Canal的原理
Canal的基本原理是模拟MySQL从Slave读取Binlog,并解析其内容获取增量数据的过程,因此需要在MySQL中开启Binlog功能,并授权Canal的账号访问MySQL中的Binlog,并通过Canal的配置文件进行配置。Canal监听了MySQL的Binlog文件流,将其解析成增量数据,并使用MQ进行异步传输。
Canal的工作流程如下图所示:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/97322/1635351589653-441d43f2-dc83-48f3-84b2-84b8d793f44c.png#clientId=u8455a5d5-5e5c-4&from=paste&height=274&id=u7466c167&margin=%5Bobject%20Object%5D&name=image.png&originHeight=548&originWidth=758&originalType=binary&ratio=1.562907268170426&size=120593&status=done&style=none&taskId=uc0daba54-6c31-44f7-be6c-6381b57ee6b&width=379)
Canal由三个主要组件组成:
1. Canal Server:用于从MySQL的binlog接收增量数据,并以自定义格式发送数据到不同的消费者。Canal Server可以纯粹地运行在内存中,只需要很少的资源和CPU。
2. Canal Client:作为Canal Server的消费者,接收从Canal Server传送过来的增量数据,并进行解析和处理。Canal Client可以通过自定义的处理逻辑将增量数据转换为状态消息、数据变更事件或数据操作语句。
3. Canal Admin:提供Web界面和API,用于管理Canal Server和Canal Client的配置、启动和关闭。
二、Canal的使用
1. 下载Canal Server并解压:http://github.com/alibaba/canal/releases
2. 修改Canal Server的配置文件:conf/canal.properties,其中需要配置MySQL的主机名、端口、用户名、密码等信息。也可以在该配置文件中启用其他功能,如过滤Binlog的表、字段、DML操作类型、DDL操作类型等。
3. 启动Canal Server:bin/startup.sh
4. 下载Canal Client的依赖包:http://maven.aliyun.com/nexus/content/groups/public/com/alibaba/otter/canal/。如果使用的是Maven,则可以在pom.xml文件中添加以下依赖:
com.alibaba.otter canal.client
${canal.version}
5. 编写Canal Client程序:Canal提供了基于Java、Python、C++等语言的客户端SDK。以Java为例,可以按照以下步骤进行:
5.1. 定义Canal Client的配置文件:
canal.instance.master.address=127.0.0.1:11111
canal.instance.dbUsername=rootcanal.instance.dbPassword=123456
canal.instance.connectionCharset=UTF-8canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb?useSSL=falsecanal.instance.gtidon=false
canal.instance.filter.regex=.*\\..*
5.2. 定义Canal Client的监听器:
public class CanalClientTest {
public static void mn(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "root", "123456");
int batchSize = 1000; int emptyCount = 0;
try { connector.connect();
connector.subscribe(".*\\..*"); connector.rollback();
int totalEmptyCount = 1200; while (emptyCount
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId();
int size = message.getEntries().size(); if (batchId == -1 || size == 0) {
emptyCount++; System.out.println("empty count : " + emptyCount);
try { Thread.sleep(1000);
} catch (InterruptedException e) { }
} else { emptyCount = 0;
// 解析并处理更新数据 printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据
} } finally {
connector.disconnect(); }
}
private static void printEntry(List entrys) {
for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue; }
RowChange rowChage = null;
try { rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e); }
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList()); } else {
System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList());
} }
} }
private static void printColumn(List columns) {
for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
} }
}
5.3. 启动Canal Client程序即可监听、解析和处理MySQL的增量数据。
Canal是一个非常强大的基于MySQL的增量数据同步工具,它可以在跨机房、跨数据中心的异构系统之间实现高效的数据同步。在实际应用中,我们只需要编写Canal Client程序即可实现对MySQL的增量数据进行灵活、高效的处理。