一、简介

当今的应用程序有时需要副本数据库、执行搜索操作的搜索索引、加速数据读取的缓存存储以及用于对数据进行复杂分析的数据仓库。

支持不同数据模型和数据访问模式的需求提出了大多数软件 Web 开发人员需要解决的常见问题,而这正是变更数据捕获 (CDC) 来救援的时候!

在本文中,我们将从 CDC 的简要概述开始,我们将重点关注 CDC 常用的平台 Debezium

2. 什么是CDC?

在本节中,我们将了解什么是 CDC、使用它的主要好处以及一些常见用例。

2.1.变更数据捕获

变更数据捕获(CDC)是一种技术和设计模式。我们经常使用它在数据库之间实时复制数据。

我们还可以跟踪写入源数据库的数据更改并自动同步目标数据库。 CDC 支持增量加载并消除批量加载更新的需要

2.2.CDC的优势

如今,大多数公司仍然使用批处理在系统之间同步数据。使用批处理:

  • 数据未立即同步
  • 更多分配的资源用于同步数据库
  • 数据复制仅在指定的批次期间发生

然而,变更数据捕获具有一些优势:

  • 不断跟踪源数据库的变化
  • 即时更新目标数据库
  • 使用流处理来保证即时更改

有了CDC, 不同的数据库就会持续同步 ,批量选择已经成为过去。此外,由于 CDC 仅传输增量更改 ,因此降低了传输数据的成本

2.3.常见的 CDC 用例

CDC 可以帮助我们解决多种用例,例如通过保持不同数据源同步来进行数据复制、更新或使缓存失效、更新搜索索引、微服务中的数据同步等等。

现在我们对 CDC 的功能有了一些了解,让我们看看它是如何在一个著名的开源工具中实现的。

3.Debezium平台

在本节中,我们将介绍Debezium ,详细了解其架构,并了解部署它的不同方法。

3.1.什么是Debezium?

Debezium 是一个构建在Apache Kafka之上的 CDC 开源平台。它的主要用途是在事务日志中 记录提交给每个源数据库表的所有行级更改 。侦听这些事件的每个应用程序都可以根据增量数据更改执行所需的操作。

Debezium 提供了一个连接器库,支持多种数据库,例如 MySQL、MongoDB、PostgreSQL 等。

这些连接器可以监视和记录数据库更改并将其发布到 Kafka 等流服务。

此外, 即使我们的应用程序出现故障,Debezium 也会进行监控 。重新启动后,它将开始消耗上次停止的事件,因此不会丢失任何内容。

3.2. Debezium架构

部署 Debezium 取决于我们拥有的基础设施,但更常见的是,我们经常使用 Apache Kafka Connect。

Kafka Connect 是一个框架,与 Kafka 代理一起作为单独的服务运行。我们用它在 Apache Kafka 和其他系统之间传输数据。

我们还可以定义连接器来将数据传入和传出 Kafka。

下图显示了基于 Debezium 的变更数据捕获管道的不同部分:

Debezium Platform Architecture

首先,在左侧,我们有一个 MySQL 源数据库,我们希望将其数据复制并在目标数据库(如 PostgreSQL 或任何分析数据库)中使用。

其次, Kafka Connect 连接器解析并解释事务日志并将其写入 Kafka 主题。

接下来,Kafka 充当消息代理,将变更集可靠地传输到目标系统。

然后,在右侧,我们有 Kafka 连接器轮询 Kafka 并将更改推送到目标数据库。

Debezium 在其架构中使用 Kafka ,但它还提供其他部署方法来满足我们的基础设施需求。

我们可以将其用作 Debezium 服务器的独立服务器,也可以将其作为库嵌入到我们的应用程序代码中。

我们将在以下部分中看到这些方法。

3.3. Debezium服务器

Debezium 提供了一个独立的服务器 来捕获源数据库的更改。它配置为使用 Debezium 源连接器之一。

此外,这些连接器将更改事件发送到各种消息基础设施,例如 Amazon Kinesis 或 Google Cloud Pub/Sub。

