[万字长文]Angr 符号执行的真实世界案例与经验分享

admin 2024年8月19日10:51:09评论42 views字数 43168阅读143分53秒阅读模式

[万字长文]Angr 符号执行的真实世界案例与经验分享

介绍

我们可以轻松扩展angr来显示程序中发生的事情。我会分享一些代码片段,用于实时代码覆盖率可视化、状态的跟踪可视化以及其他一些功能,比如为Windows目标提供调试符号支持。

背景故事

angr是一个让我深深着迷的项目。我在 2019 年尝试学习它。同年,我参加了 Rolf Rolles 基于SMT的二进制程序分析培训,这确实解开了我脑海中的一些东西:

  • 这个家伙在两年半的时间里为培训制作了 40,000 行代码框架 + 书面材料:这帮助我正确看待事物,并更现实地了解开发新技术需要花费多少时间。
  • 演示以行业用例为中心,而非CTF问题。这就像学习魔法咒语,你可以将其施展到混淆的代码上并理解它。

虽然我没有立即应用所学知识,但种子已经种下,每年我都尝试用它 angr 解决难题,但每次都放弃。大多数时候,我仍然被同一个老问题困扰:为什么运行 angr 需要很长时间并耗尽了我所有的内存?

当时,我缺乏正确调试/提取其源代码的技能。然而,几年后,我不再感到这些障碍。这可能是因为我在这期间做过更困难、更痛苦的事情,所以angr这次源代码不会吓到我!

好吧,假新闻。ioctlance 在 Windows 驱动程序中进行实验时,我发现在给出的入口点之后的 3 个函数调用处 angr 卡住了。我最小化了测试用例并向同事寻求帮助。我们花了 2 个小时研究它,这 2 个小时终于帮助我克服了最后的心理限制。

我的同事对 angr 很有经验,坚持一步一步地进行 exploration ,每次都用 state.solver.constraints 检查约束。然后,我们发现在调用 memmove 之后,ioctlance 函数被 SimProcedure hook,并且添加了一个 unsat 约束。(SimProcedure 是 angr 中自动给标准库函数添加 Hook 的类,用于模拟抽象标准库函数)

我的缓冲区地址是0xfffffffff,Windows 驱动程序正在对该地址进行某种溢出检查,但并未接受它。奇怪!Hook 的实现对我们来说看起来完全正常,它肯定没有返回这个相当奇怪的值。

我的同事没有时间帮忙,我独自花了 1 个小时,使用 Pycharm 调试器查明这个地址出现的时间和原因。最后我找到了罪魁祸首:

def store(self, addr, data, size=None, condition=None, **kwargs):
        # Fast path
        if type(addr) is int:
            self._store_one_addr(addr, data, True, addr, condition, size, **kwargs)
            return
        elif not self.state.solver.symbolic(addr):
            self._store_one_addr(self.state.solver.eval(addr), data, True, addr, condition, size, **kwargs)
            return

        if self.state.solver.symbolic(addr) and options.AVOID_MULTIVALUED_WRITES in self.state.options:
            # not completed
            return

        try:
            concrete_addrs = self._interleave_ints(sorted(self.concretize_write_addr(addr)))
        except SimMemoryError:
            if options.CONSERVATIVE_WRITE_STRATEGY in self.state.options:
                return  # not completed
            else:
                raise

        # quick optimization so as to not involve the solver if not necessary
        trivial = len(concrete_addrs) == 1 and (addr == concrete_addrs[0]).is_true()
        if not trivial:
            # apply the concretization results to the state
            constraint_options = [addr == concrete_addr for concrete_addr in concrete_addrs]
            conditional_constraint = self.state.solver.Or(*constraint_options)
            self._add_constraints(conditional_constraint, condition=condition, **kwargs)

            if len(concrete_addrs) == 1:
                # simple case: avoid conditional write since the address has been concretized to one solution
                super().store(concrete_addrs[0], data, size=size, **kwargs)
                return

        for concrete_addr in concrete_addrs:
            # perform each of the stores as conditional
            # the implementation of conditionality must be at the bottom of the stack
            self._store_one_addr(concrete_addr, data, trivial, addr, condition, size, **kwargs)

angr需要一个 concrete 地址(读作:一个真实的地址)来存储 memmove 的结果,但 hook 提供了一个不受约束的符号地址(读作:像x代数一样)。

这个问题有点相关:

那不是 bug,那是 angr 能做的最好的事情。你期望从不受约束的指针加载的结果是什么?我们必须将其具体化才能继续执行。rhelmot

这个反问听起来rhelmot确实很聪明,但像我这样的新手在遇到这个问题之前甚至都没有想过它。

所以修复很简单,我使用了strcpySimProcedure已经可用,然后一切都很顺利。我讲这个故事有两个原因:

  • 坚持是关键
  • 使用“黑盒”工具时展示工作流很有用。

angr 的实际用例

此时,我确信,只要付出努力并采用正确的方法论,angr这些问题是可以解决的。嗯……但研究有一个心理因素,你必须相信自己会投入更多的时间和精力,即使到目前为止没有任何回报。有了这种新获得的信心,我已准备好应对接下来的步骤:

  • 了解 angr 到底在做什么。
  • 找到 angr “恢复” 分析的解决方案:angr 有时需要5分钟才能找到我的目标地址,而我不喜欢每次运行脚本时等待5分钟。
  • angr当文档不够时学习如何正确使用。

使用 Pycharm

现代 IDE 必不可少。老实说,我不理解那些使用 Vim 开发大型项目的人。这就像双手被绑在身后编码一样。如果您正在寻找 angr 特定用例,文档不会涵盖足够的信息,因此您有两个选项:

  1. 阅读源代码
  2. 玩转对象属性,看看它们包含什么。

调试器

使用Vim和 时IPython,您通常会在一堆print(obj)和之间交替dir(obj)

Pycharm 会为您显示所有这些内容: 

[万字长文]Angr 符号执行的真实世界案例与经验分享

嗯,遗憾的是它没有显示方法,但有这个功能还是很不错的。

当调试器遇到断点时,还可以使用源代码注释来提供运行时调试信息:

[万字长文]Angr 符号执行的真实世界案例与经验分享

