在渗透测试中,爆破、爬虫、注入等等往往是必不可少的环节之一
而网站为了防止此类问题带来的危害很多选择封禁ip。
所以就想自己动手实现一个可自动切换ip的代理
1.打造之前,确定需要什么?
(1)一个可用的ip池
(2)一个代理脚本
(3)一个vps(这个可有可无,看个人情况而定)
2.第一步,获取ip、存储ip、验证ip
2.1获取免费的ip代理
因为网上有太多ip池项目,具体可参考大佬们的代码https://github.com/search?q=ip%E6%B1%A0
我们从github、google等等网站找到可以获取代理的网站。
整理了一些代理网站
http://free-proxy.cz/zh/proxylist/country/CN/socks5/date/level1
https://proxy.mimvp.com/freeopen?proxy=in_socks
http://proxydb.net/?protocol=socks4&protocol=socks5&anonlvl=2&anonlvl=4&country=CN
https://www.my-proxy.com/free-socks-5-proxy.html
http://spys.one/en/socks-proxy-list/
2.2存储代理
这里为了方便,选择sqllite。
说明:
(1)ip: 必填值,记录代理ip。
(2) port: 必填值,记录代理port。
(3) anonymousType: 可选,代理的匿名类型,分为四种:transparent(透明)、anonymous(匿名)、distorting(混淆)、elite(高匿)。
(4) protocolType: 必填值,代理的协议类型。
(5) isDemostic: 必填值,是否为国内代理,值为1和0。
(6) rumtime: 必填值,测试代理访问花费的时间。
2.3验证代理有效性即时效性
因为是爬取的免费代理,所以这些代理ip在有效性以及时效性上肯定会大打折扣(毕竟白嫖)。
所以为了在使用上获得“良好体验”,我们需要实现一个验证步骤,来对代理ip进行“过滤”
startime = time.time()
is_ok=0
if "Socks5" in pooltype:
proxies = {'http': 'socks5://' + ip + ':' +port+ '', 'https': 'socks5://' + ip+ ':' + port+ ''}
if "Socks4" in pooltype:
proxies = {'http': 'socks4://' + ip + ':' + port+ '', 'https': 'socks4://' + ip+ ':' +port+ ''}
try:
html = requests.get("http://pv.sohu.com/cityjson?ie=utf-8", timeout=5, proxies=proxies)
if html.status_code == 200:
if ip in html.text:
is_ok=1
except:
is_ok = 0
endtime = time.time()
3.socks使用代理
关于socks的讲解网上也有不少,可以百度。
可参考:https://www.jianshu.com/p/d2fc24f76090、https://www.cnblogs.com/woaixuexi9999/p/9360581.html
这里我们要使用socks代理只需设置对应的类型(这里使用socks4举例)、ip等参数
socks.set_default_proxy(socks.PROXY_TYPE_SOCKS4,'ip', int(port), True)
socket.socket = socks.socksocket
这里我们使用现成的代码,来自
https://hatboy.github.io/2018/04/28/Python%E7%BC%96%E5%86%99socks5%E6%9C%8D%E5%8A%A1%E5%99%A8/
#!/usr/bin/python
# -*- coding: utf-8 -*-
import logging
import select
import socket
import struct
import socks
import random
from socketserver import StreamRequestHandler, ThreadingTCPServer
logging.basicConfig(level=logging.DEBUG)
SOCKS_VERSION = 5
class SocksProxy(StreamRequestHandler):
username = 'user'
password = 'pass'
def handle(self):
logging.info('Accepting connection from %s:%s' % self.client_address)
# 协商
# 从客户端读取并解包两个字节的数据
header = self.connection.recv(2)
version, nmethods = struct.unpack("!BB", header)
# 设置socks5协议,METHODS字段的数目大于0
assert version == SOCKS_VERSION
assert nmethods > 0
# 接受支持的方法
methods = self.get_available_methods(nmethods)
# 检查是否支持用户名/密码认证方式,不支持则断开连接
if 2 not in set(methods):
self.server.close_request(self.request)
return
# 发送协商响应数据包
self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 2))
# 校验用户名和密码
if not self.verify_credentials():
return
# 请求
version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
assert version == SOCKS_VERSION
if address_type == 1: # IPv4
address = socket.inet_ntoa(self.connection.recv(4))
elif address_type == 3: # 域名
domain_length = self.connection.recv(1)[0]
address = self.connection.recv(domain_length)
elif address_type == 4: # IPv6
addr_ip = self.connection.recv(16)
address = socket.inet_ntop(socket.AF_INET6, addr_ip)
else:
self.server.close_request(self.request)
return
port = struct.unpack('!H', self.connection.recv(2))[0]
# 响应,只支持CONNECT请求
try:
if cmd == 1: # CONNECT
#这里进行数据库查询并随机取一条记录
ips=sql_select("SELECT * FROM ip where runtime<=5 and time_out<=5 ORDER BY RANDOM() limit 1")
if ips == None:
print("无数据")
else:
if "Socks4" in ips[4]:
socks.set_default_proxy(socks.PROXY_TYPE_SOCKS4, ips[1], int(ips[2]), True)
socket.socket = socks.socksocket
if "Socks5" in ips[4]:
socks.set_default_proxy(socks.PROXY_TYPE_SOCKS5, ips[1], int(ips[2]), True)
socket.socket = socks.socksocket
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((address, port))
bind_address = remote.getsockname()
logging.info('Connected to %s %s' % (address, port))
else:
self.server.close_request(self.request)
addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
port = bind_address[1]
reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1, addr, port)
except Exception as err:
logging.error(err)
# 响应拒绝连接的错误
reply = self.generate_failed_reply(address_type, 5)
self.connection.sendall(reply)
# 建立连接成功,开始交换数据
if reply[1] == 0 and cmd == 1:
self.exchange_loop(self.connection, remote)
self.server.close_request(self.request)
def get_available_methods(self, n):
methods = []
for i in range(n):
methods.append(ord(self.connection.recv(1)))
return methods
def verify_credentials(self):
"""校验用户名和密码"""
version = ord(self.connection.recv(1))
assert version == 1
username_len = ord(self.connection.recv(1))
username = self.connection.recv(username_len).decode('utf-8')
password_len = ord(self.connection.recv(1))
password = self.connection.recv(password_len).decode('utf-8')
if username == self.username and password == self.password:
# 验证成功, status = 0
response = struct.pack("!BB", version, 0)
self.connection.sendall(response)
return True
# 验证失败, status != 0
response = struct.pack("!BB", version, 0xFF)
self.connection.sendall(response)
self.server.close_request(self.request)
return False
def generate_failed_reply(self, address_type, error_number):
return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)
def exchange_loop(self, client, remote):
while True:
# 等待数据
r, w, e = select.select([client, remote], [], [])
if client in r:
data = client.recv(4096)
if remote.send(data) <= 0:
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
break
if __name__ == '__main__':
# 使用socketserver库的多线程服务器ThreadingTCPServer启动代理
with ThreadingTCPServer(('0.0.0.0', 1080), SocksProxy) as server:
server.serve_forever()
4.使用效果
4.1在burp中使用,设置socks代理
浏览器设置代理走burp
4.2在python中使用
小结:
(1)在使用过程中,难免会遇见代理不稳定情况,并且数据增量的情况下,为了减少资源浪费可以在数据库增加字段记录验证不通过次数,达到自己设定的阀值就抛弃该条记录测试,但是这样又会出现另一个问题,在数据库ip不变的情况下,可用代理会越来越少,甚至会减少为0,这不是我们所期望的结果,解决方案为每天把阀值减为0,让验证脚本重新验证所有ip。
扫描下方二维码,关注我查看其他文章
原文始发于微信公众号(安全学习与分享):使用python实现代理池
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论