Angr 构建污点分析 x 反混淆

admin 2025年2月4日21:09:04评论83 views字数 11002阅读36分40秒阅读模式

污点分析

污点分析(Taint Analysis)是一种广泛应用于软件安全和漏洞检测领域的程序分析技术。其核心思想是通过追踪程序中不可信或敏感数据(称为“污点数据”)的传播路径,识别这些数据是否可能被用于危险操作(如执行恶意代码、触发漏洞等)。通过这种方式,污点分析能够帮助发现潜在的安全漏洞、隐私泄露或逻辑缺陷。

基本工作原理

  1. 污点标记(Taint Source)在程序执行或静态分析过程中,将外部输入(如用户输入、网络数据、文件读取等)标记为“污点源”,即可能携带风险的初始数据。
  2. 污点传播(Taint Propagation)跟踪污点数据在程序中的流动路径,包括变量赋值、函数调用、表达式计算等操作。若污点数据通过合法操作(如字符串拼接、算术运算)传播到其他变量,则新变量也会被标记为污点。
  3. 敏感操作检测(Sink Check)当污点数据到达“敏感操作点”(如数据库查询、系统命令执行、内存写入等)时,分析器会触发警报,判定是否存在安全风险(如SQL注入、命令注入、缓冲区溢出等)。

反混淆与污点分析

在反混淆场景下的二进制分析任务中,经常需要区分正常指令与混淆产生的垃圾指令,污点分析可用于区分正常指令与混淆产生的垃圾指令。

控制流平坦化 CFF

将控制流平坦化中的状态变量设置为污点源,污点分析记录每一条与污点源相关的所有指令,包括条件跳转。

Angr 构建污点分析 x 反混淆

对状态变量设置污点源,并进行指令级的污点分析,能够找出所有状态变量相关的指令,包括内存加载、寄存器移动、加减、比较等指令。

虚假控制流

如下图是一个 Hikari 虚假控制流(简称 bcf)混淆效果。

Angr 构建污点分析 x 反混淆

bcf 混淆会生成许多虚假的基本块,并用一个永真式嵌入原程序, bcf 生成的表达式中依赖一些全局变量(例如上图中的 dword_34C8)。首先将 bcf 涉及到的全局变量全部设置为污点源。Hikari bcf 的全局变量很容易识别,所有Hikari bcf 全局变量位于一个固定的全局连续区间,只需要人工标记区间的开始和结束位置即可。

基于 angr 构建污点分析框架

针对反混淆需求,污点分析框架应该有回答如下问题的能力:

(1)某指定地址的指令位置,该位置的指令是否会被污点源污染?

(2)某指定地址的指令位置,指定的寄存器是否被污点源污染?

(3)某指定地址的指令位置,指定的内存是否被污点源污染?

利用 angr 符号执行的特性能够构建一个基于符号执行的污点分析框架。“污点” 用 angr 中的符号值来表示,随着符号值参与执行运算,污点对应的符号值随着执行指令的语义污点传播。

同时,angr 的多指令集符号执行能力,让基于 angr 构建的污点分析框架能够跨指令集工作。

污点源

针对反混淆的需求,污点源通常是寄存器和内存。

def taint_memory(state, address, size_in_bytes):
    state.memory.store(address, claripy.BVS("taint", size_in_bytes*8))

def taint_register(state, reg_name, reg_size_in_bytes=8):
    setattr(state.regs, reg_name, claripy.BVS("taint", reg_size_in_bytes*8))

实现原理是将指定的内存或寄存器符号化,并且符号名是 taint

判断表达式是否污点

用表达式来抽象“寄存器”和“内存数据”,从寄存器和内存中读取出来的值都称为表达式。

def taint_check_expr(expr):
    """
    check if an expression is tainted
    """
    if expr.symbolic:
        for vname in list(expr.variables):
            if 'taint' in vname:
                return True
    return False

接下来构建判断寄存器是否被污点传播的函数

def taint_check_register(state, reg_name):
    """
    check if a register is tainted
    """
  
    try:
        if reg_name == 'nzcv'# 架构相关!只在 aarch64 验证
            reg_name = 'flags'
        rs = state.regs.get(reg_name)
    except AttributeError:
        logger.warning("taint_check_register: %s not supported by angr" % reg_name)
        return False
  
    return taint_check_expr(rs)

污点传播(执行)

污点源设置后,用 angr 做符号执行就能实现污点传播。具体的执行策略(比如控制流之间的转移策略)还需要根据不同的任务具体讨论。

angr 默认以基本块(Basic Block)为单位的执行策略无法满足反混淆场景的细粒度分析需求,因此要构建一个适合反混淆分析场景的污点指令级执行策略。

