Skip to content

Go并发基础

Go 的并发常用于批量探测主机、并行请求接口、后台处理任务和控制超时。运维工具里最常见的并发模型不是复杂计算,而是“一批目标同时检查,但整体要有上限、有超时、有结果汇总”。

Go 并发主要围绕 goroutinechannelselectsynccontext 展开。goroutine 负责跑任务,channel 负责传递结果,select 负责等待多个事件,sync 负责等待和加锁,context 负责取消和超时。

一、串行和并发

串行执行是一件事做完再做下一件。批量检查 3 台机器时,如果每次检查都要等 1 秒,串行执行大约需要 3 秒:

go
for _, host := range hosts {
	check(host)
}

并发执行是把多个检查任务同时发出去,让它们在等待网络、磁盘或远端响应的时候交错运行。HTTP 探活、TCP 端口探测、SSH 执行命令这类任务,大部分时间都在等外部系统返回,并发能减少整批任务的总等待时间。

并发不等于无限同时跑。1000 个目标同时请求,可能把本机连接数、DNS、出口网关或目标服务打满。Go 提供的是并发工具,具体并发多少、多久超时、失败后是否重试,仍然要由程序明确控制。

二、goroutine

goroutine 是 Go 运行时管理的轻量执行单元。写法是在函数调用前加 go

go
package main

import (
	"fmt"
	"time"
)

func check(name string) {
	time.Sleep(1 * time.Second)
	fmt.Println(name, "ok")
}

func main() {
	go check("web01")
	go check("web02")

	// 示例里临时等待,真实程序通常用 sync.WaitGroup 或 channel 等待结果
	time.Sleep(2 * time.Second)
}

go check("web01") 启动任务后会立刻返回,主流程不会等 check 执行完成。上面示例里的 time.Sleep(2 * time.Second) 只是为了让进程临时停住,方便看到输出。

main 函数返回后,进程会直接退出,未执行完的 goroutine 也会结束。因此并发任务需要明确等待方式,不能只把函数前面加一个 go 就算完成。

三、WaitGroup

sync.WaitGroup 用来等待一组 goroutine 全部结束。它只负责等待,不负责传数据。

go
package main

import (
	"fmt"
	"sync"
	"time"
)

func check(name string) {
	time.Sleep(1 * time.Second)
	fmt.Println(name, "ok")
}

func main() {
	hosts := []string{"web01", "web02", "db01"}

	var wg sync.WaitGroup
	for _, host := range hosts {
		wg.Add(1)

		go func(name string) {
			defer wg.Done() // 无论检查成功还是失败,都要减少等待计数
			check(name)
		}(host)
	}

	wg.Wait()
	fmt.Println("all checks finished")
}

WaitGroup 内部可以近似看成一个计数器。Add(1) 表示多一个任务要等,Done() 表示一个任务结束,Wait() 会一直等到计数归零。Done() 少调用一次,程序会一直卡在 Wait()Done() 调多了,计数变成负数会触发 panic。

循环里启动 goroutine 时,把循环变量作为参数传进去更稳。旧代码里如果直接在闭包里使用外层循环变量,容易出现所有任务读到同一个值的问题。新版本 Go 对 range 变量做过调整,但显式传参仍然更清楚。

四、channel

channel 用来在 goroutine 之间传数据。批量巡检时,任务函数通常把结果写入 channel,主流程从 channel 里汇总。

go
package main

import (
	"fmt"
	"time"
)

type Result struct {
	Host string
	OK   bool
}

func check(host string, results chan<- Result) {
	time.Sleep(500 * time.Millisecond)
	results <- Result{Host: host, OK: true}
}

func main() {
	hosts := []string{"web01", "web02", "db01"}
	results := make(chan Result)

	for _, host := range hosts {
		go check(host, results)
	}

	for range hosts {
		result := <-results
		fmt.Println(result.Host, result.OK)
	}
}

这里用了无缓冲 channel。无缓冲 channel 的发送和接收会互相等待:发送方执行 results <- value 时,如果没有接收方在读,它会停住;接收方执行 <-results 时,如果还没有发送方写入,它也会停住。

上面示例里有 3 个目标,所以主流程读取 3 次结果。读取次数少了,某个 goroutine 可能卡在发送结果;读取次数多了,主流程会卡在等待不存在的结果。

chan<- Result 表示这个参数只能写入,<-chan Result 表示只能读取。方向写清楚后,函数职责更明确。

带缓冲 channel:

go
results := make(chan Result, 10)

缓冲区能短暂存放结果。没有接收方时,写满缓冲区后发送方仍然会阻塞。channel 不是无限队列,批量任务里仍然要控制并发数量。

五、关闭 channel

生产方全部结束后,可以关闭 channel。读取方用 range 读取,直到 channel 被关闭。

go
package main

import (
	"fmt"
	"sync"
)

type Result struct {
	Host string
	OK   bool
}

func main() {
	hosts := []string{"web01", "web02", "db01"}
	results := make(chan Result)

	var wg sync.WaitGroup
	for _, host := range hosts {
		wg.Add(1)
		go func(name string) {
			defer wg.Done()
			results <- Result{Host: name, OK: true}
		}(host)
	}

	go func() {
		wg.Wait()
		close(results) // 所有发送方结束后再关闭,避免向已关闭 channel 写数据
	}()

	for result := range results {
		fmt.Println(result.Host, result.OK)
	}
}

关闭 channel 表示“后面不会再有新值”。已经写入 channel 的值仍然可以继续读完,读完后 range 自动结束。

