1. 概述

在当今数据驱动的世界中,企业往往需要同时具备传统数据库的结构化优势和现代搜索引擎的强大查询能力。这时,将 MySQLElasticsearch 进行集成就显得尤为重要。

MySQL 是一个广泛使用的开源关系型数据库,擅长结构化数据的存储和事务处理。而 Elasticsearch 是一个分布式的搜索和分析引擎,以其高效的全文搜索和实时分析能力著称。将两者结合,可以构建出一个兼具稳定性和高性能的系统。

本文将带你一步步完成 MySQL 与 Elasticsearch 的集成。我们会讲解基础概念、环境搭建、数据同步方案、查询方式等内容,并附有完整代码示例,方便你快速上手。

2. 为何要将 Elasticsearch 与 MySQL 集成?

将 Elasticsearch 与 MySQL 集成,能充分发挥两者的优点,带来以下核心优势:

更强的搜索能力:Elasticsearch 能显著提升 MySQL 数据的搜索性能,尤其是全文搜索场景。
可扩展的架构:面对大规模文本搜索,MySQL 可能会遇到性能瓶颈,而 Elasticsearch 能高效处理。
实时分析能力:Elasticsearch 支持对海量数据进行实时分析,弥补 MySQL 在这方面能力的不足。
更灵活的查询方式:Elasticsearch 提供了比 SQL 更丰富的查询语法,适合复杂搜索逻辑。
减轻 MySQL 压力:将搜索和部分分析任务交给 Elasticsearch,有助于降低 MySQL 的负载,提升整体性能。

通过这种集成方式,我们可以在 MySQL 中保证数据的完整性与事务性,同时利用 Elasticsearch 实现高性能的搜索和分析能力。

3. 环境搭建

我们以 Ubuntu 22.04 LTS 为例,介绍如何安装和配置 Elasticsearch 和 MySQL,并安装必要的依赖库。

3.1 安装 Elasticsearch

首先添加 Elasticsearch 的官方源:

$ sudo apt update && sudo apt install apt-transport-https
$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
$ echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list

然后安装 Elasticsearch:

$ sudo apt update && sudo apt install elasticsearch

启动并启用服务:

$ sudo systemctl start elasticsearch && sudo systemctl enable elasticsearch

验证是否运行正常:

$ curl -X GET 'http://localhost:9200'

正常情况下你会看到包含版本信息的 JSON 输出。

3.2 配置 Elasticsearch

编辑配置文件 /etc/elasticsearch/elasticsearch.yml,做以下修改:

xpack.security.enabled: false
network.host: 0.0.0.0

重启服务:

$ sudo systemctl restart elasticsearch

⚠️ 注意:关闭安全功能仅用于测试环境,生产环境中应启用安全配置。

3.3 配置 MySQL

更新软件包列表并安装 MySQL:

$ sudo apt update && sudo apt install mysql-server

运行安全配置脚本:

$ sudo mysql_secure_installation

启动服务并设置开机启动:

$ sudo systemctl start mysql && sudo systemctl enable mysql

登录验证:

$ sudo mysql -u root -p

3.4 安装依赖库

为了后续数据同步和查询脚本的运行,我们需要安装以下组件:

$ sudo apt install default-jdk python3 python3-pip
$ pip3 install kafka-python mysql-connector-python elasticsearch
$ sudo apt install logstash

至此,我们的环境已经搭建完成,可以开始数据同步流程。

4. MySQL 与 Elasticsearch 数据同步

4.1 使用 Debezium 实现实时同步

我们使用 Debezium 来实现 MySQL 数据变更的捕获,并将变更事件发送到 Kafka,再由消费者同步到 Elasticsearch。

创建项目目录结构:

$ mkdir ~/elasticsearch-mysql-integration
$ cd ~/elasticsearch-mysql-integration
$ mkdir scripts
$ touch docker-compose.yml register-mysql.json
$ touch scripts/import_books.py scripts/sync_elasticsearch.py

4.2 配置 Debezium

