How to implement SAGA design pattern in Spring Boot?

Contents:

  1. Problem
  2. Solution
  3. Implementation
  4. Code
  5. Testing
  6. Advantages
  7. Disadvantages
  8. Conclusion

Problem:

Microservices come with their own advantages and disadvantages.

One such disadvantage is managing distributed transactions.

Let’s say your transaction spans 4 different microservices.

How do you ensure that your transaction either commits successfully with all the 4 tasks succeeding or fails successfully if any of the tasks is not completed ( the completed ones are rolled back)?

Spring Boot provides the annotation @Transactional to manage transactions.

But this works only within a single method and within a single project.

Solution:

There is a design pattern which solves the issue with distributed transactions in microservices.

It was originally proposed by the computer scientist Hector Gracia Molina and Kenneth Salem as mentioned in their paper here.

As suggested in the paper , they created this for Long Lived Transactions (LLT) and not for microservices but it serves well for microservices.

A long lived transaction is a transaction which takes a longer time , may be minutes , hours or even days. You can’t lock a database until all the tasks in such transaction completes for it will severely affect the performance of the application. Hence they came up with the design pattern SAGA (probably named SAGA because they created it for dealing with long transactions – SAGA means a very long story).

It goes like this :

If your transaction contains , lets say 4 tasks,

You create a compensating task for each task except the last.

So that if any task fails then you run the compensating tasks of the previous tasks to rollback the original effect.

So if there are four tasks T1, T2 , T3 and T4,

Then you have three corresponding compensating tasks C1,C2,C3.

If , for example , T1 and T2 succeeds and T3 fails , then you run C2 and C1 to rollback the effects of T1 and T2 in Last In First Out order.

So the sequence of the transaction goes like this : T1 -> T2 -> T3 (failed) -> C2 -> C1.

You don’t need a compensating task for the last task because if the last task fails you just need to roll back the previous tasks.

This is called Backward Recovery since you go back and execute the compensating tasks of already completed successful tasks.

You can also try Forward Recovery by retrying T3 and then T4 if your business use case requires it.

Backward Recovery is more common though.

On a high level this is what SAGA design pattern is.

It can be implemented in two ways:

  • Choreography
  • Orchestration

Choreography means the tasks execute independently. Once one task is completed , it invokes the next tasks in the sequence. In case if the next task fails then it invokes the compensating tasks for the previous tasks.

Orchestration means the tasks are invoked by another parent task.

It plays the role of an orchestrator.

It calls each tasks in sequence and based on their response decides whether to call the next task or the compensating tasks.

Let us implement SAGA using Choreography in this example.

It is simpler and neat compared to Orchestration.

Implementation:

Let’s consider the example of a ecommerce application.

A customer places an order and the order gets shipped.

This is the business use case.

Let’s say there are four different microservices to take care of this flow.

An order microservice which handles the customer orders.

A payment microservice which handles payments for the orders.

An inventory microservice which updates the inventory once orders are placed.

A shipping microservice which deals with delivering the orders.

Note that in real case , separating these functionalities into four different apps may not be a good design . If your user base is small you can do all the above in a single monolith instead of four different microservices which is going to increase your network calls and infrastructure cost. Also handling transactions in a monolith is way more easier.

For this example though , we will go with this design to understand SAGA design pattern.

Now let’s consider the below functions in each microservice when a customer places an order:

  1. createOrder() – Oder microservice
  2. processPayment() – Payment microservice
  3. updateInventory() – Inventory microservice
  4. shipOrder() – Shipping Microservice

When a customer places an order and createOrder() , processPayment() methods succeed and updateInventory() method fails then the system will have a wrong inventory information. And the customer won’t get her order shipped!

So all these tasks have to be part of a single transaction.

You can use SAGA design pattern to implement distributed transaction.

To resolve the above issue , you can rollback the entire transaction using backward recovery.

You can have a compensation task for each of the tasks above.

Here are the compensating tasks

  1. reverseOrder() – Order microservice
  2. reversePayment() – Payment microservice
  3. reverseInventory() – Inventory microservice

Here is the updated flow:

Now if updateInventory() method fails, then you call reversePayment() and then reverseOrder() and the order will be rolled back!

Code:

ORDER MICROSERVICE:

Let’s first look at the order microservice.

Here is the controller class to add new orders:

package com.order.microservice;

