-
Mavenを使用してKafkaクライアントをセットアップする方法:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
KafkaProducerの初期化と設定:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); String topic = "my_topic"; String key = "my_key"; String value = "my_value"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); producer.send(record); producer.close(); } }
上記のコードでは、Kafkaブローカーのアドレスとシリアライザの設定を行い、KafkaProducerを初期化しています。送信するトピック、キー、および値を指定し、ProducerRecordを作成してから、sendメソッドを使用してメッセージを送信します。
-
メッセージの送信オプション:
KafkaProducerでは、メッセージの送信オプションをカスタマイズすることもできます。以下はいくつかの一般的なオプションの例です。
-
同期的な送信:
producer.send(record).get();
-
非同期的な送信:
producer.send(record, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); } else { System.out.println("Message sent successfully"); } });
-
レコードのパーティション指定:
int partition = 0; ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, value);
これらは一部の例です。詳細なオプションについては、公式ドキュメントを参照してください。
-
これらのコード例と手順を使用して、KafkaProducerを使用してデータを送信する方法を学ぶことができます。さまざまな用途に応じて、さらに多くのオプションや機能が利用可能です。