feat: sync httpdns sdk/platform updates without large binaries
This commit is contained in:
347
HttpDNSSDK/sdk/ios/NewHttpDNSTests/Network/mock_server.py
Normal file
347
HttpDNSSDK/sdk/ios/NewHttpDNSTests/Network/mock_server.py
Normal file
@@ -0,0 +1,347 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
HTTP/HTTPS Mock Server for HttpdnsNWHTTPClient Integration Tests
|
||||
|
||||
模拟 httpbin.org 的核心功能,用于替代不稳定的外部依赖。
|
||||
支持 HTTP (端口 11080) 和多个 HTTPS 端口 (11443-11446,自签名证书)。
|
||||
|
||||
使用方法:
|
||||
python3 mock_server.py
|
||||
|
||||
端口配置:
|
||||
- HTTP: 127.0.0.1:11080
|
||||
- HTTPS: 127.0.0.1:11443, 11444, 11445, 11446
|
||||
|
||||
注意:
|
||||
- 使用非特权端口,无需 root 权限
|
||||
- HTTPS 使用自签名证书,测试时需禁用 TLS 验证
|
||||
- 多个 HTTPS 端口用于测试连接池隔离
|
||||
- 按 Ctrl+C 停止服务器
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
import ssl
|
||||
import os
|
||||
import subprocess
|
||||
import signal
|
||||
import sys
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from urllib.parse import urlparse
|
||||
from threading import Thread
|
||||
from socketserver import ThreadingMixIn
|
||||
|
||||
|
||||
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
|
||||
"""多线程 HTTP 服务器,支持并发请求"""
|
||||
daemon_threads = True
|
||||
allow_reuse_address = True
|
||||
|
||||
|
||||
class MockHTTPHandler(BaseHTTPRequestHandler):
|
||||
"""模拟 httpbin.org 的请求处理器"""
|
||||
|
||||
# 使用 HTTP/1.1 协议(支持 keep-alive)
|
||||
protocol_version = 'HTTP/1.1'
|
||||
|
||||
# 禁用日志输出(可选,便于查看测试输出)
|
||||
def log_message(self, format, *args):
|
||||
# 取消注释以启用详细日志
|
||||
# print(f"[{self.address_string()}] {format % args}")
|
||||
pass
|
||||
|
||||
def do_GET(self):
|
||||
"""处理 GET 请求"""
|
||||
path = urlparse(self.path).path
|
||||
|
||||
if path == '/get':
|
||||
self._handle_get()
|
||||
elif path.startswith('/status/'):
|
||||
self._handle_status(path)
|
||||
elif path.startswith('/stream-bytes/'):
|
||||
self._handle_stream_bytes(path)
|
||||
elif path.startswith('/delay/'):
|
||||
self._handle_delay(path)
|
||||
elif path == '/headers':
|
||||
self._handle_headers()
|
||||
elif path == '/uuid':
|
||||
self._handle_uuid()
|
||||
elif path == '/user-agent':
|
||||
self._handle_user_agent()
|
||||
elif path == '/connection-test':
|
||||
self._handle_connection_test()
|
||||
else:
|
||||
self._handle_not_found()
|
||||
|
||||
def _handle_get(self):
|
||||
"""模拟 /get - 返回请求信息"""
|
||||
data = {
|
||||
'args': {},
|
||||
'headers': dict(self.headers),
|
||||
'origin': self.client_address[0],
|
||||
'url': f'{self.command} {self.path}'
|
||||
}
|
||||
self._send_json(200, data)
|
||||
|
||||
def _handle_status(self, path):
|
||||
"""模拟 /status/{code} - 返回指定状态码"""
|
||||
try:
|
||||
status_code = int(path.split('/')[-1])
|
||||
# 限制状态码范围在 100-599
|
||||
if 100 <= status_code < 600:
|
||||
self._send_json(status_code, {'status': status_code})
|
||||
else:
|
||||
self._send_json(400, {'error': 'Invalid status code'})
|
||||
except (ValueError, IndexError):
|
||||
self._send_json(400, {'error': 'Invalid status code format'})
|
||||
|
||||
def _handle_stream_bytes(self, path):
|
||||
"""模拟 /stream-bytes/{n} - 返回 chunked 编码的 n 字节数据"""
|
||||
try:
|
||||
n = int(path.split('/')[-1])
|
||||
except (ValueError, IndexError):
|
||||
self._send_json(400, {'error': 'Invalid byte count'})
|
||||
return
|
||||
|
||||
# 发送 chunked 响应
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/octet-stream')
|
||||
self.send_header('Transfer-Encoding', 'chunked')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
self.end_headers()
|
||||
|
||||
# 发送 chunk
|
||||
chunk_data = b'X' * n
|
||||
chunk_size_hex = f'{n:x}\r\n'.encode('utf-8')
|
||||
self.wfile.write(chunk_size_hex)
|
||||
self.wfile.write(chunk_data)
|
||||
self.wfile.write(b'\r\n')
|
||||
|
||||
# 发送最后一个 chunk (size=0)
|
||||
self.wfile.write(b'0\r\n\r\n')
|
||||
self.wfile.flush() # 确保数据发送
|
||||
|
||||
def _handle_delay(self, path):
|
||||
"""模拟 /delay/{seconds} - 延迟指定秒数后返回"""
|
||||
try:
|
||||
seconds = int(path.split('/')[-1])
|
||||
except (ValueError, IndexError):
|
||||
self._send_json(400, {'error': 'Invalid delay value'})
|
||||
return
|
||||
|
||||
# 最多延迟 10 秒(防止意外)
|
||||
seconds = min(seconds, 10)
|
||||
time.sleep(seconds)
|
||||
|
||||
data = {
|
||||
'args': {},
|
||||
'headers': dict(self.headers),
|
||||
'origin': self.client_address[0],
|
||||
'url': f'{self.command} {self.path}',
|
||||
'delayed': seconds
|
||||
}
|
||||
self._send_json(200, data)
|
||||
|
||||
def _handle_headers(self):
|
||||
"""模拟 /headers - 返回所有请求头部"""
|
||||
data = {
|
||||
'headers': dict(self.headers)
|
||||
}
|
||||
self._send_json(200, data)
|
||||
|
||||
def _handle_uuid(self):
|
||||
"""模拟 /uuid - 返回随机 UUID"""
|
||||
data = {
|
||||
'uuid': str(uuid.uuid4())
|
||||
}
|
||||
self._send_json(200, data)
|
||||
|
||||
def _handle_user_agent(self):
|
||||
"""模拟 /user-agent - 返回 User-Agent 头部"""
|
||||
data = {
|
||||
'user-agent': self.headers.get('User-Agent', '')
|
||||
}
|
||||
self._send_json(200, data)
|
||||
|
||||
def _handle_connection_test(self):
|
||||
"""处理 /connection-test - 返回指定的 Connection 头部用于测试"""
|
||||
from urllib.parse import parse_qs
|
||||
|
||||
# 解析查询参数
|
||||
query_string = urlparse(self.path).query
|
||||
params = parse_qs(query_string)
|
||||
mode = params.get('mode', ['keep-alive'])[0]
|
||||
|
||||
data = {
|
||||
'mode': mode,
|
||||
'message': f'Connection header test with mode: {mode}'
|
||||
}
|
||||
|
||||
body = json.dumps(data).encode('utf-8')
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.send_header('Content-Length', len(body))
|
||||
|
||||
# 根据 mode 参数设置不同的 Connection 头部
|
||||
if mode == 'close':
|
||||
self.send_header('Connection', 'close')
|
||||
elif mode == 'proxy-close':
|
||||
self.send_header('Proxy-Connection', 'close')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
elif mode == 'close-uppercase':
|
||||
self.send_header('CONNECTION', 'CLOSE')
|
||||
elif mode == 'close-mixed':
|
||||
self.send_header('Connection', 'Close')
|
||||
else: # keep-alive (default)
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
self.wfile.flush()
|
||||
|
||||
def _handle_not_found(self):
|
||||
"""处理未知路径"""
|
||||
self._send_json(404, {'error': 'Not Found', 'path': self.path})
|
||||
|
||||
def _send_json(self, status_code, data):
|
||||
"""发送 JSON 响应"""
|
||||
try:
|
||||
body = json.dumps(data).encode('utf-8')
|
||||
self.send_response(status_code)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.send_header('Content-Length', len(body))
|
||||
# 支持 HTTP/1.1 keep-alive
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
self.wfile.flush() # 确保数据发送
|
||||
except Exception as e:
|
||||
print(f"Error sending response: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
def create_self_signed_cert(cert_file='server.pem'):
|
||||
"""生成自签名证书(如果不存在)"""
|
||||
if os.path.exists(cert_file):
|
||||
print(f"✓ 使用现有证书: {cert_file}")
|
||||
return cert_file
|
||||
|
||||
print(f"正在生成自签名证书: {cert_file} ...")
|
||||
try:
|
||||
subprocess.run([
|
||||
'openssl', 'req', '-x509', '-newkey', 'rsa:2048',
|
||||
'-keyout', cert_file, '-out', cert_file,
|
||||
'-days', '365', '-nodes',
|
||||
'-subj', '/CN=localhost'
|
||||
], check=True, capture_output=True)
|
||||
print(f"✓ 证书生成成功")
|
||||
return cert_file
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"✗ 证书生成失败: {e.stderr.decode()}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except FileNotFoundError:
|
||||
print("✗ 未找到 openssl 命令,请安装 OpenSSL", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_http_server(port=11080):
|
||||
"""运行 HTTP 服务器"""
|
||||
try:
|
||||
server = ThreadedHTTPServer(('127.0.0.1', port), MockHTTPHandler)
|
||||
print(f"✓ HTTP 服务器运行在 http://127.0.0.1:{port}")
|
||||
server.serve_forever()
|
||||
except OSError as e:
|
||||
if e.errno == 48: # Address already in use
|
||||
print(f"✗ 端口 {port} 已被占用,请关闭占用端口的进程或使用其他端口", file=sys.stderr)
|
||||
else:
|
||||
print(f"✗ HTTP 服务器启动失败: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_https_server(port=11443, cert_file='server.pem'):
|
||||
"""运行 HTTPS 服务器(使用自签名证书)"""
|
||||
try:
|
||||
server = ThreadedHTTPServer(('127.0.0.1', port), MockHTTPHandler)
|
||||
|
||||
# 配置 SSL 上下文
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(cert_file)
|
||||
|
||||
# 包装 socket
|
||||
server.socket = context.wrap_socket(server.socket, server_side=True)
|
||||
|
||||
print(f"✓ HTTPS 服务器运行在 https://127.0.0.1:{port} (自签名证书)")
|
||||
server.serve_forever()
|
||||
except OSError as e:
|
||||
if e.errno == 48: # Address already in use
|
||||
print(f"✗ 端口 {port} 已被占用,请关闭占用端口的进程或使用其他端口", file=sys.stderr)
|
||||
else:
|
||||
print(f"✗ HTTPS 服务器启动失败: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except ssl.SSLError as e:
|
||||
print(f"✗ SSL 配置失败: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
"""处理 Ctrl+C 信号"""
|
||||
print("\n\n✓ 服务器已停止")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
# 注册信号处理器
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
# 生成自签名证书
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
cert_file = os.path.join(script_dir, 'server.pem')
|
||||
create_self_signed_cert(cert_file)
|
||||
|
||||
print("\n" + "="*60)
|
||||
print(" HttpdnsNWHTTPClient Mock Server")
|
||||
print("="*60)
|
||||
print("\n支持的 endpoints:")
|
||||
print(" GET /get - 返回请求信息")
|
||||
print(" GET /status/{code} - 返回指定状态码")
|
||||
print(" GET /stream-bytes/N - 返回 chunked 编码的 N 字节数据")
|
||||
print(" GET /delay/N - 延迟 N 秒后返回")
|
||||
print(" GET /headers - 返回所有请求头部")
|
||||
print(" GET /uuid - 返回随机 UUID")
|
||||
print(" GET /user-agent - 返回 User-Agent 头部")
|
||||
print(" GET /connection-test?mode={mode}")
|
||||
print(" - 返回指定 Connection 头部")
|
||||
print(" mode: close, keep-alive, proxy-close,")
|
||||
print(" close-uppercase, close-mixed")
|
||||
print("\n按 Ctrl+C 停止服务器\n")
|
||||
print("="*60 + "\n")
|
||||
|
||||
# 启动 HTTP 和 HTTPS 服务器(使用线程)
|
||||
http_thread = Thread(target=run_http_server, args=(11080,), daemon=True)
|
||||
|
||||
# 启动多个 HTTPS 端口用于测试连接复用隔离
|
||||
https_ports = [11443, 11444, 11445, 11446]
|
||||
https_threads = []
|
||||
|
||||
http_thread.start()
|
||||
time.sleep(0.5) # 等待 HTTP 服务器启动
|
||||
|
||||
# 启动所有 HTTPS 服务器
|
||||
for port in https_ports:
|
||||
https_thread = Thread(target=run_https_server, args=(port, cert_file), daemon=True)
|
||||
https_threads.append(https_thread)
|
||||
https_thread.start()
|
||||
time.sleep(0.1) # 错峰启动避免端口冲突
|
||||
|
||||
# 主线程等待(保持服务器运行)
|
||||
try:
|
||||
http_thread.join()
|
||||
for thread in https_threads:
|
||||
thread.join()
|
||||
except KeyboardInterrupt:
|
||||
signal_handler(signal.SIGINT, None)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user