import java.util.List;
import java.util.Optional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrderController {

    @Autowired
    private OrderRepository repository;

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @PostMapping("/orders")
    public void createOrder(@RequestBody CustomerOrder customerOrder) {

        OrderEntity order = new OrderEntity();
        try {
            // save order in database

            order.setAmount(customerOrder.getAmount());
            order.setItem(customerOrder.getItem());
            order.setQuantity(customerOrder.getQuantity());
            order.setStatus("CREATED");
            order = this.repository.save(order);

            customerOrder.setOrderId(order.getId());

            // publish order created event for payment microservice to consume.

            OrderEvent event = new OrderEvent();
            event.setOrder(customerOrder);
            event.setType("ORDER_CREATED");
            this.kafkaTemplate.send("new-orders", event);
        } catch (Exception e) {

            order.setStatus("FAILED");
            this.repository.save(order);

        }

    }


}

As you see when an order is placed , we create an entry in database and mark it as success.

And then we publish an order created event using Apache Kafka so that payment microservice can receive it and process the payment.

If the above logic fails we mark the order status as failed and save it to the database.

The Compensating Task:

Now let’s see the compensating task for order microservice.

If payment microservice publishes an event to reverse the order then this task will be executed.

Let’s create a class to implement this:

package com.order.microservice;

import java.util.Optional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PathVariable;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class ReverseOrder {

    @Autowired
    private OrderRepository repository;

    @KafkaListener(topics = "reversed-orders", groupId = "orders-group")
    public void reverseOrder(String event) {

        try {
            OrderEvent orderEvent = new ObjectMapper().readValue(event, OrderEvent.class);

            Optional<OrderEntity> order = this.repository.findById(orderEvent.getOrder().getOrderId());

            order.ifPresent(o -> {
                o.setStatus("FAILED");
                this.repository.save(o);
            });
        } catch (Exception e) {

            e.printStackTrace();
        }

    }
}

As you see , we listen for the “reversed-orders” topic in Apache Kafka and once we receive any event under that topic , we fetch it , extract the order details from it and mark the status in the database as failed (You could also remove the row entirely but marking it as failed gives us a chance to retry the order later and also do analytics).

For our example , we are using in memory database and Apache Kafka running in local.

Here is the configuration for order microservice:

spring.h2.console.enabled=true
spring.datasource.url=jdbc:h2:mem:ordersdb
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=orders-group
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

You need to add required dependencies for in memory database and apache kafka in pom.xml.

Here is the link to the entire code:

https://github.com/vijaysrj/ordermicroservice

PAYMENT MICROSERVICE:

Once an order event is triggered , payment microservice picks it up , extracts the order details and then the payment information from it and then stores it in payment database.

And then it triggers a payment event which will be picked up by inventory microservice.

Here is the code for it:

package com.payment.microservice;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Controller
public class PaymentController {

    @Autowired
    private PaymentRepository repository;

    @Autowired
    private KafkaTemplate<String, PaymentEvent> kafkaTemplate;

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaOrderTemplate;

    @KafkaListener(topics = "new-orders", groupId = "orders-group")
    public void processPayment(String event) throws JsonMappingException, JsonProcessingException {

        System.out.println("Recieved event" + event);
        OrderEvent orderEvent = new ObjectMapper().readValue(event, OrderEvent.class);

        CustomerOrder order = orderEvent.getOrder();
        Payment payment = new Payment();
        try {

            // save payment details in db
            payment.setAmount(order.getAmount());
            payment.setMode(order.getPaymentMode());
            payment.setOrderId(order.getOrderId());
            payment.setStatus("SUCCESS");
            this.repository.save(payment);

            // publish payment created event for inventory microservice to consume.

            PaymentEvent paymentEvent = new PaymentEvent();
            paymentEvent.setOrder(orderEvent.getOrder());
            paymentEvent.setType("PAYMENT_CREATED");
            this.kafkaTemplate.send("new-payments", paymentEvent);
        } catch (Exception e) {

            payment.setOrderId(order.getOrderId());
            payment.setStatus("FAILED");
            repository.save(payment);

            // reverse previous task
            OrderEvent oe = new OrderEvent();
            oe.setOrder(order);
            oe.setType("ORDER_REVERSED");
            this.kafkaOrderTemplate.send("reversed-orders", orderEvent);

        }

    }

}

Also , as you see if the payment fails for some reason , you mark the status as “failed” and update the database.

You also trigger the compensating task for order service by publishing an event to Kafka.

the COMPENSAtING TASK:

To reverse a payment we create a compensating task.

Here is the code:

package com.payment.microservice;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class ReversePayment {

    @Autowired
    private PaymentRepository repository;

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @KafkaListener(topics = "reversed-payments", groupId = "payments-group")
    public void reversePayment(String event) {

        try {

            PaymentEvent paymentEvent = new ObjectMapper().readValue(event, PaymentEvent.class);

            CustomerOrder order = paymentEvent.getOrder();

            // do refund..

            // update status as failed

            Iterable<Payment> payments = this.repository.findByOrderId(order.getOrderId());

            payments.forEach(p -> {

                p.setStatus("FAILED");
                this.repository.save(p);
            });

            // reverse previous task
            OrderEvent orderEvent = new OrderEvent();
            orderEvent.setOrder(paymentEvent.getOrder());
            orderEvent.setType("ORDER_REVERSED");
            this.kafkaTemplate.send("reversed-orders", orderEvent);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

When the compensating task above is triggered by inventory microservice when it fails , we update the status as failed in the payment database for the particular order.

We also need to reverse the order created for this payment , so we trigger the compensating task of order service as well.

Here is the link to the code:

https://github.com/vijaysrj/paymentsmicroservice

INVENTORY MICROSERVICE:

Similar to payment microservice, in inventory microservice we listen for payment event , pick it up , extract the order details and then update the inventory in database(We decrease the stock by the quantity of items bought).

If stock already doesn’t exist in database , we throw an exception.

We can use the above error scenario to test if the compensating tasks are working as expected later.

Also once the inventory is updated , we trigger a new event for shipment microservice to pick up.

To add new stock , let us also expose a REST API :

package com.inventory.microservice;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@RestController
public class InventoryController {

    @Autowired
    private InventoryRepository repository;

    @Autowired
    private KafkaTemplate<String, InventoryEvent> kafkaTemplate;

    @Autowired
    private KafkaTemplate<String, PaymentEvent> kafkaPaymentTemplate;

    @KafkaListener(topics = "new-payments", groupId = "payments-group")
    public void updateInventory(String paymentEvent) throws JsonMappingException, JsonProcessingException {

        InventoryEvent event = new InventoryEvent();

        PaymentEvent p = new ObjectMapper().readValue(paymentEvent, PaymentEvent.class);
        CustomerOrder order = p.getOrder();

        try {

            // update stock in database
            Iterable<Inventory> inventories = this.repository.findByItem(order.getItem());

            boolean exists = inventories.iterator().hasNext();

            if (!exists)
                throw new Exception("Stock not available");

            inventories.forEach(
                    i -> {
                        i.setQuantity(i.getQuantity() - order.getQuantity());

                        this.repository.save(i);
                    });

            event.setType("INVENTORY_UPDATED");
            event.setOrder(p.getOrder());
            this.kafkaTemplate.send("new-inventory", event);

        } catch (Exception e) {

            // reverse previous task
            PaymentEvent pe = new PaymentEvent();
            pe.setOrder(order);
            pe.setType("PAYMENT_REVERSED");
            this.kafkaPaymentTemplate.send("reversed-payments", pe);

        }

    }

    @PostMapping("/inventory")
    public void addInventory(@RequestBody Stock stock) {

        Iterable<Inventory> items = this.repository.findByItem(stock.getItem());

        if (items.iterator().hasNext()) {

            items.forEach(i -> {

                i.setQuantity(stock.getQuantity() + i.getQuantity());
                this.repository.save(i);
            });
        } else {

            Inventory i = new Inventory();
            i.setItem(stock.getItem());
            i.setQuantity(stock.getQuantity());
            this.repository.save(i);
        }
    }

  
}

If an exception happens while updating inventory we trigger the compensating task of payment service by publishing an event to Kafka.

THE COMPENSATING TASK

To reverse the update made to the inventory in case the next task fails , we create the below compensating task in inventory service.

We increase the stock here as we had decreased it earlier.

We also trigger the next compensating task in the chain , the reverse payment task which will do the refund and reverse the payment and then itself in turn will call reverse order compensating task.

package com.inventory.microservice;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.ObjectMapper;

@RestController
public class ReverseInventory {

    @Autowired
    private InventoryRepository repository;

    @Autowired
    private KafkaTemplate<String, PaymentEvent> kafkaTemplate;

    @KafkaListener(topics = "reversed-inventory", groupId = "inventory-group")
    public void reverseInventory(String event) {

        try {

            InventoryEvent inventoryEvent = new ObjectMapper().readValue(event, InventoryEvent.class);

            Iterable<Inventory> inv = this.repository.findByItem(inventoryEvent.getOrder().getItem());

            inv.forEach(i -> {

                i.setQuantity(i.getQuantity() + inventoryEvent.getOrder().getQuantity());

                this.repository.save(i);
            });

            // reverse previous task
            PaymentEvent paymentEvent = new PaymentEvent();
            paymentEvent.setOrder(inventoryEvent.getOrder());
            paymentEvent.setType("PAYMENT_REVERSED");
            this.kafkaTemplate.send("reversed-payments", paymentEvent);

        } catch (Exception e) {

            e.printStackTrace();

        }
    }
}

Here is the link to the code :

https://github.com/vijaysrj/inventorymicroservice

THE SHIPMENT MICROSERVICE:

Once inventory is updated , we call the final service to ship the items to the customer.

Here is the code :

package com.shipment.microservice;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Controller
public class ShipmentController {

    @Autowired
    private ShipmentRepository repository;

    @Autowired
    private KafkaTemplate<String, InventoryEvent> kafkaTemplate;

    @KafkaListener(topics = "new-inventory", groupId = "inventory-group")
    public void shipOrder(String event) throws JsonMappingException, JsonProcessingException {

        Shipment shipment = new Shipment();
        InventoryEvent inventoryEvent = new ObjectMapper().readValue(event, InventoryEvent.class);
        CustomerOrder order = inventoryEvent.getOrder();
        try {

            if (order.getAddress() == null) {
                throw new Exception("Address not present");
            }

            shipment.setAddress(order.getAddress());
            shipment.setOrderId(order.getOrderId());

            shipment.setStatus("success");

            this.repository.save(shipment);

            // do other shipment logic ..

        } catch (Exception e) {
            shipment.setOrderId(order.getOrderId());
            shipment.setStatus("failed");
            this.repository.save(shipment);

            InventoryEvent reverseEvent = new InventoryEvent();

            reverseEvent.setType("INVENTORY_REVERSED");
            System.out.println(order);
            reverseEvent.setOrder(order);
            this.kafkaTemplate.send("reversed-inventory", reverseEvent);

        }
    }

}

We check the address of the customer , if it is blank we throw an exception which in turn will trigger the compensating task of inventory service and mark the shipment status as failed in the database.

If for any other reason the shipment fails also , we do the same as above.

There is no compensating task required for the shipment service since it is the last service in the flow.

If it fails we just trigger the compensating task of the previous service (inventory service).

Here is the link to the code:

https://github.com/vijaysrj/shipmentmicroservice

Test:

To test,

let’s first set up and run Apache Kafka.

Refer the below link to do the same:

Once Kafka is running , let us start all the microservices.

SUCCESS SCENARIO:

Once started , lets first add some stock to our inventory using /inventory REST API of inventory microservice :

We can verify this in in memory database of inventory microservice:

Now that we have got some books in our inventory , lets try to buy some books using /orders REST API in orders microservice:

It is a simple request with item name , quantity , amount , delivery address of customer and payment mode chosen by customer.

Now let’s check the databases of our microservices.

Orders DB:

Payments DB:

Inventory DB:

Initially the stock was 200 books. Since we placed an order for 10 books it has been reduced from the stock.

Shipment DB:

The entire flow was a success as indicated by the status column .

The distributed transaction completed successfully for all the four tasks.

FAILURE SCENARIO:

Let’s place an order for an item which is not in the inventory.

Remember we are throwing an exception in inventory microservice if item is not in stock.

So when an order is placed , it goes through order microservice , payments microservice and fails on inventory microservice.

So we need to rollback the changes in payments microservice and order microservice.

In other words , we need to invoke the compensating tasks in those services.

Let’s test it:

I am placing an order for shoes which is not yet added in the inventory:

Now let me check the orders db. It should have been rolled back:

As you see the shoes order has been marked as failed.

Let’s check payments db:

The order with id 2 has been marked as failed as well:

There are no entries in inventory db as expected.

Also since the flow stopped at inventory microservice there is no entry in shipment db as well.

The compensating tasks in order and payment microservice got executed successfully!

Let’s test one more case.

Let’s place order for existing book items but lets remove the address.

Now the order should go till shipment microservice , fail there and then the compensating tasks for the rest of the microservices should be executed.

Let’s fire a request without address:

and check the databases:

Orders DB:

The order has been marked as failed as expected.

Payments DB:

Order has been marked as failed here as well.

Inventory DB:

Notice that the stock of books remain the same . (It was reduced initially for the 5 books we ordered but later got updated by the compensating task to the initial value)

Shipment DB:

Order id 3 has been marked as failed!

Using SAGA we are able to implement distributed transaction across different microservices!

What we saw here is an example for Choreographed SAGA , there is also Orchestrated SAGA as already discussed which we are not going to look into in this post.

Advantages:

  • If you want to execute a series of tasks spanning different microservices SAGA helps you achieve it.
  • SAGA lets you do a transaction in an asynchronous way , this can improve application performance

Disadvantages:

  • SAGA makes your code more complex to implement and understand
  • SAGA only ensures eventual consistency of data so if your transaction has tightly coupled tasks then it is not an option

Conclusion:

We saw how SAGA can help in implementing distributed transactions with sample code.

Check out other microservice design patterns here:

CQRS

Event Sourcing

Service Registry

Transactional Outbox

Centralized Configuration

Client Side load balancing

Server Side load balancing

Distributed Tracing

    Comments

    Leave a Reply

    Discover more from The Full Stack Developer

    Subscribe now to keep reading and get access to the full archive.

    Continue reading