How to implement Change Data Capture (CDC) using Debezium?

Photo by Alex Andrews on Pexels.com

Let’s say you have created an application which writes some data to the database.

And you want to track all operations done on the database.

How can you do this without blocking the user in a non intrusive way?

And that too without adding any extra code to the application?

CDC (Change Data Capture) is one solution for this.

You can capture every data change and use it to audit user activity or do ETL (Extract Transform and Load) or implement Transactional Outbox pattern ( a design pattern for microservices).

Let’s see how to do it .

Let’s use Debezium tool to implement this.

Debezium is an open source distributed platform for Change Data Capture.

It streams database changes as events into Apache Kafka.

Also if you want to capture data changes to your database using Debezium , it does it by reading SQL logs so that the user is not blocked and there is no performance hit. At the same time the data capture takes in real time.

Let’s see the steps involved:

STEP1: Start Apache Zookeeper

STEP2: Start Apache Kafka

STEP3: Start MySQL server

STEP4: Start MySQL client

STEP5: Start Apache Kafka Connect

STEP6: Activate Debezium Connector

STEP7: Start Kafdrop

STEP8: Test using a Spring Boot App

Debezium uses Apache Kafka to stream data changes as events , so you need to install Apache Kafka , Apache Zoopkeeper(used to manage Apache Kafka servers) and Apache Kafka Connect for this to work.

Apache Kafka Connect is needed since we don’t want to write any extra code to our application to capture the data changes . It is a data integration system which automatically captures changes to database and many other data sources like GraphQL, ElastiSearch,AWS S3 etc.

In addition since we are capturing changes to a database in this example , we need to install MySQL server. We will also install a MySQL client to view our tables.

Finally we will install KafDrop , a UI tool to monitor Kafka topics. This is to test if the data changes are reflected as messages in Apache Kafka topics.

Finally we will create a simple spring boot application which performs CRUD operations on the database to test our changes.

Since installing all these tools manually can be time consuming , we will just download docker images and run them.

Prerequisite: Docker is installed in your system (install from here if you haven’t)

STEP1: Start Apache Zookeeper:

Run the below docker command to start Apache Zookeeper:

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9

The parameters are not explained here for simplicity . You can refer the link at the end of the post to know them.

Zookeeper is now running as a docker container.

STEP 2: Start Apache Kafka

Use the below docker command to start apache kafka

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9

Apache Kafka is running now and it listens at the IP 172.17.0.3 and port 9092 as shown in the logs above. We will need this IP later to set up Kafdrop to monitor the messages in Kafka topics.

STEP 3: Start MySQL server:

Start MySQL server using the below docker command:

docker run -it --rm --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9

Notice that the server is deployed within the container at port 3036 but it is exposed to the outside world (outside container) at port 3037. This is because I already had a SQL server in my local at port 3306.

The above docker image for MySQL contains some prebuilt tables as well which we can use for CDC (Change Data Capture)

STEP 4: Start MySQL client:

Start MySQL client by running the below docker command:

docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

Once you run this command a new MySQL command line client will get installed.

You can then use the client to query the tables.

First switch to the database “inventory” which comes inbuilt with the previous MySQL database server image.

Use the command “use inventory”

Then you can find the list of tables using “show tables” command:

There are 6 tables in the above database.

We will just monitor “customers” table.

First let’s look at the all the records in this table:

There are four records in the table.

STEP 5: Start Apache Kafka Connect

Use the below command to Apache Kafka Connect:

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9

As you notice it is linked to the other docker images using –link attribute so that it has access to Zookeeper, Kafka and MySQL server.

You can then go to localhost:8083 to see if Kafka Connect is running:

Kafka Connect is our data integration tool.

It is going to monitor the database for changes .

But for this you need a “connector”.

Kafka Connect uses various connectors to connect to different datasources.

For connecting to database and to capture data change we need to use a debezium connector.

But this connector need not be installed or configured.

You just need to make a REST API call to activate it as shown in next step.

STEP6: Activate Debezium Connector

Most of our tools are now ready.

You need to now tell Apache Kafka Connect to monitor our specific database “inventory” deployed on a particular server.

You can do this by calling a REST API with the configuration details in a JSON request.

Here is the request:

{
  "name": "inventory-connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "mysql",  
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",  
    "database.server.name": "dbserver1",  
    "database.include.list": "inventory",  
    "database.history.kafka.bootstrap.servers": "kafka:9092",  
    "database.history.kafka.topic": "schema-changes.inventory"  
  }
}

The connector name is “inventory-connector”.

And we are using Debezium MySQL connector as the connector class. These are inbuilt connectors already deployed in Apache Kafka Connect. We just need to load them by passing this request to a REST API.

