Contents

Quickstart Flink SQL Unified Batch and Streaming

Set up a minimal CDC (Change Data Capture) and Unified Batch and Streaming processing using Docker, MySQL, and Flink SQL.


This article demonstrates how to quickly set up a minimal CDC (Change Data Capture) and Unified Batch and Streaming processing using Docker, MySQL, and Flink SQL, with the following features:

  • The entire process requires no Java/Scala code, only Flink SQL
  • Flink CDC captures MySQL data changes
  • Flink streaming processing writes to Kafka + Paimon (Lakehouse Storage Engine)
  • Flink batch processing writes statistics to CSV
  • Visualization tools: Adminer (MySQL) and Kafdrop (Kafka)

It is designed to help operations engineers and data analysts quickly get hands-on experience with Flink Data Source → CDC Capture → Batch-Stream Analysis → Downstream Results.

Setup Services

Structure

.
├── flink
│   ├── conf
│   │   └── sql-client-defaults.yaml
│   └── lib
│       ├── flink-connector-jdbc-3.2.0-1.19.jar
│       ├── flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
│       ├── flink-sql-connector-kafka-3.2.0-1.19.jar
│       ├── flink-sql-connector-mysql-cdc-3.2.0.jar
│       ├── mysql-connector-j-8.4.0.jar
│       ├── paimon-flink-1.19-1.2.0.jar
│       └── paimon-format-1.2.0.jar
├── output
│   └── checkpoints
├── sql
│   ├── 01_kafka_cdc.sql
│   ├── 02_paimon_cdc.sql
│   ├── 03_topn_batch.sql
│   └── 04_paimon_read.sql
├── docker-compose.yml
└── mysql-init.sql

Download JARs

mkdir flink-amd64
cd flink-amd64
mkdir -p output/checkpoints
mkdir -p flink/lib
cd flink/lib

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.0/flink-sql-connector-mysql-cdc-3.2.0.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.4.0/mysql-connector-j-8.4.0.jar
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.19/1.2.0/paimon-flink-1.19-1.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-format/1.2.0/paimon-format-1.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

Create docker-compose.yml

cd ../../
vim docker-compose.yml
services:
  mysql:
    image: mysql:8.0
    platform: linux/amd64
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=rootpw
      - MYSQL_DATABASE=sales
    command: >
      --server-id=857
      --log-bin=binlog
      --binlog_format=ROW
      --binlog_row_image=FULL
      --gtid_mode=ON
      --enforce-gtid-consistency=ON
      --binlog_expire_logs_seconds=600
    volumes:
      - ./mysql-init.sql:/docker-entrypoint-initdb.d/mysql-init.sql

  kafka:
    image: bitnami/kafka:3.7
    platform: linux/amd64
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - ALLOW_PLAINTEXT_LISTENER=yes

  kafdrop:
    image: obsidiandynamics/kafdrop:4.2.0
    platform: linux/amd64
    ports:
      - "9000:9000"
    environment:
      - KAFKA_BROKERCONNECT=kafka:9092
    depends_on:
      - kafka

  adminer:
    image: adminer:5.4.0
    platform: linux/amd64
    ports:
      - "8080:8080"

  jobmanager:
    image: flink:1.19.1-scala_2.12-java11
    platform: linux/amd64
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    ports:
      - "8081:8081"
    volumes:
      - ./flink/conf/sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:ro
      - ./flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:ro
      - ./flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:ro
      - ./flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:ro
      - ./flink/lib/mysql-connector-j-8.4.0.jar:/opt/flink/lib/mysql-connector-j-8.4.0.jar:ro
      - ./flink/lib/paimon-flink-1.19-1.2.0.jar:/opt/flink/lib/paimon-flink-1.19-1.2.0.jar:ro
      - ./flink/lib/paimon-format-1.2.0.jar:/opt/flink/lib/paimon-format-1.2.0.jar:ro
      # self-contained and isolated set of Hadoop client libraries
      - ./flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:ro
      - ./sql:/opt/sql:ro
      - ./output:/opt/flink/output

  taskmanager:
    image: flink:1.19.1-scala_2.12-java11
    platform: linux/amd64
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    depends_on:
      - jobmanager
    volumes:
      - ./flink/conf/sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:ro
      - ./flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar:ro
      - ./flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.19.jar:ro
      - ./flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:ro
      - ./flink/lib/mysql-connector-j-8.4.0.jar:/opt/flink/lib/mysql-connector-j-8.4.0.jar:ro
      - ./flink/lib/paimon-flink-1.19-1.2.0.jar:/opt/flink/lib/paimon-flink-1.19-1.2.0.jar:ro
      - ./flink/lib/paimon-format-1.2.0.jar:/opt/flink/lib/paimon-format-1.2.0.jar:ro
      # self-contained and isolated set of Hadoop client libraries
      - ./flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:ro
      - ./sql:/opt/sql:ro
      - ./output:/opt/flink/output

