利用Oracle Canal助力数据库增量同步(oracle canal)

利用Oracle Canal助力数据库增量同步

在现代信息化建设的过程中,数据库作为核心数据存储和管理平台,其可靠性和高效性显得特别重要。为了保证数据的完整性和一致性,通常需要将不同的数据库之间进行同步,但是如果每次都进行全量同步,则会带来较大的性能负担,甚至会导致业务中断。因此,为了解决这一问题,我们可以利用Oracle Canal工具实现数据库的增量同步。

Oracle Canal介绍

Oracle Canal是阿里巴巴开源的一款MySQL二进制日志解析工具。其可以快速获取二进制日志中的增量更新数据,并通过数据格式规范的对比,对更新的数据进行统一管理和同步。在数据增量同步的场景下,Oracle Canal表现出了很好的稳定性和高效性。

原理介绍

Oracle Canal通过读取MySQL主库上生成的二进制日志,将其解析成单条SQL语句,并提供了类似于Trigger的方式,将其同步到其他的MySQL从库中。可以根据业务需求,选择合适的同步方式,整体设计如下图所示:

![Oracle Canal增量同步系统原理图](https://img-blog.csdn.net/20180914200017262?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3Nlcl9tb3Jl/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/q/75)

其中,Master为主库,需要同步数据的源;Slave为从库,负责接收Master同步的数据。客户端通过Canal Connect协议连接到Canal服务,获取到MySQL主库上的二进制日志,对其进行解析后,再将解析后的数据传递给上层业务处理。

使用方法

下面我们以一个简单的Java程序为例,来介绍如何使用Oracle Canal实现增量同步。

(1)添加依赖库

“`xml

com.alibaba.otter

canal.client

1.1.4


(2)编写Canal客户端

```java
public class CanalClient {
private static CanalConnector canalConnector;
private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void mn(String[] args) {

// 创建链接
canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"root",
"root");
int batchSize = 1000;
int emptyCount = 0;
try {
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
canalConnector.rollback();
while (true) {
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
printSummary(message, batchId);
printEntries(message.getEntries());
}
canalConnector.ack(batchId); // 提交确认
// canalConnector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
canalConnector.disconnect();
}
}
private static void printSummary(Message message, long batchId) {
long firstEntryTime = message.getEntries().get(0).getHeader().getExecuteTime();
long lastEntryTime = message.getEntries().get(message.getEntries().size() - 1).getHeader().getExecuteTime();

System.out.println("**********************");
System.out.println("batchId=" + batchId + ", count=" + message.getEntries().size() + ", delay="
+ (System.currentTimeMillis() - lastEntryTime) + "ms, batchTime=" + dateFormat.format(new Date(firstEntryTime))
+ "~" + dateFormat.format(new Date(lastEntryTime)));
}

private static void printEntries(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));

for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList(), true);
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList(), false);
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList(), true);
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList(), false);
}
}
}
}
private static void printColumn(List columns, boolean isBefore) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated() + "(before=" + isBefore + ")");
}
}
}

(3)启动Canal服务

启动Canal服务前,需要在MySQL主库上开启二进制日志功能,并设置Canal账户的相关权限,根据历史数据来判断增量更新。

“`shell

./bin/startup.sh


(4)运行客户端程序

在运行上面的Java程序时,请确保Canal服务已经启动,并且能够正确连接到MySQL主库。运行程序后,即可实现MySQL主库上数据的增量同步。

总结

Oracle Canal作为一款实用的数据库增量同步工具,在众多企业数据处理场景中得到了广泛的应用。其中,其优点包括高效、稳定、易用等,为数据转移和同步提供了强有力的技术支持。通过上述实例教程,相信读者对于如何运用Oracle Canal实现数据库增量同步已经有了更详尽的认知。

数据运维技术 » 利用Oracle Canal助力数据库增量同步(oracle canal)