Flask代码审计从思路到实战

admin 2025年2月5日00:38:53评论15 views字数 11732阅读39分6秒阅读模式

关注并星标🌟 一起学安全❤️

作者:coleak  

首发于公号:渗透测试安全攻防 

字数:18743

声明:仅供学习参考,请勿用作违法用途

目录

  • Flask代码审计
    • SQL注入
    • 命令/代码执行
    • 反序列化
    • 文件操作
    • XXE
    • SSRF
    • XSS
    • 其他
  • 审计实战
  • 后记
  • reference

Flask代码审计

SQL注入

1、正确的使用直白一点就是:使用”逗号”,而不是”百分号”

stmt = "SELECT * FROM table WHERE id=?"connection.execute(stmt, (value,))#或者cursor.execute("SELECT * FROM users WHERE name = ?", (username,))

2、检查所有SQL语句是否使用+%sf-string直接拼接用户输入

"SELECT * FROM table WHERE id=" + value"SELECT * FROM table WHERE id=%s" % value"SELECT * FROM table WHERE id={0}".format(value)

3、SQLAlchemy的text()是否进行参数化

from sqlalchemy import text# 错误用法stmt = text(f"SELECT * FROM users WHERE name = '{username}'")# 正确用法stmt = text("SELECT * FROM users WHERE name = :username").bindparams(username=username)#或query = "SELECT * FROM articles WHERE title LIKE :keyword"result = db.session.execute(query, {"keyword"f"%{keyword}%"})

4、ORM安全使用:优先使用ORM方法

# SQLAlchemy ORMUser.query.filter_by(username=username).first()

命令/代码执行

1、危险函数popen、system、commands、subprocess、exec、eval

import subprocess@app.route('/ping')defping():    ip = request.args.get('ip')    result = subprocess.run(["ping""-c""1", ip], capture_output=True, text=True)returnf"<pre>{result.stdout}</pre>"#subprocess.run() 绑定参数,不会执行恶意命令。

2、SSTI

render_template_string

3、第三方库风险

import yaml# 漏洞示例:使用默认Loaderdata = yaml.load(user_input, Loader=yaml.Loader)  # 可触发任意代码执行

4、反序列化漏洞:pickle、marshal、PyYAML

import pickle# 漏洞示例:反序列化用户可控数据data = request.get_data()obj = pickle.loads(data)  # 攻击者可构造恶意序列化对象(如反弹Shell)

反序列化

反序列化漏洞的核心是程序将不可信的序列化数据还原为对象时,未验证数据合法性,导致攻击者通过构造恶意序列化数据执行任意代码。常见场景:

  1. pickle模块的不安全使用pickle.loads()直接反序列化用户输入。
  2. PyYAML的不安全加载yaml.load()默认支持执行构造函数(如!!python/object)。
  3. 自定义反序列化逻辑:开发者自行实现的__reduce__方法被利用

1、不安全模块:picklemarshalPyYAML等。

2、防御措施:优先使用安全格式如JSON(json.loads())代替pickle

3、第三方库的反序列化操作危险库:PyYAML(默认Loader不安全)、dillshelve

4、安全使用pickle:仅反序列化可信数据确保序列化数据来源可信。签名验证对序列化数据签名,防止篡改。

import hmac, picklekey = b'secret_key'data = request.get_data()# 验证HMAC签名ifnot hmac.compare_digest(hmac.new(key, data).digest(), request.headers.get('Signature')):    abort(403)obj = pickle.loads(data)

5、安全使用PyYAML,强制使用安全Loader:如SafeLoaderFullLoader

import yamldata = yaml.load(user_input, Loader=yaml.SafeLoader)  # 禁用构造函数

文件操作

1、关键函数

file()、file.save()、open()、codecs.open()

2、安全文件上传(白名单限制文件类型、重命名上传文件、magic验证文件内容)

ALLOWED_EXTENSIONS = {'png''jpg''jpeg'}defallowed_file(filename):return'.'in filename and        filename.rsplit('.'1)[1].lower() in ALLOWED_EXTENSIONSif file and allowed_file(file.filename):    file.save("safe_path")import uuidsecure_name = str(uuid.uuid4()) + ".png"# 生成随机文件名或通过secure_filename()file.save(f"/uploads/{secure_name}")import magicmime = magic.from_buffer(file.read(1024), mime=True)if mime notin ['image/png''image/jpeg''image/gif']:    abort(400"Invalid file type")

