快速实践 Flink SQL 流批一体

目录
使用 Docker、MySQL 和 Flink 快速实践最小化的实时数据采集与流批一体分析。
本文演示如何使用 Docker、MySQL 和 Flink 快速实践最小化的实时数据采集与流批一体分析,具有以下特点:
- 整个过程无需 Java / Scala 代码,仅用 Flink SQL 完成
- Flink CDC 捕获 MySQL 数据变更
- Flink 流处理写入 Kafka + Paimon (湖仓存储引擎)
- Flink 批处理统计写入 CSV
- 配套可视化工具 Adminer (MySQL) 和 Kafdrop (Kafka)
适合运维开发与数据分析人员快速上手体验 Flink 从 数据源 → CDC 捕获 → 流批分析 → 下游结果
的全链路实践。
搭建服务
目录结构
.
├── flink
│ ├── conf
│ │ └── sql-client-defaults.yaml
│ └── lib
│ ├── flink-connector-jdbc-3.2.0-1.19.jar
│ ├── flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
│ ├── flink-sql-connector-kafka-3.2.0-1.19.jar
│ ├── flink-sql-connector-mysql-cdc-3.2.0.jar
│ ├── mysql-connector-j-8.4.0.jar
│ ├── paimon-flink-1.19-1.2.0.jar
│ └── paimon-format-1.2.0.jar
├── output
│ └── checkpoints
├── sql
│ ├── 01_kafka_cdc.sql
│ ├── 02_paimon_cdc.sql
│ ├── 03_topn_batch.sql
│ └── 04_paimon_read.sql
├── docker-compose.yml
└── mysql-init.sql
下载 JAR 包
mkdir flink-amd64
cd flink-amd64
mkdir -p output/checkpoints
mkdir -p flink/lib
cd flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.0/flink-sql-connector-mysql-cdc-3.2.0.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.4.0/mysql-connector-j-8.4.0.jar
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.19/1.2.0/paimon-flink-1.19-1.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-format/1.2.0/paimon-format-1.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
创建 docker-compose.yml
cd ../../
vim docker-compose.yml
services:
mysql:
image: mysql:8.0
platform: linux/amd64
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=rootpw
- MYSQL_DATABASE=sales
command: >
--server-id=857
--log-bin=binlog
--binlog_format=ROW
--binlog_row_image=FULL
--gtid_mode=ON
--enforce-gtid-consistency=ON
--binlog_expire_logs_seconds=600
volumes:
- ./mysql-init.sql:/docker-entrypoint-initdb.d/mysql-init.sql
kafka:
image: bitnami/kafka:3.7
platform: linux/amd64
ports:
- "9092:9092"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes
kafdrop:
image: obsidiandynamics/kafdrop:4.2.0
platform: linux/amd64
ports:
- "9000:9000"
environment:
- KAFKA_BROKERCONNECT=kafka:9092
depends_on:
- kafka
adminer:
image: adminer:5.4.0
platform: linux/amd64
ports:
- "8080:8080"
jobmanager:
image: flink:1.19.1-scala_2.12-java11
platform: linux/amd64
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
ports:
- "8081:8081"
volumes:
- ./flink/conf/sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:ro
- ./flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:ro
- ./flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:ro
- ./flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:ro
- ./flink/lib/mysql-connector-j-8.4.0.jar:/opt/flink/lib/mysql-connector-j-8.4.0.jar:ro
- ./flink/lib/paimon-flink-1.19-1.2.0.jar:/opt/flink/lib/paimon-flink-1.19-1.2.0.jar:ro
- ./flink/lib/paimon-format-1.2.0.jar:/opt/flink/lib/paimon-format-1.2.0.jar:ro
# self-contained and isolated set of Hadoop client libraries
- ./flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:ro
- ./sql:/opt/sql:ro
- ./output:/opt/flink/output
taskmanager:
image: flink:1.19.1-scala_2.12-java11
platform: linux/amd64
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
depends_on:
- jobmanager
volumes:
- ./flink/conf/sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:ro
- ./flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:ro
- ./flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:ro
- ./flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:ro
- ./flink/lib/mysql-connector-j-8.4.0.jar:/opt/flink/lib/mysql-connector-j-8.4.0.jar:ro
- ./flink/lib/paimon-flink-1.19-1.2.0.jar:/opt/flink/lib/paimon-flink-1.19-1.2.0.jar:ro
- ./flink/lib/paimon-format-1.2.0.jar:/opt/flink/lib/paimon-format-1.2.0.jar:ro
# self-contained and isolated set of Hadoop client libraries
- ./flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:ro
- ./sql:/opt/sql:ro
- ./output:/opt/flink/output
创建 mysql-init.sql
vim mysql-init.sql
CREATE DATABASE IF NOT EXISTS sales;
USE sales;
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
region VARCHAR(10),
amount DOUBLE,
status VARCHAR(10),
order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO orders VALUES
(1001, 1, 'US', 20.5, 'NEW', NOW()),
(1002, 2, 'EU', 35.2, 'NEW', NOW()),
(1003, 3, 'CN', 66.6, 'NEW', NOW()),
(1004, 4, 'UK', 38.9, 'NEW', NOW()),
(1005, 5, 'AU', 25.3, 'NEW', NOW()),
(1006, 6, 'JP', 33.8, 'NEW', NOW());
CREATE USER 'flink'@'%' IDENTIFIED BY 'flinkpw';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
FLUSH PRIVILEGES;
创建 Flink 配置文件
mkdir -p flink/conf
vim flink/conf/sql-client-defaults.yaml
execution:
type: streaming
result-mode: table
parallelism: 1
configuration:
execution.checkpointing.interval: 5 s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 1 s
execution.checkpointing.timeout: 5 min
state.checkpoints.dir: file:///opt/flink/output/checkpoints
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 5 s
table.exec.sink.not-null-enforcer: drop
table.exec.sink.upsert-materialize: none
table.exec.source.idle-timeout: 5 s
deployment:
response-timeout: 10000
创建 01_kafka_cdc.sql
mkdir -p sql
vim sql/01_kafka_cdc.sql
-- 01_kafka_cdc.sql
CREATE TABLE orders_cdc (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'flink',
'password' = 'flinkpw',
'database-name' = 'sales',
'table-name' = 'orders',
'server-id' = '985',
'scan.startup.mode' = 'initial'
);
CREATE TABLE orders_kafka (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO orders_kafka
SELECT * FROM orders_cdc;
创建 02_paimon_cdc.sql
vim sql/02_paimon_cdc.sql
-- 02_paimon_cdc.sql
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.checkpointing.interval' = '5 s';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.min-pause' = '1 s';
SET 'execution.checkpointing.timeout' = '5 min';
CREATE TABLE orders_cdc (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'flink',
'password' = 'flinkpw',
'database-name' = 'sales',
'table-name' = 'orders',
'server-id' = '996',
'scan.startup.mode' = 'initial'
);
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:///opt/flink/output/warehouse'
);
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS dwd;
USE dwd;
CREATE TABLE IF NOT EXISTS orders_paimon (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'changelog-producer' = 'input',
'bucket' = '1'
);
USE CATALOG default_catalog;
INSERT INTO paimon_catalog.dwd.orders_paimon SELECT * FROM orders_cdc;
创建 03_topn_batch.sql
vim sql/03_topn_batch.sql
-- 03_topn_batch.sql
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'TABLEAU';
SET 'table.dml-sync' = 'true';
CREATE TABLE orders_jdbc (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/sales',
'table-name' = 'orders',
'username' = 'flink',
'password' = 'flinkpw'
);
CREATE TABLE top_customers (
customer_id INT,
total_amount DOUBLE
) WITH (
'connector' = 'filesystem',
'path' = '/opt/flink/output/top_customers',
'format' = 'csv'
);
INSERT OVERWRITE top_customers
SELECT customer_id,
CAST(SUM(amount) AS DOUBLE) AS total_amount
FROM orders_jdbc
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 5;
SELECT * FROM top_customers;
创建 04_paimon_read.sql
vim sql/04_paimon_read.sql
-- 04_paimon_read.sql
SET 'sql-client.execution.result-mode' = 'TABLEAU';
SET 'execution.runtime-mode' = 'batch';
SET 'table.dml-sync' = 'true';
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:///opt/flink/output/warehouse'
);
USE CATALOG paimon_catalog;
USE dwd;
SELECT COUNT(*) AS cnt FROM orders_paimon;
SELECT * FROM orders_paimon;
流批一体分析
运行 docker-compose
docker-compose pull
docker-compose up -d --scale taskmanager=4
[+] Running 7/7
✔ Network flink-amd64_default Created
✔ Container flink-amd64-jobmanager-1 Started
✔ Container flink-amd64-kafka-1 Started
✔ Container flink-amd64-adminer-1 Started
✔ Container flink-amd64-mysql-1 Started
✔ Container flink-amd64-kafdrop-1 Started
✔ Container flink-amd64-taskmanager-1 Started
✔ Container flink-amd64-taskmanager-2 Started
✔ Container flink-amd64-taskmanager-3 Started
✔ Container flink-amd64-taskmanager-4 Started
docker-compose ps --format "table {{.Name}}\t{{.Service}}\t{{.Status}}"
NAME SERVICE STATUS
flink-amd64-adminer-1 adminer Up 3 minutes
flink-amd64-jobmanager-1 jobmanager Up 3 minutes
flink-amd64-kafdrop-1 kafdrop Up 3 minutes
flink-amd64-kafka-1 kafka Up 3 minutes
flink-amd64-mysql-1 mysql Up 3 minutes
flink-amd64-taskmanager-1 taskmanager Up 3 minutes
flink-amd64-taskmanager-2 taskmanager Up 3 minutes
flink-amd64-taskmanager-3 taskmanager Up 3 minutes
flink-amd64-taskmanager-4 taskmanager Up 3 minutes
运行 01_kafka_cdc.sql
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/01_kafka_cdc.sql
...
Flink SQL>
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7537135a66b3ead1213fcdbc334fa192
Flink SQL>
Shutting down the session...
done.
运行 02_paimon_cdc.sql
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/02_paimon_cdc.sql
...
Flink SQL>
[INFO] Execute statement succeed.
Flink SQL>
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 852a7286763bdf109cdf36aa1b9c3627
Flink SQL>
Shutting down the session...
done.
验证 Streaming CDC
-
插入新数据
docker-compose exec -T mysql mysql -uroot -prootpw -e \ "USE sales; INSERT INTO orders VALUES (3001, 31, 'US', 12.3, 'PAID', NOW());"
-
检查 Flink jobs
访问
http://localhost:8081
查看 Flink running jobs
-
检查 MySQL table
访问
http://localhost:8080
登录:
Server: mysql Username: root Password: rootpw Database: sales
检查 table
sales.orders
-
检查 Kafka events
访问
http://localhost:9000
查看 topic
orders_topic
验证 Batch Job
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/03_topn_batch.sql
...
Flink SQL>
> INSERT OVERWRITE top_customers
> SELECT customer_id,
> CAST(SUM(amount) AS DOUBLE)
[INFO] Complete execution of the SQL update statement.
Flink SQL>
+-------------+--------------+
| customer_id | total_amount |
+-------------+--------------+
| 3 | 66.6 |
| 4 | 38.9 |
| 2 | 35.2 |
| 6 | 33.8 |
| 5 | 25.3 |
+-------------+--------------+
5 rows in set (0.94 seconds)
Flink SQL>
Shutting down the session...
done.
验证 Paimon Table
docker-compose exec -T mysql mysql -uroot -prootpw -e "
INSERT INTO sales.orders
(order_id, customer_id, region, amount, status, order_time)
VALUES
(6001, 61, 'US', 18.5, 'NEW', NOW()),
(6002, 62, 'EU', 27.0, 'PAID', NOW()),
(6003, 63, 'APAC',33.3, 'NEW', NOW());
UPDATE sales.orders SET amount = amount + 5, status='PAID' WHERE order_id IN (6001, 6002);
DELETE FROM sales.orders WHERE order_id = 6003;
"
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/04_paimon_read.sql
...
Flink SQL>
[INFO] Execute statement succeed.
Flink SQL>
> SELECT COUNT(*)
+-----+
| cnt |
+-----+
| 9 |
+-----+
1 row in set (7.52 seconds)
Flink SQL>
+----------+-------------+--------+--------+--------+-------------------------+
| order_id | customer_id | region | amount | status | order_time |
+----------+-------------+--------+--------+--------+-------------------------+
| 1001 | 1 | US | 20.5 | NEW | 2025-09-23 02:23:53.000 |
| 1002 | 2 | EU | 35.2 | NEW | 2025-09-23 02:23:53.000 |
| 1003 | 3 | CN | 66.6 | NEW | 2025-09-23 02:23:53.000 |
| 1004 | 4 | UK | 38.9 | NEW | 2025-09-23 02:23:53.000 |
| 1005 | 5 | AU | 25.3 | NEW | 2025-09-23 02:23:53.000 |
| 1006 | 6 | JP | 33.8 | NEW | 2025-09-23 02:23:53.000 |
| 3001 | 31 | US | 12.3 | PAID | 2025-09-23 02:27:03.000 |
| 6001 | 61 | US | 23.5 | PAID | 2025-09-23 02:30:49.000 |
| 6002 | 62 | EU | 32.0 | PAID | 2025-09-23 02:30:49.000 |
+----------+-------------+--------+--------+--------+-------------------------+
9 rows in set (0.92 seconds)
Flink SQL>
Shutting down the session...
done.
检查 Flink finished jobs