编辑 docker-compose.yml 文件:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:1.9
    ports:
     - 2181:2181
  kafka:
    image: debezium/kafka:1.9
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:1.9
    ports:
     - 3307:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: debezium/connect:1.9
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092

启动服务:

$ sudo docker-compose up -d

注册 MySQL 连接器(register-mysql.json):

{
    "name": "bookstore-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "bookstore",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.bookstore"
    }
}

注册连接器:

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-mysql.json

4.3 创建 MySQL 数据库并初始化数据

连接到 MySQL 容器:

$ sudo docker exec -it elasticsearch-mysql-integration_mysql_1 mysql -uroot -p

执行建库建表语句:

CREATE DATABASE bookstore;
USE bookstore;

CREATE TABLE books (
    id INT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    author VARCHAR(255) NOT NULL,
    publication_year INT,
    isbn VARCHAR(13)
);

INSERT INTO books (title, author, publication_year, isbn) VALUES
('To Kill a Mockingbird', 'Harper Lee', 1960, '9780446310789'),
('1984', 'George Orwell', 1949, '9780451524935'),
('Pride and Prejudice', 'Jane Austen', 1813, '9780141439518'),
('The Great Gatsby', 'F. Scott Fitzgerald', 1925, '9780743273565');

CREATE USER 'bookstore_user'@'%' IDENTIFIED BY 'our_strong_password';
GRANT ALL PRIVILEGES ON bookstore.* TO 'bookstore_user'@'%';
FLUSH PRIVILEGES;

4.4 初始化数据导入 Elasticsearch

编写 import_books.py 脚本实现初始化导入:

import mysql.connector
from elasticsearch import Elasticsearch

# 省略连接部分,请参考原文
...

# 创建索引
...

# 查询 MySQL 并写入 Elasticsearch
...

运行脚本:

$ python3 import_books.py

4.5 处理更新与删除事件

编写 sync_elasticsearch.py 监听 Kafka 消息:

from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
import json

# 省略初始化部分,请参考原文
...

# 处理增删改操作
...

运行脚本:

$ python3 sync_elasticsearch.py

4.6 验证同步效果

在另一个终端执行以下 SQL:

INSERT INTO books (title, author, publication_year, isbn) VALUES ('New Book', 'New Author', 2024, '1234567890123');
UPDATE books SET publication_year = 2025 WHERE title = 'New Book';
DELETE FROM books WHERE title = 'New Book';

观察 sync_elasticsearch.py 的输出是否正确反映了这些变更。

5. 查询数据

5.1 Elasticsearch 查询示例

创建 query_elasticsearch.py 实现以下查询:

  • ✅ 匹配查询(Match Query):按作者名搜索
  • ✅ 范围查询(Range Query):按出版年份范围搜索
  • ✅ 全文搜索(Multi Match Query):跨字段搜索

运行结果示例:

Match query for author: George Orwell
{
  "id": 2,
  "title": "1984",
  "author": "George Orwell",
  ...
}

5.2 结合 MySQL 与 Elasticsearch 查询

创建 combined_query.py,实现:

  • ✅ 使用 Elasticsearch 快速查找某年出版的书籍
  • ✅ 通过 MySQL 获取 ISBN 等详细信息

输出示例:

Books published in 1949:
- 1984 by George Orwell
  ISBN: 9780451524935

这种方式结合了 Elasticsearch 的搜索效率与 MySQL 的事务能力,适用于复杂业务场景。

6. 总结

本文详细介绍了如何将 MySQL 与 Elasticsearch 集成,涵盖环境搭建、数据同步、查询实现等关键步骤。

通过 Debezium 实现了 MySQL 数据变更的实时捕获,并借助 Kafka 将变更同步到 Elasticsearch,构建了实时搜索系统。同时,我们也展示了如何结合两者进行高效查询。

这套方案适用于需要同时支持事务处理和复杂搜索的业务场景,如电商商品搜索、日志分析、内容管理平台等。当然,集成系统也需要注意性能监控、Schema 管理、异常处理等运维层面的问题。


原始标题:Integration of ElasticSearch With MySQL