How to implement Transactional Outbox design pattern in Spring Boot Microservices?

Let’s say you have created a microservice.

One of the APIs in the microservice does two operations:

  • update a database
  • send a message to another service

How can you make sure both are transactional?

In other words if database update fails don’t send the message to the other service and if message sending fails rollback the database update?

In Spring you handle transactions using the @Transactional annotation.

But this works only at the database level.

If you are sending a message to another service preferrable in an asynchronous way then the annotation wont work.

Distributed Transactions (XA) may not work since messaging systems like Apache Kafka don’t support them.

A solution to the above problem is the Transactional Outbox pattern.

Let’s study this pattern in this post

Contents:

  1. An Use Case explaining the problem
  2. What is transactional outbox pattern?
  3. Transactional outbox with Polling publisher
  4. Transactional outbox with Transaction Log Trailing
  5. Implementation in Spring Boot

An Use Case:

Let’s say you run a coffee shop.

You have an application to take orders for coffee.

Customer places an order at the entrance and then goes to the barista to collect it.

You have a bunch of microservices to manage your coffee shop.

And one of them is to take orders .

The “Order Service” stores an order in database as soon as an order is placed and sends an asynchronous message to the barista “Delivery Service” to prepare the coffee and give it to the customer.

You have kept the delivery part to the barista(“delivery service”) as asynchronous for scalability.

Now,

Let’s say a customer places an order and order is inserted into the order database.

While sending the message to the “Delivery Service” some exception happens and the message is not sent.

The order entry is still in the database though leaving the system in an inconsistent state.

Ideally you would roll back the entry in the orders database since placing the order and sending an event to the delivery service are part of the same transaction.

But how do you implement transaction across two different types of systems :

A database and

A messaging service.

Such a scenario is quite common in the microservices world.

If the two operations are database operations it would be easy to handle the transaction. Use @Transactional annotation provided by Spring Data.

Here the scenario is different.

And hence the solution is:

Use Transactional Outbox pattern

What is Transactional Outbox pattern?

Transactional Outbox pattern mandates that you create an “Outbox” table to keep track of the asynchronous messages. For every asynchronous message, you make an entry in the “Outbox” table. You then perform the database operation and the “Outbox” insert operations as part of the same transaction.

This way if an error happens in any of the two operations the transaction is rolled back.

You then pick up the messages from the ‘Outbox’ and deliver it to your messaging system like Apache Kafka.

Also once the message is delivered delete the entry from the Outbox so that it is not processed again.

So let’s say you perform two different operations in the below order as part of a single transaction:

  1. Database Insert
  2. Asynchronous Message (Insert into Outbox table)

If step 1 fails anyway exception will be thrown and step 2 won’t happen.

If step 1 succeeds and step 2 (insert into outbox table) fails the transaction will be rolled back.

If the order of operations are reversed:

  1. Asynchronous Message (Insert into Outbox table)
  2. Database Insert

Then if step 1 fails similar to the previous case exception will be thrown and step 2 won’t happen.

If step 1 succeeds and step 2 (Database Insert) fails then the transaction will be rolled back and the entry in Outbox table will be removed. Since it is part of the same transaction , the insert into Outbox table earlier was not committed and hence the asynchronous message won’t be sent.

In our case ,

When a customer places an order , we make an entry in Orders database and another entry in Outbox table.

Once the above transaction completes we pick up the messages from the Outbox table and send it the “Delivery Service”. Notice that if some error happens and the “Delivery Service” did not receive the message , the messaging system like Apache Kafka will automatically retry to deliver the message.

That summarizes the Outbox pattern.

Now there are two ways to pick up the messages from the Outbox and deliver it to the external service.

  1. Polling Publisher
  2. Transaction Log trailing.

Let’s see each of them.

Polling Publisher

In Polling Publisher pattern you periodically poll the “Outbox” table , pick the messages , deliver to the messaging service and delete the entry from the Outbox table.

You can use Spring Batch or Spring Scheduler (@Scheduled annotation) to implement this.

