关注并星标🌟 一起学安全❤️
作者:coleak
首发于公号:渗透测试安全攻防
字数:97250
声明:仅供学习参考,请勿用作违法用途
目录
-
前记 -
Flask -
escape -
|safe -
secure_filename -
errorhandler/make_response -
session -
sql-execute -
generate_password_hash、check_password_hash -
send_from_directory -
CSRFProtect -
jsonify -
safe_load、defusedxml -
flask_talisman/csp -
并发 -
越权 -
oss -
CORS -
SSTI -
后记 -
reference
前记
非专业web开发,以flask框架为基础记录Web开发过程的部分安全问题
Flask
escape
When returning HTML (the default response type in Flask), any user-provided values rendered in the output must be escaped to protect from injection attacks. HTML templates rendered with Jinja, introduced later, will do this automatically.
escape()
, shown here, can be used manually. It is omitted in most examples for brevity, but you should always be aware of how you’re using untrusted data.
from markupsafe import escape@app.route("/<path:name>")defhello(name):#return f"Hello, {name}!"returnf"Hello, {escape(name)}!"#http://127.0.0.1:5000/<script>alert("a")</script>
这里默认path是为了接受slash(/),即Converter types:
string |
|
---|---|
int |
|
float |
|
path |
string but also accepts slashes |
uuid |
|
|safe
Automatic escaping is enabled, so if person contains HTML it will be escaped automatically. If you can trust a variable and you know that it will be safe HTML (for example because it came from a module that converts wiki markup to HTML) you can mark it as safe by using the Markup class or by using the |safe filter in the template. Head over to the Jinja 2 documentation for more examples.
from flask import render_template@app.route("/wel/<name>")defhello(name):return render_template('welcome.html', person=name)
welcome.html
<!DOCTYPE html><htmllang="en"><head><metacharset="UTF-8"><title>Hello from Flask</title></head><body>{% if person %}<!--<h1>Hello {{ person}}!</h1>--><h1>Hello {{ person|safe }}!</h1>{% else %}<h1>Hello, World!</h1>{% endif %}</body></html>
http://127.0.0.1:5000/wel/<img src=x onerror=alert('XSS')>
secure_filename
If you want to know how the file was named on the client before it was uploaded to your application, you can access the filename
attribute. However please keep in mind that this value can be forged so never ever trust that value. If you want to use the filename of the client to store the file on the server, pass it through the secure_filename()
function that Werkzeug provides for you:
from werkzeug.utils import secure_filename@app.route('/upload', methods=['GET', 'POST'])defupload_file():if request.method == 'POST': file = request.files['the_file'] file.save(f"/var/www/uploads/{secure_filename(file.filename)}")
errorhandler/make_response
If you want to get hold of the resulting response object inside the view you can use the make_response() function.
errorhandler:Register a function to handle errors by code or exception class.
自己处理错误返回信息可以防止报错导致信息泄露,特别是调试模式下的报错
@app.errorhandler(404)defnot_found(error): resp = make_response(render_template('error.html'), 404) resp.headers['X-Something'] = 'A value'return resp
session
In addition to the request object there is also a second object called session which allows you to store information specific to a user from one request to the next. This is implemented on top of cookies for you and signs the cookies cryptographically. What this means is that the user could look at the contents of your cookie but not modify it, unless they know the secret key used for signing.
app.secret_key = "dev"counter = 0@app.route('/')@app.route('/<id>')defindex(id=0):if'username'in session:returnf'{id}Logged in as {session["username"]}'returnf'{id}You are not logged in'@app.route('/login', methods=['GET', 'POST'])deflogin():global counterif request.method == 'POST': counter+=1 session['username'] = request.form['username']return redirect(url_for('index',id=counter))return''' <form method="post"> <p><input type=text name=username> <p><input type=submit value=Login> </form> '''@app.route('/logout')deflogout():# remove the username from the session if it's there session.pop('username', None)return redirect(url_for('index'))
random secret_key
import secretsprint(secrets.token_hex())
伪造session
eyJ1c2VyX2lkIjoxfQ.Z1-oSg.ZKFyFEbpDBKPH16nWfEusSHUoPYsession三段式:base64 encode,时间戳,安全签名
import hashlibfrom flask.json.tag import TaggedJSONSerializerfrom itsdangerous import *session = {"user_id":2}secret = 'dev'print(URLSafeSerializer(secret_key=secret, salt='cookie-session', # Flask固定的盐,盐和secret会先经过一轮sha1运算,其结果作为下一轮盐和cookie内容生成签名。 serializer=TaggedJSONSerializer(), signer=TimestampSigner, signer_kwargs={'key_derivation': 'hmac','digest_method': hashlib.sha1 } ).dumps(session))
伪造后修改cookie后完成越权:eyJ1c2VyX2lkIjoyfQ.Z1-rsA.CIWG3nrubHedU_5s-zo4fz_rmrU
sql-execute
db.execute takes a SQL query with ? placeholders for any user input, and a tuple of values to replace the placeholders with. The database library will take care of escaping the values so you are not vulnerable to a SQL injection attack.
For security, passwords should never be stored in the database directly. Instead, generate_password_hash() is used to securely hash the password, and that hash is stored. Since this query modifies data, db.commit() needs to be called afterwards to save the changes.
db.execute("INSERT INTO user (username, password) VALUES (?, ?)", (username, generate_password_hash(password)), )
generate_password_hash、check_password_hash
db.execute("INSERT INTO user (username, password) VALUES (?, ?)", (username, generate_password_hash(password)), )check_password_hash(user["password"], password)
send_from_directory
-
sending out HTML from uploaded files, never do that, use the Content-Disposition: attachment
header to prevent that problem.
return send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename)Content-Disposition: inline; filename=a.pngContent-Type: image/pngreturn send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename, as_attachment=True)Content-Disposition: attachment; filename=a.pngContent-Type: image/png
如果未指定 as_attachment=True
,浏览器将尝试直接在页面中显示文件内容;如果 as_attachment=True
,则强制下载文件
Content-Type
-
自动 MIME 类型检测:
-
.txt
→text/plain
-
.jpg
→image/jpeg
-
.png
→image/png
-
.pdf
→application/pdf
-
send_from_directory
使用mimetypes.guess_type()
根据文件扩展名推测Content-Type
。 -
例如: -
手动覆盖:
-
可通过
mimetype
参数手动指定内容类型:return send_from_directory( directory="uploads", filename="example.txt", mimetype="application/octet-stream",)
对上传文件进行检验和修改文件名
from flask import request, redirect, url_for, render_template, flash, Blueprint, send_from_directoryimport uuidfrom werkzeug.utils import secure_filenameimport osfrom PIL import Imagefrom flask import current_appbp = Blueprint("file", __name__, url_prefix="/file")# 检查文件扩展名defallowed_file(filename):return'.'in filename and filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']# 检查文件是否为图片defis_image(file_path):try: Image.open(file_path).verify()returnTrueexcept:returnFalse# 上传图片@bp.route('/upload', methods=['GET', 'POST'])defupload_file():if request.method == 'POST':if'file'notin request.files: flash('No file part')return redirect(request.url) file = request.files['file']if file.filename == '': flash('No selected file')return redirect(request.url)if file and allowed_file(file.filename): filename = secure_filename(file.filename) ext = filename.rsplit('.', 1)[1].lower() # 获取文件扩展名 random_filename = f"{uuid.uuid4().hex}.{ext}"# 生成随机文件名 file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], random_filename) file.save(file_path)ifnot is_image(file_path): os.remove(file_path) flash('Uploaded file is not a valid image')return redirect(request.url) flash('File uploaded successfully!')return redirect(url_for('index'))else: flash('Allowed file types are: png, a.jpg, jpeg, gif')return redirect(request.url)return render_template('file/file.html')# 下载图片@bp.route('/download/<filename>', methods=['GET'])defuploaded_file(filename): file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)try:# return send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename)return send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename, as_attachment=True)except FileNotFoundError: flash("File not found.")return redirect(url_for("file.upload_file"))'''app.config.from_mapping( # a default secret that should be overridden by instance config SECRET_KEY="dev", # store the database in the instance folder DATABASE=os.path.join(app.instance_path, "flaskr.sqlite"), UPLOAD_FOLDER = 'uploads', ALLOWED_EXTENSIONS= {'png', 'jpg', 'jpeg', 'gif'} )'''
file.html
<!DOCTYPE html><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width, initial-scale=1.0"><title>Upload Image</title></head><body><h1>Upload an Image</h1> {% with messages = get_flashed_messages() %} {% if messages %}<ul> {% for message in messages %}<li>{{ message }}</li> {% endfor %}</ul> {% endif %} {% endwith %}<formmethod="POST"enctype="multipart/form-data"><labelfor="file">Select image:</label><inputtype="file"name="file"accept="image/*"required><inputtype="submit"value="Upload"></form></body></html>
CSRFProtect
CSRFProtect初始化后会自动保护所有带有 POST
, PUT
, PATCH
, DELETE
等修改性请求。在 HTML 的form表单中添加 CSRF 令牌:
from flask_wtf import CSRFProtectCSRFProtect(app)<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
AJAX Requests with CSRF-Token
base.html
<head><metaname="csrf-token"content="{{ csrf_token() }}"><scriptsrc="{{ url_for('static', filename='js/like.js') }}"></script></head>
like.js
document.addEventListener('DOMContentLoaded', function () {// Find all like buttons by classconst buttons = document.querySelectorAll('.like-button'); buttons.forEach(button => {// Add click event listener to each button button.addEventListener('click', function () {const postId = this.dataset.postId; likePost(postId); // Call likePost function with postId }); });});// Function to handle the "like" actionfunctionlikePost(postId) {const csrfToken = document.querySelector('meta[name="csrf-token"]').getAttribute('content'); fetch(`${postId}/like`, {method: 'POST',headers: {'Content-Type': 'application/json','X-CSRFToken': csrfToken } }) .then(response => response.json()) .then(data => {if (data.status === 'success') {const likesSpan = document.getElementById(`likes-${postId}`); likesSpan.textContent = data.likes; } elseif (data.status === 'error') { alert(data.message); // 显示错误消息 } }) .catch(error =>console.error('Error:', error));}
jsonify
@bp.route('/data', methods=['GET'])defget_data(): data = [1,"<img src=x onerror=alert('XSS')>",'xiao',77]# return datareturn json.dumps(data) #xss# return jsonify(data)
json.dumps:Content-Type: text/html; charset=utf-8
jsonify:application/json
safe_load、defusedxml
defsafe_load(stream):""" Parse the first YAML document in a stream and produce the corresponding Python object. Resolve only basic YAML tags. This is known to be safe for untrusted input. """return load(stream, SafeLoader)import yamlsyaml_data = """!!python/object/apply:os.system args: ['calc']"""try:# This prevents execution of unsafe tags# parsed_data = yaml.safe_load(yaml_data) parsed_data = yaml.load(yaml_data, Loader=yaml.UnsafeLoader) print("Parsed safely:", parsed_data)except yaml.YAMLError as e: print(f"Error: {e}")
from defusedxml import ElementTree as ET# Correct XML payload without external entitiessafe_xml_data = """<?xml version="1.0"?><root> <message>Hello, safe world!</message></root>"""# Safe XML Parsing Exampletry: print("nUsing safe XML parsing") root_safe = ET.fromstring(safe_xml_data, forbid_dtd=True) print(f"Parsed (Safe): {ET.tostring(root_safe, encoding='unicode')}")except ET.ParseError as e: print(f"Error: {e}")
flask_talisman/csp
flask --app .flaskr run --cert=cert.pem --key=key.pem
:通过证书默认开启https
from flask_talisman import Talisman csp = {'default-src': ''self'','script-src': ''self'','style-src': ''self'' } Talisman(app, force_https=True, force_https_permanent=True,content_security_policy=csp)
并发
@bp.route('/<int:id>/like', methods=['POST'])@login_requireddeflike_post(id):"""Handle a like action for a post.""" db = get_db()# 使用事务开始并立即锁定数据库#db.execute('BEGIN IMMEDIATE')try:# 检查用户是否已经给该文章点过赞 like_exists = db.execute("SELECT 1 FROM post_likes WHERE user_id = ? AND post_id = ?", (g.user['id'], id) ).fetchone()if like_exists:return jsonify(status='error', message='You have already liked this post.')# 增加点赞记录 db.execute("INSERT INTO post_likes (user_id, post_id) VALUES (?, ?)", (g.user['id'], id) )# 更新文章的点赞数量 db.execute("UPDATE post SET likes = likes + 1 WHERE id = ?", (id,) ) db.commit() # 提交事务# 获取更新后的点赞数量 likes = db.execute("SELECT likes FROM post WHERE id = ?", (id,) ).fetchone()['likes']return jsonify(status='success', likes=likes)except Exception as e: db.rollback() # 如果发生异常,回滚事务return jsonify(status='error', message=str(e))
解决上述并发漏洞的方法如下:
-
db.execute('BEGIN IMMEDIATE'):使用 BEGIN IMMEDIATE
来显式开始事务并锁定表。在事务中完成所有操作(检查是否点赞、插入点赞、更新点赞数),确保这些操作在同一事务中执行。 -
加一个额外的唯一约束,在 post_likes
表上建立(user_id, post_id)
唯一约束来防止重复点赞
CREATE TABLE post_likes ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, post_id INTEGER NOT NULL, UNIQUE(user_id, post_id), -- 确保同一用户对同一篇文章只能点赞一次 FOREIGN KEY (user_id) REFERENCES user (id), FOREIGN KEY (post_id) REFERENCES post (id));
-
利用缓存层批量写入
每次点赞时,写入 Redis 哈希表;定时批量同步到数据库(例如每五秒)
越权
-
在增删查改处加入鉴权和登录校验
deflogin_required(view):"""View decorator that redirects anonymous users to the login page.""" @functools.wraps(view)defwrapped_view(**kwargs):if g.user isNone:return redirect(url_for("auth.login"))return view(**kwargs)return wrapped_viewdefget_post(id, check_author=True): ...if post isNone: abort(404, f"Post id {id} doesn't exist.")if check_author and post["author_id"] != g.user["id"]: abort(403)return post@bp.route("/<int:id>/update", methods=("GET", "POST"))@login_requireddefupdate(id):"""Update a post if the current user is the author.""" post = get_post(id)
-
使用session机制验证用户身份
@bp.route("/login", methods=("GET", "POST"))deflogin(): ...if error isNone:# store the user id in a new session and return to the index session.clear() session["user_id"] = user["id"]@bp.route("/logout")deflogout():"""Clear the current session, including the stored user id.""" session.clear()@bp.before_app_requestdefload_logged_in_user():"""If a user id is stored in the session, load the user object from the database into ``g.user``.""" user_id = session.get("user_id")if user_id isNone: g.user = Noneelse: g.user = ( get_db().execute("SELECT * FROM user WHERE id = ?", (user_id,)).fetchone() )
oss
对象存储(Object-Based Storage),也可以叫做面向对象的存储,现在也有不少厂商直接把它叫做云存储。
说到对象存储就不得不提 Amazon,Amazon S3 (Simple Storage Service) 简单存储服务,是 Amazon 的公开云存储服务,与之对应的协议被称为 S3 协议,目前 S3 协议已经被视为公认的行业标准协议,因此目前国内主流的对象存储厂商基本上都会支持 S3 协议。
在 Amazon S3 标准下中,对象存储中可以有多个桶(Bucket),然后把对象(Object)放在桶里,对象又包含了三个部分:Key、Data 和 Metadata
1、Bucket权限配置
公开访问:在只配置读写权限设置为公有读或公共读写的情况下,无法列出对象(AccessDenied)。但是可以爆破key,从而访问对应的KEY路径(xxx.oss-cn-hangzhou.aliyuncs.com/img.png)
ListObject:列出Object对象,访问存储桶域名可以把存储桶的东西列出来
公开写:管理员将存储桶权限配置为可写,则攻击者可上传任意文件到存储桶中,或覆盖已经存在的文件。PUT上传文件名aa,内容为aabb数据包如下
PUT /aa HTTP/1.1Host: coleakspictures.oss-cn-hangzhou.aliyuncs.comCache-Control: max-age=0Sec-Ch-Ua: "Chromium";v="97", " Not;A Brand";v="99"Sec-Ch-Ua-Mobile: ?0Sec-Ch-Ua-Platform: "Windows"Upgrade-Insecure-Requests: 1User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9Sec-Fetch-Site: noneSec-Fetch-Mode: navigateSec-Fetch-User: ?1Sec-Fetch-Dest: documentAccept-Encoding: gzip, deflateAccept-Language: zh-CN,zh;q=0.9Connection: closeaabb
2、Bucket桶爆破
Bucket 不存在:InvalidBucketName 或 NoSuchBucket
Bucket 存在:列出Object 或 AccessDenied
3、策略配置
如果管理员设置了某些IP,UA才可以请求该存储桶的话,此时如果错误的配置了GetBucketPolicy,可导致攻击者获取策略配置
aliyun.exe oss bucket-policy oss://testpictures --method get
4、AccessKeyId,SecretAccessKey泄露
-
GitHub等开源平台中的源代码泄露Key -
反编译APK,找敏感信息 -
目标网站JS源代码中
5、Bucket接管
Bucket 显示 NoSuchBucket 说明是可以接管的,创建一个同名的 Bucket ,创建完 Bucket 后,再次访问发现就显示 AccessDenied 了,说明该 Bucket 已经被我们接管了, Bucket 设置为公开并上传个文件
CORS
漏洞规则
|
|
|
---|---|---|
|
|
|
<all-host></all-host> |
|
|
|
|
|
|
|
|
全局设置
from flask import Flaskfrom flask_cors import CORSapp = Flask(__name__)CORS(app, origins='http://example.com')
单个接口
from flask import Flaskfrom flask_cors import CORS, cross_origin@bp.route('/api/some_endpoint')@cross_origin(origins='https://localhost:5000', methods=['GET', 'POST'],supports_credentials=True)defsome_endpoint():# 处理接口逻辑# return jsonify({"message": f"Hello, {g.user['id']}!"}) id=session['user_id']return jsonify({"message": f"Hello, {id}!"})@bp.route('/test')deftest():return render_template("blog/test.html")
Flask-CORS提供了许多配置选项,用于控制CORS的行为。以下是一些常用的配置选项:
-
origins
:指定允许的源。您可以使用通配符来表示所有的源,或者指定具体的源。例如,origins='*'
表示允许所有的源,而origins='example.com'
表示只允许example.com
这个源。 -
methods
:指定允许的HTTP方法。默认情况下,所有的方法(GET、POST等)都是允许的。您可以通过设置methods=['GET']
来只允许GET请求。 -
headers
:指定允许的请求头。默认情况下,所有的请求头都是允许的。您可以通过设置headers=['Content-Type']
来只允许Content-Type
请求头。 -
expose_headers
:指定允许浏览器访问的响应头。默认情况下,浏览器只能访问一些基本的响应头,如Content-Type
和Cache-Control
等。您可以通过设置expose_headers=['Authorization']
来允许浏览器访问Authorization
响应头。 -
supports_credentials
:指定是否允许使用凭据进行跨域请求。如果设置为True,浏览器会在请求中添加Cookie等凭据信息。默认情况下,该选项是禁用的。
攻击演示
test.html
<!DOCTYPE html><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge"><metaname="viewport"content="width=device-width, initial-scale=1.0"><metaname="csrf-token"content="your-csrf-token-here"><title>CORS Test</title><script>// 发送 GET 请求来测试 CORSfunctiontestCORS() {fetch('https://127.0.0.1:5000/api/some_endpoint', {method: 'GET',headers: {'Content-Type': 'application/json','X-CSRFToken': document.querySelector('meta[name="csrf-token"]').getAttribute('content'),'Origin': 'https://localhost:5000', },credentials: 'include'// 确保携带 Cookies}) .then(response => {// 检查是否支持 CORSif (response.ok) {return response.json(); } else {thrownewError('CORS test failed'); } }) .then(data => {console.log('Response:', data); alert('CORS request successful: ' + data.message); }) .catch(error => {console.error('Error:', error); alert('CORS request failed'); }); }</script></head><body><h1>CORS Test</h1><buttononclick="testCORS()">Test CORS</button></body></html>
数据包及返回包如下
OPTIONS /api/some_endpoint HTTP/1.1Host: 127.0.0.1:5000Accept: */*Access-Control-Request-Method: GETAccess-Control-Request-Headers: content-type,x-csrftokenOrigin: https://localhost:5000User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36Sec-Fetch-Mode: corsSec-Fetch-Site: cross-siteSec-Fetch-Dest: emptyReferer: https://localhost:5000/Accept-Encoding: gzip, deflateAccept-Language: zh-CN,zh;q=0.9Connection: closeHTTP/1.1 200 OKServer: Werkzeug/3.1.3 Python/3.10.8Date: Mon, 30 Dec 2024 08:17:19 GMTContent-Type: text/html; charset=utf-8Allow: GET, HEAD, OPTIONSAccess-Control-Allow-Origin: https://localhost:5000Access-Control-Allow-Credentials: trueAccess-Control-Allow-Headers: content-type, x-csrftokenAccess-Control-Allow-Methods: GET, POSTVary: CookieContent-Length: 0Connection: close
GET /api/some_endpoint HTTP/1.1Host: 127.0.0.1:5000Cookie: session=eyJjc3JmX3Rva2VuIjoiYzUxYjFjY2UwNTBlNjQzYzQxZWVjMzFlODQyMDM4Y2QzZmVhZDVlOSIsInVzZXJfaWQiOjJ9.Z3JVhg.4mgF6x-gP1KYKloTpC7X12oluKUSec-Ch-Ua: "Chromium";v="97", " Not;A Brand";v="99"X-Csrftoken: your-csrf-token-hereSec-Ch-Ua-Mobile: ?0Content-Type: application/jsonUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36Sec-Ch-Ua-Platform: "Windows"Accept: */*Origin: https://localhost:5000Sec-Fetch-Site: cross-siteSec-Fetch-Mode: corsSec-Fetch-Dest: emptyReferer: https://localhost:5000/Accept-Encoding: gzip, deflateAccept-Language: zh-CN,zh;q=0.9Connection: closeHTTP/1.1 200 OKServer: Werkzeug/3.1.3 Python/3.10.8Date: Mon, 30 Dec 2024 08:17:20 GMTContent-Type: application/jsonContent-Length: 24Access-Control-Allow-Origin: https://localhost:5000Access-Control-Allow-Credentials: trueVary: CookieConnection: close{"message":"Hello, 2!"}
防御措施
-
配置csp以阻止CORS -
origins设置可信任的白名单 -
不开启supports_credentials(默认) -
浏览器samesite:Strict/Lax,而不配置为有风险的none(还将cookie加上 Secure
)
SSTI
render_template自动转义不存在SSTI,而render_template_string会出现该漏洞
@bp.route('/cc',methods=['GET', 'POST'])defcc(): template = ''' <div class="center-content error"> <h1>Oops! That page doesn't exist.</h1> <h3>%s</h3> </div> ''' %(request.url)return render_template_string(template)
https://127.0.0.1:5000/cc?{{7+8}}{{self.__init__.__globals__.__builtins__['__import__']('os').popen('ipconfig').read()}}{{''.__class__.__base__.__subclasses__()[128].__init__.__globals__['__builtins__']['eval']('__import__("os").popen("dir").read()')}}{{().__class__.__base__.__subclasses__()[155].__init__.__globals__.__builtins__['eval']('__import__("os").popen("dir").read()')}}
魔术方法
|
|
---|---|
init |
|
class |
|
module |
|
mro |
|
base |
|
bases |
|
dict |
|
subclasses |
|
globals |
|
import |
|
builtins |
|
获取子类
# 获取子类''.__class__.__base__.__subclasses__()''.__class__.__bases__[0].__subclasses__()''.__class__.__mro__[-1].__subclasses__()data = r''' [<class 'type'>, <class 'async_generator'>, <class 'int'>, <class 'bytearray_iterator'>...]'''userful_class = ['linecache', 'os._wrap_close', 'subprocess.Popen', 'warnings.catch_warnings', '_frozen_importlib._ModuleLock', '_frozen_importlib._DummyModuleLock', '_frozen_importlib._ModuleLockManager', '_frozen_importlib.ModuleSpec']k=data.split(',')for i in range(len(k)):for j in userful_class:if j in k[i]: print(i,":",k[i])100 : <class '_frozen_importlib._ModuleLock'>101 : <class '_frozen_importlib._DummyModuleLock'>102 : <class '_frozen_importlib._ModuleLockManager'>102 : <class '_frozen_importlib._ModuleLockManager'>103 : <class '_frozen_importlib.ModuleSpec'>139 : <class 'os._wrap_close'>155 : <class 'warnings.catch_warnings'>266 : <class 'subprocess.Popen'>{{().__class__.__base__.__subclasses__()[155].__init__.__globals__}}
后记
flask生命周期
接收请求创建请求对象请求钩子(before)路由匹配执行视图函数生成响应对象响应钩子(after 和 teardown)返回响应
Flask 提供了 4 个钩子函数,允许你在处理请求的不同阶段执行自定义逻辑:
|
|
---|---|
before_first_request |
|
before_request |
|
after_request |
|
teardown_request |
|
WSGI/ASGI
特性 | WSGI | ASGI |
---|---|---|
请求处理 |
|
|
实时通信支持 |
|
|
性能与扩展性 |
|
|
典型框架 |
|
|
Web 服务器 |
|
|
enev
创建:python -m venv myenv激活:myenvScriptsactivate.bat退出:deactivate删除:删除整个环境的安装目录即可。
生产环境
pyproject.toml
[project]name = "flaskr"version = "1.0.0"description = "The basic blog app built in the Flask tutorial."dependencies = [ "flask",][build-system]requires = ["flit_core<4"]build-backend = "flit_core.buildapi"
本地打包
pip install build
python -m build --wheel
服务安装
python -m venv myenv
myenvScriptsactivate.bat
pip install flaskr-1.0.0-py2.py3-none-any.whl
flask --app flaskr init-db
pip install waitress
waitress-serve --call flaskr:create_app
自签证书
openssl req -x509 -newkey rsa:4096 -nodes -out cert.pem -keyout key.pem -days 365
oss
# -*- coding: utf-8 -*-import oss2from itertools import isliceimport loggingimport timeimport random# 配置日志logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')access_key_id=""access_key_secret=""auth = oss2.AuthV4(access_key_id, access_key_secret)# 设置Endpoint和Regionendpoint = "https://oss-cn-hangzhou.aliyuncs.com"region = "cn-hangzhou"defgenerate_unique_bucket_name():# 获取当前时间戳 timestamp = int(time.time())# 生成0到9999之间的随机数 random_number = random.randint(0, 9999)# 构建唯一的Bucket名称 bucket_name = f"demo-{timestamp}-{random_number}"return bucket_name# 生成唯一的Bucket名称bucket_name = ""bucket = oss2.Bucket(auth, endpoint, bucket_name, region=region)defcreate_bucket(bucket):try: bucket.create_bucket(oss2.models.BUCKET_ACL_PRIVATE) logging.info("Bucket created successfully")except oss2.exceptions.OssError as e: logging.error(f"Failed to create bucket: {e}")defupload_file(bucket, object_name, data):try: result = bucket.put_object(object_name, data) logging.info(f"File uploaded successfully, status code: {result.status}")except oss2.exceptions.OssError as e: logging.error(f"Failed to upload file: {e}")defdownload_file(bucket, object_name):try: file_obj = bucket.get_object(object_name) content = file_obj.read()with open("test.png", 'wb') as f: # 将数据写入本地文件 f.write(content)return contentexcept oss2.exceptions.OssError as e: logging.error(f"Failed to download file: {e}")deflist_objects(bucket):try: objects = list(islice(oss2.ObjectIterator(bucket), 10))for obj in objects: logging.info(obj.key)except oss2.exceptions.OssError as e: logging.error(f"Failed to list objects: {e}")defdelete_objects(bucket):try: objects = list(islice(oss2.ObjectIterator(bucket), 100))if objects:for obj in objects: bucket.delete_object(obj.key) logging.info(f"Deleted object: {obj.key}")else: logging.info("No objects to delete")except oss2.exceptions.OssError as e: logging.error(f"Failed to delete objects: {e}")defdelete_bucket(bucket):try: bucket.delete_bucket() logging.info("Bucket deleted successfully")except oss2.exceptions.OssError as e: logging.error(f"Failed to delete bucket: {e}")# 主流程if __name__ == '__main__':# 1. 创建Bucket create_bucket(bucket)with open("img.png","rb") as f: con=f.read()# # 2. 上传文件 upload_file(bucket, 'img.png', con)# 3. 下载文件 download_file(bucket, 'img.png')# 4. 列出Bucket中的对象 list_objects(bucket)# 5. 删除Bucket中的对象 delete_objects(bucket)# 6. 删除Bucket delete_bucket(bucket)
CORS/CSRF/CSP
特性 | CORS | CSRF | CSP |
---|---|---|---|
目的 |
|
|
|
工作原理 |
Access-Control-Allow-Origin 控制跨域 |
|
Content-Security-Policy 控制资源加载 |
适用场景 |
|
|
|
解决的攻击 |
|
|
|
CORS 主要处理浏览器跨域请求的问题,控制哪些外部网站可以访问你的资源。
CSRF 防止跨站请求伪造,确保用户请求的合法性,避免恶意网站伪造用户请求。
CSP 防止 XSS 攻击,控制页面可以加载和执行的资源,减少注入攻击。
reference
https://flask.palletsprojects.com/en/stable/https://www.secpulse.com/archives/97707.htmlhttps://flask.palletsprojects.com/en/stable/templating/https://escape.tech/blog/best-practices-protect-flask-applications/#what-is-flaskhttps://github.com/GoogleCloudPlatform/flask-talismanhttps://developer.aliyun.com/article/875653https://segmentfault.com/a/1190000041525436https://xz.aliyun.com/t/12001
原文始发于微信公众号(渗透测试安全攻防):Flask开发安全指南
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论