当前位置: > > > Canal - MySQL实时数据采集工具使用详解(附:采集实时数据至Kafka)

Canal - MySQL实时数据采集工具使用详解(附:采集实时数据至Kafka)

一、基本介绍

1,什么是 Canal?

(1)Canal 由阿里巴巴开源的一个基于 MySQL 数据库的增量日志(Binary Log)解析工具,可以提供增量数据订阅和消费,支持将 MySQL 中的增量数据采集到 KafkaRabbitMQElasticsearchHBase 中。
以下业务可以基于日志增量订阅和消费来实现:
  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理
(2)当前的 Canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

2,Canal 的工作原理

(1)MySQL 主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看)
  • MySQL slavemasterbinary log events 拷贝到它的中继日志(relay log
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

(2)Canal 工作原理
  • Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送增量日志(Binary Log)给 Canal
  • Canal 解析增量日志(Binary Log),再将其发送到指定目的地,例如 MySQLKafkaElasticsearch 等。

3,Canal 的架构设计

(1)Canal 内部组件架构如下:

(2)各个组件的功能如下:
  • Server:一个 Canal 运行实例,对应于一个 JVM
  • Instance:一个数据队列。一个数据队列由 EventParserEventSinkEventStoreMetaManager 组成。
  • EventParser:接入数据源,并模拟 MySQL Slave 的交互协议和 MySQL Master 节点进行交互,解析协议。
  • EventSinkParserStore 的链接器,进行数据过滤、加工和分发工作。
  • EventStore:存储数据。支持存储到内存、本地文件及 Zookeeper 中。

二、安装配置

1,JDK 安装

    Canal 是基于 Java 语言开发的,所以需要依赖 JDK 环境。因此我们首先需要安装 JDK 1.8。具体可以参考我之前写的文章:

2,下载安装包

(1)访问 CanalGitHub 主页(点击访问),获取下载地址并下载,这里我使用的是 1.1.7 版本:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

(2)接着创建一个存放 Canal 的文件夹,并将压缩包复制进来(这个可以根据习惯调整目录的位置):
mkdir -p /usr/local/canal
cp canal.deployer-1.1.7.tar.gz /usr/local/canal

(3)接着进入该文件夹,并将 Canal 安装包解压:
cd /usr/local/canal
tar -zxvf canal.deployer-1.1.7.tar.gz

三、使用样例

1,样例说明

(1)下面我们使用 CancalMySQL 数据库的实时数据采集到 Kafka 中:

(2)在 MySQL 的实时数据(Binary Log)进入 Kafka 后,可以在 Kafka 后对接一个消费者程序,判断收到的数据类型是 INSERTDELETE,还是 UPDATE,这样就可以在第三方应用中实时维护 MySQL 数据了。

2,准备工作

(1)首先我们需要开放 MySQL 的远程访问权限,这样 Canal 可以连接远程机器上的 MySQL 服务。具体操作步骤可以参考我之前写的文章:

(2)接着我们还要开启 MySQLBinlog 功能,执行以下命令查看目前 Binlog 的状态:
show global variables like 'log_bin';
  • 如果返回的是 ON,则说明 Binlog 已开启。

  • 如果返回的是 OFF,则说明 Binlog 没有开启。我们需要修改 MySQL 的“/etc/my.cnf”文件:
vi /etc/my.cnf

  • [mysqld] 参数下添加以下配置,然后重启 MySQL 数据库:
[mysqld]
server_id=1
log-bin=master
binlog_format=row
log_bin=/var/lib/mysql/bin-log
log_bin_index=/var/lib/mysql/mysql-bin.index

(3)由于 Canal 需要权限来伪装自己为 MySQLSlave 节点。所以我们执行如下命令为其单独在 MySQL 中创建一个用户,并配置一定的权限。
提示:虽然直接使用 root 用户也是可以的,但是生产上为了安全,还是建议为其创建一个专门的账户。
CREATE USER canal IDENTIFIED BY 'hangge1234';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

(4)最后我们在 MySQL 中创建一张 user 表:
CREATE TABLE `user` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `phone` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

3,修改配置文件

(1)首先编辑编辑 Canalinstance.properties 文件:
vi /usr/local/canal/conf/example/instance.properties

(2)主要修改其中的数据库连接地址、用户名和密码、Kafkatopic

(3)接着修改 Canalcanal.properties 文件:
vi /usr/local/canal/conf/canal.properties

(4)主要修改其中的 serverMode 设置为 kafka,并且设置 kafka 地址:
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.60.9:9092

4,启动 Canal

(1)进入 Canal 目录:
cd /usr/local/canal

(2)执行如下命令启动 Canal
bin/startup.sh

(3)查看 canal 日志,检查启动是否正常:
cat logs/canal/canal.log

(4)查看 instance 的日志,看到数据库连接也是正常:
cat logs/example/example.log

5,开始测试

(1)我们首先对 user 表数据进行操作,首先插入两条数据、接着更新一条数据、最后删除一条数据。
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (1, 'hangge', '1234567890');
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (2, 'baidu', '13362623365');
UPDATE `user` SET `name` = '航歌' WHERE `id` = 1;
DELETE FROM `user` WHERE `id` = 2;

(2)查看 Kafkatest 主题的数据变化,说明 Canal 正常工作。
评论0