使用angr挖掘Windows漏洞的一些辅助技巧

admin 2024年7月15日13:43:42评论17 views字数 35106阅读117分1秒阅读模式

原文链接:https://plowsec.github.io/angr-introspection-2024.html原文作者:volodya

 

目录:

  • 介绍
  • 背景故事
  • 适用于实际用例的 ANGR
  • 使用 Pycharm
    • 调试器
    • 条件断点
    • 自定义类型渲染器
  • 使用泡菜
  • 内省
    • 下载符号
    • 解剖
    • 更新 angr 的知识库
    • 覆盖范围:基本尝试
    • 调用堆栈
    • Windows 调试符号
    • 处理错误状态
    • 实时代码覆盖率与趋势
  • 结论

介绍

这篇博文的内容是,我们可以轻松地扩展以显示正在发生的事情。我正在共享代码片段,用于实时代码覆盖率可视化、每状态跟踪可视化以及其他细节,例如为 Windows 目标提供对调试符号的支持。下一部分是个人轶事,请随意跳过它以获取技术内容!tl;drangr

背景故事

angr是一个让我深深着迷的项目。我在 2019 年尝试学习它。同年,我参加了 Rolf Rolles 基于 SMT 的二进制程序分析培训,这确实解锁了我大脑中的一些东西:

  • 这个家伙在 2.5 年的时间里为培训制作了 40,000 行代码框架 + 书面材料:这帮助我正确看待事情,并对开发新技术需要多少时间有一个更现实的想法。
  • 演示以行业用例为中心,而不是 CTF 问题。这就像学习魔法咒语一样,你可以扔给混淆的代码并理解它。

虽然我没有立即应用我所学到的东西,但种子已经种下,每年我都试图用它来解决难题,每次都放弃了。大多数情况下,我仍然被困在同一个老问题上:为什么跑了这么久,吃掉了我所有的内存angrangr

当时,我缺乏正确调试它/摄取其源代码的技能。然而,多年后,我不再感到这些障碍了。这可能是因为我在这两者之间做了更艰难和痛苦的事情,所以这次源代码不会吓到我!angr

好吧,假新闻。在 Windows 驱动程序中进行试验时,我发现在我给出的入口点之后,它被卡住了 3 个函数调用。我最小化了测试用例,并向同事(@PickleBryne)寻求帮助。我们花了 2 个小时,这 2 个小时终于帮助我克服了我最后的心理极限。ioctlanceangrangr

我的同事很有经验,坚持一步一步地探索,每次都用 .然后,我们发现在调用 后,它被 挂钩了,添加了一个约束。angrstate.solver.constraintsmemmoveioctlanceSimProcedureunsat

我的缓冲区的地址是,Windows 驱动程序正在对此地址进行某种溢出检查,并且不接受它。奇怪!钩子的实现对我们来说看起来完全正常。它肯定没有返回这个相当奇怪的值。0xfffffffff

我的同事用完了帮助时间预算,我自己继续了 1 个小时,使用调试器来确定此地址出现的时间和原因。然后我找到了罪魁祸首:Pycharm

def store(self, addr, data, size=None, condition=None, **kwargs):        # Fast path        if type(addr) is int:            self._store_one_addr(addr, data, True, addr, condition, size, **kwargs)            return        elif not self.state.solver.symbolic(addr):            self._store_one_addr(self.state.solver.eval(addr), data, True, addr, condition, size, **kwargs)            return
        if self.state.solver.symbolic(addr) and options.AVOID_MULTIVALUED_WRITES in self.state.options:            # not completed            return
        try:            concrete_addrs = self._interleave_ints(sorted(self.concretize_write_addr(addr)))        except SimMemoryError:            if options.CONSERVATIVE_WRITE_STRATEGY in self.state.options:                return  # not completed            else:                raise
        # quick optimization so as to not involve the solver if not necessary        trivial = len(concrete_addrs) == 1 and (addr == concrete_addrs[0]).is_true()        if not trivial:            # apply the concretization results to the state            constraint_options = [addr == concrete_addr for concrete_addr in concrete_addrs]            conditional_constraint = self.state.solver.Or(*constraint_options)            self._add_constraints(conditional_constraint, condition=condition, **kwargs)
            if len(concrete_addrs) == 1:                # simple case: avoid conditional write since the address has been concretized to one solution                super().store(concrete_addrs[0], data, size=size, **kwargs)                return
        for concrete_addr in concrete_addrs:            # perform each of the stores as conditional            # the implementation of conditionality must be at the bottom of the stack            self._store_one_addr(concrete_addr, data, trivial, addr, condition, size, **kwargs)
 

angr需要一个地址(读作:一个真实的地址)来存储 的结果,但钩子提供了一个不受约束的符号(读作:就像在代数中一样)。这个问题有点相关:concretememmovex

这不是一个错误,这是 angr 能做的最好的事情。您期望从不受约束的指针加载的结果是什么?我们必须将其具体化,以便能够继续执行。雷莫特

这个反问听起来确实很聪明,但像我这样的新手在遇到问题之前甚至没有考虑过。rhelmot

所以修复很简单,我使用了已经可用的,然后一切都很顺利。我讲这个轶事有两个原因:strcpySimProcedureangr

  • 坚持是关键
  • 在利用“黑匣子”工具时展示我的工作流程很有用。

适用于实际用例的 ANGR

在这一点上,我确信通过努力和方法论方法可以解决这些问题。咄......但是研究有一个心理因素,你必须相信自己,当到目前为止没有任何回报时,你会投入更多的时间和精力。有了这种新发现的信心,我准备进行接下来的步骤:angr

  • 了解到底在做什么。angr
  • 寻找“恢复”分析的解决方案:它需要 5 分钟才能找到我的目标地址,我不喜欢每次运行脚本时等待 5 分钟。angr
  • 学习如何在文档不够时正确使用。angr

让我们先解决第三个问题。

使用 Pycharm

