Go言語におけるFan-inとFan-outの理解


まず、Fan-inの例を見てみましょう。以下のコードでは、3つのソースからデータを収集し、1つのチャネルに集約しています。

package main
import (
    "fmt"
    "sync"
)
func producer(ch chan<- int, wg *sync.WaitGroup, data []int) {
    defer wg.Done()
    for _, val := range data {
        ch <- val
    }
}
func fanIn(channels []<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    wg.Add(len(channels))
    for _, ch := range channels {
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                out <- val
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)
    data1 := []int{1, 2, 3}
    data2 := []int{4, 5, 6}
    data3 := []int{7, 8, 9}
    var wg sync.WaitGroup
    wg.Add(3)
    go producer(ch1, &wg, data1)
    go producer(ch2, &wg, data2)
    go producer(ch3, &wg, data3)
    go func() {
        wg.Wait()
        close(ch1)
        close(ch2)
        close(ch3)
    }()
    result := fanIn([]<-chan int{ch1, ch2, ch3})
    for val := range result {
        fmt.Println(val)
    }
}

次に、Fan-outの例を見てみましょう。以下のコードでは、1つのソースからデータを取得し、4つのゴルーチンで同時に処理します。

package main
import (
    "fmt"
    "sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        // ここで実際の処理を行う
        results <- job * 2 // 処理結果を結果チャネルに送信
    }
}
func main() {
    jobs := make(chan int)
    results := make(chan int)
    // ゴルーチンを4つ起動して処理を並行実行
    const numWorkers = 4
    var wg sync.WaitGroup
    wg.Add(numWorkers)
    for i := 1; i <= numWorkers; i++ {
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }
// ジョブを送信
    const numJobs = 10
    for i := 1; i <= numJobs; i++ {
        jobs <- i
    }
    close(jobs)
    // 結果を受信
    go func() {
        wg.Wait()
        close(results)
    }()
    // 結果を表示
    for res := range results {
        fmt.Println(res)
    }
}

Fan-inとFan-outは、Go言語における並行処理と並列処理のパターンです。Fan-inは複数のソースからデータを収集し、1つのチャネルに集約するパターンです。Fan-outは1つのソースからデータを取得し、複数のゴルーチンで同時に処理するパターンです。これらのパターンは、アプリケーションのパフォーマンスやスケーラビリティを向上させるのに役立ちます。

まず、Fan-inの例を見てみましょう。以下のコードでは、3つのソースからデータを収集し、1つのチャネルに集約しています。

package main
import (
    "fmt"
    "sync"
)
func producer(ch chan<- int, wg *sync.WaitGroup, data []int) {
    defer wg.Done()
    for _, val := range data {
        ch <- val
    }
}
func fanIn(channels []<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    wg.Add(len(channels))
    for _, ch := range channels {
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                out <- val
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)
    data1 := []int{1, 2, 3}
    data2 := []int{4, 5, 6}
    data3 := []int{7, 8, 9}
    var wg sync.WaitGroup
    wg.Add(3)
    go producer(ch1, &wg, data1)
    go producer(ch2, &wg, data2)
    go producer(ch3, &wg, data3)
    go func() {
        wg.Wait()
        close(ch1)
        close(ch2)
        close(ch3)
    }()
    result := fanIn([]<-chan int{ch1, ch2, ch3})
    for val := range result {
        fmt.Println(val)
    }
}

次に、Fan-outの例を見てみましょう。以下のコードでは、1つのソースからデータを取得し、4つのゴルーチンで同時に処理します。

package main
import (
    "fmt"
    "sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        // ここで実際の処理を行う
        results <- job * 2 // 処理結果を結果チャネルに送信
    }
}
func main() {
    jobs := make(chan int)
    results := make(chan int)
    // ゴルーチンを4つ起動して処理を並行実行
    const numWorkers = 4
    var wg sync.WaitGroup
    wg.Add(numWorkers)
    for i := 1; i <= numWorkers; i++ {
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }
// ジョブを送信
    const numJobs = 10
    for i := 1; i <= numJobs; i++ {
        jobs <- i
    }
    close(jobs)
    // 結果を受信
    go func() {
        wg.Wait()
        close(results)
    }()
    // 結果を表示
    for res := range results {
        fmt.Println(res)
    }
}

このように、Fan-inとFan-outはGo言語で並行処理を行うための便利なパターンです。これらのパターンを活用することで、アプリケーションのパフォーマンスを向上させることができます。