Mengenal Apache Kafka dan Integrasi dengan Spring Boot 2

Pada artikel sebelumnya kita sudah menginstal berbagai kebutuhan dari Kafka, Zookeeper, Kakfa Magic dan melengkapi keperluan konfigurasi pada application.properties , maka pada bagian ini kita siap berkomunikasi dengan Kafka. Maka sekarang, kita dapat melanjutkan dengan menambahkan Producer dan Consumer yang lebih kompleks, sesuai dengan kebutuhan sistem Anda.


Berikut adalah contoh implementasi bagian pengujian koneksi Kafka dengan class dan atribut yang lebih kompleks di dalam proyek YBoilerplate yang telah dikonfigurasi dengan Kafka. Kita akan membuat Producer dan Consumer yang menggunakan objek dengan beberapa atribut, alih-alih hanya string sederhana, untuk menggambarkan komunikasi yang lebih realistis antar komponen.

Langkah 1: Definisikan Model Data (MessageObject)

Buat class sederhana untuk merepresentasikan pesan yang akan dikirim dan diterima melalui Kafka. Misalkan kita memiliki objek MessageObject dengan beberapa atribut:

package com.mhyusuf.yboilerplate.model.kafka;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageObject {
private String id;
private String content;
private String sender;
private long timestamp;
}
Nama Class yang digunakan bisa apa saja dalam kasus ini kita kasih saja nama MessageObject.

  • @Data: Menggunakan Lombok untuk secara otomatis menghasilkan getter dan setter.
  • @NoArgsConstructor dan @AllArgsConstructor: Menghasilkan konstruktor tanpa argumen dan konstruktor dengan semua argumen.


  • Langkah 2: Konfigurasi Kafka (KafkaConfig)

    Buat class konfigurasi Kafka untuk men-serialize dan de-serialize MessageObject:

    Buat class sederhana untuk merepresentasikan pesan yang akan dikirim dan diterima melalui Kafka. Misalkan kita memiliki objek MessageObject dengan beberapa atribut:

    package com.mhyusuf.yboilerplate.config;

    import com.mhyusuf.yboilerplate.model.kafka.MessageObject;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.*;
    import org.springframework.kafka.support.serializer.JsonDeserializer;
    import org.springframework.kafka.support.serializer.JsonSerializer;

    import java.util.HashMap;
    import java.util.Map;

    /**
    * Penjelasan:
    * ProducerConfig: Menggunakan JsonSerializer untuk serialisasi MessageObject menjadi JSON saat mengirimkan pesan.
    * ConsumerConfig: Menggunakan JsonDeserializer untuk mendeserialize JSON yang diterima menjadi objek MessageObject.
    * KafkaTemplate: Komponen ini digunakan untuk mengirim pesan dari producer.
    */
    @EnableKafka
    @Configuration
    public class KafkaConfig {

    @Bean
    public ProducerFactory<String, MessageObject> producerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, MessageObject> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, MessageObject> consumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
    new JsonDeserializer<>(MessageObject.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageObject> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MessageObject> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
    }
    }

    Penjelasan:

    • ProducerConfig: Menggunakan JsonSerializer untuk serialisasi MessageObject menjadi JSON saat mengirimkan pesan.
    • ConsumerConfig: Menggunakan JsonDeserializer untuk mendeserialize JSON yang diterima menjadi objek MessageObject.
    • KafkaTemplate: Komponen ini digunakan untuk mengirim pesan dari producer.



    Langkah 3: Implementasi Kafka Producer (MessageProducer)

    Buat class untuk mengirim pesan ke Kafka:

    package com.mhyusuf.yboilerplate.service;
    import com.mhyusuf.yboilerplate.model.kafka.MessageObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;

    /**
    * Penjelasan:
    * KafkaTemplate digunakan untuk mengirimkan objek MessageObject ke topik Kafka.
    * TOPIC: Nama topik tempat pesan akan dikirim, dalam hal ini my_topic.
    */
    @Service
    public class MessageProducer {

    private static final String TOPIC = "my_topic";

    @Autowired
    private KafkaTemplate<String, MessageObject> kafkaTemplate;

    public void sendMessage(MessageObject message) {
    kafkaTemplate.send(TOPIC, message);
    System.out.println("Sent message: " + message.toString());
    }
    }

    Penjelasan:

    • KafkaTemplate digunakan untuk mengirimkan objek MessageObject ke topik Kafka.
    • TOPIC: Nama topik tempat pesan akan dikirim, dalam hal ini my_topic.



    Langkah 4: Implementasi Kafka Consumer (MessageConsumer)

    Buat consumer untuk menerima dan memproses pesan dari Kafka:

    package com.mhyusuf.yboilerplate.service;

    import com.mhyusuf.yboilerplate.model.kafka.MessageObject;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;

    /**
    * Penjelasan:
    * @KafkaListener: Mendengarkan topik my_topic dan menerima pesan dari Kafka.
    * consume: Method yang menerima MessageObject dan menampilkannya di konsol.
    */
    @Service
    public class MessageConsumer {

    @KafkaListener(topics = "my_topic", groupId = "my-group")
    public void consume(MessageObject message) {
    System.out.println("Consumed message: " + message.toString());
    }
    }

    Penjelasan:

    • @KafkaListener: Mendengarkan topik my_topic dan menerima pesan dari Kafka.
    • consume: Method yang menerima MessageObject dan menampilkannya di konsol.



    Langkah 5: Controller untuk Menguji Pengiriman Pesan

    Buat controller di Spring Boot untuk mengirimkan pesan melalui MessageProducer:

    package com.mhyusuf.yboilerplate.controller;


    import com.mhyusuf.yboilerplate.model.kafka.MessageObject;
    import com.mhyusuf.yboilerplate.service.MessageProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;

    import java.time.Instant;

    @RestController
    @RequestMapping("/kafka")
    public class KafkaTestController {

    @Autowired
    private MessageProducer messageProducer;

    @PostMapping("/publish")
    public String publishMessage(@RequestParam("content") String content, @RequestParam("sender") String sender) {
    MessageObject message = new MessageObject();
    message.setId(String.valueOf(Instant.now().toEpochMilli()));
    message.setContent(content);
    message.setSender(sender);
    message.setTimestamp(Instant.now().toEpochMilli());

    messageProducer.sendMessage(message);
    return "Message published successfully!";
    }
    }

    Penjelasan:

    • /kafka/publish: Endpoint untuk mengirim pesan ke Kafka dengan dua parameter (content dan sender).
    • Setiap kali endpoint diakses, sebuah MessageObject baru dikirimkan ke Kafka dengan timestamp.



    Langkah 6: Pengujian

    • Jalankan aplikasi Spring Boot Anda.

    • Kirimkan request POST ke endpoint /kafka/publish dengan parameter content dan sender untuk mengirim pesan ke Kafka:

    $ curl -X POST "http://localhost:8080/kafka/publish?content=HelloKafka&sender=YBoilerplate" 

     

    Consumer menerima pesan


    Branch Code Ini Di Repo

    https://github.com/yoesoff/YBoilerplate.git 



    Comments

    Popular posts from this blog

    Numpang Kerja Remote dari Bandung Creative Hub

    Numpang Kerja Remote dari Bandung Digital Valley

    Cara Decompile berkas Dex dan Apk Android