Python 数据库同步:高效实现数据同步和数据备份 (python同步数据库)

在现代企业信息化的生态中,数据同步和数据备份成为了每个企业数据存储管理的必经之路。企业需要高效地同步数据,以确保各地数据的一致性和安全备份。Python 作为一种快速开发语言,在企业信息化中的地位逐渐得到认可。本文将讲述如何使用 Python 实现数据库同步以及数据备份,并提供一些实用技巧。

I. 数据库同步

同步数据库是指将一个数据库的数据迁移到另一个数据库中,从而使两个数据库的数据保持一致,这项工作尤其在分布式应用中显得尤为重要。在实际应用中,同步数据库需要考虑许多方面,例如:双向同步、单向同步、增量同步、全量同步等。

1. 全量同步

全量同步是指将源库所有数据全部迁移到目标库中。这种同步方式可以保证两个数据库的数据一定是一致的,同时由于迁移的是全量数据,所以也能确定地保证迁移的数据是完整的。

我们通过 Python 实现这种方式时,需要使用到 PyMySQL 这个包。接下来,我们就通过使用 PyMySQL 实现 MySQL 数据库之间的全量同步。

使用 PyMySQL 同步数据库

由于 PyMySQL 包主要是针对 MySQL 数据库的,因此我们首先需要安装 PyMySQL 这个包。可以通过 pip 安装,而具体的安装方式如下:

“`python

pip install PyMySQL

“`

在安装 PyMySQL 成功之后,我们需要执行以下步骤:

– 创建源库和目标库表结构

– 训练数据导出

– 训练数据导入目标库

创建源库和目标库表结构

将以下 SQL 语句拷贝到 PyCharm 等 Python IDE 中

“`python

CREATE TABLE student

(

id INT PRIMARY KEY auto_increment,

`name` VARCHAR (20) NOT NULL,

age INT NOT NULL,

sex INT NOT NULL,

province VARCHAR (255) NOT NULL

) ENGINE = innodb

AUTO_INCREMENT = 1

DEFAULT CHARSET = utf8;

“`

运行以上 SQL 语句则可以实现在源库和目标库中自动生成的 student 表。

训练数据导出

将源库数据进行导出,生成 SQL 文件;使用以下 SQL 语句进行操作:

“`python

mysqldump -uroot -p123456 -d -R database_name table_name > table_name.sql

“`

此时,可以在你所在的文件系统中找到 SQL 文件,并将其导入到目标数据库中。

训练数据导入目标库

使用以下 SQL 语句即可将训练数据导入到目标库中:

“`python

source table_name.sql

“`

在以上三个步骤完成之后,我们就成功地完成了 MySQL 数据库之间的全量同步。其中,由于 PyMySQL 包的支持,使得同步数据库变得非常简单。

2. 增量同步

增量同步是指当源库发生数据变化时,将变化的数据同步到目标库中。这种同步方式能够有效地降低同步数据的时间成本,同时也能实现准实时地同步数据。

在实际使用中,增量同步依然是一个复杂的问题,主要包括以下两个方面:

– 在源库发生变化时,如何获取发生变化的数据?

– 如何将获取到的数据进行同步?

PyRedis 是一个非常好的 Python 包,它可以为我们解决上述问题。在 PyRedis 的支持下,增量同步能够实现真正的实时同步,本文接下来将讲述如何使用 PyRedis 实现 MySQL 数据库的增量同步。

使用 PyRedis 实现 MySQL 数据库增量同步

接下来,实现 MySQL 数据库的增量同步。我们在这里使用 Python 实现两个服务,一个是 Redis 服务,另一个是 MySQL 服务。因此,在使用 MySQL 的服务之前,需要先在本机安装 MySQL 服务。

3. 双向同步

双向同步指两个数据库中的数据互相同步,即在两个数据库中进行了增删和改操作之后,将相应的操作同步到另一个数据库。在实际运营的过程中,很多公司也都会经常使用双向同步方式进行数据同步操作,即在公司内网和外网服务器之间进行实时数据同步。而 Python 作为一种快速开发语言,在实现双向数据同步时也非常实用。

使用 Python 实现 MySQL 数据库的双向同步

要实现双向同步的数据同步,需要用到 PyMySQL 包和 Redis 包。

1. Redis 数据库同步

– 安装 Redis

需要在本地安装 Redis。这可以通过以下步骤进行:

“`python

wget http://download.redis.io/releases/redis-3.0.7.tar.gz

tar xzvf redis-3.0.7.tar.gz

cd redis-3.0.7

make

make install

“`

将 Redis 安装在本地之后,就可以启动 Redis 服务。

2. 配置 PyMySQL + Redis 的双向同步

在继续操作之前,请确保已经安装了 PyMySQL 和 Redis。

双向同步是一种非常实用的同步方式,因此我们在本段代码中展示如何使用 Python 实现 MySQL 数据库之间的双向同步。在双向同步的过程中,我们需要保存每个数据的记录信息,然后根据不同的操作类型进行相关的操作。代码如下:

