KafkaProducerの使用方法


  1. Mavenを使用してKafkaクライアントをセットアップする方法:

    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>2.8.0</version>
    </dependency>
  2. KafkaProducerの初期化と設定:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class KafkaProducerExample {
       public static void main(String[] args) {
           Properties props = new Properties();
           props.put("bootstrap.servers", "localhost:9092");
           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           KafkaProducer<String, String> producer = new KafkaProducer<>(props);
           String topic = "my_topic";
           String key = "my_key";
           String value = "my_value";
           ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
           producer.send(record);
           producer.close();
       }
    }

    上記のコードでは、Kafkaブローカーのアドレスとシリアライザの設定を行い、KafkaProducerを初期化しています。送信するトピック、キー、および値を指定し、ProducerRecordを作成してから、sendメソッドを使用してメッセージを送信します。

  3. メッセージの送信オプション:

    KafkaProducerでは、メッセージの送信オプションをカスタマイズすることもできます。以下はいくつかの一般的なオプションの例です。

    • 同期的な送信:

      producer.send(record).get();
    • 非同期的な送信:

      producer.send(record, (metadata, exception) -> {
       if (exception != null) {
           exception.printStackTrace();
       } else {
           System.out.println("Message sent successfully");
       }
      });
    • レコードのパーティション指定:

      int partition = 0;
      ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, value);

    これらは一部の例です。詳細なオプションについては、公式ドキュメントを参照してください。

これらのコード例と手順を使用して、KafkaProducerを使用してデータを送信する方法を学ぶことができます。さまざまな用途に応じて、さらに多くのオプションや機能が利用可能です。