パブサブアーキテクチャは、メッセージの発行者(パブリッシャ)がメッセージをトピックに送信し、購読者(サブスクライバ)がそのトピックを購読してメッセージを受け取る仕組みです。Kafkaは、このアーキテクチャを実現するための強力なツールです。
まず、GolangでKafkaを使用するためには、kafka-goというライブラリをインストールする必要があります。以下のコマンドを使用してインストールします。
go get github.com/segmentio/kafka-go
インストールが完了したら、次にKafkaのブローカーへの接続を設定します。ブローカーのアドレスとポート番号を指定し、接続を確立します。
package main
import (
"context"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// Kafka brokerのアドレスとポートを指定
brokerAddress := "localhost:9092"
// Kafkaへの接続を確立
conn, err := kafka.DialContext(context.Background(), "tcp", brokerAddress)
if err != nil {
log.Fatal("Error connecting to Kafka broker:", err)
}
fmt.Println("Connected to Kafka broker:", brokerAddress)
// 接続をクローズ
err = conn.Close()
if err != nil {
log.Fatal("Error closing connection:", err)
}
}
上記のコードでは、localhost:9092
で実行されているKafkaブローカーに接続しています。
次に、メッセージのパブリッシュとサブスクライブの方法を説明します。
メッセージのパブリッシュ:
package main
import (
"context"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// Kafka brokerのアドレスとポートを指定
brokerAddress := "localhost:9092"
// Kafkaへの接続を確立
conn, err := kafka.DialContext(context.Background(), "tcp", brokerAddress)
if err != nil {
log.Fatal("Error connecting to Kafka broker:", err)
}
// メッセージをパブリッシュするトピックを指定
topic := "my-topic"
// パブリッシャを作成
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{brokerAddress},
Topic: topic,
})
// メッセージをパブリッシュ
err = writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("key-1"),
Value: []byte("Hello, Kafka!"),
},
kafka.Message{
Key: []byte("key-2"),
Value: []byte("Welcome to Kafka!"),
},
)
if err != nil {
log.Fatal("Error publishing messages:", err)
}
fmt.Println("Messages published to topic:", topic)
// パブリッシャをクローズ
err = writer.Close()
if err != nil {
log.Fatal("Error closing writer:", err)
}
// 接続をクローズ
err = conn.Close()
if err != nil {
log.Fatal("Error closing connection:", err)
}
}
上記のコードでは、my-topic
というトピックに対して2つのメッセージをパブリッシュしています。kafka.NewWriter
を使用してパブリッシャを作成し、WriteMessages
を使用してメッセージを送信しています。
メッセージのサブスクライブ:
package main
import (
"context"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// Kafka brokerのアドレスとポートを指定
brokerAddress := "localhost:9092"
// Kafkaへの接続を確立
conn, err := kafka.DialContext(context.Background(), "tcp", brokerAddress)
if err != nil {
log.Fatal("Error connecting to Kafka broker:", err)
}
// 購読するトピックを指定
topic := "my-topic"
// グループIDを指定(同じグループIDのサブスクライバはメッセージを共有します)
groupID := "my-group"
// サブスクライバを作成
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic,
GroupID: groupID,
})
// メッセージをサブスクライブ
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatal("Error reading message:", err)
}
fmt.Println("Received message:", string(msg.Value))
}
// サブスクライバをクローズ
err = reader.Close()
if err != nil {
log.Fatal("Error closing reader:", err)
}
// 接続をクローズ
err = conn.Close()
if err != nil {
log.Fatal("Error closing connection:", err)
}
}
上記のコードでは、my-topic
というトピックをmy-group
というグループIDで購読しています。kafka.NewReader
を使用してサブスクライバを作成し、ReadMessage
を使用してメッセージを受信しています。
以上で、KafkaとGolangを使用して基本的なパブサブアプリケーションを構築する方法を説明しました。これにより、メッセージのパブリッシュとサブスクライブが可能になります。このアプリケーションを拡張することで、いくつでもトピックとメッセージを追加できます。