查看 Output 产物
tree output
output
├── checkpoints
├── top_customers
│ └── part-b36f808b-b393-4359-bfb0-05eaf732ee2f-task-0-file-0
└── warehouse
├── default.db
└── dwd.db
└── orders_paimon
├── bucket-0
│ ├── changelog-1e5892ad-7d3b-4dbb-8801-deb523e8bb9c-0.parquet
│ ├── changelog-49f7184f-cbd9-4ecb-bc93-6479fe5dd88d-0.parquet
│ ├── changelog-7a66b3ac-64bf-454a-8ca8-3185e281d5d0-0.parquet
│ ├── data-1e5892ad-7d3b-4dbb-8801-deb523e8bb9c-1.parquet
│ ├── data-49f7184f-cbd9-4ecb-bc93-6479fe5dd88d-1.parquet
│ └── data-7a66b3ac-64bf-454a-8ca8-3185e281d5d0-1.parquet
├── manifest
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-0
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-1
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-2
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-3
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-4
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-5
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-0
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-1
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-10
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-11
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-12
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-13
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-14
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-15
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-16
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-17
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-18
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-19
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-2
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-20
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-3
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-4
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-5
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-6
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-7
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-8
│ └── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-9
├── schema
│ └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
├── snapshot-1
├── snapshot-2
├── snapshot-3
├── snapshot-4
├── snapshot-5
├── snapshot-6
├── snapshot-7
├── snapshot-8
└── snapshot-9