“`python

import ast

import pymysql

import redis

import json

from pymysql.cursors import DictCursor

SET = ‘SET’

GET = ‘GET’

INCR = ‘INCR’

HSET = ‘HSET’

HINCRBY = ‘HINCRBY’

HGETALL = ‘HGETALL’

class Syncer(object):

def __init__(self, origin_conn_config, dest_conn_config, redis_conn_config):

self.origin_conn_config = origin_conn_config

self.dest_conn_config = dest_conn_config

self.redis_conn_config = redis_conn_config

@staticmethod

def handle_set(key, val, cursor):

cursor.execute(‘SELECT id FROM `set` WHERE `key` = %s’, (key,))

res = cursor.fetchone()

if res:

cursor.execute(‘UPDATE `set` SET `val` = %s WHERE `key` = %s’, (val, key))

else:

cursor.execute(‘INSERT INTO `set` (`key`, `val`) VALUES (%s, %s)’, (key, val))

@staticmethod

def handle_incr(key, val, cursor):

cursor.execute(‘SELECT id FROM `incr` WHERE `key` = %s’, (key,))

res = cursor.fetchone()

if res:

cursor.execute(‘UPDATE `incr` SET `val` = `val` + %s WHERE `key` = %s’, (val, key))

else:

cursor.execute(‘INSERT INTO `incr` (`key`, `val`) VALUES (%s, %s)’, (key, val))

@staticmethod

def handle_hset(key, subkey, val, cursor):

cursor.execute(

‘SELECT id FROM `hset` WHERE `key` = %s and `subkey` = %s’, (key, subkey))

res = cursor.fetchone()

if res:

cursor.execute(

‘UPDATE `hset` SET `val` = %s WHERE `key` = %s and `subkey` = %s’, (val, key, subkey))

else:

cursor.execute(

‘INSERT INTO `hset` (`key`, `subkey`, `val`) VALUES (%s, %s, %s)’, (key, subkey, val))

@staticmethod

def handle_hincrby(key, subkey, val, cursor):

cursor.execute(

‘SELECT id FROM `hincrby` WHERE `key` = %s and `subkey` = %s’, (key, subkey))

res = cursor.fetchone()

if res:

cursor.execute(

‘UPDATE `hincrby` SET `val` = `val` + %s WHERE `key` = %s and `subkey` = %s’, (val, key, subkey))

else:

cursor.execute(

‘INSERT INTO `hincrby` (`key`, `subkey`, `val`) VALUES (%s, %s, %s)’, (key, subkey, val))

@staticmethod

def handle_hgetall(key, val, cursor):

for subkey, subval in val.items():

Syncer.handle_hset(key, subkey, subval, cursor)

def sync_set(self, key):

r = redis.Redis(**self.redis_conn_config)

val = r.get(key)

if val:

val = val.decode(‘utf-8’)

val = ast.literal_eval(val) if val.startswith(‘{‘) else val

with self.origin_conn.cursor() as cursor:

Syncer.handle_set(key, val, cursor)

self.origin_conn.commit()

def sync_incr(self, key):

r = redis.Redis(**self.redis_conn_config)

val = r.get(key)

if val:

with self.origin_conn.cursor() as cursor:

Syncer.handle_incr(key, val, cursor)

self.origin_conn.commit()

def sync_hset(self, key, subkey):

r = redis.Redis(**self.redis_conn_config)

val = r.hget(key, subkey)

if val:

val = val.decode(‘utf-8’)

with self.origin_conn.cursor() as cursor:

Syncer.handle_hset(key, subkey, val, cursor)

self.origin_conn.commit()

def sync_hincrby(self, key, subkey):

r = redis.Redis(**self.redis_conn_config)

val = r.hget(key, subkey)

if val:

with self.origin_conn.cursor() as cursor:

Syncer.handle_hincrby(key, subkey, val, cursor)

self.origin_conn.commit()

def sync_hgetall(self, key):

r = redis.Redis(**self.redis_conn_config)

val = r.hgetall(key)

if val:

val = {k.decode(‘utf-8’): v.decode(‘utf-8’) for k, v in val.items()}

with self.origin_conn.cursor() as cursor:

Syncer.handle_hgetall(key, val, cursor)

self.origin_conn.commit()

def sync_all(self):

r = redis.Redis(**self.redis_conn_config)

with self.origin_conn.cursor() as cursor:

cursor.execute(‘TRUNCATE TABLE `set`’)

cursor.execute(‘TRUNCATE TABLE `incr`’)

cursor.execute(‘TRUNCATE TABLE `hset`’)

cursor.execute(‘TRUNCATE TABLE `hincrby`’)

self.origin_conn.commit()

keys = r.keys()

for key in keys:

key = key.decode(‘utf-8’)

val_t = r.type(key)

if val_t == SET:

self.sync_set(key)

elif val_t == INCR:

self.sync_incr(key)

elif val_t == HSET:

subkeys = r.hkeys(key)

for subkey in subkeys:

subkey = subkey.decode(‘utf-8’)

self.sync_hset(key, subkey)

elif val_t == HINCRBY:

subkeys = r.hkeys(key)

for subkey in subkeys:

subkey = subkey.decode(‘utf-8’)

self.sync_hincrby(key, subkey)

elif val_t == HGETALL:

self.sync_hgetall(key)

@property

def origin_conn(self):

if not hasattr(self, ‘_origin_conn’):

self._origin_conn = pymysql.connect(**self.origin_conn_config, cursorclass=DictCursor)

return self._origin_conn

@property

def dest_conn(self):

if not hasattr(self, ‘_dest_conn’):

self._dest_conn = pymysql.connect(**self.dest_conn_config, cursorclass=DictCursor)

return self._dest_conn

“`