现代 IDE 是必须的。老实说,我不理解那些使用 Vim 从事大型项目的人。这就像双手被绑在背后编码一样。如果您正在寻找特定的用例,则文档不会涵盖足够的信息,因此您有两个(非排他性)选项:angr

  1. 读取源代码
  2. 玩转对象属性,看看它们包含什么。

调试器

使用 和 时,通常会在一堆 和 之间交替使用。Pycharm 为您展示了所有这些:VimIPythonprint(obj)dir(obj)

使用angr挖掘Windows漏洞的一些辅助技巧

好吧,可悲的是,它没有显示方法,但拥有此功能仍然很好。

当调试器遇到断点时,运行时调试信息还可以使用源代码注释:

使用angr挖掘Windows漏洞的一些辅助技巧

最后但并非最不重要的一点是,允许您选择调用堆栈中的任何帧并查看局部变量的过去值。这有助于找出该讨厌值首次出现的方式和时间:PyCharm

使用angr挖掘Windows漏洞的一些辅助技巧

条件断点

如果您知道只有当满足特定条件时,或者在循环的多次迭代之后,或者在执行时间的 5 分钟后才会发生错误,该怎么办?当然,您可以非常有耐心并使用手动步骤和/或语句,但正确的工具是条件断点。下面是我为调试会话设置的断点的真实示例:print

使用angr挖掘Windows漏洞的一些辅助技巧

自定义类型渲染器

PyCharm提供了一种自定义数据在调试器中的显示方式的方法。对于 ,我发现自己经常阅读十六进制地址并在 和 之间来回跳转。当我向我显示以 10 为基数的地址时,我不再认出它们了,并且只在前 10,000 次中这样做很有趣:angrangrIDA ProPyCharmprint(hex(x))

使用angr挖掘Windows漏洞的一些辅助技巧

和一样:dict

使用angr挖掘Windows漏洞的一些辅助技巧

第二个屏幕截图中的代码是(谢谢):Claude 3.5

{(hex(k) if isinstance(k, int) else k): (hex(v) if isinstance(v, int) else v) for k, v in self.items()}
 

使用泡菜

我已经知道一段时间了,因为我在我的交易机器人中大量使用它。因此,我知道使所有类可序列化是多么痛苦,并且假设它太复杂而无法腌制。我错了!pickleangr

从文档中:

如何序列化 angr 对象?泡菜会起作用。但是,Python 将默认使用非常旧的 pickle 协议,该协议不支持更复杂的 Python 数据结构,因此您必须指定更高级的数据流格式。最简单的方法是 。pickle.dumps(obj, -1)

这意味着您可以腌制以下东西:

  1. angr.Project
  2. angr.analyses.CFGEmulated
  3. 成功但耗时的通话后的 Simulation Manager!.explore

通过这样做,你可以恢复你的脚本,从而快速迭代试错过程,在我看来,这是学习和构建新东西的原始过程。angr

内省

让我们进入本文的核心。对于任何长时间运行的作业,我们开发人员都需要确信执行的程序按预期工作,并且不会因为问题而浪费 CPU 周期。如何诊断这些?嗯,最近我喜欢反模式:我把所有东西都记录到一个文件中,这样我就可以一目了然地了解是否有东西横盘整理。logging

但在这里我们可以走得更远。

覆盖范围:基本尝试

在模糊测试中,我们可以使用代码覆盖率并可视化模糊器的探索(并可能识别瓶颈)。我们能用吗?lighthouseangr

当然,正如 Jannis Kirschner 在 Insomni'hack 2022 上所展示的那样:

def get_small_coverage(*args, **kwargs):    sm = args[0]    stashes = sm.stashes    i = 0    for simstate in stashes["active"]:        state_history = ""
        for addr in simstate.history.bbl_addrs.hardcopy:            write_address = hex(addr)            state_history += "{0}n".format(write_address)        raw_syminput = simstate.posix.stdin.load(0, state.posix.stdin.size)
        syminput = simstate.solver.eval(raw_syminput, cast_to=bytes)        print(syminput)        ip = hex(state.solver.eval(simstate.ip))        uid = str(uuid.uuid4())        sid = str(i).zfill(5)        filename = "{0}_active_{1}_{2}_{3}".format(sid, syminput, ip, uid)
        with open(filename, "w") as f:            f.write(state_history)        i += 1
simgr.explore(find=0x00001407, step_func=get_small_coverage)
 

在这里,我使用 ChatGPT 作为 OCR 工具来转录 YouTube 视频中显示的幻灯片(我知道 ChatGPT 在幕后使用 pytesseract,但输出要好得多)。

运行此step_func会在本地文件夹中创建一堆文件,以朝着给定的目标前进:angr


$ ls -lsaht | headtotal 74912 0 drwxr-xr-x    38 user  staff   1.2K Jun 14 14:34 .. 0 drwxr-xr-x  5416 user  staff   169K Jun 13 15:35 .16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00164_active_0x1c0010ae4_d9d59da2-daf1-43b3-999a-02d85f81477816 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00163_active_0x1c0010ae6_b9618c9b-6c05-4c2d-8ba1-6a9b74dbdcb516 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00162_active_0x1c0010b98_81b30e77-01ad-43c0-8990-c58877d738d216 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00161_active_0x1c0010b9d_18a2bf38-1aa1-47c5-a206-2cc116ab548616 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00160_active_0x1c0010baf_788b6aae-a9f2-4866-9458-7c28c0bf339016 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00159_active_0x1c0010bb7_96548594-619a-4a2e-8445-072c208ec02a16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00158_active_0x1c01000e0_f1177956-e9e4-4c8f-8ab3-a87e4c9821a9
 

并阅读其中一个节目:


$ cat 00164_active_0x1c0010ae4_d9d59da2-daf1-43b3-999a-02d85f814778 | head                                                                                       0x1c00109e80x1c0010a1b0x1c0010a360x1c0010b4c0x1c0010b880x1c002fc300x1c01002980x1c002fc3b0x1c0010b8d0x1c0010b5f
 

然后,我们可以使用 Lighthouse 加载所有这些文件:IDA Pro > Load File > Code Coverage batch

使用angr挖掘Windows漏洞的一些辅助技巧

