Skip to content

网络与批量管理

Python 做运维批量任务时,经常会调用 HTTP API、SSH 登录远端主机、连接 MySQL/Redis 查询状态,再把结果整理成报告。网络脚本比本地脚本更容易出现超时、认证失败、半成功半失败,所以超时、重试、错误记录和结果汇总要写清楚。

批量管理的重点不是“循环一批主机”这么简单,而是每个目标都要有独立结果:成功、失败、失败原因、耗时、后续处理建议。这样脚本跑完以后能知道哪些机器处理过,哪些机器还需要单独看。

一、HTTP 请求

requests 是最常用的 HTTP 客户端:

bash
uv add requests

GET 请求:

python
import requests

response = requests.get("https://example.com/health", timeout=5)

print(response.status_code)
print(response.text)

带超时很重要。没有超时的 HTTP 请求可能一直卡住,定时任务会越积越多。

检查状态码:

python
import requests


def check_url(url):
    try:
        response = requests.get(url, timeout=5)
    except requests.RequestException as exc:
        return False, f"request failed: {exc}"

    if response.status_code != 200:
        return False, f"unexpected status_code={response.status_code}"

    return True, "ok"


ok, message = check_url("https://example.com/health")
print(ok, message)

POST JSON:

python
import requests

payload = {
    "hostname": "web01",
    "status": "ok",
}

response = requests.post(
    "https://example.com/api/report",
    json=payload,
    timeout=5,
)

print(response.status_code)

Token 认证:

python
import os
import requests

token = os.environ["API_TOKEN"]

headers = {
    "Authorization": f"Bearer {token}",
}

response = requests.get("https://example.com/api/hosts", headers=headers, timeout=5)
print(response.json())

response.json() 会把 JSON 响应解析成 Python 字典或列表。如果服务端返回的不是 JSON,会抛异常,脚本里要按接口稳定性决定是否捕获。

二、HTTP 重试

网络抖动、服务短暂 502、连接被重置都可能恢复。重试要控制次数和间隔,避免把故障接口打得更重。

python
import time
import requests


def get_with_retry(url, retries=3, timeout=5):
    last_error = None

    for attempt in range(1, retries + 1):
        try:
            response = requests.get(url, timeout=timeout)
            if response.status_code == 200:
                return response
            last_error = f"status_code={response.status_code}"
        except requests.RequestException as exc:
            last_error = str(exc)

        time.sleep(attempt)  # 简单退避:第 1 次等 1 秒,第 2 次等 2 秒

    raise RuntimeError(f"request failed after {retries} retries: {last_error}")

重试适合网络抖动和临时错误,不适合认证失败、参数错误这类确定性错误。401、403、400 一般要直接记录失败原因。

三、SSH 批量执行

paramiko 可以通过 SSH 执行远端命令:

bash
uv add paramiko

单机执行:

python
import paramiko


def run_ssh_command(host, username, password, command):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        client.connect(hostname=host, username=username, password=password, timeout=5)
        stdin, stdout, stderr = client.exec_command(command, timeout=10)
        exit_code = stdout.channel.recv_exit_status()
        return exit_code, stdout.read().decode(), stderr.read().decode()
    finally:
        client.close()


code, out, err = run_ssh_command("192.168.10.11", "root", "123", "hostname")
print(code, out, err)

AutoAddPolicy() 会自动信任新主机指纹,适合受控测试环境。生产环境更稳的方式是维护 known_hosts,避免连到被替换的主机。

批量执行:

python
hosts = ["192.168.10.11", "192.168.10.12", "192.168.10.13"]
results = []

for host in hosts:
    try:
        code, out, err = run_ssh_command(host, "root", "123", "uptime")
        results.append({"host": host, "ok": code == 0, "stdout": out.strip(), "stderr": err.strip()})
    except Exception as exc:
        results.append({"host": host, "ok": False, "error": str(exc)})

for result in results:
    print(result)

批量脚本里不要因为一台机器失败就让整个任务中断。更好的做法是记录这台机器失败,继续处理其他机器,最后汇总失败列表。

四、MySQL 查询

连接 MySQL 可以用 pymysql

bash
uv add pymysql

查询版本:

python
import pymysql

connection = pymysql.connect(
    host="127.0.0.1",
    port=3306,
    user="root",
    password="password",
    database="mysql",
    connect_timeout=5,
    read_timeout=10,
)

try:
    with connection.cursor() as cursor:
        cursor.execute("SELECT VERSION()")
        row = cursor.fetchone()
        print(row[0])