在上述代码中,我们通过 Redis 存储每个数据的记录信息,并通过 MySQL 存储所同步的数据。针对不同类型的数据,我们也有不同的处理方式。对应的操作类型包括:SET、GET、INCR、HSET、HINCRBY 和 HGETALL 等。同时,这种方式也能够支持单向同步和双向同步。

4. 单向同步

单项同步是指数据只能从源服务同步到目标服务,不支持其他方式,比如从目标服务向源服务同步数据。在单向同步的实际使用中,由于源数据是需要持续不断地更新的,因此我们需要实现一个数据同步机制,保证数据能够长时间保持一致。

使用 Python 实现 MySQL 数据库的单向同步

单项同步在使用基于 MySQL 数据库和 PyMySQL 包的时候非常容易实现。具体代码如下:

“`python

import time

import pymysql.cursors

class MySqlSync:

def __init__(self, source_config, target_config, interval=3):

self.source_config = source_config

self.target_config = target_config

self.interval = interval

def sync_data(self):

while True:

source_conn = pymysql.connect(**self.source_config)

target_conn = pymysql.connect(**self.target_config)

try:

with source_conn.cursor() as source_cursor:

with target_conn.cursor() as target_cursor:

sql = ‘SELECT * FROM student’

source_cursor.execute(sql)

rows = list(source_cursor.fetchall())

for row in rows:

target_cursor.execute(

‘INSERT INTO student (`name`, `age`, `sex`,`province`) VALUES (%s,%s,%s,%s)’,

(row[‘name’], row[‘age’], row[‘sex’], row[‘province’]))

target_conn.commit()

except Exception as e:

print(e)

finally:

source_conn.close()

target_conn.close()

time.sleep(self.interval)

“`

在以上代码中,我们使用 pymysql.connect() 方法来连接 MySQL 数据库。如果连接成功,就可以使用 cursor 方法进行数据的查询和插入等操作。这样,使用 PyMySQL 包配合 MySQL 数据库,就可以非常简单地实现 Python 代码的数据库同步操作。

II. 数据库备份

对于数据库的备份,实际用途很大程度上是为了避免数据丢失和数据恢复等问题。在实际备份时,我们可以考虑备份整个数据库,也可以单独备份其中的一些表。具体而言,数据库备份的方式可以包括以下几种:

– 备份数据库的二进制文件

– 将数据库导出为 SQL 文件或 CSV 文件

– 备份数据库的纯文本文件

– 使用第三方工具进行数据备份

在工作中,每一种数据备份方法都有其自身的优缺点。在本文中,我们选择将数据库导出为 SQL 文件的方式。

使用 Python 实现数据库备份

Python 中有一个叫做 PyMySQL 这个包,它是一个纯 Python 的 MySQL 客户端库。在实际备份时,我们可以使用 PyMySQL 备份指定数据库或者指定表。具体代码如下:

“`python

import sys

import os

import pymysql

db_host = ‘127.0.0.1’

db_user = ‘root’

db_password = ‘root’

db_name = ‘test_db’

table_name = ‘test_table’

def backup_table(conn, table_name, backup_path):

path = os.path.join(backup_path, table_name + ‘.sql’)

with conn.cursor() as cursor:

cursor.execute(‘SELECT * FROM {}’.format(table_name))

data = cursor.fetchall()

create_statement = ‘SHOW CREATE TABLE {}’.format(table_name)

with conn.cursor() as cursor:

cursor.execute(create_statement)

create_sql = cursor.fetchone()[1]

with open(path, ‘w’) as f:

f.write(‘{};\n’.format(create_sql))

for row in data:

insert_statement = ‘INSERT INTO {} VALUES {}’.format(table_name, row)

f.write(‘{};\n’.format(insert_statement))

def backup_all_tables(conn, backup_path):

with conn.cursor() as cursor:

cursor.execute(‘SHOW TABLES’)

tables = [x[0] for x in cursor.fetchall()]

for table_name in tables:

backup_table(conn, table_name, backup_path)

def backup_database(conn, backup_path):

with conn.cursor() as cursor:

cursor.execute(‘SELECT DATABASE()’)

database_name = cursor.fetchone()[0]

path = os.path.join(backup_path, database_name + ‘.sql’)

with conn.cursor() as cursor:

cursor.execute(‘SHOW CREATE DATABASE {}’.format(database_name))

create_sql = cursor.fetchone()[1]

with open(path, ‘w’) as f:

f.write(‘{};\n


数据运维技术 » Python 数据库同步:高效实现数据同步和数据备份 (python同步数据库)