1. Overview

In today’s data-driven world, businesses often need both the robust relational structure of traditional databases and the powerful search capabilities of modern search engines. This is where the integration of MySQL and Elasticsearch comes into play.

MySQL, a popular relational database management system, excels at storing and managing structured data. Elasticsearch, on the other hand, is a distributed search and analytics engine known for its fast and scalable full-text search capabilities. By combining these two technologies, we can create a powerful system that leverages the strengths of both.

In this tutorial, we’ll explore how to integrate Elasticsearch with MySQL effectively. We’ll cover the basics of both systems, walk through the setup process, and demonstrate how to synchronize data between MySQL and Elasticsearch. Finally, we’ll delve into querying data using Elasticsearch and combining Elastic Search query with MySQL.

2. Why Integrate Elasticsearch with MySQL?

Integrating Elasticsearch with MySQL combines the strengths of both systems offering several benefits:

  • Enhanced search capabilities: Elasticsearch significantly improves search functionality for MySQL data, especially for full-text searches.
  • Scalable architecture: While MySQL can struggle with large-scale text searches, Elasticsearch handles them efficiently.
  • Real-time analytics: Elasticsearch excels at providing real-time analytics on large datasets, complementing MySQL’s transactional capabilities.
  • Flexible querying: Elasticsearch enables more flexible and complex queries, which might be challenging to implement in MySQL alone.
  • Improved performance: By offloading search and certain analytical tasks to Elasticsearch, we can reduce the load on a MySQL database, potentially improving overall system performance.

By integrating these two systems, we can create a robust architecture that maintains data integrity in MySQL while leveraging Elasticsearch’s powerful search and analytics capabilities.

3. Setting Up the Environment

Let’s set up an environment to integrate Elasticsearch with MySQL. To do so, we walk through the process of installing and configuring both systems on Ubuntu 22.04 LTS, as well as setting up the necessary tools and libraries.

3.1. Installing Elasticsearch

To begin with, we add the Elasticsearch repository to the system via apt by running several commands:

$ sudo apt update && sudo apt install apt-transport-https
$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
$ echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list

In particular, these commands do four things:

  • update the package lists
  • install HTTPS support for apt
  • download and add the Elasticsearch GPG key
  • add the Elasticsearch repository to the system

Overall, this prepares the system to install Elasticsearch from the official source.

After adding the repository, we proceed with the installation of Elasticsearch:

$ sudo apt update && sudo apt install elasticsearch

After updating the package lists again to include the new Elasticsearch repository, we install Elasticsearch.

Then, we start and enable the Elasticsearch service:

$ sudo systemctl start elasticsearch && sudo systemctl enable elasticsearch
Created symlink /etc/systemd/system/multi-user.target.wants/elasticsearch.service → /lib/systemd/system/elasticsearch.service.

To verify that Elasticsearch is running correctly, we can then send a simple GET request to its default port:

$ curl -X GET 'http://localhost:9200'
{
  "name" : "Ubuntu",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "0W8o4rxdSniXsf_grVvxvQ",
  "version" : {
    "number" : "8.14.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "93a57a1a76f556d8aee6a90d1a95b06187501310",
    "build_date" : "2024-06-10T23:35:17.114581191Z",
    "build_snapshot" : false,
    "lucene_version" : "9.10.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

If Elasticsearch is running properly, we should see a JSON response containing version information and other details about the Elasticsearch instance.

3.2. Configuring Elasticsearch

After installing Elasticsearch, we may need to adjust its configuration, especially if we encounter issues like receiving an empty reply from the server. To address this, we modify the Elasticsearch configuration file to ensure it’s accessible and not restricted by security settings.

First, let’s open the Elasticsearch configuration file at /etc/elasticsearch/elasticsearch.yml via sudo or as root.

Next, we disable the X-Pack security feature by searching for the line containing xpack.security.enabled and set it to false:

xpack.security.enabled: false

By doing this, we disable the security features that might be preventing access to Elasticsearch but might expose certain aspects of the installation.

Next, we ensure Elasticsearch is listening on the localhost interface by looking for the network.host line and configuring it appropriately:

network.host: 0.0.0.0

Then, we restart the Elasticsearch service to update the configuration:

$ sudo systemctl restart elasticsearch

Lastly, we wait for Elasticsearch to fully initialize and test the configuration by sending the GET request.

Notably, disabling security features is only a troubleshooting step and should not be done in a production environment.

3.3. Configuring MySQL

Next, let’s move on to installing and configuring MySQL.

We start by updating the package lists and installing the MySQL server:

$ sudo apt update && sudo apt install mysql-server

After the installation, it’s important to secure the MySQL setup via the provided mysql_secure_installation script:

$ sudo mysql_secure_installation

Securing the MySQL server deployment.

Connecting to MySQL using a blank password.

...
Press y|Y for Yes, any other key for No: y

There are three levels of password validation policy:

LOW    Length >= 8
MEDIUM Length >= 8, numeric, mixed case, and special characters
STRONG Length >= 8, numeric, mixed case, special characters and dictionary                  file

Please enter 0 = LOW, 1 = MEDIUM and 2 = STRONG: 2

...
Reload privilege tables now? (Press y|Y for Yes, any other key for No) : y
Success.

All done!

Notably, the decisions in the script above are important and should be considered carefully.

Next, we start and enable the MySQL service:

$ sudo systemctl start mysql && sudo systemctl enable mysql
Synchronizing state of mysql.service with SysV service script with /lib/systemd/systemd-sysv-install.
Executing: /lib/systemd/systemd-sysv-install enable mysql

We can then log in as the root user to verify that MySQL is installed and running correctly:

$ sudo mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 10
Server version: 8.0.37-0ubuntu0.22.04.3 (Ubuntu)

Copyright (c) 2000, 2024, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

If we can log in successfully at the password prompt, MySQL should be installed and configured correctly.

3.4. Required Tools and Libraries

To facilitate the integration between Elasticsearch and MySQL, we use a few additional tools and libraries.

First, let’s install Java, required for some Elasticsearch plugins:

$ sudo apt install default-jdk

Next, we install Python and pip, mainly for synchronization scripts:

$ sudo apt install python3 python3-pip

With Python installed, we can now install the necessary Python libraries for interacting with MySQL and Elasticsearch:

$ pip3 install kafka-python mysql-connector-python elasticsearch

This command uses pip to install Kafka, the MySQL connector for Python, and the Elasticsearch Python client.

Lastly, while optional, Logstash can be very useful for data synchronization between MySQL and Elasticsearch:

$ sudo apt install logstash

With these components in place, the environment is now set up for integrating Elasticsearch with MySQL.

4. Synchronizing Data Between MySQL and Elasticsearch

In this section, we create a book database in MySQL, perform an initial import into Elasticsearch, and set up real-time synchronization.

To achieve this integration, we use Debezium. Notably, Debezium is a powerful open-source platform for change data capture (CDC). Further, it enables us to monitor the MySQL database for changes and propagate those changes to ElasticSearch in near real-time ensuring both systems remain synchronized.

4.1. Setting Up the Project Structure

Let’s start by creating a dedicated directory and the necessary files for the project:

$ mkdir ~/elasticsearch-mysql-integration
$ cd ~/elasticsearch-mysql-integration
$ mkdir scripts
$ touch docker-compose.yml register-mysql.json
$ touch scripts/import_books.py scripts/sync_elasticsearch.py

This creates the project path tree:

elasticsearch-mysql-integration/
├── docker-compose.yml
├── register-mysql.json
└── scripts/
    ├── import_books.py
    └── sync_elasticsearch.py

Thus, we have a basic structure.

4.2. Setting Up Debezium for Real-time Synchronization

To set up Debezium for data change capturing, we use Docker for simplicity:

$ sudo apt-get update
$ sudo apt-get install docker.io docker-compose

Now, we edit the docker-compose.yml file in the project root:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:1.9
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:1.9
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:1.9
    ports:
     - 3307:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: debezium/connect:1.9
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

This file provides a configuration setup for four services:

We configure the services to use specific ports and link to each other, ensuring proper communication between them. We then map the MySQL service to port 3307 on the host machine to avoid conflicts with any local MySQL installation.

Now, we can start the services:

$ cd ~/elasticsearch-mysql-integration
$ sudo docker-compose up -d

Next, we configure the Debezium MySQL connector by editing the register-mysql.json file:

{
    "name": "bookstore-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "bookstore",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.bookstore"
    }
}

Finally, we register the Debezium connector:

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-mysql.json
HTTP/1.1 201 Created
Date: Thu, 27 Jun 2024 07:53:43 GMT
Location: http://localhost:8083/connectors/bookstore-connector
Content-Type: application/json
Content-Length: 490
Server: Jetty(9.4.44.v20210927)

{"name":"bookstore-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"dbserver1","database.include.list":"bookstore","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.bookstore","name":"bookstore-connector"},"tasks":[],"type":"source"}

This command sends a POST request with the configuration defined in register-mysql.json to the Debezium Connect service. The successful response indicates that the connector is set up and ready to capture changes from the MySQL database and publish them to Kafka.

4.3. Creating and Populating the MySQL Database

Now, let’s create a book database and populate it with some sample data. First, we connect to the MySQL container:

$ sudo docker exec -it elasticsearch-mysql-integration_mysql_1 mysql -uroot -p

When prompted for a password, we use debezium as specified in the docker-compose.yml.

Then, in the MySQL prompt, we run several commands:

CREATE DATABASE bookstore;
USE bookstore;

CREATE TABLE books (
    id INT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    author VARCHAR(255) NOT NULL,
    publication_year INT,
    isbn VARCHAR(13)
);

INSERT INTO books (title, author, publication_year, isbn) VALUES
('To Kill a Mockingbird', 'Harper Lee', 1960, '9780446310789'),
('1984', 'George Orwell', 1949, '9780451524935'),
('Pride and Prejudice', 'Jane Austen', 1813, '9780141439518'),
('The Great Gatsby', 'F. Scott Fitzgerald', 1925, '9780743273565');

EXIT;

This way, we create the bookstore database with a books table and inserts four sample books.

Next, we create the user and grant the necessary permissions:

mysql> CREATE USER 'bookstore_user'@'%' IDENTIFIED BY 'our_strong_password';
mysql> GRANT ALL PRIVILEGES ON bookstore.* TO 'bookstore_user'@'%';
mysql> FLUSH PRIVILEGES;
mysql> EXIT;

This sequence creates a user bookstore_user and grants all privileges on the database bookstore to the user. Notably, we replace our_strong_password with the password we desire to use.

4.4. Initial Data Import

Now, let’s create a Python script to import this data into Elasticsearch by editing the ~/elasticsearch-mysql-integration/scripts/import_books.py file:

import mysql.connector
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError, ConnectionTimeout

# Connect to MySQL
try:
    mysql_conn = mysql.connector.connect(
        host="localhost",
        user="bookstore_user",
        password="our_strong_password",
        database="bookstore",
    port=3307
    )
    print("Successfully connected to MySQL")
except mysql.connector.Error as err:
    print(f"Error connecting to MySQL: {err}")
    exit(1)

# Connect to Elasticsearch
try:
    es = Elasticsearch(["http://localhost:9200"])
    if not es.ping():
        raise ConnectionError("Cannot connect to Elasticsearch")
    print("Successfully connected to Elasticsearch")
except (ConnectionError, ConnectionTimeout) as e:
    print(f"Error connecting to Elasticsearch: {e}")
    mysql_conn.close()
    exit(1)

# Create the index if it doesn't exist
try:
    if not es.indices.exists(index="books"):
        es.indices.create(index="books")
        print("Created 'books' index in Elasticsearch")
    else:
        print("'books' index already exists in Elasticsearch")
except Exception as e:
    print(f"Error creating index in Elasticsearch: {e}")
    mysql_conn.close()
    exit(1)

# Fetch data from MySQL
cursor = mysql_conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM books")
books = cursor.fetchall()

# Index data in Elasticsearch
for book in books:
    try:
        es.index(index="books", id=book["id"], body=book)
        print(f"Indexed book: {book['title']}")
    except Exception as e:
        print(f"Error indexing book {book['title']}: {e}")

print(f"Imported {len(books)} books into Elasticsearch.")

mysql_conn.close()

We replace our_strong_password with the password we set when creating the user and then run the script:

$ cd ~/elasticsearch-mysql-integration/scripts
$ python3 import_books.py
Successfully connected to MySQL
Successfully connected to Elasticsearch
Created 'books' index in Elasticsearch
Indexed book: To Kill a Mockingbird
Indexed book: 1984
Indexed book: Pride and Prejudice
Indexed book: The Great Gatsby
Imported 4 books into Elasticsearch.

The script imports the books from MySQL into an Elasticsearch index named books.

4.5. Handling Data Updates and Deletions

To handle updates and deletions, we create a Python script that listens to the Kafka topic and updates Elasticsearch accordingly. Let’s edit the scripts/sync_elasticsearch.py file:

from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
import json

consumer = KafkaConsumer(
    'dbserver1.bookstore.books',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x is not None else None
)

es = Elasticsearch(["http://localhost:9200"])

def index_book(book):
    es.index(index="books", id=book["id"], body=book)
    print(f"Indexed book: {book.get('title', 'Unknown Title')}")

for message in consumer:
    if message.value is None:
        print("Received null message. Skipping.")
        continue

    if 'payload' not in message.value:
        print("Unexpected message format:", message.value)
        continue
    
    payload = message.value['payload']
    
    if 'op' not in payload:
        print("Unexpected payload format:", payload)
        continue
    
    if payload['op'] in ['c', 'r', 'u']:  # create, read, or update
        if 'after' not in payload or payload['after'] is None:
            print("Missing or null 'after' in payload:", payload)
            continue
        book = payload['after']
        index_book(book)
    elif payload['op'] == 'd':  # delete
        if 'before' not in payload or payload['before'] is None:
            print("Missing or null 'before' in payload:", payload)
            continue
        book_id = payload['before']['id']
        try:
            es.delete(index="books", id=book_id)
            print(f"Deleted book with id: {book_id}")
        except Exception as e:
            print(f"Error deleting book with id {book_id}: {str(e)}")
    else:
        print(f"Unknown operation: {payload['op']}")

This script handles several operations:

  • c: create
  • r: read
  • u: update
  • d: delete

It also uses a separate function index_book() to index books in Elasticsearch.

Next, let’s run this script to start syncing changes:

$ cd ~/elasticsearch-mysql-integration/scripts
$ python3 sync_elasticsearch.py

This way, we continuously listen for changes in the MySQL database and reflect them in Elasticsearch.

4.6. Verification

To test if it’s working for new changes, we can open another terminal and connect to MySQL using debezium as password:

$ docker-compose exec mysql mysql -u root -p

Then, we can make some changes:

mysql> USE bookstore;
mysql> INSERT INTO books (title, author, publication_year, isbn) VALUES ('New Book', 'New Author', 2024, '1234567890123');
mysql> UPDATE books SET publication_year = 2025 WHERE title = 'New Book';
mysql> DELETE FROM books WHERE title = 'New Book';

We should see these changes reflected in the output of the sync_elasticsearch.py script:

$ python3 sync_elasticsearch.py 
Indexed book: New Book
Indexed book: New Book
Deleted book with id: 7
Received null message. Skipping.

By following these steps, we’ve set up a real-time synchronization system between MySQL and Elasticsearch using Debezium as the change monitor. This enables us to maintain an up-to-date search index in Elasticsearch while using MySQL as our primary data store.

5. Querying Data

Now that we have the data synchronized between MySQL and Elasticsearch, let’s explore how to effectively query this data.

5.1. Elasticsearch Query Examples

Elasticsearch provides a powerful query DSL (Domain Specific Language) that enables complex searches.

To demonstrate, let’s create a new Python script called query_elasticsearch.py in the scripts directory:

from elasticsearch import Elasticsearch
import json

es = Elasticsearch(["http://localhost:9200"])

def print_results(results):
    print(f"Total hits: {results['hits']['total']['value']}")
    for hit in results['hits']['hits']:
        print(json.dumps(hit['_source'], indent=2))
    print("\n")

# Basic match query
def basic_match_query(field, value):
    query = {
        "query": {
            "match": {
                field: value
            }
        }
    }
    results = es.search(index="books", body=query)
    print(f"Match query for {field}: {value}")
    print_results(results)

# Range query
def range_query(field, gte, lte):
    query = {
        "query": {
            "range": {
                field: {
                    "gte": gte,
                    "lte": lte
                }
            }
        }
    }
    results = es.search(index="books", body=query)
    print(f"Range query for {field} between {gte} and {lte}")
    print_results(results)

# Full-text search
def full_text_search(search_term):
    query = {
        "query": {
            "multi_match": {
                "query": search_term,
                "fields": ["title", "author"]
            }
        }
    }
    results = es.search(index="books", body=query)
    print(f"Full-text search for '{search_term}'")
    print_results(results)

# Run queries
basic_match_query("author", "George Orwell")
range_query("publication_year", 1900, 1950)
full_text_search("kill mockingbird")

Next, we run this script:

$ python3 ~/elasticsearch-mysql-integration/scripts/query_elasticsearch.py
Match query for author: George Orwell
Total hits: 1
{
  "id": 2,
  "title": "1984",
  "author": "George Orwell",
  "publication_year": 1949,
  "isbn": "9780451524935"
}


Range query for publication_year between 1900 and 1950
Total hits: 2
{
  "id": 2,
  "title": "1984",
  "author": "George Orwell",
  "publication_year": 1949,
  "isbn": "9780451524935"
}
{
  "id": 4,
  "title": "The Great Gatsby",
  "author": "F. Scott Fitzgerald",
  "publication_year": 1925,
  "isbn": "9780743273565"
}


Full-text search for 'kill mockingbird'
Total hits: 1
{
  "id": 1,
  "title": "To Kill a Mockingbird",
  "author": "Harper Lee",
  "publication_year": 1960,
  "isbn": "9780446310789"
}

Indeed, this script demonstrates three types of Elasticsearch queries:

  1. basic match query to find books by a specific author
  2. range query to find books published within a certain period
  3. full-text search across title and author fields

Further, the results of each query are printed showing the total hits and detailed information for each matching document.

5.2. Combining MySQL and Elasticsearch Queries

While Elasticsearch is great for full-text search and analytics, there might be scenarios where we need to combine data from Elasticsearch with data from MySQL. So, let’s create a script called combined_query.py that demonstrates this:

from elasticsearch import Elasticsearch
import mysql.connector
import json

es = Elasticsearch(["http://localhost:9200"])

mysql_conn = mysql.connector.connect(
    host="localhost",
    user="bookstore_user",
    password="our_strong_password",
    database="bookstore",
    port=3307
)
cursor = mysql_conn.cursor(dictionary=True)

def search_books_by_year(year):
    # Elasticsearch query
    es_query = {
        "query": {
            "match": {
                "publication_year": year
            }
        }
    }
    es_results = es.search(index="books", body=es_query)
    
    print(f"Books published in {year}:")
    for hit in es_results['hits']['hits']:
        book = hit['_source']
        print(f"- {book['title']} by {book['author']}")
        
        # Fetch additional info from MySQL
        cursor.execute("SELECT isbn FROM books WHERE id = %s", (book['id'],))
        mysql_result = cursor.fetchone()
        if mysql_result:
            print(f"  ISBN: {mysql_result['isbn']}")
        print()

# Example usage
search_books_by_year(1949)

mysql_conn.close()

Then, we run this script:

$ python3 ~/elasticsearch-mysql-integration/scripts/combined_query.py
Books published in 1949:
- 1984 by George Orwell
  ISBN: 9780451524935

This script shows two concepts:

  • use Elasticsearch to quickly find books published in a specific year
  • for each book found, fetch additional information (in this case, the ISBN) from MySQL

This way, we leverage Elasticsearch’s fast search capabilities while still enabling access to data that might only be stored in MySQL. Thus, we get the benefits of Elasticsearch’s fast full-text search and analytics capabilities and still maintain the robustness and relational capabilities of MySQL.

6. Conclusion

In this article, we’ve explored the powerful synergy between MySQL and Elasticsearch, creating an integrated system that harnesses the strengths of both technologies.

Initially, we walked through the process of setting up the environment, implementing real-time data synchronization, and leveraging advanced querying capabilities. This integration enables us to maintain MySQL’s robust transactional features while tapping into Elasticsearch’s superior full-text search and analytics capabilities.

Further, we established a foundation for building applications that can handle complex data storage needs alongside rapid, sophisticated search functionalities. In this case, the real-time synchronization ensures that data remains consistent across both systems. However, maintaining this integrated system may require regular performance monitoring, careful handling of schema changes, and robust error management.