3、防止路径遍历

from werkzeug.utils import secure_filenameimport osfilename = secure_filename(request.form['filename'])  # 过滤特殊字符base_dir = os.path.abspath("/var/data")target_path = os.path.join(base_dir, filename)# 确保目标路径在base_dir下ifnot os.path.commonprefix([base_dir, target_path]) == base_dir:    abort(403"Invalid path")

4、安全文件下载:映射文件名到安全ID,不直接暴露文件路径

# 数据库存储文件ID与真实路径的映射@app.route('/download/<file_id>')defdownload(file_id):    file_path = db.get_file_path(file_id)  # 从数据库查询安全路径return send_file(file_path)

XXE

1、直接解析用户XML并用危险函数/库解析:lxmlxml.etree.ElementTreedefusedxml未安全配置

xml_data='''<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///c:/cc.txt">]><data>&xxe;</data>'''from lxml import etree# 漏洞示例:直接解析用户输入root = etree.fromstring(xml_data)  # 允许解析外部实体print(root.text)

2、禁用外部实体解析、输入过滤、使用JSON替代XML

from defusedxml.ElementTree import parsetree = parse(xml_file)  # 默认禁用外部实体from lxml import etreeparser = etree.XMLParser(resolve_entities=False)  # 禁用实体解析root = etree.fromstring(xml_data, parser=parser)if re.search(r"<!ENTITY|SYSTEM|PUBLIC", xml_data, re.IGNORECASE):    abort(400, "Invalid XML")

SSRF

1、直接请求、间接URL拼接用户提供的URL(urllib、requests等库)

# 漏洞示例:直接请求用户输入URLurl = request.form['url']response = requests.get(url)  # 输入"file:///etc/passwd"可能读取文件(取决于库支持)# 漏洞示例:拼接用户输入到内部APIuser_id = request.args.get('id')internal_url = f"http://internal-api:8080/user/{user_id}"requests.get(internal_url)  # 输入"[email protected]"可能绕过校验# requests.get()、urllib.request.urlopen()

2、名单校验域名/IP、限制协议类型、防止DNS重绑定攻击

ALLOWED_DOMAINS = {'example.com''cdn.example.net'}from urllib.parse import urlparsedefis_allowed_url(url):    parsed = urlparse(url)if parsed.hostname in ALLOWED_DOMAINS:returnTruereturnFalseifnot is_allowed_url(url):    abort(400"Invalid URL")parsed = urlparse(url)if parsed.scheme notin ['http''https']:    abort(400"Unsupported protocol")import socketfrom urllib.parse import urlparseparsed = urlparse(url)hostname = parsed.hostname# 解析DNS并验证IP是否合法resolved_ip = socket.gethostbyname(hostname)if resolved_ip in ['127.0.0.1''169.254.169.254']:    abort(403"Forbidden IP")

XSS

1、直接渲染未转义的用户输入(|safe过滤器或Markup类或render_template_string或直接return输入)

from flask import Markupuser_input = request.args.get('q')return render_template('search.html', result=Markup(user_input))<script>  var userData = "{{ search_query|safe }}";  // 输入"; alert(1);//</script>

2、payload

<svg><script>alert&#40;1&#41;</script></svg>  <!-- HTML实体编码绕过 --><script>alert(1)</script><img src=x onerror=alert(1)><img/src='1'/onerror=alert(0)>

3、使用 escape() 过滤用户输入,使用render_template渲染,设置 CSP(内容安全策略)

from markupsafe import escape@app.route('/comment', methods=['POST'])defcomment():    username = escape(request.args.get('username'))return render_template('profile.html', username=username)

4、文件上传

攻击者上传恶意 xss.svg

<svgxmlns="http://www.w3.org/2000/svg"version="1.1"><circlecx="100"cy="50"r="40"stroke="black"stroke-width="2"fill="red" /><script>alert(1)</script></svg>

解决方案:禁止上传 SVG、HTML 文件或者对于所有用户上传的文件,使用 Content-Disposition: attachment,防止直接在浏览器解析

return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True)

其他

CSRF

from flask_wtf.csrf import CSRFProtectcsrf = CSRFProtect(app)

权限校验

from flask_login import current_useruser = User.query.get(user_id)if user.id != current_user.id:    abort(403)

逻辑漏洞

# 漏洞示例:未加锁的余额扣减导致并发user.balance -= amountdb.session.commit()  # 并发请求可能导致余额为负

组件漏洞

