Appearance
并发与实战
批量巡检、接口检查、SSH 执行和日志处理经常会遇到耗时问题。Python 并发主要解决等待时间:等待网络、等待 SSH、等待磁盘 IO。CPU 密集型任务不是运维脚本的常见重点,真要处理大量计算,通常会换更合适的工具或拆到专门服务里。
并发脚本的难点不只是“跑得更快”,还包括限速、超时、错误收集、输出顺序和结果汇总。批量操作一旦没有边界,容易把远端服务、堡垒机、数据库或接口打满。
一、并发模型
常见选择:
| 方式 | 适合场景 | 说明 |
|---|---|---|
| 多线程 | HTTP、SSH、文件 IO 等等待型任务 | 运维脚本最常用 |
| 多进程 | CPU 计算、压缩、解析大量数据 | 进程开销更大 |
| asyncio | 大量 HTTP/TCP 异步任务 | 写法要求更高,库也要支持 async |
多线程适合大多数批量巡检。一个线程等待远端响应时,另一个线程可以继续跑。线程数不是越大越好,几十台机器可以从 5 到 20 之间试起,看远端和本机压力。
二、线程池
concurrent.futures.ThreadPoolExecutor 是标准库里的线程池。
python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def check_host(host):
# 模拟网络请求耗时
time.sleep(1)
return {"host": host, "ok": True}
hosts = ["web01", "web02", "db01"]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(check_host, host) for host in hosts]
for future in as_completed(futures):
result = future.result()
print(result)as_completed() 谁先完成就先返回谁,输出顺序可能和输入顺序不同。批量报告需要保持输入顺序时,可以在结果里带上序号,最后排序。
三、错误收集
线程里的异常不会自动打印到主线程,调用 future.result() 时才会重新抛出。
python
from concurrent.futures import ThreadPoolExecutor, as_completed
def check_host(host):
if host == "db01":
raise RuntimeError("ssh timeout")
return {"host": host, "ok": True, "message": "ok"}
hosts = ["web01", "web02", "db01"]
results = []
with ThreadPoolExecutor(max_workers=3) as executor:
future_map = {executor.submit(check_host, host): host for host in hosts}
for future in as_completed(future_map):
host = future_map[future]
try:
results.append(future.result())
except Exception as exc:
results.append({"host": host, "ok": False, "message": str(exc)})
print(results)批量任务里错误要变成结果的一部分。这样脚本最后可以完整输出成功和失败,而不是中途炸掉。
四、HTTP 并发检查
python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests
def check_url(target, timeout):
started = time.time()
try:
response = requests.get(target["url"], timeout=timeout)
ok = response.status_code == 200
message = f"status_code={response.status_code}"
except requests.RequestException as exc:
ok = False
message = str(exc)
return {
"name": target["name"],
"url": target["url"],
"ok": ok,
"message": message,
"cost_ms": int((time.time() - started) * 1000),
}
def batch_check(targets, workers=5, timeout=5):
results = []
with ThreadPoolExecutor(max_workers=workers) as executor:
future_map = {
executor.submit(check_url, target, timeout): target
for target in targets
}
for future in as_completed(future_map):
results.append(future.result())
return resultsworkers 控制并发数。接口检查脚本如果直接开 200 个线程,可能还没查清问题,先把目标服务压出新问题。
五、限速
简单限速可以在提交任务时分批:
python
from concurrent.futures import ThreadPoolExecutor, as_completed
def chunks(items, size):
for index in range(0, len(items), size):
yield items[index:index + size]
def batch_run(items, batch_size=10):
all_results = []
for batch in chunks(items, batch_size):
with ThreadPoolExecutor(max_workers=batch_size) as executor:
futures = [executor.submit(check_item, item) for item in batch]
for future in as_completed(futures):
all_results.append(future.result())
return all_results上面示例里的 check_item 代表具体检查函数。分批适合对远端压力比较敏感的场景,比如调用 CMDB、堡垒机 API、云厂商接口。
六、超时控制
网络库自身要设置超时,线程池层面也可以设置等待超时。
python
from concurrent.futures import ThreadPoolExecutor, TimeoutError
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(check_slow_target)
try:
result = future.result(timeout=10)
except TimeoutError:
result = {"ok": False, "message": "task timeout"}线程池的超时不能强行杀死已经卡住的线程,所以底层 HTTP、SSH、数据库连接仍然要设置自己的超时。外层超时只能控制主线程等待多久。
七、多进程
多进程适合 CPU 密集型工作,比如压缩大量文件、处理很大的日志计算。标准库入口是 ProcessPoolExecutor。
python
from concurrent.futures import ProcessPoolExecutor
def count_lines(path):
count = 0
with open(path, "r", encoding="utf-8", errors="ignore") as file:
for _ in file:
count += 1
return path, count
paths = ["/var/log/messages", "/var/log/secure"]
with ProcessPoolExecutor(max_workers=2) as executor:
for path, count in executor.map(count_lines, paths):
print(path, count)Windows 和 Linux 的多进程启动方式不同。跨平台脚本里,多进程代码要放在 if __name__ == "__main__": 下面,避免子进程重复导入时再次创建进程。
八、asyncio 简单认识
asyncio 是异步并发模型。它适合大量网络等待任务,但需要使用支持 async 的库,比如 aiohttp。普通 requests 不是异步库。
python
import asyncio
async def check(name, delay):
await asyncio.sleep(delay)
return f"{name} ok"
async def main():
results = await asyncio.gather(
check("api", 1),
check("grafana", 1),
)
print(results)
asyncio.run(main())日常运维脚本里,多线程通常已经够用。异步适合目标数量非常多、并且依赖库支持 async 的场景。
九、完整巡检脚本
下面这个脚本并发检查一组 HTTP 地址,输出 JSON 结果,并用退出码表示是否全部成功。
python
#!/usr/bin/env python3
"""并发检查 HTTP 地址健康状态。"""
import argparse
import json
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import requests
def parse_args():
parser = argparse.ArgumentParser(description="concurrent http health check")
parser.add_argument("--targets", required=True, help="targets json file")
parser.add_argument("--workers", type=int, default=5, help="concurrent worker count")
parser.add_argument("--timeout", type=int, default=5, help="request timeout seconds")
return parser.parse_args()
def load_targets(path):
data = json.loads(Path(path).read_text(encoding="utf-8"))
for item in data:
if "name" not in item or "url" not in item:
raise ValueError(f"invalid target item: {item}")
return data
def check_target(target, timeout):
started = time.time()
try:
response = requests.get(target["url"], timeout=timeout)
ok = response.status_code == 200
message = f"status_code={response.status_code}"
except requests.RequestException as exc:
ok = False
message = str(exc)
return {
"name": target["name"],
"url": target["url"],
"ok": ok,
"message": message,
"cost_ms": int((time.time() - started) * 1000),
}
def run_checks(targets, workers, timeout):
results = []
with ThreadPoolExecutor(max_workers=workers) as executor:
future_map = {
executor.submit(check_target, target, timeout): target
for target in targets
}
for future in as_completed(future_map):
target = future_map[future]
try:
results.append(future.result())
except Exception as exc:
# 兜底异常也记录成结构化结果,避免批量任务中断后没有汇总
results.append(
{
"name": target["name"],
"url": target["url"],
"ok": False,
"message": str(exc),
"cost_ms": 0,
}
)
return sorted(results, key=lambda item: item["name"])
def main():
args = parse_args()
targets = load_targets(args.targets)
results = run_checks(targets, args.workers, args.timeout)
print(json.dumps(results, ensure_ascii=False, indent=2))
failed = [item for item in results if not item["ok"]]
if failed:
print(f"failed targets: {len(failed)}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())目标文件:
json
[
{"name": "api", "url": "https://example.com/health"},
{"name": "grafana", "url": "http://127.0.0.1:3000/api/health"}
]运行:
bash
uv run python concurrent_http_check.py --targets targets.json --workers 5 --timeout 3这类脚本可以接到 cron、CI、监控探针或自动化平台里。关键是输出结构稳定、失败有原因、退出码能被外部系统识别。
十、脚本落地检查
运维脚本交付前可以按这个清单看一遍:
| 项目 | 检查点 |
|---|---|
| 参数 | 主机、端口、阈值、配置文件路径是否可传入 |
| 超时 | HTTP、SSH、数据库连接是否设置超时 |
| 并发 | workers 是否可控,默认值是否保守 |
| 错误 | 单个目标失败是否会记录结果并继续 |
| 日志 | 输出里是否有目标、状态、失败原因 |
| 退出码 | 成功返回 0,失败返回非零 |
| 依赖 | pyproject.toml、锁定文件或安装说明是否完整 |
| 敏感信息 | 密码、Token 是否避免硬编码 |
脚本能跑只是第一层。真正放进日常运维里,还要能被定时任务、监控、CI 或其他人读懂和接手。