How to send and receive events through Apache Kafka in Spring Boot?

Traditionally we have been storing data in database tables.

A database row in a table represents the “current state” of an object.

And so this is “state driven” programming.

This has been serving well for us for decades.

It is even a bit inconceivable to think of any other way to store data.

But there are scenarios where “event driven” programming serves better than “state driven” programming.

Here you don’t store the current state of an object and keep updating it if it changes.

You just store the events.

In traditional programming you do CRUD operations (Create ,Read , Update and Delete) operations. So the state of each row in a table can be changed.

In event driven programming you only do CR (Create and Read) operations. No events stored in “event logs” can be updated or deleted. Hence they stay immutable.

For example consider an ecommerce application.

Traditionally if an user adds items to a cart you may be storing those in a “shopping_cart” table with the user id , item id and item quantity.

Any row in the table will represent the current state of the user’s shopping cart.

If the user updates the cart the row in the table will be updated as well.

But in event driven programming you just store user events:

“User with id 233 added two apples to her shopping cart”

“User removed one of the apples from the shopping cart”

“User added three bananas from the shopping cart”

These events are stored instead of the latest state of the shopping cart.

You read those events to query for the data you need.

There are advantages and disadvantages for this mode of programming and solutions to overcome the disadvantages.

For now we will look at a platform which enables us to do this kind of programming:

Apache Kafka

Apache Kafka lets “producers” to send events

And “consumers” to consume those events.

The events are stored in “event logs” instead of tables.

These events can be stored indefinitely too so that you can use them to query for data later.

Let’s see how to send an event from one Spring Boot application to another.

Here are the steps:

STEP1: Set up Apache Kafka

STEP2: Create a producer application

STEP3: Create a consumer application

STEP4: Test

STEP1: Set up Apache Kafka:

To install and run Apache Kafka refer this post (for Windows): Install and Set up Apache Kafka

STEP2: Create a producer application:

To set up a producer application follow the below steps:

  • Create a spring boot application with apache kafka and spring web dependency
  • Configure apache kafka server details in application.yml
  • Use KafkaTemplate to send message to a particular topic

Create a spring boot application with apache kafka and spring web dependency:

Create a simple spring boot application with the below dependencies:

	<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

We will use the spring boot starter web dependency to create a REST API to test our changes.

Configure apache kafka server details in application.yml

Spring will take care of configuring apache kafka , we just need to tell where the apache kafka server is deployed and the serializer used for converting the key and value of the messages to be sent(internally Kafka uses them).

Here is a sample entry:

spring:
   kafka: 
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

By default Apache Kafka is deployed on 9092 and we have used the same.

Use KafkaTemplate to send message to a particular topic

Once the configuration is done , create a test REST API and publish a message using KafkaTemplate:

package com.example.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

	@Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
	
	@GetMapping("/publish")
	public String publish() {
		
		 this.kafkaTemplate.send("my_topic","Hello Developer!");
		 
		 return "success";
	}
}

As you see I am publishing a message “Hello Developer!” to the topic “my_topic”. Kafka expects messages to be sent to topics and consumers will be listening to these topics for messages.

I have sent a string message in the above case but you can also send custom Java objects.

Start the producer application.

STEP3: Create a Consumer application:

To set up a consumer application follow the below steps:

  • Create a spring boot application with apache kafka and spring web dependency
  • Configure apache kafka details in application.yml
  • Use KafkaListener to listen for messages

Create a spring boot application with apache kafka and spring web dependency

Similar to the “producer” application create a simple spring boot application with the same dependencies.

Configure apache kafka details in application.yml

Add “consumer” related configuration in application.yml.

Here is a sample entry:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: my_group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

You need to specify the Apache Kafka server in bootstrap-servers property.

Consumers are organized into groups and this group id needs to be mentioned in group-id property.

auto-offset-reset: earliest means events will be read from the earliest event by the consumer.

key-deserializer and value-deserializer are used to deserialize the keys and values sent by Kafka producer to send the message.

That’s the minimum configuration you would need.

Use KafkaListener to listen for messages

Now that the configuration is ready , let’s listen for events .

Annotate any method with @KafkaListener to turn it to a listening method.

Here is a sample:

package com.example.kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

	
	@KafkaListener(topics = "my_topic")
	public void receive(String message) {
		
		
		System.out.println(message);
	}
}

As you see I created a simple Service class and added a method which listens for the topic “my_topic”. The method takes a single argument which represents the event message.

Whenever Kafka server produces an event under “my_topic” this listener will receive it

Start the consumer application (at a different port)

STEP4: Test

Now let’s test our application.

Let’s hit the test API we created:

Let’s look at the console of “consumer application” now:

The message “Hello Developer!” got consumed!

That’s it!

This is a very simplified example of sending an event from a producer to consumer.

You can refer Spring documentation to explore further : Spring for Apache Kafka

Code:

https://github.com/vijaysrj/kafkaproducer

https://github.com/vijaysrj/kafka_consumer


Posted

in

by

Comments

Leave a Reply

Discover more from The Full Stack Developer

Subscribe now to keep reading and get access to the full archive.

Continue reading