这真是太好了!

调用堆栈

我们工具箱中的另一个有用工具是能够知道当前在哪里以及它是如何到达那里的。文档指出我们应该使用 和 .让我们试试这些:angrstate.history.descriptions.hardcopystate.history.events

f = simgr.one_foundpprint.pprint(f.history.descriptions.hardcopy)['<IRSB from 0x140001200: 1 sat>', '<SimProcedure HookVPrintf from 0x140001550: 1 sat>', '<IRSB from 0x140001238: 2 sat>', '<IRSB from 0x1400012f8: 1 sat>', '<SimProcedure HookVPrintf from 0x140001550: 1 sat>', '<IRSB from 0x140001304: 1 sat>', '<IRSB from 0x140001040: 1 sat 1 unsat>', '<IRSB from 0x1400011b0: 1 sat 1 unsat>', '<IRSB from 0x1400011b9: 1 sat>', '<SimProcedure MallocHook from 0x140100050: 1 sat>', '<IRSB from 0x1400011c2: 1 sat>', '<SimProcedure HookVPrintf from 0x140001550: 1 sat>', '<IRSB from 0x1400011e5: 1 sat>', '<IRSB from 0x140001311: 1 sat>', '<IRSB from 0x140001040: 1 sat 1 unsat>', '<IRSB from 0x140001056: 1 sat 1 unsat>', '<IRSB from 0x14000112d: 1 sat>', '<SimProcedure HookVPrintf from 0x140001550: 1 sat>', '<IRSB from 0x14000113c: 1 sat>', '<IRSB from 0x14000131b: 1 sat>', '<IRSB from 0x140001000: 1 sat 1 unsat>', '<IRSB from 0x140001034: 1 sat>', '<IRSB from 0x14000132c: 1 sat 1 unsat>', '<IRSB from 0x14000133a: 1 sat 1 unsat>']f.history.eventsOut[3]: <angr.state_plugins.history.LambdaIterIter at 0x310f73350>list(f.history.events)Out[4]: [<SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[95:64] >= 0x0>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[95:64] <= 0x7a6b>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[63:32] >= 0x0>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[63:32] <= 0x7a6b>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[31:0] >= 0x0>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[31:0] <= 0x7a6b>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[95:64] >= 0x0>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[95:64] <= 0x9>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[63:32] >= 0x0>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[63:32] <= 0x9>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[31:0] >= 0x0>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[31:0] <= 0x9>>>, <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[95:64] == 0x7a6b>>>, <SimEvent unconstrained 11890, with fields name, bits>, <SimEvent unconstrained 11891, with fields name, bits>, <SimEvent unconstrained 11892, with fields name, bits>, <SimEvent unconstrained 11893, with fields name, bits>, <SimEvent unconstrained 11894, with fields name, bits>, <SimEvent unconstrained 11895, with fields name, bits>, <SimEvent unconstrained 11896, with fields name, bits>, <SimEvent fs_write 11897, with fields filename, data, size, pos>, <SimActionConstraint 0x14000123f:23 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} != 0x7a69>>>, <SimActionConstraint 0x140001289:18 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} != 0x7a6a>>>, <SimActionConstraint 0x1400012f6:18 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} == 0x7a6b>>>, <SimEvent fs_write 11906, with fields filename, data, size, pos>, <SimEvent unconstrained 11933, with fields name, bits>, <SimEvent unconstrained 11934, with fields name, bits>, <SimEvent fs_write 11935, with fields filename, data, size, pos>, <SimEvent unconstrained 11938, with fields name, bits>, <SimEvent unconstrained 11967, with fields name, bits>, <SimEvent fs_write 11968, with fields filename, data, size, pos>, <SimActionConstraint 0x140001007:22 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} != 0x1>>>]
 

好吧,里面有很多信息,但我发现它很难阅读,而且我们丢失了一个重要的信息,因为调用堆栈是“扁平化”的(可能不是正确的词,但我愿意)。

鉴于这些缺点,我实现了一个简单的函数,该函数使用缩进来显示控制流,为状态打印某种带注释的回溯:angr

def pretty_print_callstack(state: angr.SimState, max_depth: int = 10) -> None:    """    Print a formatted call stack for a given state.
    Args:        state: The simulation state.        max_depth: Maximum depth of the call stack to print.    """    state_history: str = "Call Stack:n"    kb_functions = shared.proj.kb.functions
    last_addr: int = 0    repeat_count: int = 0    formatted_lines: List[str] = []    call_stack: List[angr.knowledge_plugins.functions.function.Function] = []    current_func: angr.knowledge_plugins.functions.function.Function | None = None
    for i, addr in enumerate(state.history.bbl_addrs.hardcopy):        func: angr.knowledge_plugins.functions.function.Function = kb_functions.floor_func(addr)
        if addr == last_addr:            repeat_count += 1        else:            if repeat_count > 0:                formatted_lines[-1] += f" (repeated {repeat_count + 1} times)"                repeat_count = 0
            if func != current_func:                if func in call_stack:                    while call_stack and call_stack[-1] != func:                        call_stack.pop()                    if call_stack:                        call_stack.pop()                else:                    call_stack.append(func)                current_func = func
            indent: str = ' ' * (len(call_stack) * 2)            if func:                fname: str = func.human_str if hasattr(func, 'human_str') else func.name                func_prototype: str = func.prototype if hasattr(func, 'prototype') else ""                formatted_lines.append(                    f"{indent}-> 0x{addr:x} : {fname} {func_prototype} ({len(list(func.xrefs))} xrefs)")            else:                formatted_lines.append(f"{indent}-> 0x{addr:x} : Unknown function")
        last_addr = addr
    if repeat_count > 0:        formatted_lines[-1] += f" (repeated {repeat_count + 1} times)"
    state_history += "n".join(formatted_lines)
    if len(formatted_lines) > max_depth + 3:        logger.debug("n".join([state_history.split("n")[0]] + formatted_lines[:max_depth]))        logger.debug(f"...(truncated {len(formatted_lines) - (max_depth + 3)} lines)")        logger.debug("n".join(formatted_lines[-3:]))    else:        logger.debug(state_history)
 

这将显示:


Active state: <SimState @ 0x1400010f0>2024-06-30 13:39:57 | DEBUG | [introspection.py:113] pretty_print_callstack() | Call Stack:  -> 0x140001200 : sub_140001200 None (0 xrefs)    -> 0x140001550 : sub_140001550 None (0 xrefs)-> 0x140001238 : sub_140001200 None (0 xrefs)-> 0x140001284 : sub_140001200 None (0 xrefs)-> 0x1400012f1 : sub_140001200 None (0 xrefs)-> 0x1400012f8 : sub_140001200 None (0 xrefs)  -> 0x140001550 : sub_140001550 None (0 xrefs)    -> 0x140001304 : sub_140001200 None (0 xrefs)      -> 0x140001040 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)      -> 0x1400011b0 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)      -> 0x1400011b9 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)        -> 0x140100050 : malloc (unsigned long (64 bits)) -> void* (0 xrefs)    -> 0x1400011c2 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x140001550 : sub_140001550 None (0 xrefs)  -> 0x1400011e5 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)    -> 0x140001311 : sub_140001200 None (0 xrefs)-> 0x140001040 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x140001056 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x140001060 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x14000106a : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010b9 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010c5 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010d9 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010f0 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs) (repeated 97 times)
 

请注意最后一行:。正在探索一个有 100 次迭代的循环。可用于诊断状态爆炸!repeated 97 timesangr

目前,调用堆栈缺少一些符号。不幸的是,尚不支持 Windows 符号。让我们来补救一下。angr

Windows 调试符号

Windows 调试符号以文件的形式提供,这些文件与已编译的二进制文件一起提供。*.pdb

下载符号

在 Windows 平台上,通常用于下载符号。但是,添加这种依赖项会有点难过,所以让我们重新实现它。如果你想知道它是如何工作的,它基本上只是对这个URL做一个HTTP请求:symchk.exesymchk.exe

https://msdl.microsoft.com/download/symbols/{pdbname}/{signature}/{pdbname}
 
  • pdbnamePDB 文件的名称。令人惊讶的是,它并不总是与 PE 文件同名(它用于 )。您可以通过遍历 PE 文件中的条目从现场获取它。ntkrnlmp.pdbntoskrnl.exePdbFileNameDIRECTORY_ENTRY_DEBUG
  • guid:您可以从同一条目获取它:.DIRECTORY_ENTRY_DEBUGSignature_String

因为已经在 依赖项中,所以让我们重用它。pefileangr

def download_pdb(self, download_dir: str = ".") -> None:    """    Download the PDB file for the current binary.
    Args:        download_dir (str): Directory to save the downloaded PDB.    """    pdbname, signature = self.get_pdb_info(self.proj.filename)
    if self.is_pdb_stored_locally(download_dir, pdbname):        logger.info(f"PDB already exists locally for {self.proj.filename}")        return
    download_url = f"https://msdl.microsoft.com/download/symbols/{pdbname}/{signature.upper()}/{pdbname}"    logger.info(f"Downloading PDB from: {download_url}")
    r = requests.head(        download_url,        headers={"User-Agent": "Microsoft-Symbol-Server/10.0.10036.206"},        allow_redirects=True    )
    if r.status_code == 200:        target_file = os.path.join(download_dir, pdbname)        with requests.get(r.url, headers={"User-Agent": "Microsoft-Symbol-Server/10.0.10036.206"},                            stream=True) as pdb:            pdb.raise_for_status()            with open(target_file, "wb") as f:                for chunk in pdb.iter_content(chunk_size=8192):                    f.write(chunk)        logger.info(f"PDB downloaded to: {target_file}")        self.pdb_path = target_file    else:        logger.error(f"(HTTP {r.status_code}) Could not find PDB at {download_url}")