tasks.max indicates the maximum number of connector tasks that should be activated and it should be one.

Database configuration are given in database.hostname , database.port, database.user and database.password.

database.server.id and database.server.name are unique identifiers for the database on the Apache Kafka Connect server.

You can configure which databases to monitor for CDC using database.include.list property.

The last two properties are related to Apache Kafka Server.

The above request need to be sent to the url :

http://localhost:8083/connectors

I used postman to send the request and the connector instance got created.

You can verify it by hitting localhost:8083/connectors on the browser:

Our connector is now ready.

This is the connector which is going to monitor the database “inventory” and send messages to Apache Kafka topics. The topic name will be the server id sent in the above request followed by the database name followed the table name. For example dbserver1.inventory.customers

For each table a different topic will be generated.

And messages will be sent to each topic for the corresponding table change.

Now that out connector is ready lets install a UI tool to look at the actual messages sent to Apache Kafka.

STEP 7: Start Apache Kafdrop:

As mentioned earlier , Apache Kafdrop is an UI tool to monitor Kafka messages.

Use the below docker command to install it:

docker run -it -p 9000:9000 -e KAFKA_BROKERCONNECT=172.17.0.3:9092 obsidiandynamics/kafdrop

The address 172.17.0.3:9092 in the above command is the Apache Kafka Server address we got from the server logs while starting Apache Kafka Server.

You can then load localhost:9000 in the browser to see the UI:

Let’s explore one of the topics above (You need to click on View Messages button to see the messages):

There are already some messages in the topic because the table “addresses” already contain some records.

As soon as we activate the connector it first generates events for each existing record in the tables so that you get a snapshot of the current database.

Now let’s create a small spring boot application and do CRUD operations on a particular table “customers”.

And then we will look at the topic “dbserver1.inventory.customers” for new messages

STEP 8: Test using a Spring Boot App

I created a simple Spring Boot App with the below three dependencies:

	<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
	
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-rest</artifactId>
		</dependency>
			

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>

Spring data jpa is to interact with the MySQL database and

Spring boot starter data rest is to expose the tables automatically as a REST resource.

mysql connector java is needed to connect to MySQL database from Spring.

Created the below entity class to represent “customers” table:

package com.example.cdc;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="customers")
public class Customer {

	@Id
	private int id;
	
	private String first_name;
	
	private String last_name;
	
	private String email;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getFirst_name() {
		return first_name;
	}

	public void setFirst_name(String first_name) {
		this.first_name = first_name;
	}

	public String getLast_name() {
		return last_name;
	}

	public void setLast_name(String last_name) {
		this.last_name = last_name;
	}

	public String getEmail() {
		return email;
	}

	public void setEmail(String email) {
		this.email = email;
	}
	
	
}

To expose the above entity as a REST resource , created the below CRUD Repository and annotated it with @RespositoryRestResource:

package com.example.cdc;

import org.springframework.data.repository.CrudRepository;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;


@RepositoryRestResource
public interface CustomersRepo extends CrudRepository<Customer, Integer> {

}

Below are the database configuration details in application.yml:

spring:
  jpa:
    database: mysql
  datasource:
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://host.docker.internal:3307/inventory
    username: mysqluser
    password: mysqlpw
  sql:
    init:
      platform: org.hibernate.dialect.MySQL5Dialect

As you notice the host name for the database is host.docker.internal because the MySQL server runs inside a docker container and this is the way to access it from outside the container. The port 3307 is the port I exposed to the outside world while running the MySQL server image. The user name and password are the same we gave while running the MySQL docker image.

Now if you start the application the entity customer will be automatically exposed as a REST resource and you can perform CRUD operations .

Load the home url in the browser (localhost:8080). This will give the list of REST resources in the application :

Now let’s retrieve all the customer records and modify one of them

Let’s update the record with the id 1003 as seen above using postman:

let’s confirm if the record has been updated:

It got updated.

Now let’s see if there is a new message in the Kafka topic “dbserver1.inventory.customers”:

Click on View messages button shown below:

You can see a new message at the end. Expand the blue arrow to see the change:

Look at the payload object as shown above and it reflects the change.

We have got a new message in Apache Kafka !

In short we have captured the data change.

And this change has been read by Debezium from MySQL logs directly so it doesn’t block the user and happens in real time.

Now that we have captured the event in Apache Kafka , applications can subscribe to it and perform various operations like replicating the database in another server , auditing database operations , performing ETL etc.

That’s it!

Reference:

Tutorial :: Debezium Documentation

Comments

Leave a Reply

Discover more from The Full Stack Developer

Subscribe now to keep reading and get access to the full archive.

Continue reading