pip install safetysafety scansafety system-scansafety scan --apply-fixes

cors

# Flask-CORS 示例from flask_cors import CORSCORS(app, origins=["https://www.attacker.com"], supports_credentials=True)
Origin: http://www.attacker.com#返回如下Access-Control-Allow-Origin: https://www.attacker.comAccess-Control-Allow-Credentials: true#或者如下Access-Control-Allow-Origin: nullAccess-Control-Allow-Credentials: true
配置
风险等级
说明
Access-Control-Allow-Origin: *

 + Credentials: true
安全(但错误)
浏览器强制阻断请求
Access-Control-Allow-Origin: 任意值

 + Credentials: true
高危
数据可被攻击者窃取
Access-Control-Allow-Origin: null

 + Credentials: true
高危
特殊场景下可绕过同源策略

修复核心:不要未经校验将客户端提供的 Origin 直接返回*,且携带凭证时禁止使用通配符 * 和 null

key环境变量

import osapp.secret_key = os.getenv("SECRET_KEY""fallback_secret")

最小权限原则

普通用户 只授予 SELECT, INSERT, UPDATE 权限,管理员用户 才能 DROP 或 ALTER 数据库

CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'strongpassword';GRANT SELECT, INSERT, UPDATE ON mydb.* TO 'appuser'@'localhost';CREATE USER 'admin'@'localhost' IDENTIFIED BY 'adminpassword';GRANT ALL PRIVILEGES ON mydb.* TO 'admin'@'localhost';

缓存安全

from flask import Flask, request, jsonifyfrom flask_caching import Cacheimport redisapp = Flask(__name__)# 配置缓存(假设Redis在内网,已开启密码和TLS)app.config['CACHE_TYPE'] = 'RedisCache'app.config['CACHE_REDIS_HOST'] = '10.0.0.5'# 内网IPapp.config['CACHE_REDIS_PORT'] = 6379app.config['CACHE_REDIS_PASSWORD'] = 'strong_redis_password'app.config['CACHE_KEY_PREFIX'] = 'myapp:'# 使用命名空间隔离数据app.config['CACHE_DEFAULT_TIMEOUT'] = 300cache = Cache(app)# 示例接口:存取用户数据(假设数据敏感,先加密后存储)import hashlibimport base64defencrypt_data(data, key='secret_key'):# 这里仅示例使用简单的哈希加盐方式(实际请使用更安全的加密方法)return base64.b64encode(hashlib.sha256((data + key).encode()).digest()).decode()defdecrypt_data(data, key='secret_key'):# 加密后无法直接解密,此处仅为示例return data@app.route('/store', methods=['POST'])defstore():    username = request.form.get('username')    secret_info = request.form.get('secret_info')ifnot username ornot secret_info:return jsonify({'error''Missing parameters'}), 400# 加密敏感数据    encrypted = encrypt_data(secret_info)    cache_key = f"user:{username}:info"    cache.set(cache_key, encrypted)return jsonify({'message''Stored securely'}), 200@app.route('/retrieve', methods=['GET'])defretrieve():    username = request.args.get('username')ifnot username:return jsonify({'error''Missing username'}), 400    cache_key = f"user:{username}:info"    encrypted = cache.get(cache_key)ifnot encrypted:return jsonify({'error''No data found'}), 404# 此处调用解密(如果能解密的话)    info = decrypt_data(encrypted)return jsonify({'username': username, 'secret_info': info}), 200if __name__ == '__main__':    app.run(debug=False)

审计实战

目标为某flask的cms,查找|safe关键词发现在article.html中存在该关键词

    {% if render_recommendations is defined %}    {{ render_recommendations()|safe }}    {% endif %}

__init__.py中声明render_recommendations上下文,查看render_recommendations()函数

definit_app(self, app):"""初始化插件"""        super().init_app(app)# 注册模板函数defrender_recommendations():"""渲染推荐模板"""            template_path = os.path.join(os.path.dirname(__file__), 'templates''recommendations.html')if os.path.exists(template_path):with open(template_path, 'r', encoding='utf-8'as f:                    template = f.read()return Markup(render_template_string(template))return''# 直接添加到 Jinja2 环境        app.jinja_env.globals['render_recommendations'] = render_recommendations

这里Markup和|safe将recommendations.html内容添加到article.html,如下