最后但并非最不重要的一点是,PyCharm允许您选择调用堆栈中的任何帧并查看局部变量的过去值。这可以帮助找到该讨厌的值首次出现的方式和时间:

[万字长文]Angr 符号执行的真实世界案例与经验分享

条件断点

如果您知道错误仅在满足特定条件时、在循环多次迭代后或在执行 5 分钟后才会发生,该怎么办?当然,您可以非常耐心地使用手动步骤和/或print语句,但正确的工具是条件断点。以下是我为调试会话设置的断点的真实示例:

[万字长文]Angr 符号执行的真实世界案例与经验分享

自定义类型渲染器

PyCharm提供了一种自定义数据在调试器中显示方式的方法。我发现自己经常读取十六进制地址并在IDA proangr 之间来回跳转。当 Pycharm 显示十进制地址时,我再也认不出它们了,而且print(hex(x)) 只有前 10,000 次这样做才有趣:

[万字长文]Angr 符号执行的真实世界案例与经验分享

同样的情况如下dict

[万字长文]Angr 符号执行的真实世界案例与经验分享

第二张截图中的代码是(感谢Claude 3.5):

{(hex(k) if isinstance(k, int) else k): (hex(v) if isinstance(v, int) else v) for k, v in self.items()}

使用 pickle

我已经知道pickle一段时间了,因为我在交易机器人中大量使用了它。因此,我知道让所有类都可序列化是多么痛苦,并且假设angr 的类太复杂而无法进行 pickle。我错了!

来自文档:

如何序列化 angr 对象?Pickle 可以工作。但是,Python 默认使用非常老旧的 pickle 协议,不支持更复杂的 Python 数据结构,因此您必须指定更高级的数据流格式。最简单的方法是pickle.dumps(obj, -1)

这意味着你可以 pickle 下列东西:

  1. angr.Project
  2. angr.analyses.CFGEmulated
  3. 经过一个耗时的 explore 调用后的 Simulation Manager

通过这样做,您可以恢复您的angr脚本,从而快速迭代试错过程,在我看来,这对于学习和构建新事物都是至关重要的。

内省

让我们进入本文的核心。对于任何长时间运行的作业,我们开发人员都需要确信执行的程序按预期运行,并且不会因为问题而浪费CPU周期。如何诊断这些问题?嗯,最近我喜欢logging反模式:我将所有内容记录到一个文件中,这样我就可以一目了然地了解是否出了问题。

但在这里我们还可以走得更远。

代码覆盖率:基本尝试

在模糊测试中,我们可以使用代码覆盖率和lighthouse可视化模糊测试器的探索(并识别瓶颈)。我们能用angr做到这一点吗?

当然,正如 Jannis Kirschner 在Insomni'hack 2022上所展示的那样:

def get_small_coverage(*args, **kwargs):
    sm = args[0]
    stashes = sm.stashes
    i = 0
    for simstate in stashes["active"]:
        state_history = ""

        for addr in simstate.history.bbl_addrs.hardcopy:
            write_address = hex(addr)
            state_history += "{0}n".format(write_address)
        raw_syminput = simstate.posix.stdin.load(0, state.posix.stdin.size)

        syminput = simstate.solver.eval(raw_syminput, cast_to=bytes)
        print(syminput)
        ip = hex(state.solver.eval(simstate.ip))
        uid = str(uuid.uuid4())
        sid = str(i).zfill(5)
        filename = "{0}_active_{1}_{2}_{3}".format(sid, syminput, ip, uid)

        with open(filename, "w"as f:
            f.write(state_history)
        i += 1

simgr.explore(find=0x00001407, step_func=get_small_coverage)

运行此 step_func (https://docs.angr.io/en/latest/advanced-topics/pipeline.html#step)会在本地文件夹中创建一堆文件,以达到指定目标:

$ ls -lsaht | head
total 74912
 0 drwxr-xr-x    38 user  staff   1.2K Jun 14 14:34 ..
 0 drwxr-xr-x  5416 user  staff   169K Jun 13 15:35 .
16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00164_active_0x1c0010ae4_d9d59da2-daf1-43b3-999a-02d85f814778
16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00163_active_0x1c0010ae6_b9618c9b-6c05-4c2d-8ba1-6a9b74dbdcb5
16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00162_active_0x1c0010b98_81b30e77-01ad-43c0-8990-c58877d738d2
16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00161_active_0x1c0010b9d_18a2bf38-1aa1-47c5-a206-2cc116ab5486
16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00160_active_0x1c0010baf_788b6aae-a9f2-4866-9458-7c28c0bf3390
16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00159_active_0x1c0010bb7_96548594-619a-4a2e-8445-072c208ec02a
16 -rw-r--r--     1 user  staff   7.9K Jun  6 16:13 00158_active_0x1c01000e0_f1177956-e9e4-4c8f-8ab3-a87e4c9821a9

阅读其中一个文件:

$ cat 00164_active_0x1c0010ae4_d9d59da2-daf1-43b3-999a-02d85f814778 | head                                                                                       
0x1c00109e8
0x1c0010a1b
0x1c0010a36
0x1c0010b4c
0x1c0010b88
0x1c002fc30
0x1c0100298
0x1c002fc3b
0x1c0010b8d
0x1c0010b5f

然后我们可以使用 Lighthouse IDA 插件来加载所有这些文件::IDA Pro > Load File > Code Coverage batch

[万字长文]Angr 符号执行的真实世界案例与经验分享

调用堆栈

我的工具箱中的另一个有用工具是能够知道angr当前位置以及它是如何到达那里的。文档指出我们应该使用state.history.descriptions.hardcopystate.history.events

让我们试试这些:

f = simgr.one_found
pprint.pprint(f.history.descriptions.hardcopy)
['<IRSB from 0x140001200: 1 sat>',
 '<SimProcedure HookVPrintf from 0x140001550: 1 sat>',
 '<IRSB from 0x140001238: 2 sat>',
 '<IRSB from 0x1400012f8: 1 sat>',
 '<SimProcedure HookVPrintf from 0x140001550: 1 sat>',
 '<IRSB from 0x140001304: 1 sat>',
 '<IRSB from 0x140001040: 1 sat 1 unsat>',
 '<IRSB from 0x1400011b0: 1 sat 1 unsat>',
 '<IRSB from 0x1400011b9: 1 sat>',
 '<SimProcedure MallocHook from 0x140100050: 1 sat>',
 '<IRSB from 0x1400011c2: 1 sat>',
 '<SimProcedure HookVPrintf from 0x140001550: 1 sat>',
 '<IRSB from 0x1400011e5: 1 sat>',
 '<IRSB from 0x140001311: 1 sat>',
 '<IRSB from 0x140001040: 1 sat 1 unsat>',
 '<IRSB from 0x140001056: 1 sat 1 unsat>',
 '<IRSB from 0x14000112d: 1 sat>',
 '<SimProcedure HookVPrintf from 0x140001550: 1 sat>',
 '<IRSB from 0x14000113c: 1 sat>',
 '<IRSB from 0x14000131b: 1 sat>',
 '<IRSB from 0x140001000: 1 sat 1 unsat>',
 '<IRSB from 0x140001034: 1 sat>',
 '<IRSB from 0x14000132c: 1 sat 1 unsat>',
 '<IRSB from 0x14000133a: 1 sat 1 unsat>']
f.history.events
Out[3]: <angr.state_plugins.history.LambdaIterIter at 0x310f73350>
list(f.history.events)
Out[4]: 
[<SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[95:64] >= 0x0>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[95:64] <= 0x7a6b>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[63:32] >= 0x0>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[63:32] <= 0x7a6b>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[31:0] >= 0x0>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[31:0] <= 0x7a6b>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[95:64] >= 0x0>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[95:64] <= 0x9>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[63:32] >= 0x0>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[63:32] <= 0x9>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[31:0] >= 0x0>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool values_2420_96[31:0] <= 0x9>>>,
 <SimActionConstraint 0x140001200:0 <SAO <Bool operations_2419_96[95:64] == 0x7a6b>>>,
 <SimEvent unconstrained 11890with fields name, bits>,
 <SimEvent unconstrained 11891with fields name, bits>,
 <SimEvent unconstrained 11892with fields name, bits>,
 <SimEvent unconstrained 11893with fields name, bits>,
 <SimEvent unconstrained 11894with fields name, bits>,
 <SimEvent unconstrained 11895with fields name, bits>,
 <SimEvent unconstrained 11896with fields name, bits>,
 <SimEvent fs_write 11897with fields filename, data, size, pos>,
 <SimActionConstraint 0x14000123f:23 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} != 0x7a69>>>,
 <SimActionConstraint 0x140001289:18 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} != 0x7a6a>>>,
 <SimActionConstraint 0x1400012f6:18 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} == 0x7a6b>>>,
 <SimEvent fs_write 11906with fields filename, data, size, pos>,
 <SimEvent unconstrained 11933with fields name, bits>,
 <SimEvent unconstrained 11934with fields name, bits>,
 <SimEvent fs_write 11935with fields filename, data, size, pos>,
 <SimEvent unconstrained 11938with fields name, bits>,
 <SimEvent unconstrained 11967with fields name, bits>,
 <SimEvent fs_write 11968with fields filename, data, size, pos>,
 <SimActionConstraint 0x140001007:22 <SAO <Bool mem_100000_2428_32{UNINITIALIZED} != 0x1>>>]