通常由发送方关闭 channel。接收方不知道后面还有没有发送者,贸然关闭容易触发 send on closed channel。channel 也不需要每发送一个值就关闭一次,一个结果队列通常只在所有发送方结束后关闭。

六、select 和超时

select 用来同时等待多个 channel。常见场景是等待结果、超时和取消信号。

go
package main

import (
	"fmt"
	"time"
)

func main() {
	result := make(chan string, 1)

	go func() {
		time.Sleep(2 * time.Second)
		result <- "ok"
	}()

	select {
	case value := <-result:
		fmt.Println("result:", value)
	case <-time.After(1 * time.Second):
		fmt.Println("timeout")
	}
}

select 会阻塞到某个 case 可以执行。上面例子里,如果 1 秒内 result 没有值,就会走 time.After 对应的超时分支;如果 1 秒内结果先回来,就会打印结果。

time.After 适合小示例。长期运行的循环里频繁创建 time.After 会产生额外计时器对象,定时循环通常用 time.NewTimertime.NewTicker 更好控制。

七、context 控制取消

context.Context 在并发程序里负责传递取消信号和超时。HTTP 请求、数据库请求、Kubernetes Client 调用都支持 context。

go
package main

import (
	"context"
	"fmt"
	"time"
)

func check(ctx context.Context, host string) error {
	select {
	case <-time.After(2 * time.Second):
		fmt.Println(host, "ok")
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel() // 释放和这个 context 相关的计时器资源

	if err := check(ctx, "web01"); err != nil {
		fmt.Println("check failed:", err)
	}
}

context 不会强行杀掉一个 goroutine。它只是通过 ctx.Done() 这个 channel 发出“已经取消”或“已经超时”的信号。函数内部监听 ctx.Done(),收到信号后返回,取消才会实际生效。

批量请求接口时,如果整体任务已经超时,继续等待每个请求意义不大。context 能把“整批任务取消”传给下游函数,避免后台请求继续占连接和 goroutine。

八、sync 包

sync 包处理共享状态。并发写同一个 map、切片或结构体字段时,需要锁或其他同步方式。

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	counts := map[string]int{}

	var mu sync.Mutex
	var wg sync.WaitGroup

	for _, service := range []string{"nginx", "nginx", "mysql"} {
		wg.Add(1)
		go func(name string) {
			defer wg.Done()

			mu.Lock()
			counts[name]++
			mu.Unlock()
		}(service)
	}

	wg.Wait()
	fmt.Println(counts)
}

普通 map 并发写会触发 fatal error: concurrent map writes。这种错误通常在压测或目标数量变多时暴露,单机少量测试不一定复现。

常用同步对象:

类型用途
sync.WaitGroup等待一组任务结束
sync.Mutex保护共享变量
sync.RWMutex读多写少时区分读锁和写锁
sync.Once只初始化一次
sync.Map特定并发读写场景下的 map

数据能通过 channel 汇总时,通常比共享 map 再加锁更清楚。需要频繁读写公共状态时,再考虑 Mutex

九、限制并发数量

批量巡检 1000 台机器时,如果同时发起 1000 个请求,可能把本机端口、DNS、目标服务或中间网关打满。常见做法是用 worker pool 控制并发数。

go
package main

import (
	"fmt"
	"sync"
	"time"
)

type Result struct {
	Host string
	OK   bool
}

func check(host string) Result {
	time.Sleep(300 * time.Millisecond)
	return Result{Host: host, OK: true}
}

func worker(jobs <-chan string, results chan<- Result, wg *sync.WaitGroup) {
	defer wg.Done()

	for host := range jobs {
		results <- check(host)
	}
}

func main() {
	hosts := []string{"web01", "web02", "web03", "db01", "db02"}
	concurrency := 2

	jobs := make(chan string)
	results := make(chan Result)

	var wg sync.WaitGroup
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go worker(jobs, results, &wg)
	}

	go func() {
		for _, host := range hosts {
			jobs <- host
		}
		close(jobs)
	}()

	go func() {
		wg.Wait()
		close(results)
	}()

	for result := range results {
		fmt.Println(result.Host, result.OK)
	}
}

这个结构里 jobs 是任务队列,results 是结果队列,worker 数量就是并发上限。并发数通常根据目标服务承受能力、网络环境和超时时间调整。内网 HTTP 健康检查可以稍高,SSH 登录、数据库探测这类操作通常更保守。

执行顺序可以拆开看:

  1. 主流程启动固定数量 worker。
  2. 投递协程把所有 host 写入 jobs
  3. worker 从 jobs 取任务,执行 check,把结果写入 results
  4. jobs 关闭后,worker 的 for host := range jobs 结束。
  5. 所有 worker 结束后关闭 results
  6. 主流程读取完 results 后退出。

jobsresults 的关闭顺序很关键。jobs 由投递方关闭,表示没有新任务;results 由等待 worker 结束的协程关闭,表示没有新结果。提前关闭 results,worker 再写结果会 panic。

十、常见并发问题

现象常见原因排查入口
程序提前退出main 没等待 goroutineWaitGroup、channel 读取逻辑
任务一直卡住channel 没有接收方、没有关闭、发送数量和接收数量不匹配打印任务进入和退出日志
CPU 不高但程序不结束goroutine 阻塞在网络请求或 channel给请求加 context 超时
concurrent map writes多个 goroutine 同时写 map加锁或改成 channel 汇总
目标服务被打满并发数过高、没有超时和重试间隔worker 数量、请求超时、服务端连接数

并发程序的日志通常会带任务 ID、目标名、开始时间、耗时和错误原因。只打印 failed 很难判断是哪个目标卡住,批量检查时尤其明显。