DataX - 数据库离线数据采集工具使用详解2(样例1:MySQL之间数据同步)
二、实现 MySQL 之间数据同步
1,准备工作
(1)首先我们需要开放 MySQL 的远程访问权限,这样 DataX 可以连接远程机器上的 MySQL 服务。具体操作步骤可以参考我之前写的文章:
(2)接着我们在 MySQL 中创建 user 和 user2 这两张表,它们的表结构是一样的:
(3)为了能够自动生成一些模拟数据,我们定义一个存储过程:
CREATE TABLE `user` ( `id` int(10) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, `phone` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) ); CREATE TABLE `user2` ( `id` int(10) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, `phone` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) );
(3)为了能够自动生成一些模拟数据,我们定义一个存储过程:
DELIMITER $$ CREATE PROCEDURE test() BEGIN DECLARE A INT DEFAULT 1; WHILE (A < 100000) DO INSERT INTO `user` VALUES (A, CONCAT("hangge", A), CONCAT("8888", A)); SET A = A + 1; END WHILE; END $$ DELIMITER ;
(4)然后调用这个存储过程:
call test();
(5)可以看到 user 表中已经生成了 99999 条数据:
2,全表同步样例
(1)首先,我们创建一个任务 json 文件:vi /usr/local/datax/job/myjob.json
(2)文件内容如下:
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "hangge1234", "splitPk": "id", "column": [ "*" ], "connection": [ { "jdbcUrl": [ "jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8" ], "table": [ "user" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "hangge1234", "column": [ "*" ], "connection": [ { "jdbcUrl": "jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8", "table": [ "user2" ] } ], "preSql": [ "truncate user2" ], "session": [ "set session sql_mode='ANSI'" ], "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "5" } } } }
(3)接着进入 DataX 的 bin 目录:
cd /usr/local/datax/bin/
(4)执行如下命令启动任务:
python datax.py ../job/myjob.json
(5)任务执行完毕后控制台会显示如下信息:
3,增量同步样例
(1)当数据量较大时,完全同步可能中途会出现中断或者错误,因此我们有时需要采用增量同步方式。即每次只选择部分数据进行同步,并且如果数据存在则更新,如果数据不存在则插入。假设我们对 user2 表进行如下修改,添加如下 3 条数据:
- 通过 where 配置选择只同步 id 大于 1 小于 6 的数据。
- 通过 writeMode 配置成 replace 根据 id 判断数据进行更新还是插入操作。
- 删除 preSql 配置避免清空表中原有的数据
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "hangge1234", "splitPk": "id", "column": [ "*" ], "where": "id > 1 AND id < 6", "connection": [ { "jdbcUrl": [ "jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8" ], "table": [ "user" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "hangge1234", "column": [ "*" ], "connection": [ { "jdbcUrl": "jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8", "table": [ "user2" ] } ], "session": [ "set session sql_mode='ANSI'" ], "writeMode": "replace" } } } ], "setting": { "speed": { "channel": "5" } } } }
(3)接着进入 DataX 的 bin 目录:
cd /usr/local/datax/bin/
(4)执行如下命令启动任务:
python datax.py ../job/myjob.json
(5)任务执行完毕后控制台会显示如下信息: