赛制
赛制启动模式
赛事启动模式分为两种:
第一种
1、选手设备、服务器、平台互相联通
2、选手设备与服务器互相联通、选手设备及服务器均无法连同平台
3、选手设备、服务器、平台均不互相联通
4、选手设备无法联通其他,服务器、平台互相联通
第二种
1、选手设备、服务器、平台互相联通
2、选手设备无法联通其他,服务器、平台互相联通
3、选手设备、服务器、平台均不互相联通
4、选手设备无法联通其他,服务器、平台互相联通
这两者最大的区别是第二步,关键在于先独立平台,还是先独立选手设备,这两个要根据比赛赛前调试来修改。
平台交互
提交内容
RHG:flag
{
user:"team1"
pwd:"pwd"
answer:"flag{1234}"
}
autopwn需要提交的是让elf崩溃的payload,payload需要base64编码一下。
AutoPwn:payload
{
RoundID:1
user:"team1"
pwd:"pwd"
Payload:{
ChallengeID:"dc87311d"
Crash:"QUFBQUFBQUFBQQ=="
#Popcal:"QUFBQUFBQUFBQQ=="
}
}
结构框架
安装部署
docker-compose
#docker-compose.yml
version: '3'
services:
robotcontainter:
build:
context: .
dockerfile: Dockerfile
privileged: true #root权限
Dockerfile
FROM ubuntu:16.04
RUN sed -i "s/http://archive.ubuntu.com/http://mirrors.tuna.tsinghua.edu.cn/g" /etc/apt/sources.list
RUN sed -i "s/http://security.ubuntu.com/http://mirrors.tuna.tsinghua.edu.cn/g" /etc/apt/sources.list
RUN apt-get update && apt-get -y dist-upgrade
RUN apt-get install -y lib32z1 xinetd
RUN apt-get install -y openssh-server vim zsh git curl wget
RUN apt-get install -y python python-dev python-pip python-numpy
RUN apt-get install -y libtool libtool-bin automake bison libglib2.0-dev
#更改pip源
RUN mkdir /root/.pip
RUN echo "[global]nindex-url = https://pypi.tuna.tsinghua.edu.cn/simplen[install]ntrusted-host=mirrors.aliyun.com" > /root/.pip/pip.conf
RUN pip install requests numpy
# oh-my-zsh
RUN sh -c "$(curl -fsSL https://gitee.com/awwwj/zsh/raw/master/install.sh)"
# 指定文件夹
WORKDIR /home/ctf
# 写入配置文件
RUN mkdir /home/ctf/robot
COPY ./robot /home/ctf/robot
COPY ./start.sh /
# 配置afl-fuzz
RUN chmod 755 /home/ctf/robot/afl-fuzz/setup.sh
WORKDIR /home/ctf/robot/afl-fuzz
RUN sh -v -c /home/ctf/robot/afl-fuzz/setup.sh
# 给start.sh可执行权限
RUN chmod 755 /start.sh
#启动docker
CMD ["/start.sh"]
接口分析
平台题目信息分析
下面是RHG平台返回的题目信息
{"AiChallenge":
[{"challengeID":1
"vm_ip":"172.16.7.13"
"vm_name":"Defcon-AI-test-Release-1-ZegaKdFm"
"question_port":"9001"
"binaryUrl":"https://172.20.1.1/resources/file/bff4819cbce2f8e6"
"flag_path":"/home/flag1.txt"
"first_blood":"3"
"current_time":1616917575
"attacks_count":"7"
"current_score":100
"score":130
"question_type":"1"},
{"challengeID":2,"vm_ip":"172.16.7.13","vm_name":"Defcon-AI-test-Release-1-ZegaKdFm","question_port":"9002","binaryUrl":"https://172.20.1.1/resources/file/7bb1dc381ac3e13a43","flag_path":"/home/flag2.txt","first_blood":"1","current_time":1616917575,"attacks_count":"1","current_score":130,"score":0,"question_type":"1"}],
"PointsInfo":{"aiPoints":"280"},
"status":1}
curl -k -X GET –user user:pwd https://rhg.ichunqiu.com/rhg/api/get_question_status
def get_question(self):
try:
basic = self.username+':'+self.password
auth=base64.b64encode(basic.encode("utf-8")).decode()
header = {"Authorization": "Basic "+auth,'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063'}
resp = requests.get(self.url_get_question, headers=header, timeout=5, verify=False)
self.gamestatus = json.loads(resp.content)['status']
self.challenge = json.loads(resp.content)["AiChallenge"]
except Exception as e:
self.log(str(e))
self.challenge = 0
self.gamestatus = 0
下面是AutoPwn平台返回的题目信息
{"CurrentRound":1,
"CurrentChallenge":[
{"cb_id":"061837cd",
"score":3,
"score_method":"Crash",
"cb_url":"https://anquan.baidu.com/bctf/bctf_games/061837cd.tar",
"cb_provider":"team_a"},
{"cb_id":"0f3abf1d","score":3,"score_method":"Crash","cb_url":"https://anquan.baidu.com/bctf/bctf_games/1/ad8d413d/1458.tar","cb_provider":"team_a"}],
"scoreboard":[
{"score":99,"first_blood":15,"bugs":20,"rank":1,"team":"test3"},
{"score":63,"first_blood":2,"bugs":4,"rank":3,"team":"test1"}]}
curl https://anquan.baidu.com/bctf/latest_round
def get_question(self):
try:
header = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063'}
resp = requests.get(self.url_get_question, headers=header, timeout=5, verify=False)
ret_con = json.loads(resp.content)
self.challenge = ret_con['CurrentChallenge']
self.round = ret_con['CurrentRound']
return True
except Exception as e:
log(str(e))
return False
心跳包接口
import requests
from requests.auth import HTTPBasicAuth
import base64
import json
import time
import threading
def run(url_heartbeat,username,password):
while 1:
try:
...
resp = requests.get(url_heartbeat,headers=header,timeout=5, verify=False)
...
def heartbeat(url_heartbeat='api/heartbeat',username='student01',password='KayVdf'):
t1 = threading.Thread(target=run,args=[url_heartbeat,username,password])
t1.start()
if __name__ == '__main__':
heartbeat(url_heartbeat='api/heartbeat',username='student01',password='KayVdf')
下载接口
RHG
def download(self):
try:
self.dir = "attachments/"+str(self.id)
os.system("wget {url} -O {dir} --no-check-certificate --timeout=10 --tries=3".format(url = self.download_url,dir = self.dir ))
if os.path.isfile(self.dir):
self.log(self.dir + " download ok")
else:
self.log(self.dir + " download failed")
except Exception as e:
self.log(self.dir + " download failed")
AutoPwn
def download(self):
try:
self.dir = "attachments/"+str(self.id)
self.bin_dir = self.dir+"/cb"
self.bin_in = self.dir+"/in"
self.bin_out = self.dir+"/out"
self.submited_data = self.dir+"/submited.txt"
file_addr = self.dir +"/"+ str(self.id)
# 先检测本地是否已经下载
if not os.path.isfile(self.bin_dir):
os.system("mkdir "+self.dir )
os.system("wget {url} -O {dir} --no-check-certificate --timeout=10 --tries=3".format(url = self.download_url,dir = file_addr ))
os.system("tar xvf {file_addr} -C {dir}/ --strip-components=1".format(file_addr = file_addr,dir = self.dir ))
if not os.path.isfile(self.bin_dir):
os.system("cp "+ self.dir+"/bin/cb " + self.bin_dir)
if not os.path.isdir(self.bin_out):
os.system("mkdir "+ self.bin_out)
if not os.path.isfile(self.submited_data):
os.system("touch "+ self.submited_data)
if not os.path.isdir(self.bin_in):
os.system("cp -r "+ self.dir+"/seed " + self.bin_in)
#拷贝以前的seed
os.system("cp ./seed/* "+self.bin_in)
#可能还需要解压功能
if os.path.isfile(self.bin_dir):
self.log(self.dir + " download ok")
return True
else:
self.log(self.dir + " download failed")
return False
except Exception as e:
self.log(self.dir + " download failed")
return False
➜ test git:(master) ✗ file 061837cd
061837cd: POSIX tar archive (GNU)
➜ test git:(master) ✗ tar xf 061837cd
➜ test git:(master) ✗ tree
.
├── 061837cd
└── crashme_bctf_01
├── Dockerfile
├── bin
│ └── cb
└── seed
└── sample.in
3 directories, 4 files
➜ test git:(master) ✗ tar xf 061837cd --strip-components=1
➜ test git:(master) ✗ tree
.
├── 061837cd
├── Dockerfile
├── bin
│ └── cb
└── seed
└── sample.in
2 directories, 4 files
AFL Fuzz 接口
AFL类
class AFL(object):
def __init__(self, id, binary, afl='/home/robot/afl-2.52b/afl-fuzz', debug=False):
self.__id = id
self.bin_addr = binary
self.afl_bin_addr = afl
self.afl_dir = os.path.dirname(afl)
self.bin_dir = os.path.dirname(binary)
self.in_dir = os.path.join(self.bin_dir, 'in')
self.out_dir = os.path.join(self.bin_dir, 'out')
self.dic_dir = os.path.join(self.bin_dir, 'dic')
self.__debug = debug
首先定义了AFL的一个类,参数中有afl的绝对地址,尽量不要用相对地址,否则查半天报错都找不到原因,。其他几个就是二进制的位置和输入输出文件夹,dic是一些字典,可以自定义
AFL Fuzz 接口
def start(self):
if not os.path.exists(self.in_dir):
os.mkdir(self.in_dir)
with open(os.path.join(self.in_dir, 'NEURON.txt'), 'w') as f:
f.write('NEURON')
if os.path.exists(self.out_dir+"/crashes"):
self.in_dir = '-'
os.chmod(self.bin_addr, 0775)
else:
self.__afl_process = subprocess.Popen(["screen -dmS {id} bash -c '{afl_path} -i {input} -o {output} -x {dic} -m none -Q -- {bin}'".format(id=self.__id,afl_path=self.afl_bin_addr,input=self.in_dir,output=self.out_dir,dic=self.dic_dir,bin=self.bin_addr )],shell=True, stdout=subprocess.PIPE)
start方法定义了fuzz启动程序,首先判断input是否为空,防止主办方发来的seed文件夹里面是空的,空的会导致无法fuzz成功,为空的话就随便写点东西进去。
第二个判断先前是否fuzz出crash,如果有的话,将input_dir
替换为-
AFL Fuzz Qemu安装
#!/bin/sh
tar xvf ./afl-latest.tgz;
cd ./afl-2.52b;
make;
make install;
cd ./qemu_mode;
apt-get install libtool libtool-bin automake bison libglib2.0-dev -y
export CPU_TARGET=i386;
./build_qemu_support.sh;
#AFL Fuzz 接口
def main(id,file_name):
try:
afl_path = "/home/ctf/afl-2.52b/afl-fuzz"
start_time = time.time()
max_run_time = 7200
afl = AFL(id,file_name, afl=afl_path, debug=False )
afl.start()
while True:
if time.time() - start_time >max_run_time:
break
time.sleep(10)
self.__afl_process.kill()
except Exception as e:
print(str(e))
print("fuzz failed")
这个是afl fuzz 主函数,定义的延时是2个小时,afl.start()启动后,会一直等待时间结束,时间到了之后便会强制结束fuzz的进程
这里需要提一下平台赛题发放的一个坑。AutoPwn平台每10分钟为一轮,更新轮次和题目内容,但是平台并不是每一轮都会更新题目的。这个也是autopwn和rhg的一个大差别,rhg一轮一个小时左右,所以rhg对轮次并不敏感。但是autopwn每10分钟一轮,如果我们的robot也是10分钟重新下载题目,停止掉上一轮fuzz的进程,再启动新的fuzz进程,就会浪费非常多的时间在fuzz进程的启动和重启上。平台是将近2个小时才会真正更新一轮题目。所以这里robot中的fuzz和crash检测是异步结构,这也是为什么10分钟一轮,fuzz的超时却设置成了2个小时
同步处理crashes
def main(id,file_name):
try:
afl_path = "/home/ctf/afl-2.52b/afl-fuzz"
start_time = time.time()
max_run_time = 7200
afl = AFL(id,file_name, afl=afl_path, debug=False )
afl.start()
self.crashes = afl.crashes()
while True:
if time.time() - start_time >max_run_time:
break
time.sleep(10)
tmp = afl.crashes()
if self.crashes != tmp:
self.crashes = tmp
self.__afl_process.kill()
except Exception as e:
print(str(e))
print("fuzz failed")
上面是以前同步处理crashes的方法:这里获取crashes的方法是嵌入fuzz的方法里的,crash会在fuzz结束后检测并返回给robot。这样的写法问题非常多,比如:robot异常结束后,重启robot无法获取到crashes,除非重启整个fuzz进程;返回的时候是全部crashes返回,而不是只返回新增加的crashes,这在提交flag时也是一个坑
异步处理crashes
def main(id,file_name):
try:
afl_path = "/home/ctf/afl-2.52b/afl-fuzz"
start_time = time.time()
max_run_time = 7200
afl = AFL(id,file_name, afl=afl_path, debug=False )
afl.start()
while True:
if time.time() - start_time >max_run_time:
break
time.sleep(10)
self.__afl_process.kill()
except Exception as e:
print(str(e))
print("fuzz failed")
重新设计的结构是异步结构,fuzz只负责fuzz功能,crash由其他功能负责。
#异步处理crashes
def get_crashes(self):
try:
if not os.path.exists(self.bin_out+ "/" +"crashes"):
return []
crashes=[]
for crashes_dir in os.listdir(self.bin_out):
if crashes_dir.find("crashes") != -1:
for crash in os.listdir(self.bin_out+ "/" +crashes_dir):
if crash != 'README.txt':
crashes.append(self.bin_out+ "/" +crashes_dir+ "/" +crash)
if crashes != self.crashes:
last_crashes = self.crashes
self.crashes = crashes
for crash in last_crashes:
crashes.remove(crash)
self.new_crashes = crashes
else:
self.new_crashes = []
except Exception as e:
print(str(e))
上面是异步获取crashes的方法:首先判断是否生成了crashes文件夹,如果没有,说明没有fuzz出crashes,直接结束该方法;如果有该文件夹,就开始遍历out文件夹里所有的crashes文件夹,再遍历每个crash文件夹里的payload,还得排除掉readme文件。这里第一层遍历的是out文件夹,而不是直接遍历crashes文件夹的原因是:当第一次fuzz出crash之后,fuzz进程被停止,第二次重新fuzz后,会将原crashes文件夹重命名为crashes+时间,会形成多个crashes文件夹
➜ out ls
crashes
crashes.2021-06-04-05:43:06
crashes.2021-06-04-06:12:22
crashes.2021-06-04-08:13:14
crashes.2021-06-04-09:11:09
再往下会对比新获取的crashes和之前的crashes是否相同,如果相同就更新self.crashes,并更新new_crashes,这个new_crashes会用在后面的submit功能里。
异步处理crashes
def get_pid(self,pid_name):
try:
p = subprocess.Popen(["ps -ef | grep -v 'grep' | grep '"+pid_name+"' | awk '{print $2}' "], shell=True,close_fds = True,stdout=subprocess.PIPE)
list = p.stdout.readlines()
if len(list) != 0:
return True
else:
return False
except Exception as e:
return False
def fuzz(self):
try:
if not self.get_pid(self.id):
self.log(self.dir + " start fuzz")
print(self.bin_dir,self.bin_in,self.bin_out)
fuzz_afl.main(self.id,self.bin_dir)
else:
self.log(self.dir + " is fuzzing in backend")
except Exception as e:
print(str(e))
刚刚是异步获取crashes的方法,还需要的就是一直fuzz的异步功能,这里的get_pid和fuzz是bin类的方法,先看get_pid,参数为pid名称,使用ps -ef来获取匹配进程,其中需要注意的是grep -v 'grep',这是排除掉grep本身命令对搜索进程的影响,把stdout结果赋值给list,当list长度不为0时就说明当前系统中,存在本题目id的fuzz进程。下面的fuzz方法检测了pid,存在pid的话就不进行fuzz,不存在的话就会新建上面的AFL fuzz类。这样的逻辑就是为了让fuzz能够一直不受影响的运行下去,而不会每次轮次更新时就被打断重新运行。
提交接口
def submit_payload(payload,id,round,username,password,url_submit_flag):
try:
template = {"RoundID":round,
"Payload":{
"ChallengeID":id,
"Crash":payload}}
submit_data = {"username": username, "password": password, "verify": template}
temstr = json.dumps(submit_data)
headers = {'User-Agent': 'Mozilla/5.0'}
ret = requests.post(url_submit_flag, json = submit_data, headers=headers,timeout = 30)
return ret.json()['error_code']
except Exception as e:
print(str(e))
return 1
这个是submit的方法,内容比较简单,按照官网的要求编写json就行,Crash记得要base64编码,这里需要注意的是,submit返回的内容是json里error_code的,可以看最下面几个典型的error_code,0代表提交成功,125代表重复提交,126代表提交过快,间隔时间为60s。
{u'error_eng': u'ok', u'error_code': 0, u'error_chn': u'ok'}
{u'error_eng': u'duplicate submit data', u'error_code': 125, u'error_chn': u'使用同一份答案提交'}
{u'error_eng': u'one submition in 60 seconds', u'error_code': 126, u'error_chn': u'提交flag过于频繁'}
也正是因为它提交间隔60s非常长,所以还需要我们对提交的crash修改,只提交新的crashes,提交过的crash和出现过重复提交的crash,以后将不再提交,否则当总crashes超过10个的时候,后面新的crashes将都无法提交。当然,60s间隔是针对同一个题目的,如果是针对所有题目的提交接口的话,那就没法写多进程并发了。
from sub_answer import submit_payload
def submit(self):
try:
self.get_crashes()
if self.new_crashes != []:
submited_data = open(self.submited_data,"r").read().split("n")
for crash in reversed(self.new_crashes):
payload = base64.b64encode(open(crash,"rb").read())
if payload in submited_data:
continue
error_code = submit_payload(payload,self.id,self.round,name,password,self.url)
if error_code==125 or error_code==0:
open(self.submited_data,"a").write(payload+"n")
sleep(61)
self.log(self.dir + " submit success")
except Exception as e:
print(str(e))
这是bin类中submit的方法,刚刚是submit的一个函数,调用了刚刚的submit_payload函数,这个方法里,首先调用了get_crashes,然后判断new_crashes是否为空,为空就结束submit,如果不为空,则对new_crashes列表进行反序提交,反序提交的目的是将最新的crash优先提交。之后便是读取submited_data,这个是代表已重复提交的crash payload,将new_crashes里的内容和submited_data匹配,如果提交过就跳过,没提交过的话调用submit_payload,并判断返回的error_code,error_code 是125就代表重复提交,就需要在submited_data再添加最新的,同样返回0的时候也判断为提交成功,同样需要加入submited_data,然后等待61秒,平台提交间隔是60s,多一秒稳妥一点
bin主流程
def attack(self):
self.download()
self.fuzz()
for i in range(0,300/5):
self.submit()
sleep(5)
def timeout(self):
for i in range(self.time_limit):
sleep(1)
self.p.terminate()
print("sorrry, timeout")
def auto(self):
self.p = Process(target=self.attack) #, args=(str(i),))
self.k = Process(target=self.timeout)
self.p.start()
self.k.start()
这是bin类的主流程了,调用了Process启用了多进程,attack是bin的流程结构,先下载二进制,然后调用fuzz。
fuzz使用的是afl-fuzz,而rhg的exploit主要是用zeratool来进行aeg攻击,zeratool有局限性,它是基于afl-fuzz和angr来运行的,所以依赖于符号表,对于静态编译或者strip掉符号表的题目,会出现路径爆炸等特殊情况。zeratool大家感兴趣的话可以自己看看
https://github.com/ChrisTheCoolHut/Zeratool.git
另外exploit还有一种方式就是利用特征化脚本,这个本质上有点离谱,那就是收集大量的exp脚本,写好对应的接口,拼的就是谁的脚本库大。这个方法往往有奇效,毕竟历史总是惊人的相似,这年头用往届原题的比赛数不胜数
异常处理
异常及log记录
def log(self,content):
try:
f = open("log.txt","a")
f.write(content+"n")
f.close()
except Exception as e:
return 0
def download(self,round_current=1):
try:
os.system("wget url -O dir --no-check-certificate --timeout=10 --tries=3")
if os.path.isfile(self.dir):
self.log(self.dir + " download ok")
except Exception as e:
self.log(self.dir + " download failed:"+str(e))
异常类型
访问平台失败
-
平台ping不通 -
request超时 -
https证书验证失败
题目信息解析失败
-
request 返回的json格式问题
round异常
-
round提前 -
round推后
解决方法根据error_code,如果显示round异常,就需要根据实际时间来判断轮次
下载题目失败
-
下载https证书问题 -
wget超时 -
下载为空文件
空文件是轮次错误导致,获取url时侯比如说是 9分59秒,下载的时候是10分01秒,这个时候之前的下载连接就会下载为空文件了,判断空文件时,需要调用轮次获取,重新下载
编译方式
-
动态编译
动态编译需要提前安装大量依赖库
lib库
lib32z1
libboost-all-dev
lib32ncurses5
libncurses5-dev
libstdc++6
lib32stdc++6
libffi-dev
-
静态编译
fuzz失败
-
fuzz无法启动 -
超时 -
memory error -
seed崩溃
exploit失败
-
编译类型 -
libc版本
submit失败
-
flag错误 -
重复提交 -
提交过快 -
round 错误
比赛开始判定
-
ping判定 -
时间判定 -
request返回判定
比赛期间平台突然与服务器的连接
-
robot 异常
-
每一轮检查比赛是否开始
-
damon 守护程序
以上只是部分异常的原因和处理方式,异常的处理在整个robot的结构中非常重要。前面提到的fuzz和exploit决定了分数的高低,而异常的处理决定了分数是否为0
项目开源地址:
https://github.com/0xaww/RHG-AutoPwn-Robot.git
原文始发于微信公众号(SAINTSEC):RHG & AutoPwn Robot竞赛技术结构详解
- 左青龙
- 微信扫一扫
- 右白虎
- 微信扫一扫
评论