3.4.嵌入式 Debezium

Kafka Connect 在用于部署 Debezium 时提供容错能力和可扩展性。然而,有时我们的应用程序不需要这种级别的可靠性,并且我们希望最大限度地降低基础设施的成本。

值得庆幸的是, 我们可以通过将 Debezium 引擎嵌入到我们的应用程序中来做到这一点 。完成此操作后,我们必须配置连接器。

4. 设置

在本节中,我们将首先从应用程序的体系结构开始。然后,我们将了解如何设置环境并遵循一些基本步骤来集成 Debezium。

让我们首先介绍我们的应用程序。

4.1.示例应用程序的架构

为了使我们的应用程序保持简单,我们将创建一个用于客户管理的 Spring Boot 应用程序。

我们的客户模型具有 ID全名电子邮件 字段。对于数据访问层,我们将使用Spring Data JPA

最重要的是,我们的应用程序将运行 Debezium 的嵌入式版本。让我们想象一下这个应用程序架构:

Springboot Debezium 嵌入式集成

首先,Debezium 引擎将跟踪源 MySQL 数据库(来自另一个系统或应用程序)上的 客户 表的事务日志。

其次,每当我们对 客户 表执行插入/更新/删除等数据库操作时,Debezium 连接器都会调用一个服务方法。

最后,根据这些事件,该方法会将 客户 表的数据同步到目标 MySQL 数据库(我们应用程序的主数据库)。

4.2. Maven 依赖项

让我们首先将所需的依赖项添加到 pom.xml 中:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>1.4.2.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.4.2.Final</version>
</dependency>

同样,我们为应用程序将使用的每个 Debezium 连接器添加依赖项。

在我们的例子中,我们将使用MySQL 连接器

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>1.4.2.Final</version>
</dependency>

4.3.安装数据库

我们可以手动安装和配置我们的数据库。但是,为了加快速度,我们将使用 docker-compose 文件:

version: "3.9"
services:
  # Install Source MySQL DB and setup the Customer database
  mysql-1:
    container_name: source-database
    image: mysql
    ports:
      - 3305:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

  # Install Target MySQL DB and setup the Customer database
  mysql-2:
    container_name: target-database
    image: mysql
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

该文件将在不同端口上运行两个数据库实例。

我们可以使用命令 docker-compose up -d 运行此文件。

现在,让我们通过运行 SQL 脚本来创建 客户 表:

CREATE TABLE customer
(
    id integer NOT NULL,
    fullname character varying(255),
    email character varying(255),
    CONSTRAINT customer_pkey PRIMARY KEY (id)
);

5. 配置

在本节中,我们将配置 Debezium MySQL 连接器并了解如何运行嵌入式 Debezium 引擎。

5.1.配置 Debezium 连接器

为了配置 Debezium MySQL 连接器,我们将创建一个 Debezium 配置 bean:

@Bean
public io.debezium.config.Configuration customerConnector() {
    return io.debezium.config.Configuration.create()
        .with("name", "customer-mysql-connector")
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", "/tmp/offsets.dat")
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", customerDbHost)
        .with("database.port", customerDbPort)
        .with("database.user", customerDbUsername)
        .with("database.password", customerDbPassword)
        .with("database.dbname", customerDbName)
        .with("database.include.list", customerDbName)
        .with("include.schema.changes", "false")
        .with("database.server.id", "10181")
        .with("database.server.name", "customer-mysql-db-server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", "/tmp/dbhistory.dat")
        .build();
}

让我们更详细地检查此配置。

该 bean 中的 create 方法 使用构建器来创建 Properties 对象

无论首选连接器如何,此构建器都会设置引擎所需的多个属性。为了跟踪源 MySQL 数据库,我们使用 MySqlConnector 类。

当此连接器运行时,它开始跟踪源中的更改并记录“偏移量”以确定 它从事务日志中处理了多少数据

有多种方法可以保存这些偏移量,但在本例中,我们将使用类 FileOffsetBackingStore 在本地文件系统上存储偏移量。

连接器的最后几个参数是 MySQL 数据库属性。

