Appearance
Go并发基础
Go 的并发常用于批量探测主机、并行请求接口、后台处理任务和控制超时。运维工具里最常见的并发模型不是复杂计算,而是“一批目标同时检查,但整体要有上限、有超时、有结果汇总”。
Go 并发主要围绕 goroutine、channel、select、sync 和 context 展开。goroutine 负责跑任务,channel 负责传递结果,select 负责等待多个事件,sync 负责等待和加锁,context 负责取消和超时。
一、串行和并发
串行执行是一件事做完再做下一件。批量检查 3 台机器时,如果每次检查都要等 1 秒,串行执行大约需要 3 秒:
go
for _, host := range hosts {
check(host)
}并发执行是把多个检查任务同时发出去,让它们在等待网络、磁盘或远端响应的时候交错运行。HTTP 探活、TCP 端口探测、SSH 执行命令这类任务,大部分时间都在等外部系统返回,并发能减少整批任务的总等待时间。
并发不等于无限同时跑。1000 个目标同时请求,可能把本机连接数、DNS、出口网关或目标服务打满。Go 提供的是并发工具,具体并发多少、多久超时、失败后是否重试,仍然要由程序明确控制。
二、goroutine
goroutine 是 Go 运行时管理的轻量执行单元。写法是在函数调用前加 go:
go
package main
import (
"fmt"
"time"
)
func check(name string) {
time.Sleep(1 * time.Second)
fmt.Println(name, "ok")
}
func main() {
go check("web01")
go check("web02")
// 示例里临时等待,真实程序通常用 sync.WaitGroup 或 channel 等待结果
time.Sleep(2 * time.Second)
}go check("web01") 启动任务后会立刻返回,主流程不会等 check 执行完成。上面示例里的 time.Sleep(2 * time.Second) 只是为了让进程临时停住,方便看到输出。
main 函数返回后,进程会直接退出,未执行完的 goroutine 也会结束。因此并发任务需要明确等待方式,不能只把函数前面加一个 go 就算完成。
三、WaitGroup
sync.WaitGroup 用来等待一组 goroutine 全部结束。它只负责等待,不负责传数据。
go
package main
import (
"fmt"
"sync"
"time"
)
func check(name string) {
time.Sleep(1 * time.Second)
fmt.Println(name, "ok")
}
func main() {
hosts := []string{"web01", "web02", "db01"}
var wg sync.WaitGroup
for _, host := range hosts {
wg.Add(1)
go func(name string) {
defer wg.Done() // 无论检查成功还是失败,都要减少等待计数
check(name)
}(host)
}
wg.Wait()
fmt.Println("all checks finished")
}WaitGroup 内部可以近似看成一个计数器。Add(1) 表示多一个任务要等,Done() 表示一个任务结束,Wait() 会一直等到计数归零。Done() 少调用一次,程序会一直卡在 Wait();Done() 调多了,计数变成负数会触发 panic。
循环里启动 goroutine 时,把循环变量作为参数传进去更稳。旧代码里如果直接在闭包里使用外层循环变量,容易出现所有任务读到同一个值的问题。新版本 Go 对 range 变量做过调整,但显式传参仍然更清楚。
四、channel
channel 用来在 goroutine 之间传数据。批量巡检时,任务函数通常把结果写入 channel,主流程从 channel 里汇总。
go
package main
import (
"fmt"
"time"
)
type Result struct {
Host string
OK bool
}
func check(host string, results chan<- Result) {
time.Sleep(500 * time.Millisecond)
results <- Result{Host: host, OK: true}
}
func main() {
hosts := []string{"web01", "web02", "db01"}
results := make(chan Result)
for _, host := range hosts {
go check(host, results)
}
for range hosts {
result := <-results
fmt.Println(result.Host, result.OK)
}
}这里用了无缓冲 channel。无缓冲 channel 的发送和接收会互相等待:发送方执行 results <- value 时,如果没有接收方在读,它会停住;接收方执行 <-results 时,如果还没有发送方写入,它也会停住。
上面示例里有 3 个目标,所以主流程读取 3 次结果。读取次数少了,某个 goroutine 可能卡在发送结果;读取次数多了,主流程会卡在等待不存在的结果。
chan<- Result 表示这个参数只能写入,<-chan Result 表示只能读取。方向写清楚后,函数职责更明确。
带缓冲 channel:
go
results := make(chan Result, 10)缓冲区能短暂存放结果。没有接收方时,写满缓冲区后发送方仍然会阻塞。channel 不是无限队列,批量任务里仍然要控制并发数量。
五、关闭 channel
生产方全部结束后,可以关闭 channel。读取方用 range 读取,直到 channel 被关闭。
go
package main
import (
"fmt"
"sync"
)
type Result struct {
Host string
OK bool
}
func main() {
hosts := []string{"web01", "web02", "db01"}
results := make(chan Result)
var wg sync.WaitGroup
for _, host := range hosts {
wg.Add(1)
go func(name string) {
defer wg.Done()
results <- Result{Host: name, OK: true}
}(host)
}
go func() {
wg.Wait()
close(results) // 所有发送方结束后再关闭,避免向已关闭 channel 写数据
}()
for result := range results {
fmt.Println(result.Host, result.OK)
}
}关闭 channel 表示“后面不会再有新值”。已经写入 channel 的值仍然可以继续读完,读完后 range 自动结束。
通常由发送方关闭 channel。接收方不知道后面还有没有发送者,贸然关闭容易触发 send on closed channel。channel 也不需要每发送一个值就关闭一次,一个结果队列通常只在所有发送方结束后关闭。
六、select 和超时
select 用来同时等待多个 channel。常见场景是等待结果、超时和取消信号。
go
package main
import (
"fmt"
"time"
)
func main() {
result := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
result <- "ok"
}()
select {
case value := <-result:
fmt.Println("result:", value)
case <-time.After(1 * time.Second):
fmt.Println("timeout")
}
}select 会阻塞到某个 case 可以执行。上面例子里,如果 1 秒内 result 没有值,就会走 time.After 对应的超时分支;如果 1 秒内结果先回来,就会打印结果。
time.After 适合小示例。长期运行的循环里频繁创建 time.After 会产生额外计时器对象,定时循环通常用 time.NewTimer 或 time.NewTicker 更好控制。
七、context 控制取消
context.Context 在并发程序里负责传递取消信号和超时。HTTP 请求、数据库请求、Kubernetes Client 调用都支持 context。
go
package main
import (
"context"
"fmt"
"time"
)
func check(ctx context.Context, host string) error {
select {
case <-time.After(2 * time.Second):
fmt.Println(host, "ok")
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() // 释放和这个 context 相关的计时器资源
if err := check(ctx, "web01"); err != nil {
fmt.Println("check failed:", err)
}
}context 不会强行杀掉一个 goroutine。它只是通过 ctx.Done() 这个 channel 发出“已经取消”或“已经超时”的信号。函数内部监听 ctx.Done(),收到信号后返回,取消才会实际生效。
批量请求接口时,如果整体任务已经超时,继续等待每个请求意义不大。context 能把“整批任务取消”传给下游函数,避免后台请求继续占连接和 goroutine。
八、sync 包
sync 包处理共享状态。并发写同一个 map、切片或结构体字段时,需要锁或其他同步方式。
go
package main
import (
"fmt"
"sync"
)
func main() {
counts := map[string]int{}
var mu sync.Mutex
var wg sync.WaitGroup
for _, service := range []string{"nginx", "nginx", "mysql"} {
wg.Add(1)
go func(name string) {
defer wg.Done()
mu.Lock()
counts[name]++
mu.Unlock()
}(service)
}
wg.Wait()
fmt.Println(counts)
}普通 map 并发写会触发 fatal error: concurrent map writes。这种错误通常在压测或目标数量变多时暴露,单机少量测试不一定复现。
常用同步对象:
| 类型 | 用途 |
|---|---|
sync.WaitGroup | 等待一组任务结束 |
sync.Mutex | 保护共享变量 |
sync.RWMutex | 读多写少时区分读锁和写锁 |
sync.Once | 只初始化一次 |
sync.Map | 特定并发读写场景下的 map |
数据能通过 channel 汇总时,通常比共享 map 再加锁更清楚。需要频繁读写公共状态时,再考虑 Mutex。
九、限制并发数量
批量巡检 1000 台机器时,如果同时发起 1000 个请求,可能把本机端口、DNS、目标服务或中间网关打满。常见做法是用 worker pool 控制并发数。
go
package main
import (
"fmt"
"sync"
"time"
)
type Result struct {
Host string
OK bool
}
func check(host string) Result {
time.Sleep(300 * time.Millisecond)
return Result{Host: host, OK: true}
}
func worker(jobs <-chan string, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for host := range jobs {
results <- check(host)
}
}
func main() {
hosts := []string{"web01", "web02", "web03", "db01", "db02"}
concurrency := 2
jobs := make(chan string)
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go worker(jobs, results, &wg)
}
go func() {
for _, host := range hosts {
jobs <- host
}
close(jobs)
}()
go func() {
wg.Wait()
close(results)
}()
for result := range results {
fmt.Println(result.Host, result.OK)
}
}这个结构里 jobs 是任务队列,results 是结果队列,worker 数量就是并发上限。并发数通常根据目标服务承受能力、网络环境和超时时间调整。内网 HTTP 健康检查可以稍高,SSH 登录、数据库探测这类操作通常更保守。
执行顺序可以拆开看:
- 主流程启动固定数量 worker。
- 投递协程把所有 host 写入
jobs。 - worker 从
jobs取任务,执行check,把结果写入results。 jobs关闭后,worker 的for host := range jobs结束。- 所有 worker 结束后关闭
results。 - 主流程读取完
results后退出。
jobs 和 results 的关闭顺序很关键。jobs 由投递方关闭,表示没有新任务;results 由等待 worker 结束的协程关闭,表示没有新结果。提前关闭 results,worker 再写结果会 panic。
十、常见并发问题
| 现象 | 常见原因 | 排查入口 |
|---|---|---|
| 程序提前退出 | main 没等待 goroutine | 看 WaitGroup、channel 读取逻辑 |
| 任务一直卡住 | channel 没有接收方、没有关闭、发送数量和接收数量不匹配 | 打印任务进入和退出日志 |
| CPU 不高但程序不结束 | goroutine 阻塞在网络请求或 channel | 给请求加 context 超时 |
concurrent map writes | 多个 goroutine 同时写 map | 加锁或改成 channel 汇总 |
| 目标服务被打满 | 并发数过高、没有超时和重试间隔 | worker 数量、请求超时、服务端连接数 |
并发程序的日志通常会带任务 ID、目标名、开始时间、耗时和错误原因。只打印 failed 很难判断是哪个目标卡住,批量检查时尤其明显。