This commit is contained in:
2026-05-15 11:06:05 +08:00
commit 304d8dbea3
5 changed files with 290 additions and 0 deletions
+9
View File
@@ -0,0 +1,9 @@
# 不提交
config.yaml
logs/
*.log
__pycache__/
*.pyc
.idea
.vscode
.deepseek
+18
View File
@@ -0,0 +1,18 @@
# Webhook 配置文件
# 每次验证都会读取此文件
# 如果没有设置项目级apikey 就用全局 API Key
access_key: "YOUR_ACCESS_KEY_HERE"
# 项目配置
projects:
xsinfo:
api_key: "YOUR_ACCESS_KEY_HERE"
script: "xsinfo.sh"
enabled: true
# 添加更多项目:
# myblog:
# api_key: "your_blog_api_key"
# script: "myblog.sh"
# enabled: true
+1
View File
@@ -0,0 +1 @@
pyyaml>=6.0
+247
View File
@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""
Webhook Receiver Server
接收 webhook事件,每次请求动态读取配置文件。
配置文件路径:~/.webhook/config.yaml
"""
import json
import os
import hmac
import hashlib
import sys
import subprocess
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import datetime
import yaml
CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.yaml")
LOG_DIR = os.path.expanduser("~/.webhook/logs")
os.makedirs(LOG_DIR, exist_ok=True)
# ============ 每次请求重新加载配置 ============
def load_config() -> dict:
"""动态加载配置文件,修改配置无需重启服务。"""
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def get_project_config(config: dict, project_name: str):
"""获取指定项目的配置,不存在或未启用则返回 None。"""
projects = config.get("projects", {})
proj = projects.get(project_name)
if not proj:
return None
if not proj.get("enabled", True):
return None
return proj
# ============ 日志 ============
def log_event(event_type: str, data: dict, status: str = "ok"):
"""结构化记录 webhook 事件到每日日志文件。"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_file = os.path.join(LOG_DIR, f"webhook_{datetime.date.today().isoformat()}.log")
entry = {
"timestamp": timestamp,
"event": event_type,
"status": status,
**data
}
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
def log_script_output(project: str, client_ip: str, user_agent: str, referer: str,
stdout: str, stderr: str, exit_code: int):
"""记录项目脚本执行输出到独立日志文件(如 xsinfo.log)。"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_file = os.path.join(LOG_DIR, f"{project}.log")
lines = [
f"[{timestamp}] {project} | ip={client_ip} ua={user_agent} referer={referer}",
f" exit={exit_code}",
]
if stdout:
for line in stdout.strip().split("\n"):
lines.append(f" stdout: {line}")
if stderr:
for line in stderr.strip().split("\n"):
lines.append(f" stderr: {line}")
with open(log_file, "a", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
# ============ HTTP Handler ============
class WebhookHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""抑制默认日志输出。"""
pass
def send_json(self, code: int, data: dict):
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode())
def do_GET(self):
self._handle_request()
def do_POST(self):
self._handle_request()
def _handle_request(self):
"""统一处理 GET/POST 请求,key 对了就执行脚本"""
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
if parsed.path == "/health":
self.send_json(200, {"status": "ok", "service": "webhook-receiver"})
return
access_key = params.get("access_key", [None])[0]
param_val = params.get("param", [None])[0]
config = load_config()
global_key = config.get("access_key", "")
# 优先项目级 key,否则用全局 key
proj = get_project_config(config, param_val) if param_val else None
if param_val and proj and proj.get("api_key"):
expected_key = proj["api_key"]
else:
expected_key = global_key
if access_key != expected_key:
log_event("unauthorized", {"ip": self.client_address[0], "param": param_val}, "denied")
self.send_json(403, {"error": "invalid access_key"})
return
client_ip = self.client_address[0]
user_agent = self.headers.get("User-Agent", "N/A")
referer = self.headers.get("Referer", "N/A")
# 执行脚本
result, status_code = self._execute_script(proj, param_val, client_ip, user_agent, referer)
script_res = result.get("script", {})
log_event(
"script_executed",
{
"param": param_val,
"project": param_val or "unknown",
"ip": client_ip,
"ua": user_agent,
"referer": referer,
"executed": script_res.get("executed", False),
"status": script_res.get("status", "unknown"),
"exit_code": script_res.get("exit_code"),
},
status=script_res.get("status", "unknown"),
)
self.send_json(status_code, {"ok": status_code == 200, "param": param_val, **result})
def _execute_script(self, proj: dict, param: str, client_ip: str, user_agent: str, referer: str) -> tuple[dict, int]:
"""执行脚本,key 对了就执行,不管什么请求"""
print(f"\n{'='*60}")
print(f"🚀 执行脚本 | param={param}")
print(f"{'='*60}\n")
script_result = {
"executed": False,
"status": "skipped",
"exit_code": None,
"stdout": None,
"stderr": None,
"error": None,
}
if proj and proj.get("script"):
script_path = os.path.expanduser(proj["script"])
script_cwd = os.path.dirname(os.path.abspath(__file__))
if os.path.exists(script_path):
os.chmod(script_path, 0o755)
try:
result = subprocess.run(
[script_path],
cwd=script_cwd,
capture_output=True,
text=True,
timeout=300,
)
script_result["executed"] = True
script_result["exit_code"] = result.returncode
script_result["stdout"] = [l for l in result.stdout.strip().split("\n") if l] if result.stdout.strip() else []
script_result["stderr"] = [l for l in result.stderr.strip().split("\n") if l] if result.stderr.strip() else []
script_result["status"] = "success" if result.returncode == 0 else "failed"
print(f"📜 {proj['script']}: exit={result.returncode}")
if result.stdout:
print(result.stdout)
if result.stderr:
print(f"STDERR: {result.stderr}", file=sys.stderr)
except subprocess.TimeoutExpired:
script_result["executed"] = True
script_result["status"] = "timeout"
script_result["error"] = "script execution timed out after 300 seconds"
print(f"⚠️ 执行超时(300s")
except Exception as e:
script_result["executed"] = True
script_result["status"] = "error"
script_result["error"] = str(e)
print(f"⚠️ 执行失败 - {e}")
else:
script_result["executed"] = True
script_result["status"] = "not_found"
script_result["error"] = f"script not found: {script_path}"
print(f"⚠️ 脚本不存在: {script_path}")
else:
print(f"⚠️ 项目 '{param}' 未配置 script")
# 脚本失败返回 500
status_code = 500 if script_result["executed"] and script_result["status"] in ("failed", "error", "timeout", "not_found") else 200
log_script_output(
project=param or "unknown",
client_ip=client_ip,
user_agent=user_agent,
referer=referer,
stdout=result.stdout if script_result["executed"] else "",
stderr=result.stderr if script_result["executed"] else "",
exit_code=script_result["exit_code"] or -1,
)
return {"script": script_result}, status_code
def main():
# 启动前检查配置文件
if not os.path.exists(CONFIG_PATH):
print(f"❌ 配置文件不存在: {CONFIG_PATH}")
sys.exit(1)
server = HTTPServer(("0.0.0.0", 8099), WebhookHandler)
print(f"""
╔══════════════════════════════════════════════╗
║ 🌐 Webhook Receiver Started ║
╚══════════════════════════════════════════════╝
监听: http://0.0.0.0:8099
健康检查: http://<host>:8099/health
手动测试: http://<host>:8099/test
配置文件: {CONFIG_PATH}
配置变更实时生效(无需重启)
""")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n👋 Server stopped")
sys.exit(0)
if __name__ == "__main__":
main()
Executable
+15
View File
@@ -0,0 +1,15 @@
#!/bin/bash
# Webhook 触发后执行的脚本
set -e
echo "==> [1/3] 拉取最新代码"
cd ~/web/xsinfo && git pull origin master
echo "==> [2/3] 构建静态网站"
npm run docs:build
echo "==> [3/3] 同步到部署目录"
cp -rf /home/xiaoshuai/web/xsinfo/docs/.vitepress/dist/* /www/xsinfo/
echo "==> 完成"