分钟级快速实践 Debezium MySQL-to-Kafka CDC

使用 Docker、KRaft 和 Kafdrop 分钟级快速实践最小化的 Debezium MySQL-to-Kafka CDC (Change Data Capture)。
-
创建 docker-compose 文件
mkdir debezium-amd64 cd debezium-amd64
vim docker-compose.yml
services: kafka: image: bitnami/kafka:3.7 platform: linux/amd64 ports: - "9092:9092" environment: # enable KRaft (no Zookeeper) - KAFKA_ENABLE_KRAFT=yes - KAFKA_CFG_NODE_ID=1 - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true - ALLOW_PLAINTEXT_LISTENER=yes connect: image: quay.io/debezium/connect:3.2 platform: linux/amd64 ports: - "8083:8083" environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=connect_configs - OFFSET_STORAGE_TOPIC=connect_offsets - STATUS_STORAGE_TOPIC=connect_statuses - KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - KEY_CONVERTER_SCHEMAS_ENABLE=false - VALUE_CONVERTER_SCHEMAS_ENABLE=false depends_on: - kafka kafdrop: image: obsidiandynamics/kafdrop:latest platform: linux/amd64 ports: - "9000:9000" environment: - KAFKA_BROKERCONNECT=kafka:9092 depends_on: - kafka mysql: image: mysql:8.0 platform: linux/amd64 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=debezium - MYSQL_PASSWORD=dbz - MYSQL_DATABASE=inventory command: > --server-id=857 --log-bin=binlog --binlog_format=ROW --binlog_row_image=FULL --gtid_mode=ON --enforce-gtid-consistency=ON --binlog_expire_logs_seconds=600 volumes: - ./mysql-init.sql:/docker-entrypoint-initdb.d/mysql-init.sql
vim mysql-init.sql
-- ensure the database exists and is selected CREATE DATABASE IF NOT EXISTS inventory; USE inventory; -- minimal demo table (keep it simple for CDC) CREATE TABLE IF NOT EXISTS customers ( id INT PRIMARY KEY AUTO_INCREMENT, first_name VARCHAR(50), last_name VARCHAR(50), email VARCHAR(100), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- seed data for snapshot INSERT INTO customers (first_name, last_name, email) VALUES ('Alice', 'Smith', 'alice@example.com'), ('Bob', 'Johnson', 'bob@example.com');
vim register-mysql.json
{ "name": "mysql-inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "857", "topic.prefix": "mysql_server", "database.include.list": "inventory", "table.include.list": "inventory.customers", "snapshot.mode": "initial", "snapshot.locking.mode": "none", "include.schema.changes": "false", "tombstones.on.delete": "false", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory" } }
ls -1
docker-compose.yml mysql-init.sql register-mysql.json
-
启动服务
docker-compose up -d
[+] Running 5/5 ✔ Network debezium-amd64_default Created ✔ Container debezium-amd64-mysql-1 Started ✔ Container debezium-amd64-kafka-1 Started ✔ Container debezium-amd64-kafdrop-1 Started ✔ Container debezium-amd64-connect-1 Started
-
授权 Debezium connector 访问 MySQL
docker-compose exec -T mysql mysql -uroot -pdebezium -e " GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'debezium'@'%'; FLUSH PRIVILEGES;"
-
注册 Debezium connector
curl -s -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d @register-mysql.json
-
更新 Debezium connector 配置(可选)
jq '.config' register-mysql.json | \ curl -s -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \ -H "Content-Type: application/json" \ -d @- | jq .
-
检查 Debezium connector 状态
curl -s localhost:8083/connectors/mysql-inventory-connector/status | jq .
{ "name": "mysql-inventory-connector", "connector": { "state": "RUNNING", "worker_id": "192.168.107.5:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "192.168.107.5:8083" } ], "type": "source" }
-
触发 CDC 事件
插入
/更新
/删除
一些数据行来查看增
/改
/删
事件docker-compose exec -T mysql mysql -udebezium -pdbz -e "USE inventory; INSERT INTO customers (first_name,last_name,email) VALUES ('Charlie','Wang','charlie@example.com'); UPDATE customers SET email='alice_new@example.com' WHERE first_name='Alice'; DELETE FROM customers WHERE first_name='Bob';"
-
在 Kafdrop 上查看主题
访问
http://localhost:9000
查看主题
mysql_server.inventory.customers
查看新事件
"op":"c"
/"op":"u"
/"op":"d"