Mengenal Apache Kafka dan Integrasi dengan Spring Boot 2
Pada artikel sebelumnya kita sudah menginstal berbagai kebutuhan dari Kafka, Zookeeper, Kakfa Magic dan melengkapi keperluan konfigurasi pada application.properties
, maka pada bagian ini kita siap berkomunikasi dengan Kafka. Maka sekarang, kita dapat melanjutkan dengan menambahkan Producer dan Consumer yang lebih kompleks, sesuai dengan kebutuhan sistem Anda.
Berikut adalah contoh implementasi bagian pengujian koneksi Kafka dengan class dan atribut yang lebih kompleks di dalam proyek YBoilerplate yang telah dikonfigurasi dengan Kafka. Kita akan membuat Producer dan Consumer yang menggunakan objek dengan beberapa atribut, alih-alih hanya string sederhana, untuk menggambarkan komunikasi yang lebih realistis antar komponen.
Langkah 1: Definisikan Model Data (MessageObject)
Buat class sederhana untuk merepresentasikan pesan yang akan dikirim dan diterima melalui Kafka. Misalkan kita memiliki objek MessageObject dengan beberapa atribut:
package com.mhyusuf.yboilerplate.model.kafka;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageObject {
private String id;
private String content;
private String sender;
private long timestamp;
}
@Data
: Menggunakan Lombok untuk secara otomatis menghasilkan getter dan setter.@NoArgsConstructor
dan @AllArgsConstructor
: Menghasilkan konstruktor tanpa argumen dan konstruktor dengan semua argumen.Langkah 2: Konfigurasi Kafka (KafkaConfig)
Buat class konfigurasi Kafka untuk men-serialize dan de-serialize MessageObject:
Buat class sederhana untuk merepresentasikan pesan yang akan dikirim dan diterima melalui Kafka. Misalkan kita memiliki objek MessageObject dengan beberapa atribut:
package com.mhyusuf.yboilerplate.config;
import com.mhyusuf.yboilerplate.model.kafka.MessageObject;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
/**
* Penjelasan:
* ProducerConfig: Menggunakan JsonSerializer untuk serialisasi MessageObject menjadi JSON saat mengirimkan pesan.
* ConsumerConfig: Menggunakan JsonDeserializer untuk mendeserialize JSON yang diterima menjadi objek MessageObject.
* KafkaTemplate: Komponen ini digunakan untuk mengirim pesan dari producer.
*/
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, MessageObject> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, MessageObject> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, MessageObject> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(MessageObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageObject> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageObject> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Penjelasan:
- ProducerConfig: Menggunakan
JsonSerializer
untuk serialisasi MessageObject menjadi JSON saat mengirimkan pesan. - ConsumerConfig: Menggunakan
JsonDeserializer
untuk mendeserialize JSON yang diterima menjadi objek MessageObject. - KafkaTemplate: Komponen ini digunakan untuk mengirim pesan dari producer.
Langkah 3: Implementasi Kafka Producer (MessageProducer)
Buat class untuk mengirim pesan ke Kafka:
package com.mhyusuf.yboilerplate.service;
import com.mhyusuf.yboilerplate.model.kafka.MessageObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* Penjelasan:
* KafkaTemplate digunakan untuk mengirimkan objek MessageObject ke topik Kafka.
* TOPIC: Nama topik tempat pesan akan dikirim, dalam hal ini my_topic.
*/
@Service
public class MessageProducer {
private static final String TOPIC = "my_topic";
@Autowired
private KafkaTemplate<String, MessageObject> kafkaTemplate;
public void sendMessage(MessageObject message) {
kafkaTemplate.send(TOPIC, message);
System.out.println("Sent message: " + message.toString());
}
}
Penjelasan:
KafkaTemplate
digunakan untuk mengirimkan objek MessageObject ke topik Kafka.TOPIC
: Nama topik tempat pesan akan dikirim, dalam hal inimy_topic
.
Langkah 4: Implementasi Kafka Consumer (MessageConsumer)
Buat consumer untuk menerima dan memproses pesan dari Kafka:
package com.mhyusuf.yboilerplate.service;
import com.mhyusuf.yboilerplate.model.kafka.MessageObject;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
* Penjelasan:
* @KafkaListener: Mendengarkan topik my_topic dan menerima pesan dari Kafka.
* consume: Method yang menerima MessageObject dan menampilkannya di konsol.
*/
@Service
public class MessageConsumer {
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void consume(MessageObject message) {
System.out.println("Consumed message: " + message.toString());
}
}
Penjelasan:
@KafkaListener
: Mendengarkan topikmy_topic
dan menerima pesan dari Kafka.consume
: Method yang menerima MessageObject dan menampilkannya di konsol.
Langkah 6: Pengujian
Jalankan aplikasi Spring Boot Anda.
Kirimkan request POST ke endpoint
/kafka/publish
dengan parametercontent
dansender
untuk mengirim pesan ke Kafka:
$ curl -X POST "http://localhost:8080/kafka/publish?content=HelloKafka&sender=YBoilerplate"
Comments
Post a Comment