<divclass="bg-white dark:bg-gray-800 rounded-lg shadow-sm p-8 mt-12 mb-8"><h3class="text-xl font-bold mb-8 text-gray-900 dark:text-white flex items-center"><svgclass="w-5 h-5 mr-2"fill="none"stroke="currentColor"viewBox="0 0 24 24"><pathstroke-linecap="round"stroke-linejoin="round"stroke-width="2"d="M13 10V3L4 14h7v7l9-11h-7z"></path></svg>        相关推荐</h3><divid="recommendations-container"class="mb-2"><!-- 推荐内容将通过 JS 动态加载 --></div></div><scriptsrc="{{ url_for('article_recommender_static', filename='js/recommendations.js') }}"></script>

js文件如下:

功能模块
作用
EMPTY_STATE_HTML
当无推荐文章时显示空状态
ERROR_STATE_HTML
当加载失败时显示错误状态
renderArticleCard(article)
生成文章卡片 HTML
renderTags(tags)
渲染标签(最多 2 个)
loadRecommendations(articleId)
请求并渲染推荐文章
DOMContentLoaded

 事件
页面加载后自动获取推荐文章
functionrenderArticleCard(article{return`        <a href="/article/${article.id}           class="group block bg-gray-50 dark:bg-gray-700 rounded-lg p-6 transition-all duration-200 hover:shadow-md hover:bg-gray-100 dark:hover:bg-gray-600">            <div class="space-y-4">                <h4 class="font-bold text-gray-900 dark:text-gray-100 group-hover:text-blue-600 dark:group-hover:text-blue-400 transition-colors duration-200 line-clamp-2">${article.title}                </h4>                <p class="text-sm text-gray-600 dark:text-gray-300 line-clamp-2">${article.summary}                </p>                <div class="flex items-center justify-between">                    <span class="text-sm text-gray-500 dark:text-gray-400">${article.category}                    </span>                    <div class="flex flex-wrap gap-2">${renderTags(article.tags)}                    </div>                </div>            </div>        </a>    `;}

title、category、tags三处均存在存储型xss,提交恶意payload<img/src='1'/onerror=alert(0)>后,在其他文章的相关推荐功能处渲染js代码完成弹窗

Flask代码审计从思路到实战

后记

Referer /Origin

在 HTTP 请求中,Referer 和 Origin 是两个与请求来源相关的头部字段,但它们的用途、格式和安全特性有显著区别。以下是它们的核心差异:

1、定义与格式

字段 定义 格式示例 包含路径信息
Referer
表示当前请求的 来源页面完整 URL
https://example.com/page.html
✅ 包含路径(如 /page.html
Origin
表示当前请求的 协议 + 域名 + 端口(不包含路径)
https://example.com:8080
❌ 不包含路径

2、主要用途

字段 应用场景 典型用例
Referer
- 统计流量来源 - 防止图片盗链(Hotlinking) - 分析用户行为
用户从 pageA.html 点击链接跳转到 pageB.htmlReferer 值为 pageA.html
Origin
- CORS(跨域资源共享)安全机制 - 限制跨域请求来源
浏览器发起跨域 AJAX 请求时,自动添加 Origin 头供服务器验证

3、发送条件

字段 触发发送的请求类型 浏览器行为
Referer
- 页面跳转(如 点击) - 资源加载(如, ``) - 表单提交(GET/POST)
默认发送,但可能被浏览器策略(如 Referrer-Policy)或用户设置阻止
Origin
- 跨域 AJAX(fetch/XMLHttpRequest) - POST 请求(部分浏览器) - WebSocket 连接
仅在跨域请求或特定方法(如 POST)时发送

事务执行失败

如果事务执行失败,数据库连接可能会卡住,影响其他查询。

try:    db.session.add(new_user)    db.session.commit()except Exception as e:    db.session.rollback()  # 事务回滚,防止数据库状态异常    print(f"Error: {e}")

reference

https://mp.weixin.qq.com/s/y1ta34MzowUnOvFnShk2MQhttps://www.freebuf.com/news/168362.htmlhttps://blog.hackall.cn/cvesubmit/614.htmlhttps://www.freebuf.com/articles/web/404899.htmlhttps://blog.neargle.com/2016/07/25/log-of-simple-code-review-about-python-base-webapp/https://www.cnblogs.com/xiaozi/p/7268506.html

原文始发于微信公众号(渗透测试安全攻防):Flask代码审计从思路到实战

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2025年2月5日00:38:53
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Flask代码审计从思路到实战http://cn-sec.com/archives/3695675.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息