The drawback with this method is polling is an expensive operation.

Also you block the table while polling it.

And if you use a non relational database like MongoDB polling could get complicated.

Hence the second way – “Transaction Log Trailing” is a better option to implement this .

Transaction Log Trailing

In Transaction Log Trailing instead of polling the table , you read the database logs.

Every commit made to a table is written to a database transaction log.

So instead of reading the table you read the log as soon as an entry is made .

This way the table is not blocked and you can avoid expensive database polling.

Tools like “Debezium” help in capturing database transaction logs.

Let’s see how to implement Transactional Outbox pattern with Transaction Log Trailing in Spring Boot using Debezium.

Implementation:

Let’s create two microservices:

“orderservice”

“deliveryservice”

Let’s create an order through orders service. And then let’s publish an event that order has been created. Both these need to be part of the same transaction.

delivery service will read this event and perform the necessary delivery logic. We will not deal with the delivery logic , we will just read the message sent by order service for this example.

The order service

Create a spring boot application with spring-boot-starter-web , spring-boot-starter-data-jpa and mysql-connector-java (since we are connecting to mysql database in this example)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.7</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>orders</artifactId>
	<version>1.0.0</version>
	<name>orders</name>
	<description>Demo for Trans Outbox Design Pattern</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Here are the properties:

spring:
  jpa:
    database: mysql
  datasource:
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://host.docker.internal:3307/orders
    username: root
    password: root
  sql:
    init:
      platform: org.hibernate.dialect.MySQL5Dialect

I used docker to create mysql server and hence have used host.docker.internal domain to access the MySQL server inside the container from outside (from my local).

Create an API to create orders:

package com.example.orders;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrdersController {

	@Autowired
	private OrdersService service;

	@PostMapping("/order")
	public Map<String,Object> createOrder(@RequestBody Map<String,Object> order) {

		return this.service.createOrder(order);

	}
}

A sample input would be:

Here is the service class which creates the order and also saves the event in an outbox table:

package com.example.orders;

import java.time.LocalDateTime;
import java.util.Map;

import javax.transaction.Transactional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class OrdersService {

	@Autowired
	private OrdersRepo repo;

	@Autowired
	private OutboxRepo outboxRepo;

	
	@Transactional
	public Map<String,Object> createOrder(Map<String,Object> orderMap) {

		
		CustomerOrder order = new CustomerOrder();
		order.setName(orderMap.get("name").toString());
		order.setQuantity(Integer.parseInt(String.valueOf(orderMap.get("quantity"))));
		this.repo.save(order);

		Outbox outbox = new Outbox();

		outbox.setEvent("order_created");
		outbox.setEventId(order.getId());

		outbox.setPayload(orderMap);

		outbox.setCreatedAt(LocalDateTime.now());

		System.out.println(outbox);
		this.outboxRepo.save(outbox);
		
		
		//delete immediately - still the entry will be picked up from the logs as there was an insert
		//in the previous line
		
		this.outboxRepo.delete(outbox);
		
		
		return Map.of("orderId",order.getId());

	}
}

As you notice we create a CustomerOrder and store it in order table.

And then we store the event in an outbox table.

The outbox table consists of the order id and the order details (in payload field).

The entry in the outbox table is immediately deleted , this way we can make sure the same message will not be picked up by Apache Kafka (our messaging system in this example) and the table won’t grow in size and occupy memory. Though the entry is immediately deleted still an entry will be made in the database transaction log for the insert operation and that is all we need .

Here is the CustomerOrder entity class:

package com.example.orders;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "customer_order")
public class CustomerOrder {
	
	
	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private int id;
	
	private int quantity;
	
	private String name;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public int getQuantity() {
		return quantity;
	}

