Canal - MySQL实时数据采集工具使用详解(附:采集实时数据至Kafka)
一、基本介绍
1,什么是 Canal?
(1)Canal 由阿里巴巴开源的一个基于 MySQL 数据库的增量日志(Binary Log)解析工具,可以提供增量数据订阅和消费,支持将 MySQL 中的增量数据采集到 Kafka、RabbitMQ、Elasticsearch 及 HBase 中。
以下业务可以基于日志增量订阅和消费来实现:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
(2)当前的 Canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
2,Canal 的工作原理
(1)MySQL 主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
(2)Canal 工作原理
- Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送增量日志(Binary Log)给 Canal。
- Canal 解析增量日志(Binary Log),再将其发送到指定目的地,例如 MySQL、Kafka、Elasticsearch 等。
3,Canal 的架构设计
(1)Canal 内部组件架构如下:
(2)各个组件的功能如下:
- Server:一个 Canal 运行实例,对应于一个 JVM。
- Instance:一个数据队列。一个数据队列由 EventParser、EventSink、EventStore 和 MetaManager 组成。
- EventParser:接入数据源,并模拟 MySQL Slave 的交互协议和 MySQL Master 节点进行交互,解析协议。
- EventSink:Parser 和 Store 的链接器,进行数据过滤、加工和分发工作。
- EventStore:存储数据。支持存储到内存、本地文件及 Zookeeper 中。
二、安装配置
1,JDK 安装
Canal 是基于 Java 语言开发的,所以需要依赖 JDK 环境。因此我们首先需要安装 JDK 1.8。具体可以参考我之前写的文章:
2,下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
(2)接着创建一个存放 Canal 的文件夹,并将压缩包复制进来(这个可以根据习惯调整目录的位置):
mkdir -p /usr/local/canal cp canal.deployer-1.1.7.tar.gz /usr/local/canal
(3)接着进入该文件夹,并将 Canal 安装包解压:
(3)由于 Canal 需要权限来伪装自己为 MySQL 的 Slave 节点。所以我们执行如下命令为其单独在 MySQL 中创建一个用户,并配置一定的权限。
(4)最后我们在 MySQL 中创建一张 user 表:
cd /usr/local/canal tar -zxvf canal.deployer-1.1.7.tar.gz
三、使用样例
1,样例说明
(1)下面我们使用 Cancal 将 MySQL 数据库的实时数据采集到 Kafka 中:
(2)在 MySQL 的实时数据(Binary Log)进入 Kafka 后,可以在 Kafka 后对接一个消费者程序,判断收到的数据类型是 INSERT、DELETE,还是 UPDATE,这样就可以在第三方应用中实时维护 MySQL 数据了。
2,准备工作
(1)首先我们需要开放 MySQL 的远程访问权限,这样 Canal 可以连接远程机器上的 MySQL 服务。具体操作步骤可以参考我之前写的文章:
(2)接着我们还要开启 MySQL 的 Binlog 功能,执行以下命令查看目前 Binlog 的状态:
show global variables like 'log_bin';
- 如果返回的是 ON,则说明 Binlog 已开启。
- 如果返回的是 OFF,则说明 Binlog 没有开启。我们需要修改 MySQL 的“/etc/my.cnf”文件:
vi /etc/my.cnf
- 在 [mysqld] 参数下添加以下配置,然后重启 MySQL 数据库:
[mysqld] server_id=1 log-bin=master binlog_format=row log_bin=/var/lib/mysql/bin-log log_bin_index=/var/lib/mysql/mysql-bin.index
(3)由于 Canal 需要权限来伪装自己为 MySQL 的 Slave 节点。所以我们执行如下命令为其单独在 MySQL 中创建一个用户,并配置一定的权限。
提示:虽然直接使用 root 用户也是可以的,但是生产上为了安全,还是建议为其创建一个专门的账户。
CREATE USER canal IDENTIFIED BY 'hangge1234'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
(4)最后我们在 MySQL 中创建一张 user 表:
CREATE TABLE `user` ( `id` int(10) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, `phone` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) );
3,修改配置文件
(1)首先编辑编辑 Canal 的 instance.properties 文件:
(2)主要修改其中的数据库连接地址、用户名和密码、Kafka 的 topic。
vi /usr/local/canal/conf/example/instance.properties
(2)主要修改其中的数据库连接地址、用户名和密码、Kafka 的 topic。
(3)接着修改 Canal 的 canal.properties 文件:
(4)主要修改其中的 serverMode 设置为 kafka,并且设置 kafka 地址:
(2)执行如下命令启动 Canal:
(3)查看 canal 日志,检查启动是否正常:
(4)查看 instance 的日志,看到数据库连接也是正常:
(2)查看 Kafka 中 test 主题的数据变化,说明 Canal 正常工作。
vi /usr/local/canal/conf/canal.properties
(4)主要修改其中的 serverMode 设置为 kafka,并且设置 kafka 地址:
canal.serverMode = kafka kafka.bootstrap.servers = 192.168.60.9:9092
4,启动 Canal
(1)进入 Canal 目录:
cd /usr/local/canal
(2)执行如下命令启动 Canal:
bin/startup.sh
(3)查看 canal 日志,检查启动是否正常:
cat logs/canal/canal.log
(4)查看 instance 的日志,看到数据库连接也是正常:
cat logs/example/example.log
5,开始测试
(1)我们首先对 user 表数据进行操作,首先插入两条数据、接着更新一条数据、最后删除一条数据。
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (1, 'hangge', '1234567890'); INSERT INTO `user` (`id`, `name`, `phone`) VALUES (2, 'baidu', '13362623365'); UPDATE `user` SET `name` = '航歌' WHERE `id` = 1; DELETE FROM `user` WHERE `id` = 2;
(2)查看 Kafka 中 test 主题的数据变化,说明 Canal 正常工作。