Sunday, 27 April 2025

Spring Boot Kafka JsonSerializer and JsonDeserializer Example


package com.app.jsonserializer.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;


@Configuration
public class KafkaConfig {

@Bean
public NewTopic topic()
{

return TopicBuilder.name(AppConstants.JSON_SERIALIZER_TOPIC_NAME).build();
}


}

==============================================================

package com.app.jsonserializer.config;

public class AppConstants {

public static final String JSON_SERIALIZER_TOPIC_NAME="json-serializer-topic";

public static final String GROUP_ID="groupid-1";

}


=====================================================================

package com.app.jsonserializer.payload;

public class User {


private int id;
private String firstName;
private String lastName;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getFirstName() {
return firstName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

@Override
public String toString() {
return "User{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", id=" + id +
'}';
}
}

============================================================================

package com.app.jsonserializer.service;


import com.app.jsonserializer.config.AppConstants;
import com.app.jsonserializer.payload.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {



private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);


@Autowired
private KafkaTemplate<String, User> kafkaTemplate;




public void sendMsg(User data)
{
LOGGER.info(String.format("Message sent -> %s", data.toString()));
Message<User> message=MessageBuilder.withPayload(data).setHeader(KafkaHeaders.TOPIC, AppConstants.JSON_SERIALIZER_TOPIC_NAME).build();
kafkaTemplate.send(message);
}
}

=======================================================================================


package com.app.jsonserializer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class JsonserializerApplication {

public static void main(String[] args) {
SpringApplication.run(JsonserializerApplication.class, args);
}

}

========================================================================


spring.application.name=jsonserializer


spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer


spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id1
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

=======================================================================================

package com.app.jsonserializer.service;


import com.app.jsonserializer.config.AppConstants;
import com.app.jsonserializer.payload.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {




private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

@KafkaListener(topics = com.app.jsonserializer.config.AppConstants.JSON_SERIALIZER_TOPIC_NAME,
groupId = com.app.jsonserializer.config.AppConstants.GROUP_ID)
public void consume(com.app.jsonserializer.payload.User data){
LOGGER.info(String.format("Message received -> %s", data.toString()));
}


}




 

 Basic Concepts Apache Kafka:

1. Start Zookeeper
C:\ApacheKafka\kafka_2.12-3.5.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
2. Start Kafka Server
C:\ApacheKafka\kafka_2.12-3.5.0>bin\windows\kafka-server-start.bat config\server.properties
3. How Topic Created?
C:\ApacheKafka\kafka_2.12-3.5.0>bin\windows\kafka-topics.bat --create --topic employee-topic --bootstrap-server localhost:9092
Created topic employee-topic.
4. How to produce the message ?
C:\ApacheKafka\kafka_2.12-3.5.0>bin\windows\kafka-console-producer.bat --topic employee-topic --bootstrap-server localhost:9092
5. How to consume the Message ?
C:\ApacheKafka\kafka_2.12-3.5.0>bin\windows\kafka-console-consumer.bat --topic employee-topic --from-beginning --bootstrap-server localhost:9092












                                                                     Apache Kafka Example

Example Apache Kafka
deliveryboyapp
enduserapp
package com.app.deliveryboy.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic()
{
return TopicBuilder.name("location-update-topic")
//.partitions()
// .replicas()
.build();
}
}
=========================================
package com.app.deliveryboy.service;
import com.app.deliveryboy.config.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
private Logger logger=LoggerFactory.getLogger(KafkaService.class);
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public boolean updateLocation(String location)
{
this.kafkaTemplate.send(AppConstants.LOCATION_TOPIC_NAME,location);
this.logger.info("message Produced");
return true;
}
}
========================================
package com.app.deliveryboy.config;
public class AppConstants {
public static final String LOCATION_TOPIC_NAME="location-update-topic";
}
==========================================
package com.app.deliveryboy.controller;
import com.app.deliveryboy.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@RestController
@RequestMapping("/location")
public class LocationController {
@Autowired
private KafkaService kafkaService;
@PostMapping("/update")
public ResponseEntity<?> updateLocation()
{
for(int i=0;i<=200000;i++)
{
long longitude=Math.round(Math.random()*100);
long latitude=Math.round(Math.random()*100);
kafkaService.updateLocation("("+longitude+","+latitude+")");
}
return new ResponseEntity<>(Map.of("message","location updated"), HttpStatus.OK);
}
}
===========================================
package com.app.deliveryboy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DeliveryboyappApplication {
public static void main(String[] args) {
SpringApplication.run(DeliveryboyappApplication.class, args);
}
}
========================================
spring.application.name=deliveryboyapp
server.port=8080
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
============================================
package com.app.enduserapp;
import com.app.enduserapp.config.AppConstants;
import org.slf4j.Logger;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConfig {
@KafkaListener(topics=AppConstants.LOCATION_TOPIC_NAME,groupId = AppConstants.GROUP_ID)
public void updateLocation(String value)
{
System.out.println(value);
}
}
============================================
package com.app.enduserapp.config;
public class AppConstants {
public static final String LOCATION_TOPIC_NAME="location-update-topic";
public static final String GROUP_ID="groupid-1";
}
=======================================
spring.application.name=enduserapp
server.port=8081
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: groupid-1
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
=========================================
package com.app.enduserapp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class EnduserappApplication {
public static void main(String[] args) {
SpringApplication.run(EnduserappApplication.class, args);
}
}