关注并星标🌟 一起学安全❤️
作者:coleak
首发于公号:渗透测试安全攻防
字数:18743
声明:仅供学习参考,请勿用作违法用途
目录
-
Flask代码审计 -
SQL注入 -
命令/代码执行 -
反序列化 -
文件操作 -
XXE -
SSRF -
XSS -
其他 -
审计实战 -
后记 -
reference
Flask代码审计
SQL注入
1、正确的使用直白一点就是:使用”逗号”,而不是”百分号”
stmt = "SELECT * FROM table WHERE id=?"connection.execute(stmt, (value,))#或者cursor.execute("SELECT * FROM users WHERE name = ?", (username,))
2、检查所有SQL语句是否使用+
、%s
或f-string
直接拼接用户输入
"SELECT * FROM table WHERE id=" + value"SELECT * FROM table WHERE id=%s" % value"SELECT * FROM table WHERE id={0}".format(value)
3、SQLAlchemy的text()
是否进行参数化
from sqlalchemy import text# 错误用法stmt = text(f"SELECT * FROM users WHERE name = '{username}'")# 正确用法stmt = text("SELECT * FROM users WHERE name = :username").bindparams(username=username)#或query = "SELECT * FROM articles WHERE title LIKE :keyword"result = db.session.execute(query, {"keyword": f"%{keyword}%"})
4、ORM安全使用:优先使用ORM方法
# SQLAlchemy ORMUser.query.filter_by(username=username).first()
命令/代码执行
1、危险函数popen、system、commands、subprocess、exec、eval
import subprocess@app.route('/ping')defping(): ip = request.args.get('ip') result = subprocess.run(["ping", "-c", "1", ip], capture_output=True, text=True)returnf"<pre>{result.stdout}</pre>"#subprocess.run() 绑定参数,不会执行恶意命令。
2、SSTI
render_template_string
3、第三方库风险
import yaml# 漏洞示例:使用默认Loaderdata = yaml.load(user_input, Loader=yaml.Loader) # 可触发任意代码执行
4、反序列化漏洞:pickle、marshal、PyYAML
import pickle# 漏洞示例:反序列化用户可控数据data = request.get_data()obj = pickle.loads(data) # 攻击者可构造恶意序列化对象(如反弹Shell)
反序列化
反序列化漏洞的核心是程序将不可信的序列化数据还原为对象时,未验证数据合法性,导致攻击者通过构造恶意序列化数据执行任意代码。常见场景:
-
pickle
模块的不安全使用:pickle.loads()
直接反序列化用户输入。 -
PyYAML
的不安全加载:yaml.load()
默认支持执行构造函数(如!!python/object
)。 -
自定义反序列化逻辑:开发者自行实现的 __reduce__
方法被利用
1、不安全模块:pickle
、marshal
、PyYAML
等。
2、防御措施:优先使用安全格式如JSON(json.loads()
)代替pickle
。
3、第三方库的反序列化操作危险库:PyYAML
(默认Loader不安全)、dill
、shelve
等
4、安全使用pickle
:仅反序列化可信数据确保序列化数据来源可信。签名验证对序列化数据签名,防止篡改。
import hmac, picklekey = b'secret_key'data = request.get_data()# 验证HMAC签名ifnot hmac.compare_digest(hmac.new(key, data).digest(), request.headers.get('Signature')): abort(403)obj = pickle.loads(data)
5、安全使用PyYAML
,强制使用安全Loader:如SafeLoader
或FullLoader
import yamldata = yaml.load(user_input, Loader=yaml.SafeLoader) # 禁用构造函数
文件操作
1、关键函数
file()、file.save()、open()、codecs.open()
2、安全文件上传(白名单限制文件类型、重命名上传文件、magic验证文件内容)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}defallowed_file(filename):return'.'in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONSif file and allowed_file(file.filename): file.save("safe_path")import uuidsecure_name = str(uuid.uuid4()) + ".png"# 生成随机文件名或通过secure_filename()file.save(f"/uploads/{secure_name}")import magicmime = magic.from_buffer(file.read(1024), mime=True)if mime notin ['image/png', 'image/jpeg', 'image/gif']: abort(400, "Invalid file type")
3、防止路径遍历
from werkzeug.utils import secure_filenameimport osfilename = secure_filename(request.form['filename']) # 过滤特殊字符base_dir = os.path.abspath("/var/data")target_path = os.path.join(base_dir, filename)# 确保目标路径在base_dir下ifnot os.path.commonprefix([base_dir, target_path]) == base_dir: abort(403, "Invalid path")
4、安全文件下载:映射文件名到安全ID,不直接暴露文件路径
# 数据库存储文件ID与真实路径的映射@app.route('/download/<file_id>')defdownload(file_id): file_path = db.get_file_path(file_id) # 从数据库查询安全路径return send_file(file_path)
XXE
1、直接解析用户XML并用危险函数/库解析:lxml
、xml.etree.ElementTree
、defusedxml
未安全配置
xml_data='''<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///c:/cc.txt">]><data>&xxe;</data>'''from lxml import etree# 漏洞示例:直接解析用户输入root = etree.fromstring(xml_data) # 允许解析外部实体print(root.text)
2、禁用外部实体解析、输入过滤、使用JSON替代XML
from defusedxml.ElementTree import parsetree = parse(xml_file) # 默认禁用外部实体from lxml import etreeparser = etree.XMLParser(resolve_entities=False) # 禁用实体解析root = etree.fromstring(xml_data, parser=parser)if re.search(r"<!ENTITY|SYSTEM|PUBLIC", xml_data, re.IGNORECASE): abort(400, "Invalid XML")
SSRF
1、直接请求、间接URL拼接用户提供的URL(urllib、requests等库)
# 漏洞示例:直接请求用户输入URLurl = request.form['url']response = requests.get(url) # 输入"file:///etc/passwd"可能读取文件(取决于库支持)# 漏洞示例:拼接用户输入到内部APIuser_id = request.args.get('id')internal_url = f"http://internal-api:8080/user/{user_id}"requests.get(internal_url) # 输入"[email protected]"可能绕过校验# requests.get()、urllib.request.urlopen()
2、名单校验域名/IP、限制协议类型、防止DNS重绑定攻击
ALLOWED_DOMAINS = {'example.com', 'cdn.example.net'}from urllib.parse import urlparsedefis_allowed_url(url): parsed = urlparse(url)if parsed.hostname in ALLOWED_DOMAINS:returnTruereturnFalseifnot is_allowed_url(url): abort(400, "Invalid URL")parsed = urlparse(url)if parsed.scheme notin ['http', 'https']: abort(400, "Unsupported protocol")import socketfrom urllib.parse import urlparseparsed = urlparse(url)hostname = parsed.hostname# 解析DNS并验证IP是否合法resolved_ip = socket.gethostbyname(hostname)if resolved_ip in ['127.0.0.1', '169.254.169.254']: abort(403, "Forbidden IP")
XSS
1、直接渲染未转义的用户输入(|safe
过滤器或Markup
类或render_template_string
或直接return输入)
from flask import Markupuser_input = request.args.get('q')return render_template('search.html', result=Markup(user_input))<script> var userData = "{{ search_query|safe }}"; // 输入"; alert(1);//</script>
2、payload
<svg><script>alert(1)</script></svg> <!-- HTML实体编码绕过 --><script>alert(1)</script><img src=x onerror=alert(1)><img/src='1'/onerror=alert(0)>
3、使用 escape()
过滤用户输入,使用render_template渲染,设置 CSP(内容安全策略)
from markupsafe import escape@app.route('/comment', methods=['POST'])defcomment(): username = escape(request.args.get('username'))return render_template('profile.html', username=username)
4、文件上传
攻击者上传恶意 xss.svg
<svgxmlns="http://www.w3.org/2000/svg"version="1.1"><circlecx="100"cy="50"r="40"stroke="black"stroke-width="2"fill="red" /><script>alert(1)</script></svg>
解决方案:禁止上传 SVG、HTML 文件或者对于所有用户上传的文件,使用
Content-Disposition: attachment
,防止直接在浏览器解析
return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True)
其他
CSRF
from flask_wtf.csrf import CSRFProtectcsrf = CSRFProtect(app)
权限校验
from flask_login import current_useruser = User.query.get(user_id)if user.id != current_user.id: abort(403)
逻辑漏洞
# 漏洞示例:未加锁的余额扣减导致并发user.balance -= amountdb.session.commit() # 并发请求可能导致余额为负
组件漏洞
pip install safetysafety scansafety system-scansafety scan --apply-fixes
cors
# Flask-CORS 示例from flask_cors import CORSCORS(app, origins=["https://www.attacker.com"], supports_credentials=True)
Origin: http://www.attacker.com#返回如下Access-Control-Allow-Origin: https://www.attacker.comAccess-Control-Allow-Credentials: true#或者如下Access-Control-Allow-Origin: nullAccess-Control-Allow-Credentials: true
配置 |
|
|
---|---|---|
Access-Control-Allow-Origin: *
Credentials: true |
|
|
Access-Control-Allow-Origin: 任意值
Credentials: true |
|
|
Access-Control-Allow-Origin: null
Credentials: true |
|
|
修复核心:不要未经校验将客户端提供的 Origin
直接返回*,且携带凭证时禁止使用通配符 *
和 null
。
key环境变量
import osapp.secret_key = os.getenv("SECRET_KEY", "fallback_secret")
最小权限原则
普通用户 只授予
SELECT, INSERT, UPDATE
权限,管理员用户 才能DROP
或ALTER
数据库
CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'strongpassword';GRANT SELECT, INSERT, UPDATE ON mydb.* TO 'appuser'@'localhost';CREATE USER 'admin'@'localhost' IDENTIFIED BY 'adminpassword';GRANT ALL PRIVILEGES ON mydb.* TO 'admin'@'localhost';
缓存安全
from flask import Flask, request, jsonifyfrom flask_caching import Cacheimport redisapp = Flask(__name__)# 配置缓存(假设Redis在内网,已开启密码和TLS)app.config['CACHE_TYPE'] = 'RedisCache'app.config['CACHE_REDIS_HOST'] = '10.0.0.5'# 内网IPapp.config['CACHE_REDIS_PORT'] = 6379app.config['CACHE_REDIS_PASSWORD'] = 'strong_redis_password'app.config['CACHE_KEY_PREFIX'] = 'myapp:'# 使用命名空间隔离数据app.config['CACHE_DEFAULT_TIMEOUT'] = 300cache = Cache(app)# 示例接口:存取用户数据(假设数据敏感,先加密后存储)import hashlibimport base64defencrypt_data(data, key='secret_key'):# 这里仅示例使用简单的哈希加盐方式(实际请使用更安全的加密方法)return base64.b64encode(hashlib.sha256((data + key).encode()).digest()).decode()defdecrypt_data(data, key='secret_key'):# 加密后无法直接解密,此处仅为示例return data@app.route('/store', methods=['POST'])defstore(): username = request.form.get('username') secret_info = request.form.get('secret_info')ifnot username ornot secret_info:return jsonify({'error': 'Missing parameters'}), 400# 加密敏感数据 encrypted = encrypt_data(secret_info) cache_key = f"user:{username}:info" cache.set(cache_key, encrypted)return jsonify({'message': 'Stored securely'}), 200@app.route('/retrieve', methods=['GET'])defretrieve(): username = request.args.get('username')ifnot username:return jsonify({'error': 'Missing username'}), 400 cache_key = f"user:{username}:info" encrypted = cache.get(cache_key)ifnot encrypted:return jsonify({'error': 'No data found'}), 404# 此处调用解密(如果能解密的话) info = decrypt_data(encrypted)return jsonify({'username': username, 'secret_info': info}), 200if __name__ == '__main__': app.run(debug=False)
审计实战
目标为某flask的cms,查找|safe
关键词发现在article.html中存在该关键词
{% if render_recommendations is defined %} {{ render_recommendations()|safe }} {% endif %}
在__init__.py
中声明render_recommendations上下文,查看render_recommendations()函数
definit_app(self, app):"""初始化插件""" super().init_app(app)# 注册模板函数defrender_recommendations():"""渲染推荐模板""" template_path = os.path.join(os.path.dirname(__file__), 'templates', 'recommendations.html')if os.path.exists(template_path):with open(template_path, 'r', encoding='utf-8') as f: template = f.read()return Markup(render_template_string(template))return''# 直接添加到 Jinja2 环境 app.jinja_env.globals['render_recommendations'] = render_recommendations
这里Markup和|safe将recommendations.html内容添加到article.html,如下
<divclass="bg-white dark:bg-gray-800 rounded-lg shadow-sm p-8 mt-12 mb-8"><h3class="text-xl font-bold mb-8 text-gray-900 dark:text-white flex items-center"><svgclass="w-5 h-5 mr-2"fill="none"stroke="currentColor"viewBox="0 0 24 24"><pathstroke-linecap="round"stroke-linejoin="round"stroke-width="2"d="M13 10V3L4 14h7v7l9-11h-7z"></path></svg> 相关推荐</h3><divid="recommendations-container"class="mb-2"><!-- 推荐内容将通过 JS 动态加载 --></div></div><scriptsrc="{{ url_for('article_recommender_static', filename='js/recommendations.js') }}"></script>
js文件如下:
|
|
---|---|
EMPTY_STATE_HTML |
|
ERROR_STATE_HTML |
|
renderArticleCard(article) |
|
renderTags(tags) |
|
loadRecommendations(articleId) |
|
DOMContentLoaded
|
|
functionrenderArticleCard(article) {return` <a href="/article/${article.id}" class="group block bg-gray-50 dark:bg-gray-700 rounded-lg p-6 transition-all duration-200 hover:shadow-md hover:bg-gray-100 dark:hover:bg-gray-600"> <div class="space-y-4"> <h4 class="font-bold text-gray-900 dark:text-gray-100 group-hover:text-blue-600 dark:group-hover:text-blue-400 transition-colors duration-200 line-clamp-2">${article.title} </h4> <p class="text-sm text-gray-600 dark:text-gray-300 line-clamp-2">${article.summary} </p> <div class="flex items-center justify-between"> <span class="text-sm text-gray-500 dark:text-gray-400">${article.category} </span> <div class="flex flex-wrap gap-2">${renderTags(article.tags)} </div> </div> </div> </a> `;}
title、category、tags三处均存在存储型xss,提交恶意payload<img/src='1'/onerror=alert(0)>
后,在其他文章的相关推荐功能处渲染js代码完成弹窗
后记
Referer /Origin
在 HTTP 请求中,Referer
和 Origin
是两个与请求来源相关的头部字段,但它们的用途、格式和安全特性有显著区别。以下是它们的核心差异:
1、定义与格式
字段 | 定义 | 格式示例 | 包含路径信息 |
---|---|---|---|
Referer |
|
https://example.com/page.html |
/page.html ) |
Origin |
|
https://example.com:8080 |
|
2、主要用途
字段 | 应用场景 | 典型用例 |
---|---|---|
Referer |
|
pageA.html 点击链接跳转到 pageB.html ,Referer 值为 pageA.html |
Origin |
|
Origin 头供服务器验证 |
3、发送条件
字段 | 触发发送的请求类型 | 浏览器行为 |
---|---|---|
Referer |
点击) - 资源加载(如 , ``) - 表单提交(GET/POST) |
Referrer-Policy )或用户设置阻止 |
Origin |
fetch /XMLHttpRequest ) - POST 请求(部分浏览器) - WebSocket 连接 |
POST )时发送 |
事务执行失败
如果事务执行失败,数据库连接可能会卡住,影响其他查询。
try: db.session.add(new_user) db.session.commit()except Exception as e: db.session.rollback() # 事务回滚,防止数据库状态异常 print(f"Error: {e}")
reference
https://mp.weixin.qq.com/s/y1ta34MzowUnOvFnShk2MQhttps://www.freebuf.com/news/168362.htmlhttps://blog.hackall.cn/cvesubmit/614.htmlhttps://www.freebuf.com/articles/web/404899.htmlhttps://blog.neargle.com/2016/07/25/log-of-simple-code-review-about-python-base-webapp/https://www.cnblogs.com/xiaozi/p/7268506.html
原文始发于微信公众号(渗透测试安全攻防):Flask代码审计从思路到实战
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论