1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
| import socket import select import logging import struct from socketserver import ThreadingTCPServer, StreamRequestHandler
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__)
SOCKS_VERSION = 5 CONNECT = 1 BIND = 2 UDP_ASSOCIATE = 3
ATYP_IPV4 = 1 ATYP_DOMAINNAME = 3 ATYP_IPV6 = 4
class SocksProxy(StreamRequestHandler): def handle(self): """处理SOCKS5连接请求""" if not self.handle_handshake(): return if not self.handle_connect(): return
def handle_handshake(self): """处理SOCKS5握手""" try: version = self.connection.recv(1)
if not version or ord(version) != SOCKS_VERSION: logger.error(f"不支持的SOCKS版本: {version}") return False
nmethods = self.connection.recv(1) if not nmethods: return False methods = self.connection.recv(ord(nmethods)) if not methods: return False
self.connection.sendall(struct.pack('!BB', SOCKS_VERSION, 0)) return True
except Exception as e: logger.error(f"握手阶段错误: {e}") return False
def handle_connect(self): """处理SOCKS5连接请求""" remote = None try: version, cmd, _, address_type = struct.unpack('!BBBB', self.connection.recv(4)) if version != SOCKS_VERSION: logger.error(f"不支持的SOCKS版本: {version}") return False
if cmd != CONNECT: logger.error(f"不支持的命令: {cmd}") self.send_reply(7) return False
if address_type == ATYP_IPV4: target_addr = socket.inet_ntoa(self.connection.recv(4)) elif address_type == ATYP_DOMAINNAME: domain_length = ord(self.connection.recv(1)) target_addr = self.connection.recv(domain_length).decode() elif address_type == ATYP_IPV6: target_addr = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16)) else: logger.error(f"不支持的地址类型: {address_type}") self.send_reply(8) return False
target_port = struct.unpack('!H', self.connection.recv(2))[0]
try: if address_type == ATYP_DOMAINNAME: remote = socket.create_connection((target_addr, target_port), timeout=10) else: remote = socket.socket(socket.AF_INET if address_type == ATYP_IPV4 else socket.AF_INET6) remote.settimeout(10) remote.connect((target_addr, target_port))
bind_address = remote.getsockname() logger.info(f"已连接到 {target_addr}:{target_port}") self.send_reply(0, bind_address[0], bind_address[1])
except Exception as e: logger.error(f"连接目标服务器失败: {e}") self.send_reply(4) return False
self.exchange_loop(self.connection, remote) return True
except Exception as e: logger.error(f"处理连接请求时发生错误: {e}") return False finally: if remote: try: remote.close() except: pass
def send_reply(self, reply_code, bind_addr='0.0.0.0', bind_port=0): """发送SOCKS5响应""" try: if ':' in bind_addr: addr_bytes = socket.inet_pton(socket.AF_INET6, bind_addr) addr_type = ATYP_IPV6 else: addr_bytes = socket.inet_aton(bind_addr) addr_type = ATYP_IPV4
reply = struct.pack('!BBBB', SOCKS_VERSION, reply_code, 0, addr_type) reply += addr_bytes + struct.pack('!H', bind_port) self.connection.sendall(reply) except Exception as e: logger.error(f"发送响应时发生错误: {e}")
def exchange_loop(self, client, remote): """数据交换循环""" try: client.setblocking(False) remote.setblocking(False)
while True: try: r, w, e = select.select([client, remote], [], [], 0.5) except: break
if e: break
for sock in r: try: data = sock.recv(32768) if not data: return
if sock is client: remote.setblocking(True) try: remote.sendall(data) finally: remote.setblocking(False) else: client.setblocking(True) try: client.sendall(data) finally: client.setblocking(False) except: return finally: for sock in [client, remote]: try: sock.close() except: pass
class ThreadingSocksServer(ThreadingTCPServer): allow_reuse_address = True daemon_threads = True def server_close(self): """确保服务器正确关闭""" try: self.socket.shutdown(socket.SHUT_RDWR) except: pass self.socket.close()
def run_proxy(host='0.0.0.0', port=9001): """运行SOCKS5代理服务器""" server = None
def shutdown_server(signum=None, frame=None): nonlocal server if server: logger.info("正在关闭SOCKS5代理服务器...") try: server.server_close() except: pass logger.info("服务器已关闭") import sys sys.exit(0)
try: server = ThreadingSocksServer((host, port), SocksProxy) logger.info(f"SOCKS5代理服务器启动在 {host}:{port}") server.serve_forever() except KeyboardInterrupt: logger.info("收到KeyboardInterrupt信号") shutdown_server() except Exception as e: logger.error(f"服务器运行时发生错误: {e}") shutdown_server()
if __name__ == '__main__': run_proxy()
|