请参考:https://github.com/alibaba/canal。
下面的内容整理自该文档。

show binlog events 进行查看)
Canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助 MQ 的多语言能力。
Read Also:
添加如下配置:
server-id = 1log_bin = /var/log/mysql/mysql-bin.logbinlog_format=ROW然后重启 MySQL。
添加用户及授权:
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';Query OK, 0 rows affected (0.01 sec)
mysql> GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';Query OK, 0 rows affected (0.00 sec)
mysql> FLUSH PRIVILEGES;Query OK, 0 rows affected (0.00 sec)去官网 releases 页面下载 Canal Deployer,本文选择的版本是 canal-1.1.6。将下载后的压缩包解压,然后复制到如下两个目录:
(用来模拟两个 Canal Server Instance)
在以上两个目录中分别执行下面的操作。
1. 修改 conf/example/instance.properties:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canalcanal.instance.dbPassword=canal
# mq config# 按需更改canal.mq.topic=test# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.enableDynamicQueuePartition=false#canal.mq.partitionsNum=3#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6#canal.mq.partitionHash=test.table:id^name,.*\\..*2. 修改 conf/canal.properties:
xxxxxxxxxx# 将第二个目录的该配置修改为 21111canal.port = 11111# 将第二个目录的该配置修改为 21112canal.metrics.pull.port = 11112
# 将第二个目录的该配置修改为 21110canal.admin.port = 11110
# zookeeper 地址canal.zkServers = 127.0.0.1:2181
canal.serverMode = kafka
canal.destinations = example
# 配置 HA 必须修改这两个配置#canal.instance.global.spring.xml = classpath:spring/file-instance.xmlcanal.instance.global.spring.xml = classpath:spring/default-instance.xml
# 是否为 flat json 格式对象canal.mq.flatMessage = true
# Kafka Bootstrap Servers 地址kafka.bootstrap.servers = 10.2.14.27:9092x
/usr/local/canal-1/bin/startup.sh日志在:/usr/local/canal-1/logs/canal/{canal,canal_stdout}.log,在我的测试环境中启动发生错误:
xxxxxxxxxxUnrecognized VM option 'AggressiveOpts'Error: Could not create the Java Virtual Machine.Error: A fatal exception has occurred. Program will exit.
解决方法是将 startup.sh 中的
xxxxxxxxxxJAVA_OPTS="$JAVA_OPTS -Xss256k -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"改为:
xxxxxxxxxxJAVA_OPTS="$JAVA_OPTS -Xss256k -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"然后启动:
xxxxxxxxxx/usr/local/canal-1/bin/startup.sh/usr/local/canal-2/bin/startup.sh说明:
- 使用 bin/stop.sh 脚本关闭 Canal
JSON 格式的 binlog 示例如下:
x
{ "data": [{ "id": "2", "name": "Tim" }], "database": "test", "es": 1668579597000, "id": 8, "isDdl": false, "mysqlType": { "id": "int", "name": "varchar(100)" }, "old": [{ "id": "1" }], "pkNames": null, "sql": "", "sqlType": { "id": 4, "name": 12 }, "table": "test", "ts": 1668579597617, "type": "UPDATE"}在集群模式下,存储在 Zookeeper 中:
[zk: localhost:2181(CONNECTED) 1] get /otter/canal/destinations/example/1001/cursor{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":11913,"serverId":1,"timestamp":1668579597000}}在单点模式下,存储在 conf/example/meta.dat 文件中:
xxxxxxxxxx{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":15988,"serverId":1,"timestamp":1668580330000}}}],"destination":"example"}Vagrant.configure("2") do |config| config.vm.box = "generic/ubuntu1804"
vms = Array(101..101) vms.each do |seq| config.vm.define :"canal-#{seq}" do |vagrant| vagrant.vm.hostname = "canal-#{seq}" vagrant.vm.network "private_network", ip: "192.168.56.#{seq}" vagrant.vm.provider "virtualbox" do |vb| vb.customize ["modifyvm", :id, "--name", "canal-#{seq}"] vb.gui = false vb.memory = "4096" vb.cpus = "8" end end endend