现在我们已经有了配置,我们可以创建我们的引擎了。

5.2.运行 Debezium 引擎

DebeziumEngine 充当我们的 MySQL 连接器的包装器。让我们使用连接器配置创建引擎:

private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
      .using(customerConnectorConfiguration.asProperties())
      .notifying(this::handleEvent)
      .build();

    this.customerService = customerService;
}

更重要的是,引擎将为每个数据更改调用一个方法 - 在我们的示例中为 handleChangeEvent

在此方法中,首先, 我们将根据 调用 create() 时指定的格式解析每个事件。

然后,我们找到我们进行的操作并调用 CustomerService 在目标数据库上执行创建/更新/删除功能:

private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
    SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
    Struct sourceRecordChangeValue= (Struct) sourceRecord.value();

    if (sourceRecordChangeValue != null) {
        Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

        if(operation != Operation.READ) {
            String record = operation == Operation.DELETE ? BEFORE : AFTER;
            Struct struct = (Struct) sourceRecordChangeValue.get(record);
            Map<String, Object> payload = struct.schema().fields().stream()
              .map(Field::name)
              .filter(fieldName -> struct.get(fieldName) != null)
              .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
              .collect(toMap(Pair::getKey, Pair::getValue));

            this.customerService.replicateData(payload, operation);
        }
    }
}

现在我们已经配置了 DebeziumEngine 对象,让我们使用服务执行器异步启动它:

private final Executor executor = Executors.newSingleThreadExecutor();

@PostConstruct
private void start() {
    this.executor.execute(debeziumEngine);
}

@PreDestroy
private void stop() throws IOException {
    if (this.debeziumEngine != null) {
        this.debeziumEngine.close();
    }
}

6. Debezium 的实际应用

要查看我们的代码的实际效果,让我们对源数据库的 客户 表进行一些数据更改。

6.1.插入记录

要将新记录添加到 customer 表中,我们将进入 MySQL shell 并运行:

INSERT INTO customerdb.customer (id, fullname, email) VALUES (1, 'John Doe', '[email protected]')

运行此查询后,我们将看到应用程序的相应输出:

23:57:57.897 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746277000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=703,row=0,thread=19},op=c,ts_ms=1617746277422}'
Hibernate: insert into customer (email, fullname, id) values (?, ?, ?)
23:57:58.095 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: CREATE

最后,我们检查一条新记录是否已插入到我们的目标数据库中:

id  fullname   email
1  John Doe   [email protected]

6.2.更新记录

现在,让我们尝试更新最后插入的客户并检查会发生什么:

UPDATE customerdb.customer t SET t.email = '[email protected]' WHERE t.id = 1

之后,我们将得到与插入相同的输出,除了操作类型更改为“UPDATE”,当然,Hibernate 使用的查询是“更新”查询:

00:08:57.893 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,[email protected]},after=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746937000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1040,row=0,thread=19},op=u,ts_ms=1617746937703}'
Hibernate: update customer set email=?, fullname=? where id=?
00:08:57.938 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: UPDATE

我们可以验证目标数据库中约翰的电子邮件已更改:

id  fullname   email
1  John Doe   [email protected]

6.3.删除记录

现在,我们可以通过执行以下命令删除 客户 表中的条目:

DELETE FROM customerdb.customer WHERE id = 1

同样,这里我们操作发生变化,再次查询:

00:12:16.892 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617747136000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1406,row=0,thread=19},op=d,ts_ms=1617747136640}'
Hibernate: delete from customer where id=?
00:12:16.951 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: DELETE

我们可以验证目标数据库上的数据已被删除:

select * from customerdb.customer where id= 1
0 rows retrieved

七、结论

在这篇文章中,我们看到了 CDC 的好处以及它可以解决哪些问题。我们还了解到,如果没有它,我们将不得不批量加载数据,这既耗时又昂贵。

我们还看到了Debezium,一个优秀的开源平台,可以帮助我们轻松解决CDC用例。

与往常一样,本文的完整源代码可以在 GitHub 上获取。