关于加解密测试的一些浅谈

admin 2023年12月21日07:46:21评论28 views字数 9274阅读30分54秒阅读模式

关于加解密测试的一些浅谈

一、前言

本文主要是对于加解密处理流程中的一些思路整理,还有一些我在使用过程中遇到的一些好用的开源工具的介绍等。

二、APP加解密

通常APP在测试过程会遇到抓包数据的解密问题及修改后数据的加密问题。有的还有可能会遇到对数据进行保护的加签问题。对于加解密问题从理论上来讲,无非是请求包的加解密和响应包的加解密四个方向。因为APP客户端在我们的掌握之中,对于加解密的四个方向。即使采用的是非对称加密方式,由于APP本身也可以处理两个,即请求包的加密、响应包的解密。所以对APP hook得当请求包的加密和响应包的解密是可以实现的。此外请求包的解密可以通过hook加密函数通过保存加密明文和加密后的密文到数据库(通常使用Tinydb)然后根据密文搜索明文方式达成请求包解密效果。那就剩下响应包数据修改后的的加密问题。对于非对称加密来讲,这个肯定是无法实现的,但是我们测试是不需要加密响应报文的,加密的响应报文到了APP这里也是要解密后参与业务处理的,所以我们需要hook的是响应包的解密函数,将其返回值修改为要测试的内容然后再返回即可。对于对称加解密来讲找到密钥,抠出算法在外面对请求和响应数据进行加解密,或者hook加解密函数,通过主动调用等实现加解密。


关于加解密测试的一些浅谈

三、具体步骤

整个APP数据加解密可以分为:

1.前期的代码分析、hook点找寻(有的可能需要Frida反调绕过、抓包绕过等)、针对加解密hook函数编写Frida脚本将加解密函数导出。

2.后期的burp.py脚本(bp插件)编写、server.py编写。根据从burp.py中得到的数据以及Frida脚本hook的加解密方法,编写相关逻辑代码,编写数据测试单元。

3.测试成功之后将得到解密明文、修改后的加密密文flask/ xmlrpc的方式返回给burp.py插件。burp插件将数据展示在对应的tab标签。

👀加解密函数寻找

有时根据加密报文特征,例如报文可能为个键值对,可以根据密文的键名在编译后的程序里去寻找,进而进一步交叉引用找到相关加解密函数。找到加解密函数之后使用Frida对其进行hook,通过向其传入明文(加密)获取到请求包加密结果。,或向其传入密文(解密)获取响应包解密结果。如果是非对称加密,为了可以实现对请求包的解密需要拦截请求包加密函数,通过输出日志的形式将明文参数和密文结果传到server.py存储到数据库建立映射关系。当需要对请求包解密时从数据库根据密文查询明文即可。

有时候虽然找到了加解密函数,但是他们的参数(即要加解密的报文)不是我们抓获的格式,需要进行一些处理,这时候就需要在Frida的脚本中也加入类似这种格式处理的方法。可能要使用某个类的某个方法。这个时候就需要主动调用该方法,然后尽可能地将其写成简明的函数形式,然后直接在Frida脚本中进行调用。

四、jython版本的burp插件

1._burp.py:

1、获取请求包,对请求包数据进行解密,展示在请求包的插件标签。

2、对请求包插件标签的内容进行修改,对修改后的数据进行加密后,展示在请求包原始标签内。

