#!/usr/bin/env python3 """ Webhook Receiver Server 接收 webhook事件,每次请求动态读取配置文件。 配置文件路径:~/.webhook/config.yaml """ import json import os import hmac import hashlib import sys import subprocess from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import datetime import yaml CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.yaml") LOG_DIR = os.path.expanduser("~/.webhook/logs") os.makedirs(LOG_DIR, exist_ok=True) # ============ 每次请求重新加载配置 ============ def load_config() -> dict: """动态加载配置文件,修改配置无需重启服务。""" with open(CONFIG_PATH, "r", encoding="utf-8") as f: return yaml.safe_load(f) def get_project_config(config: dict, project_name: str): """获取指定项目的配置,不存在或未启用则返回 None。""" projects = config.get("projects", {}) proj = projects.get(project_name) if not proj: return None if not proj.get("enabled", True): return None return proj # ============ 日志 ============ def log_event(event_type: str, data: dict, status: str = "ok"): """结构化记录 webhook 事件到每日日志文件。""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_file = os.path.join(LOG_DIR, f"webhook_{datetime.date.today().isoformat()}.log") entry = { "timestamp": timestamp, "event": event_type, "status": status, **data } with open(log_file, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") def log_script_output(project: str, client_ip: str, user_agent: str, referer: str, stdout: str, stderr: str, exit_code: int): """记录项目脚本执行输出到独立日志文件(如 xsinfo.log)。""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_file = os.path.join(LOG_DIR, f"{project}.log") lines = [ f"[{timestamp}] {project} | ip={client_ip} ua={user_agent} referer={referer}", f" exit={exit_code}", ] if stdout: for line in stdout.strip().split("\n"): lines.append(f" stdout: {line}") if stderr: for line in stderr.strip().split("\n"): lines.append(f" stderr: {line}") with open(log_file, "a", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") # ============ HTTP Handler ============ class WebhookHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): """抑制默认日志输出。""" pass def send_json(self, code: int, data: dict): self.send_response(code) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode()) def do_GET(self): self._handle_request() def do_POST(self): self._handle_request() def _handle_request(self): """统一处理 GET/POST 请求,key 对了就执行脚本""" parsed = urlparse(self.path) params = parse_qs(parsed.query) if parsed.path == "/health": self.send_json(200, {"status": "ok", "service": "webhook-receiver"}) return access_key = params.get("access_key", [None])[0] param_val = params.get("param", [None])[0] config = load_config() global_key = config.get("access_key", "") show_log = config.get("show_log", True) # 优先项目级 key,否则用全局 key proj = get_project_config(config, param_val) if param_val else None if param_val and proj and proj.get("api_key"): expected_key = proj["api_key"] else: expected_key = global_key if access_key != expected_key: log_event("unauthorized", {"ip": self.client_address[0], "param": param_val}, "denied") self.send_json(403, {"error": "invalid access_key"}) return client_ip = self.client_address[0] user_agent = self.headers.get("User-Agent", "N/A") referer = self.headers.get("Referer", "N/A") # 执行脚本 result, status_code = self._execute_script(show_log, proj, param_val, client_ip, user_agent, referer) script_res = result.get("script", {}) if show_log: log_event( "script_executed", { "param": param_val, "project": param_val or "unknown", "ip": client_ip, "ua": user_agent, "referer": referer, "executed": script_res.get("executed", False), "status": script_res.get("status", "unknown"), "exit_code": script_res.get("exit_code"), }, status=script_res.get("status", "unknown"), ) self.send_json(status_code, {"ok": status_code == 200, "param": param_val, **result}) def _execute_script(self, show_log: bool, proj: dict, param: str, client_ip: str, user_agent: str, referer: str) -> tuple[dict, int]: """执行脚本,key 对了就执行,不管什么请求""" print(f"\n{'='*60}") print(f"🚀 执行脚本 | param={param}") print(f"{'='*60}\n") script_result = { "executed": False, "status": "skipped", "exit_code": None, "stdout": None, "stderr": None, "error": None, } if proj and proj.get("script"): script_path = os.path.expanduser(proj["script"]) script_cwd = os.path.dirname(os.path.abspath(__file__)) if os.path.exists(script_path): if not show_log: # 秒回复模式:不等待执行,不记录日志,直接返回成功 self._quick_run(script_path, script_cwd) return {"script": {"executed": False, "status": "background", "exit_code": None, "stdout": None, "stderr": None, "error": None}}, 200 os.chmod(script_path, 0o755) try: result = subprocess.run( [script_path], cwd=script_cwd, capture_output=True, text=True, timeout=300, ) script_result["executed"] = True script_result["exit_code"] = result.returncode script_result["stdout"] = [l for l in result.stdout.strip().split("\n") if l] if result.stdout.strip() else [] script_result["stderr"] = [l for l in result.stderr.strip().split("\n") if l] if result.stderr.strip() else [] script_result["status"] = "success" if result.returncode == 0 else "failed" print(f"📜 {proj['script']}: exit={result.returncode}") if result.stdout: print(result.stdout) if result.stderr: print(f"STDERR: {result.stderr}", file=sys.stderr) except subprocess.TimeoutExpired: script_result["executed"] = True script_result["status"] = "timeout" script_result["error"] = "script execution timed out after 300 seconds" print(f"⚠️ 执行超时(300s)") except Exception as e: script_result["executed"] = True script_result["status"] = "error" script_result["error"] = str(e) print(f"⚠️ 执行失败 - {e}") else: # 文件不存在时即使 show_log=False 也返回错误 script_result["executed"] = True script_result["status"] = "not_found" script_result["error"] = f"script not found: {script_path}" print(f"⚠️ 脚本不存在: {script_path}") else: print(f"⚠️ 项目 '{param}' 未配置 script") # 脚本失败返回 500 status_code = 500 if script_result["executed"] and script_result["status"] in ("failed", "error", "timeout", "not_found") else 200 log_script_output( project=param or "unknown", client_ip=client_ip, user_agent=user_agent, referer=referer, stdout=result.stdout if script_result["executed"] else "", stderr=result.stderr if script_result["executed"] else "", exit_code=script_result["exit_code"] or -1, ) return {"script": script_result}, status_code def _quick_run(self, script_path: str, script_cwd: str): """后台静默执行脚本,不等待,不记录日志。""" import threading def _run(): os.chmod(script_path, 0o755) subprocess.run([script_path], cwd=script_cwd, capture_output=True, text=True) threading.Thread(target=_run, daemon=True).start() def main(): # 启动前检查配置文件 if not os.path.exists(CONFIG_PATH): print(f"❌ 配置文件不存在: {CONFIG_PATH}") sys.exit(1) server = HTTPServer(("0.0.0.0", 8099), WebhookHandler) print(f""" ╔══════════════════════════════════════════════╗ ║ 🌐 Webhook Receiver Started ║ ╚══════════════════════════════════════════════╝ 监听: http://0.0.0.0:8099 健康检查: http://:8099/health 手动测试: http://:8099/test 配置文件: {CONFIG_PATH} 配置变更实时生效(无需重启) """) try: server.serve_forever() except KeyboardInterrupt: print("\n👋 Server stopped") sys.exit(0) if __name__ == "__main__": main()