angr 符号执行有一个特性,它一次性执行一个 block,很难满足 ”某指定地址的指令位置“,特别是位于基本块中间的指令。

经过查阅 angr 源码,发现 state 的 step 能够指定执行指令的数量,将指令数量设置为 1 可实现逐指令插桩污点分析。

当然还有一些其它比较 trick 的技巧可以实现逐指令执行,比如 angr inspect。

下代码能够执行一条指令。

successors = proj.factory.successors(run_state, num_inst=1).successors

在如上代码的基础上,构建一个污点分析专用的 step 函数。

def taint_step(proj, state, run_until_pc=None, num_inst=None, check_every_inst=True):
    """
    对程序状态执行污点传播步骤,返回后继状态及污染状态列表

    Args:
        proj (angr.Project): angr 项目对象
        state (angr.SimState): 当前待分析的模拟程序状态
        run_until_pc (int, optional): 目标程序计数器值,运行直到达到该PC地址时停止。
                                      若为None则表示不启用此停止条件,默认为None
        num_inst (int, optional): 指定要执行的指令数量。若为None则表示不启用此停止条件,
                                  默认为None
        check_every_inst (bool, optional): 是否在每条指令执行后检查污染状态。
                                           若为False则仅在停止时检查,默认为True

    Returns:
        tuple: 包含两个列表的元组:
            - successors (list[SimState]): 执行产生的所有后继状态列表
            - tainted_list (list[SimState]): 被识别为受污染的状态列表

    功能说明:
        通过污点分析跟踪程序状态的传播过程。可根据PC地址或指令数量设置停止条件,
        返回分析过程中产生的所有后继状态和检测到的受污染状态
    """

    tainted_list = []
    regs_read_write_cache = {}
    run_state = state.copy()
    run_inst = 0

    whileTrue:
        cur_pc = run_state.addr
        if run_until_pc isnotNoneand cur_pc == run_until_pc:
            return [run_state], tainted_list
    
        # 解析指令的读写寄存器
        if check_every_inst and (cur_pc notin regs_read_write_cache):
            for insn in run_state.block().capstone.insns: # block 返回一块的指令,提前缓存
                read_regs, write_regs = insn.regs_access() # 使用 capstone 解析指令的读写寄存器
                regs_read_write_cache[insn.address] = ([insn.reg_name(r) for r in read_regs], [insn.reg_name(r) for r in write_regs])
    
        # 检查 tainted
        if check_every_inst:
            read_regs, write_regs = regs_read_write_cache[cur_pc]
            for reg in read_regs:
                if taint_check_register(run_state, reg):
                    if cur_pc notin tainted_list:
                        logger.info("tainted pc: %x" % cur_pc)
                        tainted_list.append(cur_pc)
    
        try:
            successors = proj.factory.successors(run_state, num_inst=1).successors
        except Exception as e:
            logger.warning("taint_step: failed to execute %x: %s, skip" % (cur_pc, e))
            temp_state = run_state.copy()
            temp_state.regs.pc = cur_pc + 4
            successors = [temp_state]
    
        run_inst += 1
        if num_inst isnotNoneand run_inst >= num_inst:
            return successors, tainted_list
    
        if check_every_inst:
            read_regs, write_regs = regs_read_write_cache[cur_pc]
            for reg in write_regs:
                for state in successors:
                    if taint_check_register(state, reg):
                        if cur_pc notin tainted_list:
                            logger.info("tainted pc: %x" % cur_pc)
                            tainted_list.append(cur_pc)

        if len(successors) != 1:
            return successors, tainted_list
        else:
            run_state = successors[0]

每一条指令执行前,首先调用 capstone 汇编引擎,解析指令读写的寄存器列表。执行前,判断指令指令读取的寄存器是否为污点;执行后,判断指令写入的寄存器是否被污点。

trace 模拟执行

在污点执行过程中,可能有将污点结果与没有污点分析结果进行对比的需求,因此需要实现不带污点的执行。对于不带污点的执行,只需要不设置污点即可。为了更加方便,封住 trace_step 函数,该函数传入的 state 不能设置污点。

def trace_step(proj, state, run_until_pc=None, num_inst=None):
    successors, _ = taint_step(proj, state, run_until_pc, num_inst, check_every_inst=False)
    return successors

敏感操作检测

污点分析的最后一个环节是敏感操作检测(sink)。 在反混淆场景中,sink 需要根据具体的需求来制定。以 bcf 混淆为例,sink 可以是 state 发生分叉,即遇到了污点传播到的条件跳转指令。

污点检查函数 taint_check_expr 在前面已经讨论过。