bp插件脚本很难调试,所以建议是加解密数据处理逻辑及测试直接在server.py里实现。这里先给出一个_burp.py的例子:
from burp import IBurpExtender                          from burp import IMessageEditorTab                      from burp import IMessageEditorTabFactory            from java.io import PrintWriter            import json, time, base64, httplib            import re            import hashlib            import sys            # config_dict            config_dict = {                "pkgname" : "test",                "_api" : "127.0.0.1:31337",            }            class BurpExtender(IBurpExtender,IMessageEditorTabFactory):                def registerExtenderCallbacks(self, callbacks):                    self.stdout = PrintWriter(callbacks.getStdout(), True)                    self.stderr = PrintWriter(callbacks.getStderr(), True)                    self._callbacks = callbacks                    self._helpers = callbacks.getHelpers()                    callbacks.setExtensionName(config_dict["pkgname"])                    callbacks.registerMessageEditorTabFactory(self)                    return                def createNewInstance(self, controller, editable):                    return Display_data(self, controller, editable)            class Display_data(IMessageEditorTab):                def __init__(self, extender, controller, editable):                    self._helpers = extender._helpers                    self._txtInput = extender._callbacks.createTextEditor()                    self._extender = extender                def getUiComponent(self):                    return self._txtInput.getComponent()                def getTabCaption(self):                    return config_dict["pkgname"]                def isEnabled(self, content, isRequest):                    return True                def setMessage(self, content, isRequest):                    self._txtInput.setText("Loading ......")                    _output = ""                    self._content = content                               if (content is None):                        self._txtInput.setText(None)                        self._txtInput.setEditable(False)                        return                    if isRequest:                        if(self._txtInput.isTextModified()):                            return                        info = self._helpers.analyzeResponse(content)                        body = content[info.getBodyOffset():].tostring()                        header = info.getHeaders()                        header_s = 'rn'.join(header)                        endata = {"body":base64.b64encode(body),"header":base64.b64encode(header_s)}                        _resp = json.loads(self.api_req("get_req",json.dumps(endata)))                        if (_resp['status']!="ok"):                            # print(_resp)                            return                        request_bytes = self._helpers.stringToBytes(_resp["data"]["output"])                        _output = self._helpers.buildHttpMessage(header, request_bytes)                    else:                        info = self._helpers.analyzeResponse(content)                        body = content[info.getBodyOffset():].tostring()                        header = info.getHeaders()                        header_s = 'rn'.join(header)                        endata = {"body":base64.b64encode(body),"header":base64.b64encode(header_s)}                        _resp = json.loads(self.api_req("decrypt",json.dumps(endata)))                        print(_resp)                        if (_resp['status']!="ok"):                            # print(_resp)                            return                        request_bytes = self._helpers.stringToBytes(_resp["data"]["output"])                        _output = self._helpers.buildHttpMessage(header, request_bytes)                        self._txtInput.setEditable(False)                               self._txtInput.setText(_output)                    return                           def getSelectedData(self):                           return self._txtInput.getSelectedText()                           def isModified(self):                           return self._txtInput.isTextModified()                           def getMessage(self):                    if(self._txtInput.isTextModified()):                        raw = self._txtInput.getText()                        info = self._helpers.analyzeResponse(raw)                        body = raw[info.getBodyOffset():].tostring()                        header = info.getHeaders()                        header_s = 'rn'.join(header)                        data = {"body":base64.b64encode(body),"header":base64.b64encode(header_s)}                        _resp = json.loads(self.api_req("encrypt", json.dumps(data)))                        if (_resp['status']!="ok"):                            print("_resp")                            return                        request_bytes = self._helpers.stringToBytes(_resp["data"]["output"])                        return self._helpers.buildHttpMessage(header, request_bytes)                    else:                        _origin = self._content                        return _origin                           def api_req(self, api, data):                    try:                        headers = {"Content-type": "APPlication/json","Accept": "text/plain"}                        conn = httplib.HTTPConnection(config_dict["_api"])                        conn.request("POST", "/api/v1/"+api, data, headers)                        resp = conn.getresponse()                        assert resp.status == 200                        res_data = resp.read()                        conn.close()                        return res_data                    except Exception as e:                        return {"status":"error","data":e.__str__()}
2.server.py

实现两个请求响应接口:

1、encrypt接口:对获取到的body数据进行加密后返回。

2、decrypt接口:对获取到的body数据进行解密后返回。

注意:这里并没有给出hook具体加解密函数代码。具体代码可以写到js中然后加载即可

server.py实现代码:

import gzip            import json            import logging            import Frida            import tinydb            import base64            import time            import threading            from queue import Queue                       from flask import Flask, request, jsonify                                  APP = Flask(__name__)            db = tinydb.TinyDB("backup.bin")            _backup = tinydb.Query()            q = Queue()                       @APP.route('/')            def hello():                return "hello world"                                  def insert2db():                for _line in iter(q.get, None):                    db.insert(_line)                       logger = logging.getLogger("Frida")            def on_message(message, data):                if message['payload'] and "log_" in message['payload']:                    _data = json.loads(message['payload'])                    if "encrypt" in message['payload']:                        q.put(_data)                else:                    logger.info(message)                                  def init_Frida():                manager = Frida.get_device_manager()                flag = True                while flag:                    try:                        usb_device = manager.get_usb_device(10)                        flag = False                    except:                        pass                logger.info(usb_device)                pid = usb_device.spawn(pkgname)                session = usb_device.attach(pid)                global script                           script = session.create_script(                    gzip.decompress(base64.b64decode(_magic)).decode())                script.on('message', on_message)                script.load()                script.exports.init()                usb_device.resume(pid)                       def search_req_input_from_db(_input):                rows = db.search(_backup.output==_input)                if rows:                    return rows[-1]['input']                else:                    return _input                       @APP.route('/api/v1/get_req', methods=['POST'])            def search_req():                # {"data": post_data}                _input = request.json['body']                return jsonify({"status": "ok", "data": {"output": search_req_input_from_db(base64.b64decode(_input).decode())}})                       @APP.route('/api/v1/encrypt', methods=['POST'])            @APP.route('/api/v1/encrypt/', methods=['POST'])      def encrypt_req(key="AAAAAAAAAAAAAAAA"):          # {"data": post_data}          _input = request.json['body']          print(_input)          print(jsonify({"status": "ok", "data": {"output": script.exports.req_encrypt(base64.b64decode(_input).decode())}}))          return jsonify({"status": "ok", "data": {"output": script.exports.req_encrypt(base64.b64decode(_input).decode())}})           # {}.keys = [_method, _urlpath, _timestamp, _token, _params]           @APP.route('/api/v1/decrypt', methods=['POST'])      @APP.route('/api/v1/decrypt/', methods=['POST'])        def decrypt_resp(key="AAAAAAAAAAAAAAAA"):            # {"data": post_data}            _input = (request.json)['body']            print(_input)            return jsonify({"status": "ok", "data": {"output": script.exports.resp_decrypt(base64.b64decode(_input).decode())}})               def test_all():            # init_Frida()            # threading.Thread(target=insert2db).start()            # time.sleep(3) # wait for Frida ready            with APP.test_client() as c:                # decrypt                print("[!] testing decrypt")                _output = "eyJNU0ciOiLmgqjovpPlhaXnmoTnlKjmiLflkI3miJblr4bnoIHplJnor6/vvIEiLCJTVEFUVVMiOiJ2YWxpZGF0aW9uLmxvZ2luX3VzZXJfbm90X2V4aXN0In0="                _input = "{"MSG":"u60a8u8f93u5165u7684u7528u6237u540du6216u5bc6u7801u9519u8befuff01","STATUS":"validation.login_user_not_exist"}"                info = c.post("/api/v1/decrypt", json={"data":_output})                info = info.json                assert info['status'] == "ok"                print(info['data']['output'] == _input)                # encrypt                print("[!] testing encrypt")                info = c.post("/api/v1/encrypt",json = {"data":_input})                info = info.json                assert info['status'] == "ok"                print(info['data']['output'] ==_output)               if __name__ == '__main__':            # init_Frida()            # threading.Thread(target=insert2db).start()            APP.run(host="127.0.0.1",port="31337")            # print(script.exports.resp_decrypt("eyJNU0ciOiLmgqjovpPlhaXnmoTnlKjmiLflkI3miJblr4bnoIHplJnor6/vvIEiLCJTVEFUVVMiOiJ2YWxpZGF0aW9uLmxvZ2luX3VzZXJfbm90X2V4aXN0In0="))            # test_all()        

