请参考:https://github.com/alibaba/canal。
下面的内容整理自该文档。
show binlog events
进行查看)Canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助 MQ 的多语言能力。
Read Also:
添加如下配置:
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_format=ROW
然后重启 MySQL。
添加用户及授权:
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.01 sec)
mysql> GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
去官网 releases 页面下载 Canal Deployer,本文选择的版本是 canal-1.1.6。将下载后的压缩包解压,然后复制到如下两个目录:
(用来模拟两个 Canal Server Instance)
在以上两个目录中分别执行下面的操作。
1. 修改 conf/example/instance.properties:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# mq config
# 按需更改
canal.mq.topic=test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
2. 修改 conf/canal.properties:
xxxxxxxxxx
# 将第二个目录的该配置修改为 21111
canal.port = 11111
# 将第二个目录的该配置修改为 21112
canal.metrics.pull.port = 11112
# 将第二个目录的该配置修改为 21110
canal.admin.port = 11110
# zookeeper 地址
canal.zkServers = 127.0.0.1:2181
canal.serverMode = kafka
canal.destinations = example
# 配置 HA 必须修改这两个配置
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# 是否为 flat json 格式对象
canal.mq.flatMessage = true
# Kafka Bootstrap Servers 地址
kafka.bootstrap.servers = 10.2.14.27:9092
x
/usr/local/canal-1/bin/startup.sh
日志在:/usr/local/canal-1/logs/canal/{canal,canal_stdout}.log,在我的测试环境中启动发生错误:
xxxxxxxxxx
Unrecognized VM option 'AggressiveOpts'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
解决方法是将 startup.sh 中的
xxxxxxxxxx
JAVA_OPTS="$JAVA_OPTS -Xss256k -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
改为:
xxxxxxxxxx
JAVA_OPTS="$JAVA_OPTS -Xss256k -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
然后启动:
xxxxxxxxxx
/usr/local/canal-1/bin/startup.sh
/usr/local/canal-2/bin/startup.sh
说明:
- 使用 bin/stop.sh 脚本关闭 Canal
JSON 格式的 binlog 示例如下:
x
{
"data": [{
"id": "2",
"name": "Tim"
}],
"database": "test",
"es": 1668579597000,
"id": 8,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(100)"
},
"old": [{
"id": "1"
}],
"pkNames": null,
"sql": "",
"sqlType": {
"id": 4,
"name": 12
},
"table": "test",
"ts": 1668579597617,
"type": "UPDATE"
}
在集群模式下,存储在 Zookeeper 中:
[zk: localhost:2181(CONNECTED) 1] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":11913,"serverId":1,"timestamp":1668579597000}}
在单点模式下,存储在 conf/example/meta.dat 文件中:
xxxxxxxxxx
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":15988,"serverId":1,"timestamp":1668580330000}}}],"destination":"example"}
Vagrant.configure("2") do |config|
config.vm.box = "generic/ubuntu1804"
vms = Array(101..101)
vms.each do |seq|
config.vm.define :"canal-#{seq}" do |vagrant|
vagrant.vm.hostname = "canal-#{seq}"
vagrant.vm.network "private_network", ip: "192.168.56.#{seq}"
vagrant.vm.provider "virtualbox" do |vb|
vb.customize ["modifyvm", :id, "--name", "canal-#{seq}"]
vb.gui = false
vb.memory = "4096"
vb.cpus = "8"
end
end
end
end