在现在的系统开发中,为了提高查询效率 , 以及搜索的精准度, 会大量的使用 redis 、memcache 等 nosql 系统的数据库 , 以及 solr 、 elasticsearch 类似的全文检索服务。
那么这个时候, 就又有一个问题需要我们来考虑, 就是数据同步的问题, 如何将实时变化的数据库中的数据同步到 MySQL数据库、solr 的索引库中或者 redis 中呢 ?
同步数据一般会考虑以下几个方法:
在以上方法中,最容易实现的就是业务代码手动同步,但它的缺点也很明显,代码耦合度很高并且执行效率低下;定时任务方法的优点是能够与业务代码解耦,但缺点是数据的实时性不高;MQ消息同步的话可以做到代码解耦,伪准时,但是已然需要添加MQ相关的代码。
而Canal
就解决了以上问题,它可以做到业务代码完全解耦,API完全解耦,可以做到准实时。
Canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志(binlog)解析,提供增量数据订阅和消费。
下面就来具体学习一下Canal同步原理以及同步MySQL增量数据到ES的实现。
Canal是基于MySQL的binlog实现同步的,因此先了解一下MySQL主从同步的原:
从上层来看,主从复制分成三步:
上图就是Canal的内部原理图,大致原理如下:
EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功),传送成功之后更新Log Position。工作流程如下图所示:
EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
真实场景中,canal 高可用依赖 zookeeper ,我将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。
实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。
顺序消费
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3、创建数据库商品表 t_product
。
CREATE TABLE `t_product` (
`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
`name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL ( 10, 2 ) NOT NULL,
`status` TINYINT ( 4 ) NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY ( `id` )
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin
使用 Kibana
创建商品索引 。
PUT /t_product
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type":"keyword"
},
"name": {
"type":"text"
},
"price": {
"type":"double"
},
"status": {
"type":"integer"
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"updateTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
执行完成,如图所示 :
这里我是用的是RocketMQ,创建主题:product-syn-topic
,canal 会将 Binlog 的变化数据发送到该主题。
我选取 canal 版本 1.1.6
,进入 conf
目录。
1、配置 canal.properties
#集群模式 zk地址
canal.zkServers = localhost:2181
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的组件文件 生产环境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
###### 以下部分是默认值 展示出来
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为 flat json格式对象
canal.mq.flatMessage = true
2、instance 配置文件
在 conf
目录下创建实例目录 product-syn
, 在 product-syn
目录创建配置文件 :instance.properties
。
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# table regex
canal.instance.filter.regex=mytest.t_product
# mq config
canal.mq.topic=product-syn-topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
3、服务启动
启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。
修改一条 t_product
表记录,可以从 RocketMQ 控制台中观测到新的消息。
1、产品索引操作服务
2、消费监听器
消费者逻辑重点有两点:
data
节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATE
、 INSERT
、DELETE
执行产品索引操作服务的方法。Canal 是一个非常有趣的开源项目,实际生产中很多公司使用 Canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。
推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。
参与评论
手机查看
返回顶部