Hikari BCF 例子

Hikari BCF 虚假控制流,先不讨论它的简单去除方法,以这个为例子来验证用于反混淆的污点分析框架特别合适。

一些方法

  1. 简单方法:IDA Pro 把 data 段设置只读,借助编译优化自动优化(不讨论,确实可以,没有泛化能力)
  2. unicorn 模拟执行(不能肯定回答某个条件跳转指令是否与 BCF 混淆相关,容易误报!)
  3. 污点分析
在处理 BCF 混淆过的程序时,对于被混淆程序中的条件跳转指令:
  1. 该条件跳转指令是否是 BCF 混淆引入?(模拟执行❎,符号执行✅)
  2. 如果是BCF 引入的条件跳转指令,该跳转指令运行时实际目标地址是什么?(模拟执行✅,符号执行❎)

模拟执行无法回答某个条件跳转指令是否与 BCF 混淆相关,但是可以回答条件跳转指令实际跳转的目标地址;污点分析无法回答被污点感染的条件跳转指令的运行时目标地址。ru把两者结合一下,就能做到准确识别 BCF 中的混淆条件跳转指令,并且分析运行时实际地址。

模拟执行的过程仍然可以用 angr 来实现,只需要不添加污点源就是模拟执行!

目标:识别 BCF 引入的部分垃圾代码,如下图红色部分,特别是最后一条指令 B.LS 条件跳转,对于最后一个 BLS 条件跳转,还要分析它的目标实际地址并修复!

Angr 构建污点分析 x 反混淆

具体流程

查找所有 BCF 位置

查找所有含 BCF 的基本块:通过分析 BCF 引入的全局变量交叉引用实现。

def debcf_hikari_find_all(hikari_bcf_start, hikari_bcf_end):
    """
    find all hikari bcf in (hikari_bcf_start, hikari_bcf_end)
    only works in ida
    return a list of hikari bcf start address
    """
    block_list = []
    for start_ea in range(hikari_bcf_start, hikari_bcf_end, 4):
        xrefs = [ref.frm for ref in idautils.XrefsTo(start_ea)]
        block = ida_get_bb(xrefs[0])
        if block.start_ea notin block_list:
            block_list.append(block.start_ea)
    return block_list


def ida_get_bb(ea):
    f_blocks = idaapi.FlowChart(idaapi.get_func(ea), flags=idaapi.FC_PREDS)
    for block in f_blocks:
        if block.start_ea <= ea and ea < block.end_ea:
            return block
    returnNone

debcf_hikari_find_all  函数在 IDA 里面调用,返回所有含有 BCF 混淆的基本块地址列表。

分析流程

以基本块为单位,构建一个分析函数 debcf_hikari(proj, addr, taint_area),addr 是基本块地址,taint_area 是 BCF 的全局变量区间。该函数的功能是分析一个基本块,找出所有 BCF 相关的指令,特别是 BCF 相关的条件跳转,并分析条件跳转的目标地址,以便于后续的修补工作。

# addr = bcf 基本块开始地址
init_state = proj.factory.blank_state(addr=addr)
init_state.options.add(angr.options.CALLLESS) # 忽略函数调用
init_state.options.add(angr.options.LAZY_SOLVES) # 延迟求解
init_state.options.add(angr.options.ZERO_FILL_UNCONSTRAINED_MEMORY) # 未初始化内存设置 0
init_state.options.add(angr.options.ZERO_FILL_UNCONSTRAINED_REGISTERS)# 未初始化寄存器设置 0

污点分析、模拟执行混合

trace_state = init_state.copy() # 用于追踪,trace 实际运行值
taint_state = init_state.copy() # 用于污点分析,查找 BCF 相关指令

taint 污点源标记,标记 BCF 全局变量区间

taint_memory(taint_state, taint_area[0], taint_area[1] - taint_area[0])

分析过程

污点分析,模拟执行同步进行,每次循环分析一条指令。

首先污点分析执行一条指令,然后模拟执行执行一条指令,接下来做 sink 检查

  • 如果污点分析当前指令被污点感染,则将当前指令地址视为垃圾指令
  • 如果污点分析当前指令被污点感染且是条件跳转指令,从 trace 中取出目标跳转地址

整个分析过程的循环直到遇到跳转指令结束(或者 csel)。

对于每一条指令,下面是代码过程

污点分析执行一条指令

# 执行指令(taint)
taint_successors, tainted_list = taint_step(proj, taint_state, check_every_inst=True, num_inst=1)

模拟执行执行一条指令

# 执行指令(trace)
trace_successors = trace_step(proj, trace_state, num_inst=1)

sink 检查

