利用Oracle Canal助力数据库增量同步(oracle canal)
利用Oracle Canal助力数据库增量同步
在现代信息化建设的过程中,数据库作为核心数据存储和管理平台,其可靠性和高效性显得特别重要。为了保证数据的完整性和一致性,通常需要将不同的数据库之间进行同步,但是如果每次都进行全量同步,则会带来较大的性能负担,甚至会导致业务中断。因此,为了解决这一问题,我们可以利用Oracle Canal工具实现数据库的增量同步。
Oracle Canal介绍
Oracle Canal是阿里巴巴开源的一款MySQL二进制日志解析工具。其可以快速获取二进制日志中的增量更新数据,并通过数据格式规范的对比,对更新的数据进行统一管理和同步。在数据增量同步的场景下,Oracle Canal表现出了很好的稳定性和高效性。
原理介绍
Oracle Canal通过读取MySQL主库上生成的二进制日志,将其解析成单条SQL语句,并提供了类似于Trigger的方式,将其同步到其他的MySQL从库中。可以根据业务需求,选择合适的同步方式,整体设计如下图所示:
![Oracle Canal增量同步系统原理图](https://img-blog.csdn.net/20180914200017262?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3Nlcl9tb3Jl/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/q/75)
其中,Master为主库,需要同步数据的源;Slave为从库,负责接收Master同步的数据。客户端通过Canal Connect协议连接到Canal服务,获取到MySQL主库上的二进制日志,对其进行解析后,再将解析后的数据传递给上层业务处理。
使用方法
下面我们以一个简单的Java程序为例,来介绍如何使用Oracle Canal实现增量同步。
(1)添加依赖库
“`xml
com.alibaba.otter
canal.client
1.1.4
(2)编写Canal客户端
```javapublic class CanalClient {
private static CanalConnector canalConnector; private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void mn(String[] args) {
// 创建链接 canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example",
"root", "root");
int batchSize = 1000; int emptyCount = 0;
try { canalConnector.connect();
canalConnector.subscribe(".*\\..*"); canalConnector.rollback();
while (true) { Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId(); int size = message.getEntries().size();
if (batchId == -1 || size == 0) { emptyCount++;
System.out.println("empty count : " + emptyCount); try {
Thread.sleep(1000); } catch (InterruptedException e) {
e.printStackTrace(); }
} else { emptyCount = 0;
printSummary(message, batchId); printEntries(message.getEntries());
} canalConnector.ack(batchId); // 提交确认
// canalConnector.rollback(batchId); // 处理失败, 回滚数据 }
} finally { canalConnector.disconnect();
} }
private static void printSummary(Message message, long batchId) { long firstEntryTime = message.getEntries().get(0).getHeader().getExecuteTime();
long lastEntryTime = message.getEntries().get(message.getEntries().size() - 1).getHeader().getExecuteTime();
System.out.println("**********************"); System.out.println("batchId=" + batchId + ", count=" + message.getEntries().size() + ", delay="
+ (System.currentTimeMillis() - lastEntryTime) + "ms, batchTime=" + dateFormat.format(new Date(firstEntryTime)) + "~" + dateFormat.format(new Date(lastEntryTime)));
}
private static void printEntries(List entrys) {
for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue; }
RowChange rowChange = null; try {
rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChange.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList(), true); } else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList(), false); } else {
System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList(), true);
System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList(), false);
} }
} }
private static void printColumn(List columns, boolean isBefore) {
for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated() + "(before=" + isBefore + ")");
} }
}
(3)启动Canal服务
启动Canal服务前,需要在MySQL主库上开启二进制日志功能,并设置Canal账户的相关权限,根据历史数据来判断增量更新。
“`shell
./bin/startup.sh
(4)运行客户端程序
在运行上面的Java程序时,请确保Canal服务已经启动,并且能够正确连接到MySQL主库。运行程序后,即可实现MySQL主库上数据的增量同步。
总结
Oracle Canal作为一款实用的数据库增量同步工具,在众多企业数据处理场景中得到了广泛的应用。其中,其优点包括高效、稳定、易用等,为数据转移和同步提供了强有力的技术支持。通过上述实例教程,相信读者对于如何运用Oracle Canal实现数据库增量同步已经有了更详尽的认知。