1. 引言

现代应用常需要多种数据存储:副本数据库、搜索索引、缓存存储和数据仓库等。当系统需要支持不同数据模型和访问模式时,变更数据捕获(CDC) 就派上用场了!

本文将简要介绍 CDC,并重点讲解 Debezium 这个流行的 CDC 平台

2. 什么是 CDC?

2.1. 变更数据捕获概述

CDC 是一种技术模式和设计模式,主要用于实时数据库间数据复制。它能追踪源数据库的变更,自动同步到目标数据库,实现增量加载,避免全量更新

2.2. CDC 的优势

传统批处理同步存在明显缺陷:

  • ❌ 数据非实时同步
  • ❌ 资源消耗大
  • ❌ 仅在指定时段执行

而 CDC 的优势:

  • ✅ 持续追踪源数据库变更
  • ✅ 即时更新目标数据库
  • ✅ 通过流处理保证变更实时性

CDC 让数据库持续同步,全量查询成为历史。同时大幅降低数据传输成本,因为只传输增量变更。

2.3. 常见 CDC 应用场景

CDC 能解决多种问题:

  • 数据源间实时同步
  • 缓存更新/失效
  • 搜索索引维护
  • 微服务数据同步
  • 数据仓库实时加载

了解 CDC 能力后,我们看看知名开源工具如何实现它。

3. Debezium 平台

3.1. Debezium 简介

Debezium 是基于 Apache Kafka 构建的开源 CDC 平台。核心功能是记录源数据库表的所有行级变更到事务日志。监听这些事件的应用可基于增量变更执行操作。

它提供多种连接器库,支持 MySQL、MongoDB、PostgreSQL 等数据库。这些连接器监控变更并发布到 Kafka 等流服务。

即使应用宕机,Debezium 也能持续监控。重启后从断点继续消费,确保不丢失事件。

3.2. Debezium 架构

部署方式取决于基础设施,最常用的是 Apache Kafka Connect。Kafka Connect 是独立服务框架,用于在 Kafka 和其他系统间流式传输数据。

下图展示了基于 Debezium 的 CDC 管道架构:

Debezium Platform Architecture

  1. 左侧是 MySQL 源数据库
  2. Kafka Connect 连接器解析事务日志并写入 Kafka Topic
  3. Kafka 作为消息代理可靠传输变更集
  4. 右侧连接器轮询 Kafka 并推送变更到目标数据库

Debezium 架构依赖 Kafka,但也提供其他部署方式:

  • 独立服务器(Debezium Server)
  • 嵌入应用作为库使用

3.3. Debezium Server

Debezium 提供独立服务器捕获源数据库变更,可配置使用各种连接器,支持将变更事件发送到 Amazon Kinesis 或 Google Cloud Pub/Sub 等消息基础设施。

3.4. 嵌入式 Debezium

Kafka Connect 提供容错和扩展性,但有时应用不需要这种级别可靠性,且想降低基础设施成本。此时可将 Debezium 引擎嵌入应用,然后配置连接器即可。

4. 环境搭建

4.1. 示例应用架构

我们创建一个简单的 Spring Boot 客户管理应用。客户模型包含 ID、全名和邮箱字段,数据层使用 Spring Data JPA。应用将运行嵌入式 Debezium:

Springboot Debezium Embedded Integration

  1. Debezium 引擎监控源 MySQL 数据库的 customer 表事务日志
  2. 当对 customer 表执行增删改时,连接器调用服务方法
  3. 该方法将变更同步到目标 MySQL 数据库(应用主库)

4.2. Maven 依赖

pom.xml 添加核心依赖:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>2.5.0.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>2.5.0.Final</version>
</dependency>

添加 MySQL 连接器:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>2.5.0.Final</version>
</dependency>

4.3. 数据库安装

使用 docker-compose 快速部署两个数据库实例:

version: "3.9"
services:
  # 源数据库
  mysql-1:
    container_name: source-database
    image: mysql
    ports:
      - 3305:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

  # 目标数据库
  mysql-2:
    container_name: target-database
    image: mysql
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

执行 docker-compose up -d 启动。然后创建 customer 表:

CREATE TABLE customer
(
    id integer NOT NULL,
    fullname character varying(255),
    email character varying(255),
    CONSTRAINT customer_pkey PRIMARY KEY (id)
);