@staticmethoddef get_pdb_info(binary_path: str) -> Tuple[str, str]:    """    Extract PDB name and signature from a PE file.
    Args:        binary_path (str): Path to the PE file.
    Returns:        Tuple[str, str]: A tuple containing the PDB name and signature.    """    pe_obj = pefile.PE(binary_path, fast_load=True)    pe_obj.parse_data_directories([pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']])
    for debug_entry in pe_obj.DIRECTORY_ENTRY_DEBUG:        if hasattr(debug_entry.entry, 'PdbFileName'):            pdb_filename = debug_entry.entry.PdbFileName.decode('utf-8').rstrip('x00')            guid = debug_entry.entry.Signature_String
            # Use only the filename if a full path is given            pdb_filename = os.path.basename(pdb_filename)
            return pdb_filename, guid
    raise ValueError("No PDB debug information found in the PE file")
@staticmethoddef is_pdb_stored_locally(directory: str, filename: str) -> bool:    """Check if the PDB file already exists locally."""    return os.path.exists(os.path.join(directory, filename))
 

请注意,我们发送的 User-Agent 标头?。那是我们假装是一个合法的符号服务器客户端。有时,你必须玩装扮才能得到你想要的东西!Microsoft-Symbol-Server/10.0.10036.206

因此,简而言之,此代码执行以下操作:

  1. 检查我们是否已经在本地有这个 PDB
  2. 用于提取 PDB 信息。pefile
  3. 生成 URL。
  4. 检查 Microsoft 的服务器是否喜欢我们的请求 (HTTP 200)。
  5. 下载文件。下次我们需要它时,它会在那里等着我们。
  6. 如果出现问题(例如PDB不在我们预期的位置),我们会记录错误。因为,你知道,事情发生了。

差不多就是这样。我们刚刚甜言蜜语地说服了Microsoft的服务器,让他们给我们调试符号。但是在你像老板一样进行调试之前,你必须解析它并用新名称填充  kb。angr

解剖

PDB 文件中的符号通常被篡改。众所周知的 cxxfilt 库对我不起作用,但我发现了一个由防病毒供应商 AVG 制作的库。尽管最后一次提交是在 7 年前完成的,但该库开箱即用。

更新 angr 的知识库

当谈到知道如何解析 PDB 文件的 Python 库时,我们没有太多选择。我决定使用 ,它又旧又有问题。在线示例对我不起作用,因此我手动浏览了解析的 PDB 文件的所有属性,并将以下代码放在一起:pdbparsepycharm's debugger

def load_global_symbols(self) -> Dict[int, str]:    """    Load global symbols from the PDB.
    Returns:        Dict[int, str]: A dictionary mapping offsets to symbol names.    """    globals_symbols = {}    for stream in self.pdb.streams:        if hasattr(stream, 'funcs'):            for sym, sym_value in stream.funcs.items():                globals_symbols[sym_value.offset] = sym_value.name                logger.debug(f"Global symbol: {sym_value.name} at {hex(sym_value.offset)}")    return globals_symbols
 

请注意,我正在使用该属性。这是一个相对虚拟地址 (RVA),不能按原样与 一起使用 。我们必须首先调整它。对于 Windows 二进制文件,似乎为所有函数提供了地址。然而,PDB 中的偏移量是其封闭 PE 部分的偏移量。知道了这一点,让我们把下面的代码放在一起:offsetangrangrloaded base + offset

def get_text_section_offset(self) -> int:    """    Get the offset of the .text section from the image base.
    Returns:        int: The offset of the .text section, or 0 if not found.    """    main_object = self.proj.loader.main_object    for section_name, section in main_object.sections_map.items():        if section_name.startswith('.text'):            return section.vaddr - main_object.mapped_base
    logger.warning("Could not find .text section. Using 0 as offset.")    return 0
def address_to_symbol(self, address: int) -> Optional[str]:    """    Convert an address to a symbol name.
    Args:        address (int): The address to look up.
    Returns:        Optional[str]: The symbol name if found, None otherwise.    """    rva: int = address - self.proj.loader.main_object.mapped_base
    adjusted_rva: int = rva - self.text_section_offset    symbol: Optional[str] = self.symbols.get(adjusted_rva)    if symbol:        return symbol
    logger.warning(f"Symbol not found for address {hex(address)} (RVA: {hex(rva)}, Adjusted RVA: {hex(adjusted_rva)})")    return None

这使用 和 截面偏移量来调整所有偏移量。请注意,我们假设每个符号都会出现在该部分中,这显然不是真的,但我编码不是为了制作艺术,我编码是为了解决我面临的足够多的问题。loader.main_object.mapped_base.text.text

然后,我们可以用这些信息更新 的知识库:angr

    
def update_kb_with_symbols(self):        """        Update the knowledge base with symbols.
        This method updates the names of functions in the angr knowledge base        with demangled symbols from the PDB.        """        for func in self.proj.kb.functions.values():            symbol = self.address_to_symbol(func.addr)            if symbol:                demangled = self.demangle_name(symbol)                func.name = demangled                logger.debug(f"Function {hex(func.addr)} updated with symbol: {func.name}")
The entire file is:import osimport reimport requestsfrom typing import Dict, Optional, Tuple
import angrimport pdbparseimport pefilefrom cppmangle import demangle, cdecl_sym
from helpers.log import logger

class SymbolManager:    """    A class to manage symbols for an angr project.
    This class handles loading symbols from PDB files, demangling names,    mapping addresses to symbols, and downloading PDB files if necessary.    """
    def __init__(self, proj: angr.Project):        """        Initialize the SymbolManager.
        Args:            proj (angr.Project): The angr project to analyze.        """        self.proj: angr.Project = proj        self.pdb: Optional[pdbparse.PDB] = None        self.symbols: Dict[int, str] = {}        self.text_section_offset: int = 0
        self.load_symbols()        self.load_global_symbols()        self.get_text_section_offset()        self.pdb_path = ""
    def load_symbols(self, download_dir: str = ".") -> None:        """        Load symbols for the angr project from a PDB file.
        If the PDB file doesn't exist, it attempts to download it.        """        binary_path: str = self.proj.filename        pdb_path, _ = self.get_pdb_info(binary_path)
        self.pdb_path = os.path.join(download_dir, pdb_path)
        if not os.path.exists(self.pdb_path):            logger.info(f"PDB file not found: {pdb_path}")            self.download_pdb(os.path.dirname(pdb_path))
        if os.path.exists(self.pdb_path):            self.pdb = pdbparse.parse(self.pdb_path)            logger.info(f"Loaded PDB file: {pdb_path}")        else:            logger.warning("Failed to load PDB file")
    def load_global_symbols(self) -> None:        """        Load global symbols from the PDB.        """        if not self.pdb:            logger.warning("No PDB loaded, cannot load global symbols")            return
        for stream in self.pdb.streams:            if hasattr(stream, 'funcs'):                for sym, sym_value in stream.funcs.items():                    self.symbols[sym_value.offset] = sym_value.name                    logger.debug(f"Global symbol: {sym_value.name} at {hex(sym_value.offset)}")
    def get_text_section_offset(self) -> None:        """        Get the offset of the .text section from the image base.        """        main_object = self.proj.loader.main_object        for section_name, section in main_object.sections_map.items():            if section_name.startswith('.text'):                self.text_section_offset = section.vaddr - main_object.mapped_base                return
        logger.warning("Could not find .text section. Using 0 as offset.")        self.text_section_offset = 0
    @staticmethod    def demangle_name(mangled_name: str) -> str:        """        Demangle a C++ function name and extract just the function name.
        Args:            mangled_name (str): The mangled function name.
        Returns:            str: The demangled function name without parameters or return type.        """        try:            full_demangled: str = cdecl_sym(demangle(mangled_name))            match: Optional[re.Match] = re.search(r'(?:.*::)?(w+)(', full_demangled)            return match.group(1) if match else full_demangled        except:            return mangled_name
    def address_to_symbol(self, address: int) -> Optional[str]:        """        Convert an address to a symbol name.
        Args:            address (int): The address to look up.
        Returns:            Optional[str]: The symbol name if found, None otherwise.        """        rva: int = address - self.proj.loader.main_object.mapped_base
        """symbol: Optional[str] = self.symbols.get(rva)        if symbol:            return symbol        """
        adjusted_rva: int = rva - self.text_section_offset        symbol: Optional[str] = self.symbols.get(adjusted_rva)        if symbol:            return symbol
        logger.warning(f"Symbol not found for address {hex(address)} (RVA: {hex(rva)}, Adjusted RVA: {hex(adjusted_rva)})")        return None
    def update_kb_with_symbols(self) -> None:        """        Update the knowledge base with symbols.
        This method updates the names of functions in the angr knowledge base        with demangled symbols from the PDB.        """        for func in self.proj.kb.functions.values():            symbol = self.address_to_symbol(func.addr)            if symbol:                demangled = self.demangle_name(symbol)                func.name = demangled                logger.debug(f"Function {hex(func.addr)} updated with symbol: {func.name}")
    def download_pdb(self, download_dir: str = ".") -> None:        """        Download the PDB file for the current binary.
        Args:            download_dir (str): Directory to save the downloaded PDB.        """        pdbname, signature = self.get_pdb_info(self.proj.filename)
        if self.is_pdb_stored_locally(download_dir, pdbname):            logger.info(f"PDB already exists locally for {self.proj.filename}")            return
        download_url = f"https://msdl.microsoft.com/download/symbols/{pdbname}/{signature.upper()}/{pdbname}"        logger.info(f"Downloading PDB from: {download_url}")
        r = requests.head(            download_url,            headers={"User-Agent": "Microsoft-Symbol-Server/10.0.10036.206"},            allow_redirects=True        )
        if r.status_code == 200:            target_file = os.path.join(download_dir, pdbname)            with requests.get(r.url, headers={"User-Agent": "Microsoft-Symbol-Server/10.0.10036.206"},                              stream=True) as pdb:                pdb.raise_for_status()                with open(target_file, "wb") as f:                    for chunk in pdb.iter_content(chunk_size=8192):                        f.write(chunk)            logger.info(f"PDB downloaded to: {target_file}")            self.pdb_path = target_file        else:            logger.error(f"(HTTP {r.status_code}) Could not find PDB at {download_url}")
    @staticmethod    def get_pdb_info(binary_path: str) -> Tuple[str, str]:        """        Extract PDB name and signature from a PE file.
        Args:            binary_path (str): Path to the PE file.
        Returns:            Tuple[str, str]: A tuple containing the PDB name and signature.        """        pe_obj = pefile.PE(binary_path, fast_load=True)        pe_obj.parse_data_directories([pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']])
        for debug_entry in pe_obj.DIRECTORY_ENTRY_DEBUG:            if hasattr(debug_entry.entry, 'PdbFileName'):                pdb_filename = debug_entry.entry.PdbFileName.decode('utf-8').rstrip('x00')                guid = debug_entry.entry.Signature_String
                # Use only the filename if a full path is given                pdb_filename = os.path.basename(pdb_filename)
                return pdb_filename, guid
        raise ValueError("No PDB debug information found in the PE file")
    @staticmethod    def is_pdb_stored_locally(directory: str, filename: str) -> bool:        """Check if the PDB file already exists locally."""        return os.path.exists(os.path.join(directory, filename))

并且可以像这样使用:
symbol_manager = symbols.SymbolManager(proj) # angr.Projectsymbol_manager.download_pdb()symbol_manager.update_kb_with_symbols()
 

之前显示的相同信息现在显示正确的函数名称:

Active state: <SimState @ 0x1400010f0>2024-06-30 13:56:35 | DEBUG | [introspection.py:113] pretty_print_callstack() | Call Stack:  -> 0x140001200 : run_heap_operations None (0 xrefs)    -> 0x140001550 : printf None (0 xrefs)-> 0x140001238 : run_heap_operations None (0 xrefs)-> 0x140001284 : run_heap_operations None (0 xrefs)-> 0x1400012f1 : run_heap_operations None (0 xrefs)-> 0x1400012f8 : run_heap_operations None (0 xrefs)  -> 0x140001550 : printf None (0 xrefs)    -> 0x140001304 : run_heap_operations None (0 xrefs)      -> 0x140001040 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)      -> 0x1400011b0 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)      -> 0x1400011b9 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)        -> 0x140100050 : malloc (unsigned long (64 bits)) -> void* (0 xrefs)    -> 0x1400011c2 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x140001550 : printf None (0 xrefs)  -> 0x1400011e5 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)    -> 0x140001311 : run_heap_operations None (0 xrefs)-> 0x140001040 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x140001056 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x140001060 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x14000106a : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010b9 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010c5 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010d9 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)-> 0x1400010f0 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs) (repeated 97 times)

