Spring Boot Kafka JsonSerializer and JsonDeserializer Example
package com.app.jsonserializer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic()
{
return TopicBuilder.name(AppConstants.JSON_SERIALIZER_TOPIC_NAME).build();
}
}
==============================================================
package com.app.jsonserializer.config;
public class AppConstants {
public static final String JSON_SERIALIZER_TOPIC_NAME="json-serializer-topic";
public static final String GROUP_ID="groupid-1";
}
=====================================================================
package com.app.jsonserializer.payload;
public class User {
private int id;
private String firstName;
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "User{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", id=" + id +
'}';
}
}
============================================================================
package com.app.jsonserializer.service;
import com.app.jsonserializer.config.AppConstants;
import com.app.jsonserializer.payload.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendMsg(User data)
{
LOGGER.info(String.format("Message sent -> %s", data.toString()));
Message<User> message=MessageBuilder.withPayload(data).setHeader(KafkaHeaders.TOPIC, AppConstants.JSON_SERIALIZER_TOPIC_NAME).build();
kafkaTemplate.send(message);
}
}=======================================================================================package com.app.jsonserializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class JsonserializerApplication {
public static void main(String[] args) {
SpringApplication.run(JsonserializerApplication.class, args);
}
}========================================================================spring.application.name=jsonserializer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id1
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
=======================================================================================
package com.app.jsonserializer.service;
import com.app.jsonserializer.config.AppConstants;
import com.app.jsonserializer.payload.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = com.app.jsonserializer.config.AppConstants.JSON_SERIALIZER_TOPIC_NAME,
groupId = com.app.jsonserializer.config.AppConstants.GROUP_ID)
public void consume(com.app.jsonserializer.payload.User data){
LOGGER.info(String.format("Message received -> %s", data.toString()));
}
}