5. 配置详解

5.1. 配置 Debezium 连接器

创建配置 Bean:

@Bean
public io.debezium.config.Configuration customerConnector() {
    return io.debezium.config.Configuration.create()
        .with("name", "customer-mysql-connector")
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", "/tmp/offsets.dat")
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", customerDbHost)
        .with("database.port", customerDbPort)
        .with("database.user", customerDbUsername)
        .with("database.password", customerDbPassword)
        .with("database.dbname", customerDbName)
        .with("database.include.list", customerDbName)
        .with("include.schema.changes", "false")
        .with("database.server.id", "10181")
        .with("database.server.name", "customer-mysql-db-server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", "/tmp/dbhistory.dat")
        .build();
}

关键配置说明:

  • 使用 MySqlConnector 监控 MySQL
  • FileOffsetBackingStore 在本地文件存储偏移量
  • MySQL 连接参数(主机/端口/用户等)
  • database.server.id 需唯一,避免冲突

5.2. 运行 Debezium 引擎

创建引擎并绑定事件处理器:

private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
      .using(customerConnectorConfiguration.asProperties())
      .notifying(this::handleEvent)
      .build();

    this.customerService = customerService;
}

事件处理逻辑:

private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
    SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
    Struct sourceRecordChangeValue= (Struct) sourceRecord.value();

    if (sourceRecordChangeValue != null) {
        Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

        if(operation != Operation.READ) {
            String record = operation == Operation.DELETE ? BEFORE : AFTER;
            Struct struct = (Struct) sourceRecordChangeValue.get(record);
            Map<String, Object> payload = struct.schema().fields().stream()
              .map(Field::name)
              .filter(fieldName -> struct.get(fieldName) != null)
              .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
              .collect(toMap(Pair::getKey, Pair::getValue));

            this.customerService.replicateData(payload, operation);
        }
    }
}

启动/停止引擎:

private final Executor executor = Executors.newSingleThreadExecutor();

@PostConstruct
private void start() {
    this.executor.execute(debeziumEngine);
}

@PreDestroy
private void stop() throws IOException {
    if (this.debeziumEngine != null) {
        this.debeziumEngine.close();
    }
}

6. 实战演示

6.1. 插入记录

在源数据库执行:

INSERT INTO customerdb.customer (id, fullname, email) VALUES (1, 'John Doe', 'john.doe@example.com')

应用输出:

23:57:57.897 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=John Doe,email=john.doe@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746277000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=703,row=0,thread=19},op=c,ts_ms=1617746277422}'
Hibernate: insert into customer (email, fullname, id) values (?, ?, ?)
23:57:58.095 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=john.doe@example.com} with Operation: CREATE

目标数据库验证:

id  fullname   email
1  John Doe   john.doe@example.com

6.2. 更新记录

执行更新:

UPDATE customerdb.customer t SET t.email = 'john.new@example.com' WHERE t.id = 1

应用输出:

00:08:57.893 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,email=john.doe@example.com},after=Struct{id=1,fullname=John Doe,email=john.new@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746937000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1040,row=0,thread=19},op=u,ts_ms=1617746937703}'
Hibernate: update customer set email=?, fullname=? where id=?
00:08:57.938 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=john.new@example.com} with Operation: UPDATE

目标数据库验证:

id  fullname   email
1  John Doe   john.new@example.com

6.3. 删除记录

执行删除:

DELETE FROM customerdb.customer WHERE id = 1

应用输出:

00:12:16.892 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,email=john.new@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617747136000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1406,row=0,thread=19},op=d,ts_ms=1617747136640}'
Hibernate: delete from customer where id=?
00:12:16.951 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=john.new@example.com} with Operation: DELETE

目标数据库验证:

select * from customerdb.customer where id= 1
0 rows retrieved

7. 总结

本文展示了 CDC 的优势及解决的问题。没有 CDC 时,我们只能依赖耗时且昂贵的全量数据加载。

Debezium 作为优秀的开源平台,能轻松解决各类 CDC 场景。踩坑提醒:生产环境需注意偏移量存储的容错性,避免文件存储方案导致数据丢失。

完整源码可在 GitHub 获取。


原始标题:Introduction to Debezium | Baeldung