Quickstart Flink SQL Unified Batch and Streaming

Set up a minimal CDC (Change Data Capture) and Unified Batch and Streaming processing using Docker, MySQL, and Flink SQL.
This article demonstrates how to quickly set up a minimal CDC (Change Data Capture) and Unified Batch and Streaming processing using Docker, MySQL, and Flink SQL, with the following features:
- The entire process requires no Java/Scala code, only Flink SQL
- Flink CDC captures MySQL data changes
- Flink streaming processing writes to Kafka + Paimon (Lakehouse Storage Engine)
- Flink batch processing writes statistics to CSV
- Visualization tools: Adminer (MySQL) and Kafdrop (Kafka)
It is designed to help operations engineers and data analysts quickly get hands-on experience with Flink Data Source → CDC Capture → Batch-Stream Analysis → Downstream Results
.
Setup Services
Structure
.
├── flink
│ ├── conf
│ │ └── sql-client-defaults.yaml
│ └── lib
│ ├── flink-connector-jdbc-3.2.0-1.19.jar
│ ├── flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
│ ├── flink-sql-connector-kafka-3.2.0-1.19.jar
│ ├── flink-sql-connector-mysql-cdc-3.2.0.jar
│ ├── mysql-connector-j-8.4.0.jar
│ ├── paimon-flink-1.19-1.2.0.jar
│ └── paimon-format-1.2.0.jar
├── output
│ └── checkpoints
├── sql
│ ├── 01_kafka_cdc.sql
│ ├── 02_paimon_cdc.sql
│ ├── 03_topn_batch.sql
│ └── 04_paimon_read.sql
├── docker-compose.yml
└── mysql-init.sql
Download JARs
mkdir flink-amd64
cd flink-amd64
mkdir -p output/checkpoints
mkdir -p flink/lib
cd flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.0/flink-sql-connector-mysql-cdc-3.2.0.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.4.0/mysql-connector-j-8.4.0.jar
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.19/1.2.0/paimon-flink-1.19-1.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-format/1.2.0/paimon-format-1.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
Create docker-compose.yml
cd ../../
vim docker-compose.yml
services:
mysql:
image: mysql:8.0
platform: linux/amd64
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=rootpw
- MYSQL_DATABASE=sales
command: >
--server-id=857
--log-bin=binlog
--binlog_format=ROW
--binlog_row_image=FULL
--gtid_mode=ON
--enforce-gtid-consistency=ON
--binlog_expire_logs_seconds=600
volumes:
- ./mysql-init.sql:/docker-entrypoint-initdb.d/mysql-init.sql
kafka:
image: bitnami/kafka:3.7
platform: linux/amd64
ports:
- "9092:9092"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes
kafdrop:
image: obsidiandynamics/kafdrop:4.2.0
platform: linux/amd64
ports:
- "9000:9000"
environment:
- KAFKA_BROKERCONNECT=kafka:9092
depends_on:
- kafka
adminer:
image: adminer:5.4.0
platform: linux/amd64
ports:
- "8080:8080"
jobmanager:
image: flink:1.19.1-scala_2.12-java11
platform: linux/amd64
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
ports:
- "8081:8081"
volumes:
- ./flink/conf/sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:ro
- ./flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:ro
- ./flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:ro
- ./flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:ro
- ./flink/lib/mysql-connector-j-8.4.0.jar:/opt/flink/lib/mysql-connector-j-8.4.0.jar:ro
- ./flink/lib/paimon-flink-1.19-1.2.0.jar:/opt/flink/lib/paimon-flink-1.19-1.2.0.jar:ro
- ./flink/lib/paimon-format-1.2.0.jar:/opt/flink/lib/paimon-format-1.2.0.jar:ro
# self-contained and isolated set of Hadoop client libraries
- ./flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:ro
- ./sql:/opt/sql:ro
- ./output:/opt/flink/output
taskmanager:
image: flink:1.19.1-scala_2.12-java11
platform: linux/amd64
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
depends_on:
- jobmanager
volumes:
- ./flink/conf/sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:ro
- ./flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:ro
- ./flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:ro
- ./flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:ro
- ./flink/lib/mysql-connector-j-8.4.0.jar:/opt/flink/lib/mysql-connector-j-8.4.0.jar:ro
- ./flink/lib/paimon-flink-1.19-1.2.0.jar:/opt/flink/lib/paimon-flink-1.19-1.2.0.jar:ro
- ./flink/lib/paimon-format-1.2.0.jar:/opt/flink/lib/paimon-format-1.2.0.jar:ro
# self-contained and isolated set of Hadoop client libraries
- ./flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:ro
- ./sql:/opt/sql:ro
- ./output:/opt/flink/output
Create mysql-init.sql
vim mysql-init.sql
CREATE DATABASE IF NOT EXISTS sales;
USE sales;
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
region VARCHAR(10),
amount DOUBLE,
status VARCHAR(10),
order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO orders VALUES
(1001, 1, 'US', 20.5, 'NEW', NOW()),
(1002, 2, 'EU', 35.2, 'NEW', NOW()),
(1003, 3, 'CN', 66.6, 'NEW', NOW()),
(1004, 4, 'UK', 38.9, 'NEW', NOW()),
(1005, 5, 'AU', 25.3, 'NEW', NOW()),
(1006, 6, 'JP', 33.8, 'NEW', NOW());
CREATE USER 'flink'@'%' IDENTIFIED BY 'flinkpw';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
FLUSH PRIVILEGES;
Create Flink Configuration
mkdir -p flink/conf
vim flink/conf/sql-client-defaults.yaml
execution:
type: streaming
result-mode: table
parallelism: 1
configuration:
execution.checkpointing.interval: 5 s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 1 s
execution.checkpointing.timeout: 5 min
state.checkpoints.dir: file:///opt/flink/output/checkpoints
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 5 s
table.exec.sink.not-null-enforcer: drop
table.exec.sink.upsert-materialize: none
table.exec.source.idle-timeout: 5 s
deployment:
response-timeout: 10000
Create 01_kafka_cdc.sql
mkdir -p sql
vim sql/01_kafka_cdc.sql
-- 01_kafka_cdc.sql
CREATE TABLE orders_cdc (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'flink',
'password' = 'flinkpw',
'database-name' = 'sales',
'table-name' = 'orders',
'server-id' = '985',
'scan.startup.mode' = 'initial'
);
CREATE TABLE orders_kafka (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO orders_kafka
SELECT * FROM orders_cdc;
Create 02_paimon_cdc.sql
vim sql/02_paimon_cdc.sql
-- 02_paimon_cdc.sql
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.checkpointing.interval' = '5 s';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.min-pause' = '1 s';
SET 'execution.checkpointing.timeout' = '5 min';
CREATE TABLE orders_cdc (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'flink',
'password' = 'flinkpw',
'database-name' = 'sales',
'table-name' = 'orders',
'server-id' = '996',
'scan.startup.mode' = 'initial'
);
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:///opt/flink/output/warehouse'
);
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS dwd;
USE dwd;
CREATE TABLE IF NOT EXISTS orders_paimon (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'changelog-producer' = 'input',
'bucket' = '1'
);
USE CATALOG default_catalog;
INSERT INTO paimon_catalog.dwd.orders_paimon SELECT * FROM orders_cdc;
Create 03_topn_batch.sql
vim sql/03_topn_batch.sql
-- 03_topn_batch.sql
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'TABLEAU';
SET 'table.dml-sync' = 'true';
CREATE TABLE orders_jdbc (
order_id INT,
customer_id INT,
region STRING,
amount DOUBLE,
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/sales',
'table-name' = 'orders',
'username' = 'flink',
'password' = 'flinkpw'
);
CREATE TABLE top_customers (
customer_id INT,
total_amount DOUBLE
) WITH (
'connector' = 'filesystem',
'path' = '/opt/flink/output/top_customers',
'format' = 'csv'
);
INSERT OVERWRITE top_customers
SELECT customer_id,
CAST(SUM(amount) AS DOUBLE) AS total_amount
FROM orders_jdbc
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 5;
SELECT * FROM top_customers;
Create 04_paimon_read.sql
vim sql/04_paimon_read.sql
-- 04_paimon_read.sql
SET 'sql-client.execution.result-mode' = 'TABLEAU';
SET 'execution.runtime-mode' = 'batch';
SET 'table.dml-sync' = 'true';
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'file:///opt/flink/output/warehouse'
);
USE CATALOG paimon_catalog;
USE dwd;
SELECT COUNT(*) AS cnt FROM orders_paimon;
SELECT * FROM orders_paimon;
Launch Batch and Streaming Jobs
Run docker-compose
docker-compose pull
docker-compose up -d --scale taskmanager=4
[+] Running 7/7
✔ Network flink-amd64_default Created
✔ Container flink-amd64-jobmanager-1 Started
✔ Container flink-amd64-kafka-1 Started
✔ Container flink-amd64-adminer-1 Started
✔ Container flink-amd64-mysql-1 Started
✔ Container flink-amd64-kafdrop-1 Started
✔ Container flink-amd64-taskmanager-1 Started
✔ Container flink-amd64-taskmanager-2 Started
✔ Container flink-amd64-taskmanager-3 Started
✔ Container flink-amd64-taskmanager-4 Started
docker-compose ps --format "table {{.Name}}\t{{.Service}}\t{{.Status}}"
NAME SERVICE STATUS
flink-amd64-adminer-1 adminer Up 3 minutes
flink-amd64-jobmanager-1 jobmanager Up 3 minutes
flink-amd64-kafdrop-1 kafdrop Up 3 minutes
flink-amd64-kafka-1 kafka Up 3 minutes
flink-amd64-mysql-1 mysql Up 3 minutes
flink-amd64-taskmanager-1 taskmanager Up 3 minutes
flink-amd64-taskmanager-2 taskmanager Up 3 minutes
flink-amd64-taskmanager-3 taskmanager Up 3 minutes
flink-amd64-taskmanager-4 taskmanager Up 3 minutes
Run 01_kafka_cdc.sql
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/01_kafka_cdc.sql
...
Flink SQL>
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7537135a66b3ead1213fcdbc334fa192
Flink SQL>
Shutting down the session...
done.
Run 02_paimon_cdc.sql
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/02_paimon_cdc.sql
...
Flink SQL>
[INFO] Execute statement succeed.
Flink SQL>
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 852a7286763bdf109cdf36aa1b9c3627
Flink SQL>
Shutting down the session...
done.
Verify Streaming CDC
-
Insert a new data
docker-compose exec -T mysql mysql -uroot -prootpw -e \ "USE sales; INSERT INTO orders VALUES (3001, 31, 'US', 12.3, 'PAID', NOW());"
-
Check Flink jobs
Visit
http://localhost:8081
Check Flink running jobs
-
Check MySQL table
Visit
http://localhost:8080
Login as:
Server: mysql Username: root Password: rootpw Database: sales
Check the table
sales.orders
-
Check Kafka events
Visit
http://localhost:9000
Check the topic
orders_topic
Verify Batch Job
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/03_topn_batch.sql
...
Flink SQL>
> INSERT OVERWRITE top_customers
> SELECT customer_id,
> CAST(SUM(amount) AS DOUBLE)
[INFO] Complete execution of the SQL update statement.
Flink SQL>
+-------------+--------------+
| customer_id | total_amount |
+-------------+--------------+
| 3 | 66.6 |
| 4 | 38.9 |
| 2 | 35.2 |
| 6 | 33.8 |
| 5 | 25.3 |
+-------------+--------------+
5 rows in set (0.94 seconds)
Flink SQL>
Shutting down the session...
done.
Verify Paimon Table
docker-compose exec -T mysql mysql -uroot -prootpw -e "
INSERT INTO sales.orders
(order_id, customer_id, region, amount, status, order_time)
VALUES
(6001, 61, 'US', 18.5, 'NEW', NOW()),
(6002, 62, 'EU', 27.0, 'PAID', NOW()),
(6003, 63, 'APAC',33.3, 'NEW', NOW());
UPDATE sales.orders SET amount = amount + 5, status='PAID' WHERE order_id IN (6001, 6002);
DELETE FROM sales.orders WHERE order_id = 6003;
"
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/04_paimon_read.sql
...
Flink SQL>
[INFO] Execute statement succeed.
Flink SQL>
> SELECT COUNT(*)
+-----+
| cnt |
+-----+
| 9 |
+-----+
1 row in set (7.52 seconds)
Flink SQL>
+----------+-------------+--------+--------+--------+-------------------------+
| order_id | customer_id | region | amount | status | order_time |
+----------+-------------+--------+--------+--------+-------------------------+
| 1001 | 1 | US | 20.5 | NEW | 2025-09-23 02:23:53.000 |
| 1002 | 2 | EU | 35.2 | NEW | 2025-09-23 02:23:53.000 |
| 1003 | 3 | CN | 66.6 | NEW | 2025-09-23 02:23:53.000 |
| 1004 | 4 | UK | 38.9 | NEW | 2025-09-23 02:23:53.000 |
| 1005 | 5 | AU | 25.3 | NEW | 2025-09-23 02:23:53.000 |
| 1006 | 6 | JP | 33.8 | NEW | 2025-09-23 02:23:53.000 |
| 3001 | 31 | US | 12.3 | PAID | 2025-09-23 02:27:03.000 |
| 6001 | 61 | US | 23.5 | PAID | 2025-09-23 02:30:49.000 |
| 6002 | 62 | EU | 32.0 | PAID | 2025-09-23 02:30:49.000 |
+----------+-------------+--------+--------+--------+-------------------------+
9 rows in set (0.92 seconds)
Flink SQL>
Shutting down the session...
done.
Check Flink finished jobs