if len(tainted_list) == 1 and len(trace_successors) >= 1 and len(taint_successors) >= 1:
 junk_code_list.append(tainted_list[0])
 if insn.mnemonic.startswith('b.'):
  patch_list.append((insn.address, 'B', trace_successors[0].addr))
  break # 结束分析

最后切换下一条指令执行

trace_state = trace_successors[0]
taint_state = taint_successors[0]

完整实现

def debcf_hikari(proj, addr, taint_area):
    """
    addr: the address of the block to analyize
    taint_area: a tuple (start_addr, end_addr)
    """
    patch_list = []
    junk_code_list = []
    init_state = proj.factory.blank_state(addr=addr)
    init_state.options.add(angr.options.CALLLESS)
    init_state.options.add(angr.options.LAZY_SOLVES)
    init_state.options.add(angr.options.ZERO_FILL_UNCONSTRAINED_MEMORY)
    init_state.options.add(angr.options.ZERO_FILL_UNCONSTRAINED_REGISTERS)

    trace_state = init_state.copy()
    taint_state = init_state.copy()

    taint_memory(taint_state, taint_area[0], taint_area[1] - taint_area[0])

    while True:  
        insn = trace_state.block().capstone.insns[0]
        regs_read_ids, regs_write_ids = insn.regs_access()

        reg_write_list = [insn.reg_name(r) for r in regs_write_ids]
        reg_read_list = [insn.reg_name(r) for r in regs_read_ids]

        if'nzcv'in reg_write_list:
            reg_write_list.remove('nzcv')

        logger.info("debcf_hikari: pc: %x %s %s" % (trace_state.addr, insn.mnemonic, insn.op_str))

        # 执行指令(taint)
        taint_successors, tainted_list = taint_step(proj, taint_state, check_every_inst=True, num_inst=1)

        # 处理 taint csel 指令
        if len(tainted_list) == 1and insn.mnemonic == 'csel':
            setattr(trace_state.regs, reg_read_list[1], 1)
            setattr(trace_state.regs, reg_read_list[2], 2)

        # 执行指令(trace)
        trace_successors = trace_step(proj, trace_state, num_inst=1)

        if len(tainted_list) == 1and len(trace_successors) >= 1and len(taint_successors) >= 1# 如果 taint 成功,则说明是 junk code
            logger.info("debcf_hikari: junk code pc: %x" % tainted_list[0])
            junk_code_list.append(tainted_list[0])
      
            if insn.mnemonic == 'csel':
                operands = insn.operands
                dst_reg  = insn.reg_name(operands[0].value.reg)
                src1_reg = insn.reg_name(operands[1].value.reg)
                src2_reg = insn.reg_name(operands[2].value.reg)
                # trace_state 是指令执行前的状态,trace_successors[0] 是指令执行后的状态
                dst_val = trace_successors[0].regs.get(dst_reg)
                src1_val = trace_state.regs.get(src1_reg)
                src2_val = trace_state.regs.get(src2_reg)
                dst_val =  trace_successors[0].solver.eval(dst_val)
                src1_val = trace_state.solver.eval(src1_val)
                src2_val = trace_state.solver.eval(src2_val)
                if dst_val == src1_val:
                    patch_list.append((insn.address, 'MOV-REG-REG', dst_reg, src1_reg))
                elif dst_val == src2_val:
                    patch_list.append((insn.address, 'MOV-REG-REG', dst_reg, src2_reg))
                else:
                    logger.error("debcf_hikari: unknown csel pc: %x" % insn.address)
                break
      
            if insn.mnemonic.startswith('b.'):
                if len(trace_successors) != 1:
                    logger.warning("debcf_hikari: more than one successor in %x" % insn.address)
                else:
                    patch_list.append((insn.address, 'B', trace_successors[0].addr))
                logger.info("debcf_hikari: find cond branch pc: %x" % insn.address)
                break
      
        if insn.mnemonic in ['ret''b']:
            logger.info("debcf_hikari: find branch pc: %x" % insn.address)
            break

        if len(trace_successors) != 1:
            break
        else:
            trace_state = trace_successors[0]
            taint_state = taint_successors[0]

    return patch_list, junk_code_list

这段代码能够有效处理 hikari 编译的 aarch64 cff+bcf,即使有控制流平坦化 cff 也能正常工作,且 patch 完成后不影响程序的正常逻辑,在 miniz 压缩算法库完成测试。

原文始发于微信公众号(二进制磨剑):Angr 构建污点分析 x 反混淆【文末抽奖】

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2025年2月4日21:09:04
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Angr 构建污点分析 x 反混淆https://cn-sec.com/archives/3697932.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息