在上一篇博客中,我们了解了几种常见的网络代理。本文将通过python代码来实现网络代理。代码仅做学习研究使用,不进行性能考虑。

HTTP代理

在上一篇博客中我们了解到,http代理分为普通代理和隧道代理。如果目标服务器是http服务,则使用普通代理;如果目标服务器是https服务,则使用隧道代理。

以下是一个简单的http代理实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import select
import logging
import shutil
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class ProxyHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""处理 GET 请求"""
self._handle_request('GET')

def do_POST(self):
"""处理 POST 请求"""
self._handle_request('POST')

def do_PUT(self):
"""处理 PUT 请求"""
self._handle_request('PUT')

def do_DELETE(self):
"""处理 DELETE 请求"""
self._handle_request('DELETE')

def do_HEAD(self):
"""处理 HEAD 请求"""
self._handle_request('HEAD')

def do_OPTIONS(self):
"""处理 OPTIONS 请求"""
self._handle_request('OPTIONS')

def do_PATCH(self):
"""处理 PATCH 请求"""
self._handle_request('PATCH')

def _handle_request(self, method):
"""通用请求处理方法"""
try:
# 解析目标URL
url = urlparse(self.path)
if not url.netloc:
self.send_error(400, "Bad Request")
return

# 创建到目标服务器的连接
target_host = url.netloc
target_port = 80
if ':' in target_host:
target_host, target_port = target_host.split(':')
target_port = int(target_port)

# 建立连接
target_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
target_sock.connect((target_host, target_port))

# 读取请求体
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length) if content_length > 0 else b''

# 构建请求头
request_headers = []
for header, value in self.headers.items():
if header.lower() not in ('proxy-connection', 'connection', 'host'):
request_headers.append(f"{header}: {value}")

# 发送请求到目标服务器
request = f"{method} {self.path} HTTP/1.1\r\n"
request += f"Host: {target_host}\r\n"
request += "Connection: close\r\n"
request += "\r\n".join(request_headers)
request += "\r\n"
if body:
request += f"Content-Length: {len(body)}\r\n"
request += "\r\n"

# 发送请求头和请求体
target_sock.send(request.encode())
if body:
target_sock.send(body)

# 使用shutil.copyfileobj进行数据传输
target_file = target_sock.makefile('rb')
shutil.copyfileobj(target_file, self.wfile, length=8192)
target_file.close()
target_sock.close()

except Exception as e:
logger.error(f"处理 {method} 请求时发生错误: {e}")
self.send_error(500, f"服务器内部错误: {str(e)}")

def do_CONNECT(self):
"""处理 CONNECT 请求(隧道代理)"""
try:
# 解析目标地址
target_host, target_port = self.path.split(':')
target_port = int(target_port)

# 创建到目标服务器的连接
target_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
target_sock.connect((target_host, target_port))

# 发送连接成功响应
self.send_response(200, 'Connection Established')
self.send_header('Proxy-Agent', 'Python-Proxy')
self.end_headers()

# 获取客户端socket
client_sock = self.connection

# 设置非阻塞模式
client_sock.setblocking(False)
target_sock.setblocking(False)

# 使用select进行双向转发
while True:
# 使用select监控两个socket
readable, _, exceptional = select.select(
[client_sock, target_sock],
[],
[client_sock, target_sock],
1
)

# 检查是否有异常
if exceptional:
break

# 处理可读的socket
for sock in readable:
try:
# 确定源和目标socket
if sock is client_sock:
source = client_sock
target = target_sock
else:
source = target_sock
target = client_sock

# 读取数据
data = source.recv(8192)
if not data:
return

# 发送数据
target.send(data)

except Exception as e:
logger.error(f"隧道连接发生错误: {e}")
return

except Exception as e:
logger.error(f"处理 CONNECT 请求时发生错误: {e}")
self.send_error(500, f"服务器内部错误: {str(e)}")
finally:
# 确保连接被关闭
try:
client_sock.close()
except:
pass
try:
target_sock.close()
except:
pass

def log_message(self, format, *args):
"""自定义日志格式"""
logger.info(f"{self.address_string()} - {format % args}")

def run_proxy(host='0.0.0.0', port=9000):
"""运行代理服务器"""
server_address = (host, port)
httpd = ThreadingHTTPServer(server_address, ProxyHandler)
logger.info(f"代理服务器启动于 {host}:{port}")
try:
httpd.serve_forever()
except KeyboardInterrupt:
logger.info("正在关闭代理服务器")
httpd.server_close()

if __name__ == '__main__':
run_proxy()

SCOKS代理

以下是一个简单的socks代理实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import socket
import select
import logging
import struct
from socketserver import ThreadingTCPServer, StreamRequestHandler

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# SOCKS5 协议常量
SOCKS_VERSION = 5
CONNECT = 1
BIND = 2
UDP_ASSOCIATE = 3

# 地址类型
ATYP_IPV4 = 1
ATYP_DOMAINNAME = 3
ATYP_IPV6 = 4

class SocksProxy(StreamRequestHandler):
def handle(self):
"""处理SOCKS5连接请求"""
# 处理握手
if not self.handle_handshake():
return

# 处理连接请求
if not self.handle_connect():
return

def handle_handshake(self):
"""处理SOCKS5握手"""
try:
# 读取客户端支持的认证方法
version = self.connection.recv(1)

if not version or ord(version) != SOCKS_VERSION:
logger.error(f"不支持的SOCKS版本: {version}")
return False

# 读取认证方法数量
nmethods = self.connection.recv(1)
if not nmethods:
return False

# 读取所有支持的认证方法
methods = self.connection.recv(ord(nmethods))
if not methods:
return False

# 目前我们不需要认证,直接返回 NO AUTHENTICATION REQUIRED (0x00)
self.connection.sendall(struct.pack('!BB', SOCKS_VERSION, 0))
return True

except Exception as e:
logger.error(f"握手阶段错误: {e}")
return False

def handle_connect(self):
"""处理SOCKS5连接请求"""
remote = None
try:
# 读取版本和命令
version, cmd, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))

if version != SOCKS_VERSION:
logger.error(f"不支持的SOCKS版本: {version}")
return False

# 目前只支持CONNECT命令
if cmd != CONNECT:
logger.error(f"不支持的命令: {cmd}")
self.send_reply(7) # Command not supported
return False

# 解析目标地址
if address_type == ATYP_IPV4:
# IPv4
target_addr = socket.inet_ntoa(self.connection.recv(4))
elif address_type == ATYP_DOMAINNAME:
# 域名
domain_length = ord(self.connection.recv(1))
target_addr = self.connection.recv(domain_length).decode()
elif address_type == ATYP_IPV6:
# IPv6
target_addr = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
else:
logger.error(f"不支持的地址类型: {address_type}")
self.send_reply(8) # Address type not supported
return False

# 读取端口号
target_port = struct.unpack('!H', self.connection.recv(2))[0]

# 尝试连接到目标服务器
try:
if address_type == ATYP_DOMAINNAME:
remote = socket.create_connection((target_addr, target_port), timeout=10)
else:
remote = socket.socket(socket.AF_INET if address_type == ATYP_IPV4 else socket.AF_INET6)
remote.settimeout(10)
remote.connect((target_addr, target_port))

bind_address = remote.getsockname()
logger.info(f"已连接到 {target_addr}:{target_port}")

# 发送成功响应
self.send_reply(0, bind_address[0], bind_address[1])

except Exception as e:
logger.error(f"连接目标服务器失败: {e}")
self.send_reply(4) # Host unreachable
return False

# 开始转发数据
self.exchange_loop(self.connection, remote)
return True

except Exception as e:
logger.error(f"处理连接请求时发生错误: {e}")
return False
finally:
if remote:
try:
remote.close()
except:
pass

def send_reply(self, reply_code, bind_addr='0.0.0.0', bind_port=0):
"""发送SOCKS5响应"""
try:
# 将IP地址转换为网络字节序
if ':' in bind_addr: # IPv6
addr_bytes = socket.inet_pton(socket.AF_INET6, bind_addr)
addr_type = ATYP_IPV6
else: # IPv4
addr_bytes = socket.inet_aton(bind_addr)
addr_type = ATYP_IPV4

reply = struct.pack('!BBBB', SOCKS_VERSION, reply_code, 0, addr_type)
reply += addr_bytes + struct.pack('!H', bind_port)
self.connection.sendall(reply)
except Exception as e:
logger.error(f"发送响应时发生错误: {e}")

def exchange_loop(self, client, remote):
"""数据交换循环"""
try:
client.setblocking(False)
remote.setblocking(False)

while True:
try:
r, w, e = select.select([client, remote], [], [], 0.5)
except:
break

if e:
break

for sock in r:
try:
data = sock.recv(32768)
if not data:
return

if sock is client:
remote.setblocking(True)
try:
remote.sendall(data)
finally:
remote.setblocking(False)
else:
client.setblocking(True)
try:
client.sendall(data)
finally:
client.setblocking(False)
except:
return
finally:
for sock in [client, remote]:
try:
sock.close()
except:
pass

class ThreadingSocksServer(ThreadingTCPServer):
allow_reuse_address = True
daemon_threads = True # 设置为守护线程,这样主线程退出时它们会自动退出

def server_close(self):
"""确保服务器正确关闭"""
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
self.socket.close()

def run_proxy(host='0.0.0.0', port=9001):
"""运行SOCKS5代理服务器"""
server = None

def shutdown_server(signum=None, frame=None):
nonlocal server
if server:
logger.info("正在关闭SOCKS5代理服务器...")
try:
server.server_close() # 关闭服务器和所有连接
except:
pass
logger.info("服务器已关闭")
import sys
sys.exit(0) # 强制退出程序

try:
server = ThreadingSocksServer((host, port), SocksProxy)
logger.info(f"SOCKS5代理服务器启动在 {host}:{port}")
server.serve_forever()
except KeyboardInterrupt:
logger.info("收到KeyboardInterrupt信号")
shutdown_server()
except Exception as e:
logger.error(f"服务器运行时发生错误: {e}")
shutdown_server()

if __name__ == '__main__':
run_proxy()