Files
webhook/webhook_server.py
T
2026-05-15 11:51:50 +08:00

264 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Webhook Receiver Server
接收 webhook事件,每次请求动态读取配置文件。
配置文件路径:~/.webhook/config.yaml
"""
import json
import os
import hmac
import hashlib
import sys
import subprocess
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import datetime
import yaml
CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.yaml")
LOG_DIR = os.path.expanduser("~/.webhook/logs")
os.makedirs(LOG_DIR, exist_ok=True)
# ============ 每次请求重新加载配置 ============
def load_config() -> dict:
"""动态加载配置文件,修改配置无需重启服务。"""
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def get_project_config(config: dict, project_name: str):
"""获取指定项目的配置,不存在或未启用则返回 None。"""
projects = config.get("projects", {})
proj = projects.get(project_name)
if not proj:
return None
if not proj.get("enabled", True):
return None
return proj
# ============ 日志 ============
def log_event(event_type: str, data: dict, status: str = "ok"):
"""结构化记录 webhook 事件到每日日志文件。"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_file = os.path.join(LOG_DIR, f"webhook_{datetime.date.today().isoformat()}.log")
entry = {
"timestamp": timestamp,
"event": event_type,
"status": status,
**data
}
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
def log_script_output(project: str, client_ip: str, user_agent: str, referer: str,
stdout: str, stderr: str, exit_code: int):
"""记录项目脚本执行输出到独立日志文件(如 xsinfo.log)。"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_file = os.path.join(LOG_DIR, f"{project}.log")
lines = [
f"[{timestamp}] {project} | ip={client_ip} ua={user_agent} referer={referer}",
f" exit={exit_code}",
]
if stdout:
for line in stdout.strip().split("\n"):
lines.append(f" stdout: {line}")
if stderr:
for line in stderr.strip().split("\n"):
lines.append(f" stderr: {line}")
with open(log_file, "a", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
# ============ HTTP Handler ============
class WebhookHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""抑制默认日志输出。"""
pass
def send_json(self, code: int, data: dict):
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode())
def do_GET(self):
self._handle_request()
def do_POST(self):
self._handle_request()
def _handle_request(self):
"""统一处理 GET/POST 请求,key 对了就执行脚本"""
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
if parsed.path == "/health":
self.send_json(200, {"status": "ok", "service": "webhook-receiver"})
return
access_key = params.get("access_key", [None])[0]
param_val = params.get("param", [None])[0]
config = load_config()
global_key = config.get("access_key", "")
show_log = config.get("show_log", True)
# 优先项目级 key,否则用全局 key
proj = get_project_config(config, param_val) if param_val else None
if param_val and proj and proj.get("api_key"):
expected_key = proj["api_key"]
else:
expected_key = global_key
if access_key != expected_key:
log_event("unauthorized", {"ip": self.client_address[0], "param": param_val}, "denied")
self.send_json(403, {"error": "invalid access_key"})
return
client_ip = self.client_address[0]
user_agent = self.headers.get("User-Agent", "N/A")
referer = self.headers.get("Referer", "N/A")
# 执行脚本
result, status_code = self._execute_script(show_log, proj, param_val, client_ip, user_agent, referer)
script_res = result.get("script", {})
if show_log:
log_event(
"script_executed",
{
"param": param_val,
"project": param_val or "unknown",
"ip": client_ip,
"ua": user_agent,
"referer": referer,
"executed": script_res.get("executed", False),
"status": script_res.get("status", "unknown"),
"exit_code": script_res.get("exit_code"),
},
status=script_res.get("status", "unknown"),
)
self.send_json(status_code, {"ok": status_code == 200, "param": param_val, **result})
def _execute_script(self, show_log: bool, proj: dict, param: str, client_ip: str, user_agent: str, referer: str) -> tuple[dict, int]:
"""执行脚本,key 对了就执行,不管什么请求"""
print(f"\n{'='*60}")
print(f"🚀 执行脚本 | param={param}")
print(f"{'='*60}\n")
script_result = {
"executed": False,
"status": "skipped",
"exit_code": None,
"stdout": None,
"stderr": None,
"error": None,
}
if proj and proj.get("script"):
script_path = os.path.expanduser(proj["script"])
script_cwd = os.path.dirname(os.path.abspath(__file__))
if os.path.exists(script_path):
if not show_log:
# 秒回复模式:不等待执行,不记录日志,直接返回成功
self._quick_run(script_path, script_cwd)
return {"script": {"executed": False, "status": "background", "exit_code": None, "stdout": None, "stderr": None, "error": None}}, 200
os.chmod(script_path, 0o755)
try:
result = subprocess.run(
[script_path],
cwd=script_cwd,
capture_output=True,
text=True,
timeout=300,
)
script_result["executed"] = True
script_result["exit_code"] = result.returncode
script_result["stdout"] = [l for l in result.stdout.strip().split("\n") if l] if result.stdout.strip() else []
script_result["stderr"] = [l for l in result.stderr.strip().split("\n") if l] if result.stderr.strip() else []
script_result["status"] = "success" if result.returncode == 0 else "failed"
print(f"📜 {proj['script']}: exit={result.returncode}")
if result.stdout:
print(result.stdout)
if result.stderr:
print(f"STDERR: {result.stderr}", file=sys.stderr)
except subprocess.TimeoutExpired:
script_result["executed"] = True
script_result["status"] = "timeout"
script_result["error"] = "script execution timed out after 300 seconds"
print(f"⚠️ 执行超时(300s")
except Exception as e:
script_result["executed"] = True
script_result["status"] = "error"
script_result["error"] = str(e)
print(f"⚠️ 执行失败 - {e}")
else:
# 文件不存在时即使 show_log=False 也返回错误
script_result["executed"] = True
script_result["status"] = "not_found"
script_result["error"] = f"script not found: {script_path}"
print(f"⚠️ 脚本不存在: {script_path}")
else:
print(f"⚠️ 项目 '{param}' 未配置 script")
# 脚本失败返回 500
status_code = 500 if script_result["executed"] and script_result["status"] in ("failed", "error", "timeout", "not_found") else 200
log_script_output(
project=param or "unknown",
client_ip=client_ip,
user_agent=user_agent,
referer=referer,
stdout=result.stdout if script_result["executed"] else "",
stderr=result.stderr if script_result["executed"] else "",
exit_code=script_result["exit_code"] or -1,
)
return {"script": script_result}, status_code
def _quick_run(self, script_path: str, script_cwd: str):
"""后台静默执行脚本,不等待,不记录日志。"""
import threading
def _run():
os.chmod(script_path, 0o755)
subprocess.run([script_path], cwd=script_cwd, capture_output=True, text=True)
threading.Thread(target=_run, daemon=True).start()
def main():
# 启动前检查配置文件
if not os.path.exists(CONFIG_PATH):
print(f"❌ 配置文件不存在: {CONFIG_PATH}")
sys.exit(1)
server = HTTPServer(("0.0.0.0", 8099), WebhookHandler)
print(f"""
╔══════════════════════════════════════════════╗
║ 🌐 Webhook Receiver Started ║
╚══════════════════════════════════════════════╝
监听: http://0.0.0.0:8099
健康检查: http://<host>:8099/health
手动测试: http://<host>:8099/test
配置文件: {CONFIG_PATH}
配置变更实时生效(无需重启)
""")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n👋 Server stopped")
sys.exit(0)
if __name__ == "__main__":
main()