分钟级快速实践 Debezium MySQL-to-Kafka CDC

使用 Docker、KRaft 和 Kafdrop 分钟级快速实践最小化的 Debezium MySQL-to-Kafka CDC (Change Data Capture)。


  1. 创建 docker-compose 文件

    mkdir debezium-amd64
    cd debezium-amd64
    vim docker-compose.yml
    services:
      kafka:
        image: bitnami/kafka:3.7
        platform: linux/amd64
        ports:
          - "9092:9092"
        environment:
          # enable KRaft (no Zookeeper)
          - KAFKA_ENABLE_KRAFT=yes
          - KAFKA_CFG_NODE_ID=1
          - KAFKA_CFG_PROCESS_ROLES=broker,controller
          - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
          - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
          - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
          - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
          - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
          - ALLOW_PLAINTEXT_LISTENER=yes
    
      connect:
        image: quay.io/debezium/connect:3.2
        platform: linux/amd64
        ports:
          - "8083:8083"
        environment:
          - BOOTSTRAP_SERVERS=kafka:9092
          - GROUP_ID=1
          - CONFIG_STORAGE_TOPIC=connect_configs
          - OFFSET_STORAGE_TOPIC=connect_offsets
          - STATUS_STORAGE_TOPIC=connect_statuses
          - KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
          - VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
          - KEY_CONVERTER_SCHEMAS_ENABLE=false
          - VALUE_CONVERTER_SCHEMAS_ENABLE=false
        depends_on:
          - kafka
    
      kafdrop:
        image: obsidiandynamics/kafdrop:latest
        platform: linux/amd64
        ports:
          - "9000:9000"
        environment:
          - KAFKA_BROKERCONNECT=kafka:9092
        depends_on:
          - kafka
    
      mysql:
        image: mysql:8.0
        platform: linux/amd64
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=debezium
          - MYSQL_USER=debezium
          - MYSQL_PASSWORD=dbz
          - MYSQL_DATABASE=inventory
        command: >
          --server-id=857
          --log-bin=binlog
          --binlog_format=ROW
          --binlog_row_image=FULL
          --gtid_mode=ON
          --enforce-gtid-consistency=ON
          --binlog_expire_logs_seconds=600
        volumes:
          - ./mysql-init.sql:/docker-entrypoint-initdb.d/mysql-init.sql
    vim mysql-init.sql
    -- ensure the database exists and is selected
    CREATE DATABASE IF NOT EXISTS inventory;
    USE inventory;
    
    -- minimal demo table (keep it simple for CDC)
    CREATE TABLE IF NOT EXISTS customers (
      id INT PRIMARY KEY AUTO_INCREMENT,
      first_name VARCHAR(50),
      last_name  VARCHAR(50),
      email      VARCHAR(100),
      created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    
    -- seed data for snapshot
    INSERT INTO customers (first_name, last_name, email) VALUES
    ('Alice', 'Smith', 'alice@example.com'),
    ('Bob',   'Johnson', 'bob@example.com');
    vim register-mysql.json
    {
      "name": "mysql-inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
    
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
    
        "database.server.id": "857",
    
        "topic.prefix": "mysql_server",
        "database.include.list": "inventory",
        "table.include.list": "inventory.customers",
    
        "snapshot.mode": "initial",
        "snapshot.locking.mode": "none",
        "include.schema.changes": "false",
        "tombstones.on.delete": "false",
    
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
      }
    }
    ls -1
    docker-compose.yml
    mysql-init.sql
    register-mysql.json
  2. 启动服务

    docker-compose up -d
    [+] Running 5/5
    ✔ Network debezium-amd64_default      Created
    ✔ Container debezium-amd64-mysql-1    Started
    ✔ Container debezium-amd64-kafka-1    Started
    ✔ Container debezium-amd64-kafdrop-1  Started
    ✔ Container debezium-amd64-connect-1  Started
  3. 授权 Debezium connector 访问 MySQL

    docker-compose exec -T mysql mysql -uroot -pdebezium -e "
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'debezium'@'%';
    FLUSH PRIVILEGES;"
  4. 注册 Debezium connector

    curl -s -X POST http://localhost:8083/connectors \
      -H "Content-Type: application/json" \
      -d @register-mysql.json
  5. 更新 Debezium connector 配置(可选)

    jq '.config' register-mysql.json | \
    curl -s -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \
      -H "Content-Type: application/json" \
      -d @- | jq .
  6. 检查 Debezium connector 状态

    curl -s localhost:8083/connectors/mysql-inventory-connector/status | jq .
    {
      "name": "mysql-inventory-connector",
      "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.107.5:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "192.168.107.5:8083"
        }
      ],
      "type": "source"
    }
  7. 触发 CDC 事件

    插入/更新/删除一些数据行来查看//事件

    docker-compose exec -T mysql mysql -udebezium -pdbz -e "USE inventory;
    INSERT INTO customers (first_name,last_name,email) VALUES ('Charlie','Wang','charlie@example.com');
    UPDATE customers SET email='alice_new@example.com' WHERE first_name='Alice';
    DELETE FROM customers WHERE first_name='Bob';"
  8. 在 Kafdrop 上查看主题

    访问 http://localhost:9000

    查看主题 mysql_server.inventory.customers

    查看新事件 "op":"c" / "op":"u" / "op":"d"

    /2025/09/quickstart-debezium-mysql-to-kafka-cdc-in-minutes/kafdrop_topic_msg_c_u_d.png