Apache Kafka Example
Example Apache Kafka
deliveryboyapp
enduserapp
package com.app.deliveryboy.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic()
{
return TopicBuilder.name("location-update-topic")
//.partitions()
// .replicas()
.build();
}
}
=========================================
package com.app.deliveryboy.service;
import com.app.deliveryboy.config.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
private Logger logger=LoggerFactory.getLogger(KafkaService.class);
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public boolean updateLocation(String location)
{
this.kafkaTemplate.send(AppConstants.LOCATION_TOPIC_NAME,location);
this.logger.info("message Produced");
return true;
}
}
========================================
package com.app.deliveryboy.config;
public class AppConstants {
public static final String LOCATION_TOPIC_NAME="location-update-topic";
}
==========================================
package com.app.deliveryboy.controller;
import com.app.deliveryboy.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@RestController
@RequestMapping("/location")
public class LocationController {
@Autowired
private KafkaService kafkaService;
@PostMapping("/update")
public ResponseEntity<?> updateLocation()
{
for(int i=0;i<=200000;i++)
{
long longitude=Math.round(Math.random()*100);
long latitude=Math.round(Math.random()*100);
kafkaService.updateLocation("("+longitude+","+latitude+")");
}
return new ResponseEntity<>(Map.of("message","location updated"), HttpStatus.OK);
}
}
===========================================
package com.app.deliveryboy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DeliveryboyappApplication {
public static void main(String[] args) {
SpringApplication.run(DeliveryboyappApplication.class, args);
}
}
========================================
spring.application.name=deliveryboyapp
server.port=8080
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
============================================
package com.app.enduserapp;
import com.app.enduserapp.config.AppConstants;
import org.slf4j.Logger;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConfig {
@KafkaListener(topics=AppConstants.LOCATION_TOPIC_NAME,groupId = AppConstants.GROUP_ID)
public void updateLocation(String value)
{
System.out.println(value);
}
}
============================================
package com.app.enduserapp.config;
public class AppConstants {
public static final String LOCATION_TOPIC_NAME="location-update-topic";
public static final String GROUP_ID="groupid-1";
}
=======================================
spring.application.name=enduserapp
server.port=8081
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: groupid-1
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
=========================================
package com.app.enduserapp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class EnduserappApplication {
public static void main(String[] args) {
SpringApplication.run(EnduserappApplication.class, args);
}
}
No comments:
Post a Comment