关于APP测试非http协议测试在APP测试过程中会遇到数据包请求非http这种情况,以及前面提到的非对称加密拦截修改响应包等操作。这种情况下如果能找到合适的hook点,然后使用下面这个工具也是可以完成目的的。具体原理和操作这个大佬README.md写的很清楚了,大家自行食用。

传送门https://github.com/F6JO/mPaas-Frida-hook

五、web前端加解密

web前端加解密这里也只是简单提一下,因为平时做的也比较少一些,主要是两个开源工具的介绍:

1.jsrpc

如果遇到复杂的加解密算法,比如某些在标准算法上魔改的算法,根本没法从JS中扣出关键代码的场景,就非常适合jsrpc的方式直接调用浏览器中的加解密函数。并且高级用法可以直接联动扫描器进行加密流量解密后的扫描工作。如何使用项目文档以及其衍生的介绍文档网上都有,大家搜搜探索一下就可以上手了

传送门https://github.com/jxhczhl/JsRpc

2.remotejs

基于CDP实现的远程JS debug工具
传送门https://github.com/1oid/remotejs
关于这个工具的使用,我写一下自己的探索过程:

1.首先需要开启 devtools设置中的Protocol Monitor⽤于监测cdp协议的调⽤记录

关于加解密测试的一些浅谈

2.然后寻找加解密函数,这里以加密函数为例:

a.原文中给出了寻找加密函数的一个思路。通常我们要找的都是登录相关的数据包加解密。所以我们直接打开控制台的network选项卡

b.直接在登录数据包的启动器中选择submit那个链接,可以直接定位到数据包发送相关函数代码,然后再在函数调用栈中进行追溯寻找,寻找过程多打几个断点看作用域里的代码变量参数变化来不断逼近加解密函数的大概位置。

关于加解密测试的一些浅谈

3.本例中的加密函数如下:

关于加解密测试的一些浅谈

4.启动remotejs_windows_amd64.exe 参数实例如下:

remotejs -h            GLOBAL OPTIONS:               --url value, -u value            open url when open chrome, default blank url               --chrome-path value, --cp value  use specified chrome path               --proxy value                    set proxy for browser               --remote-debug-address value     use remote chrome debugging               --web-listen value               web server port (default: "8088")               --help, -h                       show help                                     ./remotejs                                              # 打开一个空白的浏览器            ./remotejs -u [URL]                                     # 打开一个浏览器,并加载指定url            ./remotejs --remote-debug-address "ws://127.0.0.1:9222" # 指定一个远程浏览器(需要目标开remote-debugger-port)            

我们这里以远程调试地址举例:

本地先启动一个开启了远程调试端口的浏览器,这个是要调试的浏览器。也就是在该浏览器上访问网站找到加密函数,然后在该函数的作用域上打断点进行调试。具体如下列图片所示:

关于加解密测试的一些浅谈

关于加解密测试的一些浅谈

然后点击运行remotejs_windows_amd64.exe 加上 --remote-debug-address "ws://127.0.0.1:9221"参数连接到我们刚才打开的浏览器

关于加解密测试的一些浅谈

调用结果:

关于加解密测试的一些浅谈

最后

完结,给大佬们撒花!!!

原文始发于微信公众号(雁行安全团队):关于加解密测试的一些浅谈

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年12月21日07:46:21
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   关于加解密测试的一些浅谈https://cn-sec.com/archives/2320119.html

发表评论

匿名网友 填写信息