finally:
    connection.close()

查询结果转字典:

python
import pymysql

connection = pymysql.connect(
    host="127.0.0.1",
    user="root",
    password="password",
    cursorclass=pymysql.cursors.DictCursor,
)

try:
    with connection.cursor() as cursor:
        cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_connected'")
        row = cursor.fetchone()
        print(row["Variable_name"], row["Value"])
finally:
    connection.close()

运维脚本里执行 SQL 要区分只读查询和变更操作。批量变更前要有清晰的输入、日志和回滚方式;只读巡检脚本也要限制超时,避免慢查询影响库本身。

五、Redis 查询

连接 Redis:

bash
uv add redis

读取 INFO

python
import redis

client = redis.Redis(
    host="127.0.0.1",
    port=6379,
    socket_connect_timeout=3,
    socket_timeout=5,
    decode_responses=True,
)

info = client.info("replication")
print(info["role"])

检查主从延迟可以结合 Redis 主从复制里的 offset 概念,基础见 主从复制

python
def get_replication_offset(host, port=6379):
    client = redis.Redis(host=host, port=port, decode_responses=True)
    info = client.info("replication")
    return {
        "host": host,
        "role": info.get("role"),
        "master_repl_offset": info.get("master_repl_offset"),
        "slave_repl_offset": info.get("slave_repl_offset"),
        "master_link_status": info.get("master_link_status"),
    }

六、批量结果结构

批量任务建议统一结果结构:

python
result = {
    "target": "192.168.10.11",
    "ok": True,
    "message": "uptime ok",
    "data": {"load1": 0.12},
}

统一结构的好处是后面能直接输出 JSON、写 CSV、发接口或生成 Markdown 报告。

python
import json

results = [
    {"target": "web01", "ok": True, "message": "ok"},
    {"target": "db01", "ok": False, "message": "ssh timeout"},
]

print(json.dumps(results, ensure_ascii=False, indent=2))

统计失败项:

python
failed = [item for item in results if not item["ok"]]

if failed:
    print(f"failed count={len(failed)}")
    for item in failed:
        print(f"- {item['target']}: {item['message']}")

七、读取目标清单

JSON 清单:

json
[
  {"name": "web01", "host": "192.168.10.11", "role": "web"},
  {"name": "db01", "host": "192.168.10.21", "role": "mysql"}
]

读取:

python
import json
from pathlib import Path


def load_targets(path):
    data = json.loads(Path(path).read_text(encoding="utf-8"))

    targets = []
    for item in data:
        if "host" not in item:
            raise ValueError(f"missing host field: {item}")
        targets.append(item)

    return targets

清单校验要尽早做。批量跑到一半才发现某一行缺字段,脚本结果会很难看。

八、批量 HTTP 健康检查脚本

python
#!/usr/bin/env python3
"""批量检查 HTTP 健康接口。"""

import argparse
import json
import sys
import time
from pathlib import Path

import requests


def parse_args():
    parser = argparse.ArgumentParser(description="batch check http health")
    parser.add_argument("--targets", required=True, help="targets json file")
    parser.add_argument("--timeout", type=int, default=5, help="request timeout seconds")
    return parser.parse_args()


def load_targets(path):
    return json.loads(Path(path).read_text(encoding="utf-8"))


def check_target(target, timeout):
    started = time.time()
    url = target["url"]

    try:
        response = requests.get(url, timeout=timeout)
    except requests.RequestException as exc:
        return {
            "name": target["name"],
            "url": url,
            "ok": False,
            "message": str(exc),
            "cost_ms": int((time.time() - started) * 1000),
        }

    return {
        "name": target["name"],
        "url": url,
        "ok": response.status_code == 200,
        "message": f"status_code={response.status_code}",
        "cost_ms": int((time.time() - started) * 1000),
    }


def main():
    args = parse_args()
    targets = load_targets(args.targets)
    results = [check_target(target, args.timeout) for target in targets]

    print(json.dumps(results, ensure_ascii=False, indent=2))

    # 任意目标失败,脚本整体返回失败,便于 cron/CI/监控识别
    if any(not item["ok"] for item in results):
        return 1

    return 0


if __name__ == "__main__":
    sys.exit(main())

目标文件:

json
[
  {"name": "api", "url": "https://example.com/health"},
  {"name": "grafana", "url": "http://127.0.0.1:3000/api/health"}
]

运行:

bash
uv run python batch_http_check.py --targets targets.json

这个脚本没有并发,目标数量很少时足够清楚。目标数量变多以后,再考虑线程池或异步。