KafkaとGolangを使用した基本的なパブサブアプリケーションの構築方法


パブサブアーキテクチャは、メッセージの発行者(パブリッシャ)がメッセージをトピックに送信し、購読者(サブスクライバ)がそのトピックを購読してメッセージを受け取る仕組みです。Kafkaは、このアーキテクチャを実現するための強力なツールです。

まず、GolangでKafkaを使用するためには、kafka-goというライブラリをインストールする必要があります。以下のコマンドを使用してインストールします。

go get github.com/segmentio/kafka-go

インストールが完了したら、次にKafkaのブローカーへの接続を設定します。ブローカーのアドレスとポート番号を指定し、接続を確立します。

package main
import (
    "context"
    "fmt"
    "log"
    "github.com/segmentio/kafka-go"
)
func main() {
    // Kafka brokerのアドレスとポートを指定
    brokerAddress := "localhost:9092"
    // Kafkaへの接続を確立
    conn, err := kafka.DialContext(context.Background(), "tcp", brokerAddress)
    if err != nil {
        log.Fatal("Error connecting to Kafka broker:", err)
    }
    fmt.Println("Connected to Kafka broker:", brokerAddress)
    // 接続をクローズ
    err = conn.Close()
    if err != nil {
        log.Fatal("Error closing connection:", err)
    }
}

上記のコードでは、localhost:9092で実行されているKafkaブローカーに接続しています。

次に、メッセージのパブリッシュとサブスクライブの方法を説明します。

メッセージのパブリッシュ:

package main
import (
    "context"
    "fmt"
    "log"
    "github.com/segmentio/kafka-go"
)
func main() {
    // Kafka brokerのアドレスとポートを指定
    brokerAddress := "localhost:9092"
    // Kafkaへの接続を確立
    conn, err := kafka.DialContext(context.Background(), "tcp", brokerAddress)
    if err != nil {
        log.Fatal("Error connecting to Kafka broker:", err)
    }
// メッセージをパブリッシュするトピックを指定
    topic := "my-topic"
    // パブリッシャを作成
    writer := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{brokerAddress},
        Topic:   topic,
    })
    // メッセージをパブリッシュ
    err = writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("key-1"),
            Value: []byte("Hello, Kafka!"),
        },
        kafka.Message{
            Key:   []byte("key-2"),
            Value: []byte("Welcome to Kafka!"),
        },
    )
    if err != nil {
        log.Fatal("Error publishing messages:", err)
    }
    fmt.Println("Messages published to topic:", topic)
    // パブリッシャをクローズ
    err = writer.Close()
    if err != nil {
        log.Fatal("Error closing writer:", err)
    }
// 接続をクローズ
    err = conn.Close()
    if err != nil {
        log.Fatal("Error closing connection:", err)
    }
}

上記のコードでは、my-topicというトピックに対して2つのメッセージをパブリッシュしています。kafka.NewWriterを使用してパブリッシャを作成し、WriteMessagesを使用してメッセージを送信しています。

メッセージのサブスクライブ:

package main
import (
    "context"
    "fmt"
    "log"
    "github.com/segmentio/kafka-go"
)
func main() {
    // Kafka brokerのアドレスとポートを指定
    brokerAddress := "localhost:9092"
    // Kafkaへの接続を確立
    conn, err := kafka.DialContext(context.Background(), "tcp", brokerAddress)
    if err != nil {
        log.Fatal("Error connecting to Kafka broker:", err)
    }
// 購読するトピックを指定
    topic := "my-topic"
    // グループIDを指定(同じグループIDのサブスクライバはメッセージを共有します)
    groupID := "my-group"
    // サブスクライバを作成
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{brokerAddress},
        Topic:   topic,
        GroupID: groupID,
    })
    // メッセージをサブスクライブ
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Fatal("Error reading message:", err)
        }
        fmt.Println("Received message:", string(msg.Value))
    }
// サブスクライバをクローズ
    err = reader.Close()
    if err != nil {
        log.Fatal("Error closing reader:", err)
    }
// 接続をクローズ
    err = conn.Close()
    if err != nil {
        log.Fatal("Error closing connection:", err)
    }
}

上記のコードでは、my-topicというトピックをmy-groupというグループIDで購読しています。kafka.NewReaderを使用してサブスクライバを作成し、ReadMessageを使用してメッセージを受信しています。

以上で、KafkaとGolangを使用して基本的なパブサブアプリケーションを構築する方法を説明しました。これにより、メッセージのパブリッシュとサブスクライブが可能になります。このアプリケーションを拡張することで、いくつでもトピックとメッセージを追加できます。