处理错误状态

我制作了这个简单的帮助程序函数来打印 Python 回溯:

def show_errors(state: angr.SimState) -> None:    """    Log error information for a given state.
    Args:        state: The simulation state.    """    logger.debug(f'errored state: {state}')    logger.debug(f'error message: {state.error}')
    tb: Any = state.traceback
    while tb.tb_next:        logger.error(f'{tb.tb_frame}')        tb = tb.tb_next
    logger.error(f'{tb.tb_frame}')
 

实时代码覆盖率与趋势

每 5 秒更新一次状态,显示新发现的块数、每个函数的代码覆盖率进度百分比以及显示速度是否减慢的图表不是很好吗?angrangr

演示时间:


INFO     core.coverage:coverage.py:90 --- Coverage Update at 0.01 seconds ---INFO     core.coverage:coverage.py:117 Overall coverage: 0.00% [+0 blocks total]INFO     core.coverage:coverage.py:119 Newly discovered functions: sub_140001550, sub_140001b68, sub_140001380, sub_140002070, UnhandledExceptionFilter, QueryPerformanceCounter, freeINFO     core.coverage:coverage.py:90 --- Coverage Update at 4.10 seconds ---INFO     core.coverage:coverage.py:112 Function: sub_140001550 - Covered blocks: 1/21 (4.76%) [+1 blocks]INFO     core.coverage:coverage.py:112 Function: sub_140001040 - Covered blocks: 20/30 (66.67%) [+20 blocks]INFO     core.coverage:coverage.py:112 Function: sub_140001200 - Covered blocks: 32/35 (91.43%) [+32 blocks]INFO     core.coverage:coverage.py:112 Function: sub_140001000 - Covered blocks: 8/8 (100.00%) [+8 blocks]INFO     core.coverage:coverage.py:117 Overall coverage: 0.22% [+61 blocks total]INFO     core.coverage:coverage.py:90 --- Coverage Update at 7.78 seconds ---INFO     core.coverage:coverage.py:112 Function: sub_140001040 - Covered blocks: 22/30 (73.33%) [+2 blocks]INFO     core.coverage:coverage.py:112 Function: sub_140001200 - Covered blocks: 33/35 (94.29%) [+1 blocks]INFO     core.coverage:coverage.py:117 Overall coverage: 0.23% [+3 blocks total]INFO     core.coverage:coverage.py:90 --- Coverage Update at 11.07 seconds ---INFO     core.coverage:coverage.py:117 Overall coverage: 0.23% [+0 blocks total]INFO     core.coverage:coverage.py:90 --- Coverage Update at 14.33 seconds ---
 