这里面有很多信息,但我发现很难阅读,而且我们正在丢失重要的信息,因为调用堆栈是“扁平化的”(可能不是正确的词,但我坚持使用它)。

考虑到这些缺点,我实现了一个简单的函数,可以打印某种angr状态的带注释的回溯,并使用缩进来显示控制流:

def pretty_print_callstack(state: angr.SimState, max_depth: int = 10) -> None:
    """
    Print a formatted call stack for a given state.

    Args:
        state: The simulation state.
        max_depth: Maximum depth of the call stack to print.
    """

    state_history: str = "Call Stack:n"
    kb_functions = shared.proj.kb.functions

    last_addr: int = 0
    repeat_count: int = 0
    formatted_lines: List[str] = []
    call_stack: List[angr.knowledge_plugins.functions.function.Function] = []
    current_func: angr.knowledge_plugins.functions.function.Function | None = None

    for i, addr in enumerate(state.history.bbl_addrs.hardcopy):
        func: angr.knowledge_plugins.functions.function.Function = kb_functions.floor_func(addr)

        if addr == last_addr:
            repeat_count += 1
        else:
            if repeat_count > 0:
                formatted_lines[-1] += f" (repeated {repeat_count + 1} times)"
                repeat_count = 0

            if func != current_func:
                if func in call_stack:
                    while call_stack and call_stack[-1] != func:
                        call_stack.pop()
                    if call_stack:
                        call_stack.pop()
                else:
                    call_stack.append(func)
                current_func = func

            indent: str = ' ' * (len(call_stack) * 2)
            if func:
                fname: str = func.human_str if hasattr(func, 'human_str'else func.name
                func_prototype: str = func.prototype if hasattr(func, 'prototype'else ""
                formatted_lines.append(
                    f"{indent}-> 0x{addr:x} : {fname} {func_prototype} ({len(list(func.xrefs))} xrefs)")
            else:
                formatted_lines.append(f"{indent}-> 0x{addr:x} : Unknown function")

        last_addr = addr

    if repeat_count > 0:
        formatted_lines[-1] += f" (repeated {repeat_count + 1} times)"

    state_history += "n".join(formatted_lines)

    if len(formatted_lines) > max_depth + 3:
        logger.debug("n".join([state_history.split("n")[0]] + formatted_lines[:max_depth]))
        logger.debug(f"...(truncated {len(formatted_lines) - (max_depth + 3)} lines)")
        logger.debug("n".join(formatted_lines[-3:]))
    else:
        logger.debug(state_history)

将显示:

Active state: <SimState @ 0x1400010f0>
2024-06-30 13:39:57 | DEBUG | [introspection.py:113] pretty_print_callstack() | Call Stack:
  -> 0x140001200 : sub_140001200 None (0 xrefs)
    -> 0x140001550 : sub_140001550 None (0 xrefs)
-> 0x140001238 : sub_140001200 None (0 xrefs)
-> 0x140001284 : sub_140001200 None (0 xrefs)
-> 0x1400012f1 : sub_140001200 None (0 xrefs)
-> 0x1400012f8 : sub_140001200 None (0 xrefs)
  -> 0x140001550 : sub_140001550 None (0 xrefs)
    -> 0x140001304 : sub_140001200 None (0 xrefs)
      -> 0x140001040 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
      -> 0x1400011b0 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
      -> 0x1400011b9 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
        -> 0x140100050 : malloc (unsigned long (64 bits)) -> void* (0 xrefs)
    -> 0x1400011c2 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x140001550 : sub_140001550 None (0 xrefs)
  -> 0x1400011e5 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
    -> 0x140001311 : sub_140001200 None (0 xrefs)
-> 0x140001040 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x140001056 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x140001060 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x14000106a : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010b9 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010c5 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010d9 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010f0 : sub_140001040 (long long (64 bits), long long (64 bits)) -> void (0 xrefs) (repeated 97 times)

注意最后一行:repeated 97 timesangr正在探索一个包含 100 次迭代的循环。可用于诊断状态爆炸!

目前,调用堆栈缺少一些符号。不幸的是,angr它还不支持 Windows 符号。让我们来解决这个问题。

Windows 调试符号

*.pdbWindows 调试符号以与已编译二进制文件共存的文件形式提供。

下载符号

在 Windows 平台上,symchk.exe 通常用于下载符号。但是添加这种依赖项会有点可惜,所以让我们重新实现它。如果您知道 symchk.exe 如何工作的,它基本上只是向此URL发出HTTP请求:

https://msdl.microsoft.com/download/symbols/{pdbname}/{signature}/{pdbname}

  • pdbname : PDB文件的名称。令人惊讶的是,它并不总是与PE文件同名(它是ntkrnlmp.pdb) 。您可以通过遍历PE文件中的DIRECTORY_ENTRY_DEBUG条目从 PdbFileName 字段中获取它 。
  • guid:您可以从相同的 DIRECTORY_ENTRY_DEBUG 条目中 Signature_String 字段获取它。

由于pefile已经处于angr的依赖关系中,因此让我们重用它。

def download_pdb(self, download_dir: str = ".") -> None:
    """
    Download the PDB file for the current binary.

    Args:
        download_dir (str): Directory to save the downloaded PDB.
    """

    pdbname, signature = self.get_pdb_info(self.proj.filename)

    if self.is_pdb_stored_locally(download_dir, pdbname):
        logger.info(f"PDB already exists locally for {self.proj.filename}")
        return

    download_url = f"https://msdl.microsoft.com/download/symbols/{pdbname}/{signature.upper()}/{pdbname}"
    logger.info(f"Downloading PDB from: {download_url}")

    r = requests.head(
        download_url,
        headers={"User-Agent""Microsoft-Symbol-Server/10.0.10036.206"},
        allow_redirects=True
    )

    if r.status_code == 200:
        target_file = os.path.join(download_dir, pdbname)
        with requests.get(r.url, headers={"User-Agent""Microsoft-Symbol-Server/10.0.10036.206"},
                            stream=Trueas pdb:
            pdb.raise_for_status()
            with open(target_file, "wb"as f:
                for chunk in pdb.iter_content(chunk_size=8192):
                    f.write(chunk)
        logger.info(f"PDB downloaded to: {target_file}")
        self.pdb_path = target_file
    else:
        logger.error(f"(HTTP {r.status_code}) Could not find PDB at {download_url}")

@staticmethod
def get_pdb_info(binary_path: str) -> Tuple[str, str]:
    """
    Extract PDB name and signature from a PE file.

    Args:
        binary_path (str): Path to the PE file.

    Returns:
        Tuple[str, str]: A tuple containing the PDB name and signature.
    """

    pe_obj = pefile.PE(binary_path, fast_load=True)
    pe_obj.parse_data_directories([pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']])

    for debug_entry in pe_obj.DIRECTORY_ENTRY_DEBUG:
        if hasattr(debug_entry.entry, 'PdbFileName'):
            pdb_filename = debug_entry.entry.PdbFileName.decode('utf-8').rstrip('x00')
            guid = debug_entry.entry.Signature_String

            # Use only the filename if a full path is given
            pdb_filename = os.path.basename(pdb_filename)

            return pdb_filename, guid

    raise ValueError("No PDB debug information found in the PE file")

@staticmethod
def is_pdb_stored_locally(directory: str, filename: str) -> bool:
    """Check if the PDB file already exists locally."""
    return os.path.exists(os.path.join(directory, filename))

注意到我们发送的 User-Agent 标头了吗?Microsoft-Symbol-Server/10.0.10036.206。这就是我们假装成合法的符号服务器客户端。有时,你必须装扮一下才能得到你想要的东西!

简而言之,此代码执行以下操作:

  1. 检查我们是否已经在本地拥有此PDB 。
  2. pefile 用于提取PDB 信息。
  3. 构建URL。
  4. 检查微软的服务器是否喜欢我们的请求(HTTP  200)。
  5. 下载文件。
  6. 如果出现问题 - 例如PDB不在我们所期望的位置 - 我们会记录错误。因为,你知道,事情总会发生。

就是这样。我们只是甜言蜜语地说服微软的服务器给我们调试符号。但在你像老板一样继续调试之前,你必须解析它并用新名称填充angr的 kb。

分解

PDB文件中的符号通常都是错乱的。众所周知的cxxfilt库对我来说不起作用,但我发现了一个由防病毒供应商AVG制作的库。尽管最后一次提交是在 7 年前,但该库开箱即用。

更新 angr 的知识库

当谈到知道如何解析PDB文件的 Python 库时,我们没有太多选择。我决定使用pdbparse,它很旧而且有缺陷。网上的示例对我来说不起作用,所以我使用手动探索了解析的PDBpycharm's debugger文件的所有属性并整理了以下代码:

def load_global_symbols(self) -> Dict[int, str]:
    """
    Load global symbols from the PDB.

    Returns:
        Dict[int, str]: A dictionary mapping offsets to symbol names.
    """

    globals_symbols = {}
    for stream in self.pdb.streams:
        if hasattr(stream, 'funcs'):
            for sym, sym_value in stream.funcs.items():
                globals_symbols[sym_value.offset] = sym_value.name
                logger.debug(f"Global symbol: {sym_value.name} at {hex(sym_value.offset)}")
    return globals_symbols

注意,我使用了offset属性。这是一个相对虚拟地址 ( RVA ),不能原样使用angr。我们必须先调整它。对于 Windows 二进制文件,angr似乎为所有函数提供了地址。然而, PDBloaded base + offset中的偏移量是它们封闭的PE部分的偏移量。知道了这一点,让我们把下面的代码放在一起:

def get_text_section_offset(self) -> int:
    """
    Get the offset of the .text section from the image base.

    Returns:
        int: The offset of the .text section, or 0 if not found.
    """

    main_object = self.proj.loader.main_object
    for section_name, section in main_object.sections_map.items():
        if section_name.startswith('.text'):
            return section.vaddr - main_object.mapped_base

    logger.warning("Could not find .text section. Using 0 as offset.")
    return 0

def address_to_symbol(self, address: int) -> Optional[str]:
    """
    Convert an address to a symbol name.

    Args:
        address (int): The address to look up.

    Returns:
        Optional[str]: The symbol name if found, None otherwise.
    """

    rva: int = address - self.proj.loader.main_object.mapped_base

    adjusted_rva: int = rva - self.text_section_offset
    symbol: Optional[str] = self.symbols.get(adjusted_rva)
    if symbol:
        return symbol

    logger.warning(f"Symbol not found for address {hex(address)} (RVA: {hex(rva)}, Adjusted RVA: {hex(adjusted_rva)})")
    return None

这使用loader.main_object.mapped_base.text部分偏移来调整所有偏移。请注意,我们假设每个符号都位于.text 中,这显然不是真的,但我编写代码不是为了创造艺术,而是为了解决我面临的足够多的问题。

然后,我们可以用这些信息更新知识库angr

   def update_kb_with_symbols(self):
        """
        Update the knowledge base with symbols.

        This method updates the names of functions in the angr knowledge base
        with demangled symbols from the PDB.
        """

        for func in self.proj.kb.functions.values():
            symbol = self.address_to_symbol(func.addr)
            if symbol:
                demangled = self.demangle_name(symbol)
                func.name = demangled
                logger.debug(f"Function {hex(func.addr)} updated with symbol: {func.name}")

整个文件是:

import os
import re
import requests
from typing import Dict, Optional, Tuple

import angr
import pdbparse
import pefile
from cppmangle import demangle, cdecl_sym

from helpers.log import logger


class SymbolManager:
    """
    A class to manage symbols for an angr project.

    This class handles loading symbols from PDB files, demangling names,
    mapping addresses to symbols, and downloading PDB files if necessary.
    """


    def __init__(self, proj: angr.Project):
        """
        Initialize the SymbolManager.

        Args:
            proj (angr.Project): The angr project to analyze.
        """

        self.proj: angr.Project = proj
        self.pdb: Optional[pdbparse.PDB] = None
        self.symbols: Dict[int, str] = {}
        self.text_section_offset: int = 0

        self.load_symbols()
        self.load_global_symbols()
        self.get_text_section_offset()
        self.pdb_path = ""

    def load_symbols(self, download_dir: str = ".") -> None:
        """
        Load symbols for the angr project from a PDB file.

        If the PDB file doesn't exist, it attempts to download it.
        """

        binary_path: str = self.proj.filename
        pdb_path, _ = self.get_pdb_info(binary_path)

        self.pdb_path = os.path.join(download_dir, pdb_path)

        if not os.path.exists(self.pdb_path):
            logger.info(f"PDB file not found: {pdb_path}")
            self.download_pdb(os.path.dirname(pdb_path))

        if os.path.exists(self.pdb_path):
            self.pdb = pdbparse.parse(self.pdb_path)
            logger.info(f"Loaded PDB file: {pdb_path}")
        else:
            logger.warning("Failed to load PDB file")

    def load_global_symbols(self) -> None:
        """
        Load global symbols from the PDB.
        """

        if not self.pdb:
            logger.warning("No PDB loaded, cannot load global symbols")
            return

        for stream in self.pdb.streams:
            if hasattr(stream, 'funcs'):
                for sym, sym_value in stream.funcs.items():
                    self.symbols[sym_value.offset] = sym_value.name
                    logger.debug(f"Global symbol: {sym_value.name} at {hex(sym_value.offset)}")

    def get_text_section_offset(self) -> None:
        """
        Get the offset of the .text section from the image base.
        """

        main_object = self.proj.loader.main_object
        for section_name, section in main_object.sections_map.items():
            if section_name.startswith('.text'):
                self.text_section_offset = section.vaddr - main_object.mapped_base
                return

        logger.warning("Could not find .text section. Using 0 as offset.")
        self.text_section_offset = 0

    @staticmethod
    def demangle_name(mangled_name: str) -> str:
        """
        Demangle a C++ function name and extract just the function name.

        Args:
            mangled_name (str): The mangled function name.

        Returns:
            str: The demangled function name without parameters or return type.
        """

        try:
            full_demangled: str = cdecl_sym(demangle(mangled_name))
            match: Optional[re.Match] = re.search(r'(?:.*::)?(w+)(', full_demangled)
            return match.group(1if match else full_demangled
        except:
            return mangled_name

    def address_to_symbol(self, address: int) -> Optional[str]:
        """
        Convert an address to a symbol name.

        Args:
            address (int): The address to look up.

        Returns:
            Optional[str]: The symbol name if found, None otherwise.
        """

        rva: int = address - self.proj.loader.main_object.mapped_base

        """symbol: Optional[str] = self.symbols.get(rva)
        if symbol:
            return symbol
        """


        adjusted_rva: int = rva - self.text_section_offset
        symbol: Optional[str] = self.symbols.get(adjusted_rva)
        if symbol:
            return symbol

        logger.warning(f"Symbol not found for address {hex(address)} (RVA: {hex(rva)}, Adjusted RVA: {hex(adjusted_rva)})")
        return None

    def update_kb_with_symbols(self) -> None:
        """
        Update the knowledge base with symbols.

        This method updates the names of functions in the angr knowledge base
        with demangled symbols from the PDB.
        """

        for func in self.proj.kb.functions.values():
            symbol = self.address_to_symbol(func.addr)
            if symbol:
                demangled = self.demangle_name(symbol)
                func.name = demangled
                logger.debug(f"Function {hex(func.addr)} updated with symbol: {func.name}")

    def download_pdb(self, download_dir: str = ".") -> None:
        """
        Download the PDB file for the current binary.

        Args:
            download_dir (str): Directory to save the downloaded PDB.
        """

        pdbname, signature = self.get_pdb_info(self.proj.filename)

        if self.is_pdb_stored_locally(download_dir, pdbname):
            logger.info(f"PDB already exists locally for {self.proj.filename}")
            return

        download_url = f"https://msdl.microsoft.com/download/symbols/{pdbname}/{signature.upper()}/{pdbname}"
        logger.info(f"Downloading PDB from: {download_url}")

        r = requests.head(
            download_url,
            headers={"User-Agent""Microsoft-Symbol-Server/10.0.10036.206"},
            allow_redirects=True
        )

        if r.status_code == 200:
            target_file = os.path.join(download_dir, pdbname)
            with requests.get(r.url, headers={"User-Agent""Microsoft-Symbol-Server/10.0.10036.206"},
                              stream=Trueas pdb:
                pdb.raise_for_status()
                with open(target_file, "wb"as f:
                    for chunk in pdb.iter_content(chunk_size=8192):
                        f.write(chunk)
            logger.info(f"PDB downloaded to: {target_file}")
            self.pdb_path = target_file
        else:
            logger.error(f"(HTTP {r.status_code}) Could not find PDB at {download_url}")

    @staticmethod
    def get_pdb_info(binary_path: str) -> Tuple[str, str]:
        """
        Extract PDB name and signature from a PE file.

        Args:
            binary_path (str): Path to the PE file.

        Returns:
            Tuple[str, str]: A tuple containing the PDB name and signature.
        """

        pe_obj = pefile.PE(binary_path, fast_load=True)
        pe_obj.parse_data_directories([pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']])

        for debug_entry in pe_obj.DIRECTORY_ENTRY_DEBUG:
            if hasattr(debug_entry.entry, 'PdbFileName'):
                pdb_filename = debug_entry.entry.PdbFileName.decode('utf-8').rstrip('x00')
                guid = debug_entry.entry.Signature_String

                # Use only the filename if a full path is given
                pdb_filename = os.path.basename(pdb_filename)

                return pdb_filename, guid

        raise ValueError("No PDB debug information found in the PE file")

    @staticmethod
    def is_pdb_stored_locally(directory: str, filename: str) -> bool:
        """Check if the PDB file already exists locally."""
        return os.path.exists(os.path.join(directory, filename))

并且可以像这样使用:

symbol_manager = symbols.SymbolManager(proj) # angr.Project
symbol_manager.download_pdb()
symbol_manager.update_kb_with_symbols()

之前显示的相同信息现在显示了正确的函数名称:

Active state: <SimState @ 0x1400010f0>
2024-06-30 13:56:35 | DEBUG | [introspection.py:113] pretty_print_callstack() | Call Stack:
  -> 0x140001200 : run_heap_operations None (0 xrefs)
    -> 0x140001550 : printf None (0 xrefs)
-> 0x140001238 : run_heap_operations None (0 xrefs)
-> 0x140001284 : run_heap_operations None (0 xrefs)
-> 0x1400012f1 : run_heap_operations None (0 xrefs)
-> 0x1400012f8 : run_heap_operations None (0 xrefs)
  -> 0x140001550 : printf None (0 xrefs)
    -> 0x140001304 : run_heap_operations None (0 xrefs)
      -> 0x140001040 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
      -> 0x1400011b0 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
      -> 0x1400011b9 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
        -> 0x140100050 : malloc (unsigned long (64 bits)) -> void* (0 xrefs)
    -> 0x1400011c2 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x140001550 : printf None (0 xrefs)
  -> 0x1400011e5 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
    -> 0x140001311 : run_heap_operations None (0 xrefs)
-> 0x140001040 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x140001056 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x140001060 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x14000106a : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010b9 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010c5 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010d9 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs)
-> 0x1400010f0 : do_heap_op (long long (64 bits), long long (64 bits)) -> void (0 xrefs) (repeated 97 times)

处理错误状态

我制作了这个简单的辅助函数来打印 Python 回溯:

def show_errors(state: angr.SimState) -> None:
    """
    Log error information for a given state.

    Args:
        state: The simulation state.
    """

    logger.debug(f'errored state: {state}')
    logger.debug(f'error message: {state.error}')

    tb: Any = state.traceback

    while tb.tb_next:
        logger.error(f'{tb.tb_frame}')
        tb = tb.tb_next

    logger.error(f'{tb.tb_frame}')

带趋势的实时代码覆盖率

angr每 5 秒钟更新一次状态,显示新发现了多少个块、每个函数的代码覆盖率进度百分比以及显示 angr 是否正在减慢的图表,这不是很好吗?

演示时间:

INFO     core.coverage:coverage.py:90 --- Coverage Update at 0.01 seconds ---
INFO     core.coverage:coverage.py:117 Overall coverage: 0.00% [+0 blocks total]
INFO     core.coverage:coverage.py:119 Newly discovered functions: sub_140001550, sub_140001b68, sub_140001380, sub_140002070, UnhandledExceptionFilter, QueryPerformanceCounter, free
INFO     core.coverage:coverage.py:90 --- Coverage Update at 4.10 seconds ---
INFO     core.coverage:coverage.py:112 Function: sub_140001550 - Covered blocks: 1/21 (4.76%) [+1 blocks]
INFO     core.coverage:coverage.py:112 Function: sub_140001040 - Covered blocks: 20/30 (66.67%) [+20 blocks]
INFO     core.coverage:coverage.py:112 Function: sub_140001200 - Covered blocks: 32/35 (91.43%) [+32 blocks]
INFO     core.coverage:coverage.py:112 Function: sub_140001000 - Covered blocks: 8/8 (100.00%) [+8 blocks]
INFO     core.coverage:coverage.py:117 Overall coverage: 0.22% [+61 blocks total]
INFO     core.coverage:coverage.py:90 --- Coverage Update at 7.78 seconds ---
INFO     core.coverage:coverage.py:112 Function: sub_140001040 - Covered blocks: 22/30 (73.33%) [+2 blocks]
INFO     core.coverage:coverage.py:112 Function: sub_140001200 - Covered blocks: 33/35 (94.29%) [+1 blocks]
INFO     core.coverage:coverage.py:117 Overall coverage: 0.23% [+3 blocks total]
INFO     core.coverage:coverage.py:90 --- Coverage Update at 11.07 seconds ---
INFO     core.coverage:coverage.py:117 Overall coverage: 0.23% [+0 blocks total]
INFO     core.coverage:coverage.py:90 --- Coverage Update at 14.33 seconds ---

以及一张可视化图表: 

[万字长文]Angr 符号执行的真实世界案例与经验分享

该代码是一个简单的类,可以立即插入到您的angr项目中:

import os
import time
import threading
from typing import Dict, List, Tuple, Set, Any
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.animation import FuncAnimation
import angr
import logging

logger = logging.getLogger(__name__)

logging.getLogger("matplotlib").setLevel(logging.ERROR)


class CoverageMonitor:
    def __init__(self, proj: angr.Project, cfg: angr.analyses.CFGEmulated, entry_point: int,
                 update_interval: float = 5.0, coverage_dir: str = "cov")
:

        """
        Initialize the CoverageMonitor.

        :param proj: The Angr project
        :param cfg: The Control Flow Graph
        :param entry_point: The entry point address
        :param update_interval: The interval between updates in seconds
        """

        self.proj: angr.Project = proj
        self.cfg: angr.analyses.CFGEmulated = cfg
        self.entry_point: int = entry_point
        self.update_interval: float = update_interval
        self.coverage_data: Dict[str, List[Tuple[float, int, float]]] = {}
        self.overall_coverage_data: List[Tuple[float, float]] = []
        self.start_time: float = time.time()
        self.stop_event: threading.Event = threading.Event()
        self.previous_coverage: Dict[str, Dict[str, int]] = {}
        self.previous_total_blocks: int = 0
        self.previous_functions: Set[str] = set()
        self.coverage_dir: str = coverage_dir

    def start_monitoring(self) -> None:
        """Start the coverage monitoring thread."""

        # clear the coverage directory
        for filename in os.listdir(self.coverage_dir):
            if filename.startswith("00"):
                os.remove(os.path.join(self.coverage_dir, filename))

        self.monitoring_thread = threading.Thread(target=self._monitor_coverage)
        self.monitoring_thread.start()

    def stop_monitoring(self) -> None:
        """Stop the coverage monitoring thread."""
        self.stop_event.set()
        self.monitoring_thread.join()

    def _monitor_coverage(self) -> None:
        """Monitor the coverage and update the data periodically."""
        while not self.stop_event.is_set():
            self._update_coverage()
            self.plot_coverage()
            time.sleep(self.update_interval)

    def _analyze_coverage(self) -> Tuple[float, Dict[str, Dict[str, int]]]:
        """
        Analyze the current coverage using Angr.

        :return: A tuple containing overall coverage percentage and function-wise coverage data
        """

        overall_coverage, function_coverage = analyze_coverage(self.proj, self.cfg, self.entry_point, "cov")

        # Convert the function_coverage to the format we need
        formatted_coverage: Dict[str, Dict[str, int]] = {}
        for func_addr, data in function_coverage.items():
            func_name = self.proj.kb.functions.get(func_addr).name
            formatted_coverage[func_name] = {
                "covered_blocks": data['covered_blocks'],
                "total_blocks": data['total_blocks']
            }

        return overall_coverage, formatted_coverage

    def _update_coverage(self) -> None:
        """Update the coverage data and log the results."""
        overall_coverage, function_coverage = self._analyze_coverage()
        elapsed_time = time.time() - self.start_time

        total_blocks = 0
        new_functions = set(function_coverage.keys()) - self.previous_functions

        logger.info(f"--- Coverage Update at {elapsed_time:.2f} seconds ---")

        for func_name, data in function_coverage.items():
            if func_name not in self.coverage_data:
                self.coverage_data[func_name] = []

            covered_blocks = data['covered_blocks']
            total_blocks += covered_blocks
            total_func_blocks = data['total_blocks']
            coverage_percentage = (covered_blocks / total_func_blocks) * 100 if total_func_blocks > 0 else 0

            self.coverage_data[func_name].append((elapsed_time, covered_blocks, coverage_percentage))

            # Calculate difference from previous update
            prev_covered = self.previous_coverage.get(func_name, {}).get('covered_blocks'0)
            block_diff = covered_blocks - prev_covered

            if block_diff > 0 or func_name in new_functions:

                if covered_blocks == 0:
                    continue

                logger.info(f"Function: {func_name} - Covered blocks: {covered_blocks}/{total_func_blocks} "
                            f"({coverage_percentage:.2f}%) [+{block_diff} blocks]")

        # Log overall statistics
        new_total_blocks = total_blocks - self.previous_total_blocks
        logger.info(f"Overall coverage: {overall_coverage:.2f}% [+{new_total_blocks} blocks total]")
        if new_functions:
            logger.info(f"Newly discovered functions: {', '.join(new_functions)}")

        # Update overall coverage data
        self.overall_coverage_data.append((elapsed_time, overall_coverage))

        # Update previous state
        self.previous_coverage = function_coverage
        self.previous_total_blocks = total_blocks
        self.previous_functions = set(function_coverage.keys())

    def plot_coverage(self) -> None:
        """Plot the coverage evolution over time."""
        fig, (ax1, ax2) = plt.subplots(21, figsize=(1212))

        # Plot overall coverage
        times, coverages = zip(*self.overall_coverage_data)
        ax1.plot(times, coverages, label='Overall Coverage', linewidth=2, color='black')
        ax1.set_xlabel('Time (seconds)')
        ax1.set_ylabel('Coverage (%)')
        ax1.set_title('Overall Coverage Evolution Over Time')
        ax1.legend()
        ax1.grid(True)

        # Plot function-wise coverage
        for func_name, data in self.coverage_data.items():
            times, _, coverages = zip(*data)
            ax2.plot(times, coverages, label=func_name)

        ax2.set_xlabel('Time (seconds)')
        ax2.set_ylabel('Coverage (%)')
        ax2.set_title('Function-wise Coverage Evolution Over Time')
        ax2.legend(loc='center left', bbox_to_anchor=(10.5))
        ax2.grid(True)

        plt.tight_layout()
        plt.show()


def monitor_coverage(proj: angr.Project, cfg: angr.analyses.CFGEmulated, entry_point: int,
                     duration: float = 10.0, update_interval: int = 5)
 -> None:

    """
    Monitor the coverage evolution for a specified duration.

    :param proj: The Angr project
    :param cfg: The Control Flow Graph
    :param entry_point: The entry point address
    :param duration: The duration to monitor in seconds
    :param update_interval: The interval between updates in seconds
    """

    monitor = CoverageMonitor(proj, cfg, entry_point, update_interval=update_interval)
    monitor.start_monitoring()

    try:
        time.sleep(duration)
    finally:
        monitor.stop_monitoring()
        monitor.plot_coverage()


def get_reachable_info(cfg: angr.analyses.cfg.cfg_fast.CFGBase, entry_point: int) -> Tuple[
    Set[int], Dict[int, Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode]]]:

    """
    Get reachable blocks and functions from the entry point in the CFG.

    Args:
        cfg: The control flow graph.
        entry_point: The entry point address.

    Returns:
        A tuple containing reachable blocks and reachable functions.
    """

    entry_node: angr.knowledge_plugins.cfg.cfg_node.CFGNode = cfg.get_any_node(entry_point)
    if not entry_node:
        raise ValueError(f"Entry point {hex(entry_point)} not found in CFG")

    reachable_nodes: Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode] = nx.descendants(cfg.graph, entry_node)
    reachable_nodes.add(entry_node)

    reachable_blocks: Set[int] = set(node.addr for node in reachable_nodes if node.block)

    reachable_functions: Dict[int, Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode]] = {}
    for node in reachable_nodes:
        if node.function_address not in reachable_functions:
            reachable_functions[node.function_address] = set()
        reachable_functions[node.function_address].add(node)

    return reachable_blocks, reachable_functions


def read_coverage_files(coverage_dir: str) -> Set[int]:
    """
    Read coverage files and return a set of covered block addresses.

    Args:
        coverage_dir: The directory containing coverage files.

    Returns:
        A set of covered block addresses.
    """

    covered_blocks: Set[int] = set()
    for filename in os.listdir(coverage_dir):
        if filename.startswith("00"):
            with open(os.path.join(coverage_dir, filename), 'r'as f:
                covered_blocks.update(int(line.strip(), 16for line in f if line.strip())
    return covered_blocks


def compare_coverage(proj: angr.Project, reachable_blocks: Set[int],
                     reachable_functions: Dict[int, Set[angr.knowledge_plugins.cfg.cfg_node.CFGNode]],
                     covered_blocks: Set[int])
 -> Tuple[float, Dict[str, Dict[str, Any]]]:

    """
    Compare coverage between reachable blocks and covered blocks.

    Args:
        proj: The angr project.
        reachable_blocks: Set of reachable block addresses.
        reachable_functions: Dictionary of reachable functions and their nodes.
        covered_blocks: Set of covered block addresses.

    Returns:
        A tuple containing overall coverage and function coverage information.
    """

    total_reachable: int = len(reachable_blocks)
    total_covered: int = len(covered_blocks.intersection(reachable_blocks))
    overall_coverage: float = total_covered / total_reachable if total_reachable > 0 else 0

    function_coverage: Dict[str, Dict[str, Any]] = {}
    for func_addr, nodes in reachable_functions.items():
        func: angr.knowledge_plugins.functions.function.Function = proj.kb.functions.get(func_addr)
        if func:
            func_blocks: Set[int] = set(node.addr for node in nodes if node.block)
            covered_func_blocks: Set[int] = func_blocks.intersection(covered_blocks)
            coverage: float = len(covered_func_blocks) / len(func_blocks) if func_blocks else 0
            function_coverage[func.name] = {
                'address': func_addr,
                'total_blocks': len(func_blocks),
                'covered_blocks': len(covered_func_blocks),
                'coverage': coverage
            }

    return overall_coverage, function_coverage


def analyze_coverage(proj: angr.Project, cfg: angr.analyses.cfg.cfg_fast.CFGBase, entry_point: int,
                     coverage_dir: str, coverage_file: str = 'reachable_blocks.txt')
 -> Tuple[float, Dict[str, Dict[str, Any]]]:

    """
    Analyze coverage for the given project and CFG.

    Args:
        proj: The angr project.
        cfg: angr control flow graph.
        entry_point: The entry point address.
        coverage_dir: The directory containing coverage files.
        coverage_file: The coverage file to write to

    Returns:
        A tuple containing overall coverage and function coverage information.
    """

    reachable_blocks, reachable_functions = get_reachable_info(cfg, entry_point)
    covered_blocks = read_coverage_files(coverage_dir)
    overall_coverage, function_coverage = compare_coverage(proj, reachable_blocks, reachable_functions, covered_blocks)

    with open(coverage_file, 'w'as f:
        f.write("n".join([hex(block) for block in reachable_blocks]))

    return overall_coverage, function_coverage

使用方法如下

monitor = coverage.CoverageMonitor(shared.proj, shared.cfg, self.entry_point, update_interval=3.0, coverage_dir="cov")
monitor.start_monitoring()

结论

感谢您的工具箱中的这些新工具,我希望您能获得更好的用户体验,并(再次)给 angr 一个机会。它真的是一个很棒的框架!

翻译自:https://plowsec.github.io/angr-introspection-2024.html【阅读原文跳转】

我们的交流群

添加小助手微信加入: OxCSorder

[万字长文]Angr 符号执行的真实世界案例与经验分享

原文始发于微信公众号(二进制磨剑):[万字长文]Angr 符号执行的真实世界案例与经验分享

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2024年8月19日10:51:09
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   [万字长文]Angr 符号执行的真实世界案例与经验分享http://cn-sec.com/archives/3079308.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息