Skip to content

并发与实战

批量巡检、接口检查、SSH 执行和日志处理经常会遇到耗时问题。Python 并发主要解决等待时间:等待网络、等待 SSH、等待磁盘 IO。CPU 密集型任务不是运维脚本的常见重点,真要处理大量计算,通常会换更合适的工具或拆到专门服务里。

并发脚本的难点不只是“跑得更快”,还包括限速、超时、错误收集、输出顺序和结果汇总。批量操作一旦没有边界,容易把远端服务、堡垒机、数据库或接口打满。

一、并发模型

常见选择:

方式适合场景说明
多线程HTTP、SSH、文件 IO 等等待型任务运维脚本最常用
多进程CPU 计算、压缩、解析大量数据进程开销更大
asyncio大量 HTTP/TCP 异步任务写法要求更高,库也要支持 async

多线程适合大多数批量巡检。一个线程等待远端响应时,另一个线程可以继续跑。线程数不是越大越好,几十台机器可以从 5 到 20 之间试起,看远端和本机压力。

二、线程池

concurrent.futures.ThreadPoolExecutor 是标准库里的线程池。

python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def check_host(host):
    # 模拟网络请求耗时
    time.sleep(1)
    return {"host": host, "ok": True}


hosts = ["web01", "web02", "db01"]

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(check_host, host) for host in hosts]

    for future in as_completed(futures):
        result = future.result()
        print(result)

as_completed() 谁先完成就先返回谁,输出顺序可能和输入顺序不同。批量报告需要保持输入顺序时,可以在结果里带上序号,最后排序。

三、错误收集

线程里的异常不会自动打印到主线程,调用 future.result() 时才会重新抛出。

python
from concurrent.futures import ThreadPoolExecutor, as_completed


def check_host(host):
    if host == "db01":
        raise RuntimeError("ssh timeout")
    return {"host": host, "ok": True, "message": "ok"}


hosts = ["web01", "web02", "db01"]
results = []

with ThreadPoolExecutor(max_workers=3) as executor:
    future_map = {executor.submit(check_host, host): host for host in hosts}

    for future in as_completed(future_map):
        host = future_map[future]
        try:
            results.append(future.result())
        except Exception as exc:
            results.append({"host": host, "ok": False, "message": str(exc)})

print(results)

批量任务里错误要变成结果的一部分。这样脚本最后可以完整输出成功和失败,而不是中途炸掉。

四、HTTP 并发检查

python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

import requests


def check_url(target, timeout):
    started = time.time()

    try:
        response = requests.get(target["url"], timeout=timeout)
        ok = response.status_code == 200
        message = f"status_code={response.status_code}"
    except requests.RequestException as exc:
        ok = False
        message = str(exc)

    return {
        "name": target["name"],
        "url": target["url"],
        "ok": ok,
        "message": message,
        "cost_ms": int((time.time() - started) * 1000),
    }


def batch_check(targets, workers=5, timeout=5):
    results = []

    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_map = {
            executor.submit(check_url, target, timeout): target
            for target in targets
        }

        for future in as_completed(future_map):
            results.append(future.result())

    return results

workers 控制并发数。接口检查脚本如果直接开 200 个线程,可能还没查清问题,先把目标服务压出新问题。

五、限速

简单限速可以在提交任务时分批:

python
from concurrent.futures import ThreadPoolExecutor, as_completed


def chunks(items, size):
    for index in range(0, len(items), size):
        yield items[index:index + size]


def batch_run(items, batch_size=10):
    all_results = []

    for batch in chunks(items, batch_size):
        with ThreadPoolExecutor(max_workers=batch_size) as executor:
            futures = [executor.submit(check_item, item) for item in batch]
            for future in as_completed(futures):
                all_results.append(future.result())

    return all_results

上面示例里的 check_item 代表具体检查函数。分批适合对远端压力比较敏感的场景,比如调用 CMDB、堡垒机 API、云厂商接口。

六、超时控制

网络库自身要设置超时,线程池层面也可以设置等待超时。

python
from concurrent.futures import ThreadPoolExecutor, TimeoutError


with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(check_slow_target)

    try:
        result = future.result(timeout=10)
    except TimeoutError:
        result = {"ok": False, "message": "task timeout"}

线程池的超时不能强行杀死已经卡住的线程,所以底层 HTTP、SSH、数据库连接仍然要设置自己的超时。外层超时只能控制主线程等待多久。

七、多进程

多进程适合 CPU 密集型工作,比如压缩大量文件、处理很大的日志计算。标准库入口是 ProcessPoolExecutor