还有一个可视化图表:

使用angr挖掘Windows漏洞的一些辅助技巧

该代码是一个简单的类,可以立即插入到项目中:angr

import osimport timeimport threadingfrom typing import Dict, List, Tuple, Set, Anyimport matplotlib.pyplot as pltimport networkx as nxfrom matplotlib.animation import FuncAnimationimport angrimport logging
logger = logging.getLogger(__name__)
logging.getLogger("matplotlib").setLevel(logging.ERROR)

class CoverageMonitor:    def __init__(self, proj: angr.Project, cfg: angr.analyses.CFGEmulated, entry_point: int,                 update_interval: float = 5.0, coverage_dir: str = "cov"):        """        Initialize the CoverageMonitor.
        :param proj: The Angr project        :param cfg: The Control Flow Graph        :param entry_point: The entry point address        :param update_interval: The interval between updates in seconds        """        self.proj: angr.Project = proj        self.cfg: angr.analyses.CFGEmulated = cfg        self.entry_point: int = entry_point        self.update_interval: float = update_interval        self.coverage_data: Dict[str, List[Tuple[float, int, float]]] = {}        self.overall_coverage_data: List[Tuple[float, float]] = []        self.start_time: float = time.time()        self.stop_event: threading.Event = threading.Event()        self.previous_coverage: Dict[str, Dict[str, int]] = {}        self.previous_total_blocks: int = 0        self.previous_functions: Set[str] = set()        self.coverage_dir: str = coverage_dir
    def start_monitoring(self) -> None:        """Start the coverage monitoring thread."""
        # clear the coverage directory        for filename in os.listdir(self.coverage_dir):            if filename.startswith("00"):                os.remove(os.path.join(self.coverage_dir, filename))
        self.monitoring_thread = threading.Thread(target=self._monitor_coverage)        self.monitoring_thread.start()
    def stop_monitoring(self) -> None:        """Stop the coverage monitoring thread."""        self.stop_event.set()        self.monitoring_thread.join()
    def _monitor_coverage(self) -> None:        """Monitor the coverage and update the data periodically."""        while not self.stop_event.is_set():            self._update_coverage()            self.plot_coverage()            time.sleep(self.update_interval)
    def _analyze_coverage(self) -> Tuple[float, Dict[str, Dict[str, int]]]:        """        Analyze the current coverage using Angr.
        :return: A tuple containing overall coverage percentage and function-wise coverage data        """        overall_coverage, function_coverage = analyze_coverage(self.proj, self.cfg, self.entry_point, "cov")
        # Convert the function_coverage to the format we need        formatted_coverage: Dict[str, Dict[str, int]] = {}        for func_addr, data in function_coverage.items():            func_name = self.proj.kb.functions.get(func_addr).name            formatted_coverage[func_name] = {                "covered_blocks": data['covered_blocks'],                "total_blocks": data['total_blocks']            }
        return overall_coverage, formatted_coverage
    def _update_coverage(self) -> None:        """Update the coverage data and log the results."""        overall_coverage, function_coverage = self._analyze_coverage()        elapsed_time = time.time() - self.start_time
        total_blocks = 0        new_functions = set(function_coverage.keys()) - self.previous_functions
        logger.info(f"--- Coverage Update at {elapsed_time:.2f} seconds ---")
        for func_name, data in function_coverage.items():            if func_name not in self.coverage_data:                self.coverage_data[func_name] = []
            covered_blocks = data['covered_blocks']            total_blocks += covered_blocks            total_func_blocks = data['total_blocks']            coverage_percentage = (covered_blocks / total_func_blocks) * 100 if total_func_blocks > 0 else 0
            self.coverage_data[func_name].append((elapsed_time, covered_blocks, coverage_percentage))
            # Calculate difference from previous update            prev_covered = self.previous_coverage.get(func_name, {}).get('covered_blocks', 0)            block_diff = covered_blocks - prev_covered
            if block_diff > 0 or func_name in new_functions:
                if covered_blocks == 0:                    continue
                logger.info(f"Function: {func_name} - Covered blocks: {covered_blocks}/{total_func_blocks} "                            f"({coverage_percentage:.2f}%) [+{block_diff} blocks]")
        # Log overall statistics        new_total_blocks = total_blocks - self.previous_total_blocks        logger.info(f"Overall coverage: {overall_coverage:.2f}% [+{new_total_blocks} blocks total]")        if new_functions:            logger.info(f"Newly discovered functions: {', '.join(new_functions)}")
        # Update overall coverage data        self.overall_coverage_data.append((elapsed_time, overall_coverage))
        # Update previous state        self.previous_coverage = function_coverage        self.previous_total_blocks = total_blocks        self.previous_functions = set(function_coverage.keys())
    def plot_coverage(self) -> None:        """Plot the coverage evolution over time."""        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 12))
        # Plot overall coverage        times, coverages = zip(*self.overall_coverage_data)        ax1.plot(times, coverages, label='Overall Coverage', linewidth=2, color='black')        ax1.set_xlabel('Time (seconds)')        ax1.set_ylabel('Coverage (%)')        ax1.set_title('Overall Coverage Evolution Over Time')        ax1.legend()        ax1.grid(True)
        # Plot function-wise coverage        for func_name, data in self.coverage_data.items():            times, _, coverages = zip(*data)            ax2.plot(times, coverages, label=func_name)
        ax2.set_xlabel('Time (seconds)')        ax2.set_ylabel('Coverage (%)')        ax2.set_title('Function-wise Coverage Evolution Over Time')        ax2.legend(loc='center left', bbox_to_anchor=(1, 0.5))        ax2.grid(True)
        plt.tight_layout()        plt.show()