	public void setQuantity(int quantity) {
		this.quantity = quantity;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	@Override
	public String toString() {
		return "Order [id=" + id + ", quantity=" + quantity + ", name=" + name + "]";
	}
	
	

}

Here is the outbox table:

package com.example.orders;

import java.time.LocalDateTime;
import java.util.Map;

import javax.persistence.Column;
import javax.persistence.Convert;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class Outbox {

	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private int id;

	private String event;

	@Column(name = "event_id")
	private int eventId;

	
	@Convert(converter = JsonToMapConverter.class)
	private Map<String, Object> payload;

	@Column(name = "created_at")
	private LocalDateTime createdAt;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getEvent() {
		return event;
	}

	public void setEvent(String event) {
		this.event = event;
	}

	public int getEventId() {
		return eventId;
	}

	public void setEventId(int eventId) {
		this.eventId = eventId;
	}

	public Map<String, Object> getPayload() {
		return payload;
	}

	public void setPayload(Map<String, Object> payload) {
		this.payload = payload;
	}

	public LocalDateTime getCreatedAt() {
		return createdAt;
	}

	public void setCreatedAt(LocalDateTime createdAt) {
		this.createdAt = createdAt;
	}

	@Override
	public String toString() {
		return "Outbox [id=" + id + ", event=" + event + ", eventId=" + eventId + ", payload=" + payload
				+ ", createdAt=" + createdAt + "]";
	}
	
	

}

Here is the SQL query which was used to create the outbox table:

create table orders.outbox(id int AUTO_INCREMENT primary key, event varchar(1000),event_id int,payload json,created_at timestamp);

Similar query was used to create the customer_order table:

create table orders.customer_order(id int AUTO_INCREMENT primary key, name varchar(1000),quantity int);

That summarizes the orderservice.

Notice that there is no code written to send an event !

This is because we follow Transaction Log Trailing pattern here.

Whenever an insert is made into Outbox table a Kafka event will be generated.

Let’s see how this is done.

Setting up Debezium:

Let’s set up Debezium , a tool which reads transaction logs and generates Kafka events.

You can refer this post to know about Debezium and how to set it up.

We need to set up Apache Kafka for sending the events and Kafka Connector to connect our MySQL database with Apache Kafka before setting up Debezium.

To set up Apache Kafka , you also need to set up Apache Zookeeper.

You can set up these by simply running the respective docker images:

Apache Zookeeper:

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9

Apache Kafka:

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9

MySQL:

 docker run -it --rm --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=root mysql

MySQL Client:

 docker run -it --rm --name mysqlterm --link mysql --rm mysql sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

Once you set up MySQL client you can create the order and outbox tables.

Kafka connector:

 docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9

Once the Kafka Connector is set up , you need to activate debezium connector.

To do that you just need to hit a connector REST API (http://localhost:8083/connectors) with the below request:

{
    "name": "orders-connecter",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "host.docker.internal",
        "database.port": "3307",
        "database.user": "root",
        "database.password": "root",
        "database.server.id": "100",
        "database.server.name": "orders_server",
        "database.include.list": "orders",
        "table.include.list":"orders.outbox",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema_changes.orders",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}

As you notice in the request , I have included “orders” database and then “orders.outbox” table for transaction log trailing in the above request using “database.include.list” and “table.include.list” properties respectively. You give your own name to the database server (orders_server in the above case).

Once you make the request , Kafka will start sending events for every database operation on the table outbox. Debezium will keep reading the database logs and send those events to Apache Kafka through Kafka Connector.

Now you need to listen for this event in your “deliveryservice” for the topic “orders_server.orders.outbox” (server name + table name)

The Delivery Service:

Create a spring boot application with spring-kafka dependency.

Here is a sample pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.7</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>delivery</artifactId>
	<version>1.0.0</version>
	<name>kafkaconsumer</name>
	<description>Demo project for Transactional Messaging</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

	
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Create a configuration class to configure the Kafka Server details and the deserializer (how to deserialize the message sent by Kafka):

package com.example.delivery;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
@EnableKafka
public class ReceiverConfig {

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host.docker.internal:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");

		return props;
	}

	@Bean
	public ConsumerFactory<String, KafkaMessage> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
				new JsonDeserializer<>(KafkaMessage.class));
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());

