3 并发编程之并发模式与设计

在上一篇中,我们深入探讨了 Go 语言中的 Channel,并理解了它在协程间通信中的重要作用。现在,我们将继续展开我们的并发编程知识,讨论 Go 语言中常见的并发模式与设计。这些模式是实现高效且易于维护的并发程序的基石。

并发模式概述

在 Go 语言的并发编程中,有数种常用的模式,每种模式都有其特定的用途和适用场景。以下是一些主要的并发模式:

  1. Worker Pool(工作池模式)
  2. Pipeline(管道模式)
  3. Publish/Subscribe(发布/订阅模式)
  4. Fan-Out, Fan-In(多输出,多输入模式)

接下来,我们将一一深入探讨这些模式。

1. Worker Pool(工作池模式)

工作池是一种通过限制并发工作的数量来控制资源消耗的模式。在这一模式中,我们会创建一个固定数量的工作协程,它们从一个共享队列中获取任务并执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Second) // 模拟工作
}
}

func main() {
var wg sync.WaitGroup
jobs := make(chan int, 100) // 创建一个缓冲channel

// 启动 3 个工人
for w := 1; w <= 3; w++ {
wg.Add(1)
go worker(w, jobs, &wg)
}

// 发送 9 个工作到工作池
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs) // 关闭渠道,表示没有更多的工作

wg.Wait() // 等待所有工人完成
}

在上面的例子中,我们创建了 3 个工作协程,每个协程从 jobs 通道中获取任务并处理。这种模式有助于控制并发的数量,从而避免过多的资源竞争。

2. Pipeline(管道模式)

管道模式可以理解为将多个处理步骤以流水线的形式连接起来。每个步骤都可以独立处理数据,数据通过 Channel 流动,从一个步骤传递到下一个步骤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
)

// 定义一个生产者
func produce(nums []int, jobs chan<- int) {
for _, n := range nums {
jobs <- n // 将数据发送到 jobs channel
}
close(jobs)
}

// 定义一个处理者
func square(jobs <-chan int, results chan<- int) {
for job := range jobs {
results <- job * job // 处理并返回结果
}
close(results)
}

func main() {
jobs := make(chan int, 5)
results := make(chan int, 5)

go produce([]int{1, 2, 3, 4, 5}, jobs)
go square(jobs, results)

// 提取并打印结果
for result := range results {
fmt.Println(result)
}
}

在这个例子中,我们定义了一个 produce 函数来生产数据,和一个 square 函数来处理数据。数据从生产者通过 jobs 通道流入处理者,最后处理结果通过 results 通道传递给主程序。这样使得每个处理步骤都可以独立运行,提升了模块化和可维护性。

3. Publish/Subscribe(发布/订阅模式)

发布/订阅模式允许发布者和订阅者之间的解耦。发布者将消息发布到一个通道,多个订阅者接收这些消息并做出响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"sync"
)

// 定义订阅者
func subscriber(id int, messages <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for msg := range messages {
fmt.Printf("Subscriber %d received: %s\n", id, msg)
}
}

func main() {
var wg sync.WaitGroup
messages := make(chan string)

// 启动 2 个订阅者
for i := 1; i <= 2; i++ {
wg.Add(1)
go subscriber(i, messages, &wg)
}

// 发布消息
messages <- "Hello World"
messages <- "Go Concurrency"
close(messages) // 关闭channel,表示没有更多消息

wg.Wait() // 等待所有订阅者完成
}

在上面的示例中,多个订阅者从同一个 messages 通道中接收消息,这样可以简单地扩展系统,使得消息的发送方与接收方之间不再直接耦合。

4. Fan-Out, Fan-In(多输出,多输入模式)

Fan-Out 是指多个协程并行处理任务,而 Fan-In 是将多个协程的结果集中到一个通道中。这种模式能有效利用 CPU,提高程序的并发性能。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"sync"
)

// 定义计算平方的函数
func square(num int, results chan<- int) {
results <- num * num
}

func main() {
var wg sync.WaitGroup
numbers := []int{1, 2, 3, 4, 5}
results := make(chan int)

// Fan-Out
for _, num := range numbers {
wg.Add(1)
go func(n int) {
defer wg.Done()
square(n, results) // 将计算后的结果发送到 results channel
}(num)
}

// 启动一个 goroutine 来关闭 results channel
go func() {
wg.Wait()
close(results)
}()

// Fan-In
for result := range results {
fmt.Println(result)
}
}

在这个示例中,我们先启动多个协程来处理数字的平方(Fan-Out),然后通过一个额外的 for 循环从 results 通道中接收所有结果(Fan-In)。

总结

通过以上对并发模式的深入解析,我们可以看到,合理的并发模式设计能够有效提升 Go 程序的性能与可读性。在实际开发中,选择合适的并发模式可以帮助我们更好地管理复杂的并发操作,并提高系统的

3 并发编程之并发模式与设计

https://zglg.work/go-one/3/

作者

AI免费学习网(郭震)

发布于

2024-08-10

更新于

2024-08-11

许可协议

分享转发

交流

更多教程加公众号

更多教程加公众号

加入星球获取PDF

加入星球获取PDF

打卡评论