Create mysql-init.sql

vim mysql-init.sql
CREATE DATABASE IF NOT EXISTS sales;
USE sales;

CREATE TABLE orders (
  order_id INT PRIMARY KEY,
  customer_id INT,
  region VARCHAR(10),
  amount DOUBLE,
  status VARCHAR(10),
  order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO orders VALUES
  (1001, 1, 'US', 20.5, 'NEW', NOW()),
  (1002, 2, 'EU', 35.2, 'NEW', NOW()),
  (1003, 3, 'CN', 66.6, 'NEW', NOW()),
  (1004, 4, 'UK', 38.9, 'NEW', NOW()),
  (1005, 5, 'AU', 25.3, 'NEW', NOW()),
  (1006, 6, 'JP', 33.8, 'NEW', NOW());

CREATE USER 'flink'@'%' IDENTIFIED BY 'flinkpw';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
FLUSH PRIVILEGES;
mkdir -p flink/conf
vim flink/conf/sql-client-defaults.yaml
execution:
  type: streaming
  result-mode: table
  parallelism: 1

configuration:
  execution.checkpointing.interval: 5 s
  execution.checkpointing.mode: EXACTLY_ONCE
  execution.checkpointing.min-pause: 1 s
  execution.checkpointing.timeout: 5 min

  state.checkpoints.dir: file:///opt/flink/output/checkpoints

  restart-strategy: fixed-delay
  restart-strategy.fixed-delay.attempts: 10
  restart-strategy.fixed-delay.delay: 5 s

  table.exec.sink.not-null-enforcer: drop
  table.exec.sink.upsert-materialize: none
  table.exec.source.idle-timeout: 5 s

deployment:
  response-timeout: 10000

Create 01_kafka_cdc.sql

mkdir -p sql
vim sql/01_kafka_cdc.sql
-- 01_kafka_cdc.sql

CREATE TABLE orders_cdc (
  order_id INT,
  customer_id INT,
  region STRING,
  amount DOUBLE,
  status STRING,
  order_time TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'port' = '3306',
  'username' = 'flink',
  'password' = 'flinkpw',
  'database-name' = 'sales',
  'table-name' = 'orders',
  'server-id' = '985',
  'scan.startup.mode' = 'initial'
);

CREATE TABLE orders_kafka (
  order_id INT,
  customer_id INT,
  region STRING,
  amount DOUBLE,
  status STRING,
  order_time TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'orders_topic',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

INSERT INTO orders_kafka
SELECT * FROM orders_cdc;

Create 02_paimon_cdc.sql

vim sql/02_paimon_cdc.sql
-- 02_paimon_cdc.sql

SET 'execution.runtime-mode' = 'streaming';
SET 'execution.checkpointing.interval' = '5 s';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.min-pause' = '1 s';
SET 'execution.checkpointing.timeout' = '5 min';

CREATE TABLE orders_cdc (
  order_id INT,
  customer_id INT,
  region STRING,
  amount DOUBLE,
  status STRING,
  order_time TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'port' = '3306',
  'username' = 'flink',
  'password' = 'flinkpw',
  'database-name' = 'sales',
  'table-name' = 'orders',
  'server-id' = '996',
  'scan.startup.mode' = 'initial'
);

CREATE CATALOG paimon_catalog WITH (
  'type' = 'paimon',
  'warehouse' = 'file:///opt/flink/output/warehouse'
);

USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS dwd;
USE dwd;

CREATE TABLE IF NOT EXISTS orders_paimon (
  order_id INT,
  customer_id INT,
  region STRING,
  amount DOUBLE,
  status STRING,
  order_time TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'paimon',
  'changelog-producer' = 'input',
  'bucket' = '1'
);

USE CATALOG default_catalog;

INSERT INTO paimon_catalog.dwd.orders_paimon SELECT * FROM orders_cdc;

Create 03_topn_batch.sql

vim sql/03_topn_batch.sql
-- 03_topn_batch.sql

SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'TABLEAU';
SET 'table.dml-sync' = 'true';

CREATE TABLE orders_jdbc (
  order_id INT,
  customer_id INT,
  region STRING,
  amount DOUBLE,
  status STRING,
  order_time TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysql:3306/sales',
  'table-name' = 'orders',
  'username' = 'flink',
  'password' = 'flinkpw'
);

CREATE TABLE top_customers (
  customer_id INT,
  total_amount DOUBLE
) WITH (
  'connector' = 'filesystem',
  'path' = '/opt/flink/output/top_customers',
  'format' = 'csv'
);

INSERT OVERWRITE top_customers
SELECT customer_id,
       CAST(SUM(amount) AS DOUBLE) AS total_amount
FROM orders_jdbc
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 5;

SELECT * FROM top_customers;

Create 04_paimon_read.sql

vim sql/04_paimon_read.sql
-- 04_paimon_read.sql

SET 'sql-client.execution.result-mode' = 'TABLEAU';
SET 'execution.runtime-mode' = 'batch';
SET 'table.dml-sync' = 'true';

CREATE CATALOG paimon_catalog WITH (
  'type' = 'paimon',
  'warehouse' = 'file:///opt/flink/output/warehouse'
);

USE CATALOG paimon_catalog;
USE dwd;

SELECT COUNT(*) AS cnt FROM orders_paimon;
SELECT * FROM orders_paimon;

Launch Batch and Streaming Jobs

Run docker-compose

docker-compose pull
docker-compose up -d --scale taskmanager=4
[+] Running 7/7
 ✔ Network flink-amd64_default          Created
 ✔ Container flink-amd64-jobmanager-1   Started
 ✔ Container flink-amd64-kafka-1        Started
 ✔ Container flink-amd64-adminer-1      Started
 ✔ Container flink-amd64-mysql-1        Started
 ✔ Container flink-amd64-kafdrop-1      Started
 ✔ Container flink-amd64-taskmanager-1  Started
 ✔ Container flink-amd64-taskmanager-2  Started
 ✔ Container flink-amd64-taskmanager-3  Started
 ✔ Container flink-amd64-taskmanager-4  Started
docker-compose ps --format "table {{.Name}}\t{{.Service}}\t{{.Status}}"
NAME                        SERVICE       STATUS
flink-amd64-adminer-1       adminer       Up 3 minutes
flink-amd64-jobmanager-1    jobmanager    Up 3 minutes
flink-amd64-kafdrop-1       kafdrop       Up 3 minutes
flink-amd64-kafka-1         kafka         Up 3 minutes
flink-amd64-mysql-1         mysql         Up 3 minutes
flink-amd64-taskmanager-1   taskmanager   Up 3 minutes
flink-amd64-taskmanager-2   taskmanager   Up 3 minutes
flink-amd64-taskmanager-3   taskmanager   Up 3 minutes
flink-amd64-taskmanager-4   taskmanager   Up 3 minutes

Run 01_kafka_cdc.sql

docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/01_kafka_cdc.sql
...

Flink SQL>
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7537135a66b3ead1213fcdbc334fa192

Flink SQL>
Shutting down the session...
done.

Run 02_paimon_cdc.sql

docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/02_paimon_cdc.sql
...

Flink SQL>
[INFO] Execute statement succeed.

Flink SQL>
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 852a7286763bdf109cdf36aa1b9c3627

Flink SQL>
Shutting down the session...
done.

Verify Streaming CDC

  1. Insert a new data

    docker-compose exec -T mysql mysql -uroot -prootpw -e \
    "USE sales; INSERT INTO orders VALUES (3001, 31, 'US', 12.3, 'PAID', NOW());"
  2. Check Flink jobs

    Visit http://localhost:8081

    Check Flink running jobs

    flink_running_jobs
  3. Check MySQL table

    Visit http://localhost:8080

    Login as:

    Server: mysql
    Username: root
    Password: rootpw
    Database: sales
    flink_mysql_adminer_login

    Check the table sales.orders

    flink_mysql_adminer_sales_orders
  4. Check Kafka events

    Visit http://localhost:9000

    Check the topic orders_topic

    flink_kafdrop_cdc_orders_topic

Verify Batch Job

docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/03_topn_batch.sql
...

Flink SQL>
> INSERT OVERWRITE top_customers
> SELECT customer_id,
>        CAST(SUM(amount) AS DOUBLE)
[INFO] Complete execution of the SQL update statement.

Flink SQL>
+-------------+--------------+
| customer_id | total_amount |
+-------------+--------------+
|           3 |         66.6 |
|           4 |         38.9 |
|           2 |         35.2 |
|           6 |         33.8 |
|           5 |         25.3 |
+-------------+--------------+
5 rows in set (0.94 seconds)

Flink SQL>
Shutting down the session...
done.

Verify Paimon Table

docker-compose exec -T mysql mysql -uroot -prootpw -e "
INSERT INTO sales.orders
  (order_id, customer_id, region, amount, status, order_time)
VALUES
  (6001, 61, 'US',  18.5, 'NEW',  NOW()),
  (6002, 62, 'EU',  27.0, 'PAID', NOW()),
  (6003, 63, 'APAC',33.3, 'NEW',  NOW());
UPDATE sales.orders SET amount = amount + 5, status='PAID' WHERE order_id IN (6001, 6002);
DELETE FROM sales.orders WHERE order_id = 6003;
"
docker-compose exec -T jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/04_paimon_read.sql
...

Flink SQL>
[INFO] Execute statement succeed.

Flink SQL>
> SELECT COUNT(*)
+-----+
| cnt |
+-----+
|   9 |
+-----+
1 row in set (7.52 seconds)

Flink SQL>
+----------+-------------+--------+--------+--------+-------------------------+
| order_id | customer_id | region | amount | status |              order_time |
+----------+-------------+--------+--------+--------+-------------------------+
|     1001 |           1 |     US |   20.5 |    NEW | 2025-09-23 02:23:53.000 |
|     1002 |           2 |     EU |   35.2 |    NEW | 2025-09-23 02:23:53.000 |
|     1003 |           3 |     CN |   66.6 |    NEW | 2025-09-23 02:23:53.000 |
|     1004 |           4 |     UK |   38.9 |    NEW | 2025-09-23 02:23:53.000 |
|     1005 |           5 |     AU |   25.3 |    NEW | 2025-09-23 02:23:53.000 |
|     1006 |           6 |     JP |   33.8 |    NEW | 2025-09-23 02:23:53.000 |
|     3001 |          31 |     US |   12.3 |   PAID | 2025-09-23 02:27:03.000 |
|     6001 |          61 |     US |   23.5 |   PAID | 2025-09-23 02:30:49.000 |
|     6002 |          62 |     EU |   32.0 |   PAID | 2025-09-23 02:30:49.000 |
+----------+-------------+--------+--------+--------+-------------------------+
9 rows in set (0.92 seconds)

Flink SQL>
Shutting down the session...
done.

Check Flink finished jobs

flink_finished_jobs

Check Output artifacts

tree output
output
├── checkpoints
├── top_customers
│   └── part-b36f808b-b393-4359-bfb0-05eaf732ee2f-task-0-file-0
└── warehouse
    ├── default.db
    └── dwd.db
        └── orders_paimon
            ├── bucket-0
            │   ├── changelog-1e5892ad-7d3b-4dbb-8801-deb523e8bb9c-0.parquet
            │   ├── changelog-49f7184f-cbd9-4ecb-bc93-6479fe5dd88d-0.parquet
            │   ├── changelog-7a66b3ac-64bf-454a-8ca8-3185e281d5d0-0.parquet
            │   ├── data-1e5892ad-7d3b-4dbb-8801-deb523e8bb9c-1.parquet
            │   ├── data-49f7184f-cbd9-4ecb-bc93-6479fe5dd88d-1.parquet
            │   └── data-7a66b3ac-64bf-454a-8ca8-3185e281d5d0-1.parquet
            ├── manifest
            │   ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-0
            │   ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-1
            │   ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-2
            │   ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-3
            │   ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-4
            │   ├── manifest-18b7fa7b-7b72-48cf-93e9-3bc50d57dd3b-5
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-0
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-1
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-10
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-11
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-12
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-13
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-14
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-15
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-16
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-17
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-18
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-19
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-2
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-20
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-3
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-4
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-5
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-6
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-7
            │   ├── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-8
            │   └── manifest-list-770c3f24-9fbe-48b4-b29f-c3eabe5c8d35-9
            ├── schema
            │   └── schema-0
            └── snapshot
                ├── EARLIEST
                ├── LATEST
                ├── snapshot-1
                ├── snapshot-2
                ├── snapshot-3
                ├── snapshot-4
                ├── snapshot-5
                ├── snapshot-6
                ├── snapshot-7
                ├── snapshot-8
                └── snapshot-9