Check Output artifacts
tree output
output
├── checkpoints
├── top_customers
│ └── part-b36f808b-b393-4359-bfb0-05eaf732ee2f-task-0-file-0
└── warehouse
├── default.db
└── dwd.db
└── orders_paimon
├── bucket-0
│ ├── changelog-1e5892ad-7d3b-4dbb-8801-deb523e8bb9c-0.parquet
│ ├── changelog-49f7184f-cbd9-4ecb-bc93-6479fe5dd88d-0.parquet
│ ├── changelog-7a66b3ac-64bf-454a-8ca8-3185e281d5d0-0.parquet
│ ├── data-1e5892ad-7d3b-4dbb-8801-deb523e8bb9c-1.parquet
│ ├── data-49f7184f-cbd9-4ecb-bc93-6479fe5dd88d-1.parquet
│ └── data-7a66b3ac-64bf-454a-8ca8-3185e281d5d0-1.parquet
├── manifest
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-0
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-1
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-2
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-3
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-4
│ ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-5
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-0
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-1
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-10
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-11
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-12
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-13
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-14
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-15
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-16
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-17
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-18
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-19
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-2
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-20
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-3
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-4
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-5
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-6
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-7
│ ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-8
│ └── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-9
├── schema
│ └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
├── snapshot-1
├── snapshot-2
├── snapshot-3
├── snapshot-4
├── snapshot-5
├── snapshot-6
├── snapshot-7
├── snapshot-8
└── snapshot-9