python
from concurrent.futures import ProcessPoolExecutor


def count_lines(path):
    count = 0
    with open(path, "r", encoding="utf-8", errors="ignore") as file:
        for _ in file:
            count += 1
    return path, count


paths = ["/var/log/messages", "/var/log/secure"]

with ProcessPoolExecutor(max_workers=2) as executor:
    for path, count in executor.map(count_lines, paths):
        print(path, count)

Windows 和 Linux 的多进程启动方式不同。跨平台脚本里,多进程代码要放在 if __name__ == "__main__": 下面,避免子进程重复导入时再次创建进程。

八、asyncio 简单认识

asyncio 是异步并发模型。它适合大量网络等待任务,但需要使用支持 async 的库,比如 aiohttp。普通 requests 不是异步库。

python
import asyncio


async def check(name, delay):
    await asyncio.sleep(delay)
    return f"{name} ok"


async def main():
    results = await asyncio.gather(
        check("api", 1),
        check("grafana", 1),
    )
    print(results)


asyncio.run(main())

日常运维脚本里,多线程通常已经够用。异步适合目标数量非常多、并且依赖库支持 async 的场景。

九、完整巡检脚本

下面这个脚本并发检查一组 HTTP 地址,输出 JSON 结果,并用退出码表示是否全部成功。

python
#!/usr/bin/env python3
"""并发检查 HTTP 地址健康状态。"""

import argparse
import json
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import requests


def parse_args():
    parser = argparse.ArgumentParser(description="concurrent http health check")
    parser.add_argument("--targets", required=True, help="targets json file")
    parser.add_argument("--workers", type=int, default=5, help="concurrent worker count")
    parser.add_argument("--timeout", type=int, default=5, help="request timeout seconds")
    return parser.parse_args()


def load_targets(path):
    data = json.loads(Path(path).read_text(encoding="utf-8"))

    for item in data:
        if "name" not in item or "url" not in item:
            raise ValueError(f"invalid target item: {item}")

    return data


def check_target(target, timeout):
    started = time.time()

    try:
        response = requests.get(target["url"], timeout=timeout)
        ok = response.status_code == 200
        message = f"status_code={response.status_code}"
    except requests.RequestException as exc:
        ok = False
        message = str(exc)

    return {
        "name": target["name"],
        "url": target["url"],
        "ok": ok,
        "message": message,
        "cost_ms": int((time.time() - started) * 1000),
    }


def run_checks(targets, workers, timeout):
    results = []

    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_map = {
            executor.submit(check_target, target, timeout): target
            for target in targets
        }

        for future in as_completed(future_map):
            target = future_map[future]
            try:
                results.append(future.result())
            except Exception as exc:
                # 兜底异常也记录成结构化结果,避免批量任务中断后没有汇总
                results.append(
                    {
                        "name": target["name"],
                        "url": target["url"],
                        "ok": False,
                        "message": str(exc),
                        "cost_ms": 0,
                    }
                )

    return sorted(results, key=lambda item: item["name"])


def main():
    args = parse_args()
    targets = load_targets(args.targets)
    results = run_checks(targets, args.workers, args.timeout)

    print(json.dumps(results, ensure_ascii=False, indent=2))

    failed = [item for item in results if not item["ok"]]
    if failed:
        print(f"failed targets: {len(failed)}", file=sys.stderr)
        return 1

    return 0


if __name__ == "__main__":
    sys.exit(main())

目标文件:

json
[
  {"name": "api", "url": "https://example.com/health"},
  {"name": "grafana", "url": "http://127.0.0.1:3000/api/health"}
]

运行:

bash
uv run python concurrent_http_check.py --targets targets.json --workers 5 --timeout 3

这类脚本可以接到 cron、CI、监控探针或自动化平台里。关键是输出结构稳定、失败有原因、退出码能被外部系统识别。

十、脚本落地检查

运维脚本交付前可以按这个清单看一遍:

项目检查点
参数主机、端口、阈值、配置文件路径是否可传入
超时HTTP、SSH、数据库连接是否设置超时
并发workers 是否可控,默认值是否保守
错误单个目标失败是否会记录结果并继续
日志输出里是否有目标、状态、失败原因
退出码成功返回 0,失败返回非零
依赖pyproject.toml、锁定文件或安装说明是否完整
敏感信息密码、Token 是否避免硬编码

脚本能跑只是第一层。真正放进日常运维里,还要能被定时任务、监控、CI 或其他人读懂和接手。