Appearance
网络与批量管理
Python 做运维批量任务时,经常会调用 HTTP API、SSH 登录远端主机、连接 MySQL/Redis 查询状态,再把结果整理成报告。网络脚本比本地脚本更容易出现超时、认证失败、半成功半失败,所以超时、重试、错误记录和结果汇总要写清楚。
批量管理的重点不是“循环一批主机”这么简单,而是每个目标都要有独立结果:成功、失败、失败原因、耗时、后续处理建议。这样脚本跑完以后能知道哪些机器处理过,哪些机器还需要单独看。
一、HTTP 请求
requests 是最常用的 HTTP 客户端:
bash
uv add requestsGET 请求:
python
import requests
response = requests.get("https://example.com/health", timeout=5)
print(response.status_code)
print(response.text)带超时很重要。没有超时的 HTTP 请求可能一直卡住,定时任务会越积越多。
检查状态码:
python
import requests
def check_url(url):
try:
response = requests.get(url, timeout=5)
except requests.RequestException as exc:
return False, f"request failed: {exc}"
if response.status_code != 200:
return False, f"unexpected status_code={response.status_code}"
return True, "ok"
ok, message = check_url("https://example.com/health")
print(ok, message)POST JSON:
python
import requests
payload = {
"hostname": "web01",
"status": "ok",
}
response = requests.post(
"https://example.com/api/report",
json=payload,
timeout=5,
)
print(response.status_code)Token 认证:
python
import os
import requests
token = os.environ["API_TOKEN"]
headers = {
"Authorization": f"Bearer {token}",
}
response = requests.get("https://example.com/api/hosts", headers=headers, timeout=5)
print(response.json())response.json() 会把 JSON 响应解析成 Python 字典或列表。如果服务端返回的不是 JSON,会抛异常,脚本里要按接口稳定性决定是否捕获。
二、HTTP 重试
网络抖动、服务短暂 502、连接被重置都可能恢复。重试要控制次数和间隔,避免把故障接口打得更重。
python
import time
import requests
def get_with_retry(url, retries=3, timeout=5):
last_error = None
for attempt in range(1, retries + 1):
try:
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
return response
last_error = f"status_code={response.status_code}"
except requests.RequestException as exc:
last_error = str(exc)
time.sleep(attempt) # 简单退避:第 1 次等 1 秒,第 2 次等 2 秒
raise RuntimeError(f"request failed after {retries} retries: {last_error}")重试适合网络抖动和临时错误,不适合认证失败、参数错误这类确定性错误。401、403、400 一般要直接记录失败原因。
三、SSH 批量执行
paramiko 可以通过 SSH 执行远端命令:
bash
uv add paramiko单机执行:
python
import paramiko
def run_ssh_command(host, username, password, command):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=host, username=username, password=password, timeout=5)
stdin, stdout, stderr = client.exec_command(command, timeout=10)
exit_code = stdout.channel.recv_exit_status()
return exit_code, stdout.read().decode(), stderr.read().decode()
finally:
client.close()
code, out, err = run_ssh_command("192.168.10.11", "root", "123", "hostname")
print(code, out, err)AutoAddPolicy() 会自动信任新主机指纹,适合受控测试环境。生产环境更稳的方式是维护 known_hosts,避免连到被替换的主机。
批量执行:
python
hosts = ["192.168.10.11", "192.168.10.12", "192.168.10.13"]
results = []
for host in hosts:
try:
code, out, err = run_ssh_command(host, "root", "123", "uptime")
results.append({"host": host, "ok": code == 0, "stdout": out.strip(), "stderr": err.strip()})
except Exception as exc:
results.append({"host": host, "ok": False, "error": str(exc)})
for result in results:
print(result)批量脚本里不要因为一台机器失败就让整个任务中断。更好的做法是记录这台机器失败,继续处理其他机器,最后汇总失败列表。
四、MySQL 查询
连接 MySQL 可以用 pymysql:
bash
uv add pymysql查询版本:
python
import pymysql
connection = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="password",
database="mysql",
connect_timeout=5,
read_timeout=10,
)
try:
with connection.cursor() as cursor:
cursor.execute("SELECT VERSION()")
row = cursor.fetchone()
print(row[0])
finally:
connection.close()查询结果转字典:
python
import pymysql
connection = pymysql.connect(
host="127.0.0.1",
user="root",
password="password",
cursorclass=pymysql.cursors.DictCursor,
)
try:
with connection.cursor() as cursor:
cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_connected'")
row = cursor.fetchone()
print(row["Variable_name"], row["Value"])
finally:
connection.close()运维脚本里执行 SQL 要区分只读查询和变更操作。批量变更前要有清晰的输入、日志和回滚方式;只读巡检脚本也要限制超时,避免慢查询影响库本身。
五、Redis 查询
连接 Redis:
bash
uv add redis读取 INFO:
python
import redis
client = redis.Redis(
host="127.0.0.1",
port=6379,
socket_connect_timeout=3,
socket_timeout=5,
decode_responses=True,
)
info = client.info("replication")
print(info["role"])检查主从延迟可以结合 Redis 主从复制里的 offset 概念,基础见 主从复制。
python
def get_replication_offset(host, port=6379):
client = redis.Redis(host=host, port=port, decode_responses=True)
info = client.info("replication")
return {
"host": host,
"role": info.get("role"),
"master_repl_offset": info.get("master_repl_offset"),
"slave_repl_offset": info.get("slave_repl_offset"),
"master_link_status": info.get("master_link_status"),
}六、批量结果结构
批量任务建议统一结果结构:
python
result = {
"target": "192.168.10.11",
"ok": True,
"message": "uptime ok",
"data": {"load1": 0.12},
}统一结构的好处是后面能直接输出 JSON、写 CSV、发接口或生成 Markdown 报告。
python
import json
results = [
{"target": "web01", "ok": True, "message": "ok"},
{"target": "db01", "ok": False, "message": "ssh timeout"},
]
print(json.dumps(results, ensure_ascii=False, indent=2))统计失败项:
python
failed = [item for item in results if not item["ok"]]
if failed:
print(f"failed count={len(failed)}")
for item in failed:
print(f"- {item['target']}: {item['message']}")七、读取目标清单
JSON 清单:
json
[
{"name": "web01", "host": "192.168.10.11", "role": "web"},
{"name": "db01", "host": "192.168.10.21", "role": "mysql"}
]读取:
python
import json
from pathlib import Path
def load_targets(path):
data = json.loads(Path(path).read_text(encoding="utf-8"))
targets = []
for item in data:
if "host" not in item:
raise ValueError(f"missing host field: {item}")
targets.append(item)
return targets清单校验要尽早做。批量跑到一半才发现某一行缺字段,脚本结果会很难看。
八、批量 HTTP 健康检查脚本
python
#!/usr/bin/env python3
"""批量检查 HTTP 健康接口。"""
import argparse
import json
import sys
import time
from pathlib import Path
import requests
def parse_args():
parser = argparse.ArgumentParser(description="batch check http health")
parser.add_argument("--targets", required=True, help="targets json file")
parser.add_argument("--timeout", type=int, default=5, help="request timeout seconds")
return parser.parse_args()
def load_targets(path):
return json.loads(Path(path).read_text(encoding="utf-8"))
def check_target(target, timeout):
started = time.time()
url = target["url"]
try:
response = requests.get(url, timeout=timeout)
except requests.RequestException as exc:
return {
"name": target["name"],
"url": url,
"ok": False,
"message": str(exc),
"cost_ms": int((time.time() - started) * 1000),
}
return {
"name": target["name"],
"url": url,
"ok": response.status_code == 200,
"message": f"status_code={response.status_code}",
"cost_ms": int((time.time() - started) * 1000),
}
def main():
args = parse_args()
targets = load_targets(args.targets)
results = [check_target(target, args.timeout) for target in targets]
print(json.dumps(results, ensure_ascii=False, indent=2))
# 任意目标失败,脚本整体返回失败,便于 cron/CI/监控识别
if any(not item["ok"] for item in results):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())目标文件:
json
[
{"name": "api", "url": "https://example.com/health"},
{"name": "grafana", "url": "http://127.0.0.1:3000/api/health"}
]运行:
bash
uv run python batch_http_check.py --targets targets.json这个脚本没有并发,目标数量很少时足够清楚。目标数量变多以后,再考虑线程池或异步。