def monitor_coverage(proj: angr.Project, cfg: angr.analyses.CFGEmulated, entry_point: int,                     duration: float = 10.0, update_interval: int = 5) -> None:    """    Monitor the coverage evolution for a specified duration.
    :param proj: The Angr project    :param cfg: The Control Flow Graph    :param entry_point: The entry point address    :param duration: The duration to monitor in seconds    :param update_interval: The interval between updates in seconds    """    monitor = CoverageMonitor(proj, cfg, entry_point, update_interval=update_interval)    monitor.start_monitoring()
    try:        time.sleep(duration)    finally:        monitor.stop_monitoring()        monitor.plot_coverage()

def get_reachable_info(cfg: angr.analyses.cfg.cfg_fast.CFGBase, entry_point: int) -> Tuple[    Set[int], Dict[int, Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode]]]:    """    Get reachable blocks and functions from the entry point in the CFG.
    Args:        cfg: The control flow graph.        entry_point: The entry point address.
    Returns:        A tuple containing reachable blocks and reachable functions.    """    entry_node: angr.knowledge_plugins.cfg.cfg_node.CFGNode = cfg.get_any_node(entry_point)    if not entry_node:        raise ValueError(f"Entry point {hex(entry_point)} not found in CFG")
    reachable_nodes: Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode] = nx.descendants(cfg.graph, entry_node)    reachable_nodes.add(entry_node)
    reachable_blocks: Set[int] = set(node.addr for node in reachable_nodes if node.block)
    reachable_functions: Dict[int, Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode]] = {}    for node in reachable_nodes:        if node.function_address not in reachable_functions:            reachable_functions[node.function_address] = set()        reachable_functions[node.function_address].add(node)
    return reachable_blocks, reachable_functions

def read_coverage_files(coverage_dir: str) -> Set[int]:    """    Read coverage files and return a set of covered block addresses.
    Args:        coverage_dir: The directory containing coverage files.
    Returns:        A set of covered block addresses.    """    covered_blocks: Set[int] = set()    for filename in os.listdir(coverage_dir):        if filename.startswith("00"):            with open(os.path.join(coverage_dir, filename), 'r') as f:                covered_blocks.update(int(line.strip(), 16) for line in f if line.strip())    return covered_blocks

def compare_coverage(proj: angr.Project, reachable_blocks: Set[int],                     reachable_functions: Dict[int, Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode]],                     covered_blocks: Set[int]) -> Tuple[float, Dict[str, Dict[str, Any]]]:    """    Compare coverage between reachable blocks and covered blocks.
    Args:        proj: The angr project.        reachable_blocks: Set of reachable block addresses.        reachable_functions: Dictionary of reachable functions and their nodes.        covered_blocks: Set of covered block addresses.
    Returns:        A tuple containing overall coverage and function coverage information.    """    total_reachable: int = len(reachable_blocks)    total_covered: int = len(covered_blocks.intersection(reachable_blocks))    overall_coverage: float = total_covered / total_reachable if total_reachable > 0 else 0
    function_coverage: Dict[str, Dict[str, Any]] = {}    for func_addr, nodes in reachable_functions.items():        func: angr.knowledge_plugins.functions.function.Function = proj.kb.functions.get(func_addr)        if func:            func_blocks: Set[int] = set(node.addr for node in nodes if node.block)            covered_func_blocks: Set[int] = func_blocks.intersection(covered_blocks)            coverage: float = len(covered_func_blocks) / len(func_blocks) if func_blocks else 0            function_coverage[func.name] = {                'address': func_addr,                'total_blocks': len(func_blocks),                'covered_blocks': len(covered_func_blocks),                'coverage': coverage            }
    return overall_coverage, function_coverage

def analyze_coverage(proj: angr.Project, cfg: angr.analyses.cfg.cfg_fast.CFGBase, entry_point: int,                     coverage_dir: str, coverage_file: str = 'reachable_blocks.txt') -> Tuple[float, Dict[str, Dict[str, Any]]]:    """    Analyze coverage for the given project and CFG.
    Args:        proj: The angr project.        cfg: angr control flow graph.        entry_point: The entry point address.        coverage_dir: The directory containing coverage files.        coverage_file: The coverage file to write to
    Returns:        A tuple containing overall coverage and function coverage information.    """    reachable_blocks, reachable_functions = get_reachable_info(cfg, entry_point)    covered_blocks = read_coverage_files(coverage_dir)    overall_coverage, function_coverage = compare_coverage(proj, reachable_blocks, reachable_functions, covered_blocks)
    with open(coverage_file, 'w') as f:        f.write("n".join([hex(block) for block in reachable_blocks]))
    return overall_coverage, function_coverage
 

并按如下方式使用:

monitor = coverage.CoverageMonitor(shared.proj, shared.cfg, self.entry_point, update_interval=3.0, coverage_dir="cov")monitor.start_monitoring()
 

结论

多亏了工具箱中的这些新工具,我希望您能获得更好的用户体验,并(再次)给它一个机会。这真的是一个很棒的框架!angr

原文始发于微信公众号(黑客白帽子):使用angr挖掘Windows漏洞的一些辅助技巧

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2024年7月15日13:43:42
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   使用angr挖掘Windows漏洞的一些辅助技巧http://cn-sec.com/archives/2925559.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息