		return factory;
	}

}

Create a service class which listens for the messages sent by Kafka:

package com.example.delivery;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class DeliveryService {

	
	@KafkaListener(topics = "orders_server.orders.outbox")
	public void receive(KafkaMessage message) {
		
		
		System.out.println(message);
	}
}

Notice the topic we are listening for. We are just printing the message here. In real time we would be performing the delivery logic here.

Here is the KafkaMessage domain object which represent the Kafka Message:

package com.example.delivery;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaMessage {
	
	
	private PayLoad payload;

	public PayLoad getPayload() {
		return payload;
	}

	public void setPayload(PayLoad payload) {
		this.payload = payload;
	}

	@Override
	public String toString() {
		return "KafkaMessage [payload=" + payload + "]";
	}
	
	
	

}

@JsonIgnoreProperties(ignoreUnknown = true)
 class PayLoad{
	
	
	int id;
	
	String event;
	
	@JsonProperty("event_id")
	int eventId;
	
	
	String payload;
	
	@JsonProperty("created_at")
	String createdAt;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getEvent() {
		return event;
	}

	public void setEvent(String event) {
		this.event = event;
	}

	public int getEventId() {
		return eventId;
	}

	public void setEventId(int eventId) {
		this.eventId = eventId;
	}

	public String getPayload() {
		return payload;
	}

	public void setPayload(String payload) {
		this.payload = payload;
	}

	public String getCreatedAt() {
		return createdAt;
	}

	public void setCreatedAt(String createdAt) {
		this.createdAt = createdAt;
	}

	@Override
	public String toString() {
		return "PayLoad [id=" + id + ", event=" + event + ", eventId=" + eventId + ", payload=" + payload
				+ ", createdAt=" + createdAt + "]";
	}
	
	
	
}

 

Kafka Message contains a lot of info , we are just interested in the “payload” object and it is mapped in the above domain object.

Since the above app interacts with Kafka and I used docker images to build those , I built a docker image for this service as well (it gets complicated to interact with Kafka inside a docker container from outside).

The below command builds a docker image for the above spring boot app.

mvnw clean install spring-boot:build-image

It created an image under the name deliveryservice:1.0.0

The below command runs the docker image:

docker run -it --rm deliveryservice:1.0.0

Testing

Now let’s test our changes.

Create a new order by hitting the orderservice API:

And notice a message in the delivery service API!

KafkaMessage [payload=PayLoad [id=1, event=order_created, eventId=1, payload={"name":"Latte Coffee","quantity":5}, createdAt=2022-05-19T15:03:41Z]]

The delivery service can then start storing these orders internally in its own tables and start preparing coffee for each order and deliver it in their own time

Link to the order service and delivery service:

https://github.com/vijaysrj/orderservice

https://github.com/vijaysrj/deliveryservice

That’s it!

Comments

4 responses to “How to implement Transactional Outbox design pattern in Spring Boot Microservices?”

  1. Lol Nice Avatar
    Lol Nice

    You are incorrect in a part of your post. Take this example:

    1) You run an insert statement to a database
    2) You send a message to queue
    3) Commit database transaction
    4) Commit message

    If step #4 fails, we are left at an inconsistent state, because the database transaction has already been committed.

  2.  Avatar
    Anonymous

    We don’t commit the message , it will be in the queue of Apache Kafka , kafka will take care of redelivering undelivered messages if it was not able to deliver the message for some reason like network failure .

    But there can be an issue at the receiver end while receiving the message which as you say leaves the system in inconsistent state .

    So the problem is not completely solved here as you say but it increases the probability of consistency.

  3.  Avatar
    Anonymous

    Also if there is some issue at the receiver end , the message will still be in the Kafka queue and you need to implement some logic there like dead letter queue pattern .

  4.  Avatar
    Anonymous

    Could you pleae explain when to use SAGA pattern?

Leave a Reply to AnonymousCancel reply

Discover more from The Full Stack Developer

Subscribe now to keep reading and get access to the full archive.

Continue reading