一、前言
二、APP加解密
通常APP在测试过程会遇到抓包数据的解密问题及修改后数据的加密问题。有的还有可能会遇到对数据进行保护的加签问题。对于加解密问题从理论上来讲,无非是请求包的加解密和响应包的加解密四个方向。因为APP客户端在我们的掌握之中,对于加解密的四个方向。即使采用的是非对称加密方式,由于APP本身也可以处理两个,即请求包的加密、响应包的解密。所以对APP hook得当请求包的加密和响应包的解密是可以实现的。此外请求包的解密可以通过hook加密函数通过保存加密明文和加密后的密文到数据库(通常使用Tinydb)然后根据密文搜索明文方式达成请求包解密效果。那就剩下响应包数据修改后的的加密问题。对于非对称加密来讲,这个肯定是无法实现的,但是我们测试是不需要加密响应报文的,加密的响应报文到了APP这里也是要解密后参与业务处理的,所以我们需要hook的是响应包的解密函数,将其返回值修改为要测试的内容然后再返回即可。对于对称加解密来讲找到密钥,抠出算法在外面对请求和响应数据进行加解密,或者hook加解密函数,通过主动调用等实现加解密。
三、具体步骤
整个APP数据加解密可以分为:
1.前期的代码分析、hook点找寻(有的可能需要Frida反调绕过、抓包绕过等)、针对加解密hook函数编写Frida脚本将加解密函数导出。
2.后期的burp.py脚本(bp插件)编写、server.py编写。根据从burp.py中得到的数据以及Frida脚本hook的加解密方法,编写相关逻辑代码,编写数据测试单元。
3.测试成功之后将得到解密明文、修改后的加密密文flask/ xmlrpc的方式返回给burp.py插件。burp插件将数据展示在对应的tab标签。
👀加解密函数寻找
有时根据加密报文特征,例如报文可能为个键值对,可以根据密文的键名在编译后的程序里去寻找,进而进一步交叉引用找到相关加解密函数。找到加解密函数之后使用Frida对其进行hook,通过向其传入明文(加密)获取到请求包加密结果。,或向其传入密文(解密)获取响应包解密结果。如果是非对称加密,为了可以实现对请求包的解密需要拦截请求包加密函数,通过输出日志的形式将明文参数和密文结果传到server.py存储到数据库建立映射关系。当需要对请求包解密时从数据库根据密文查询明文即可。
有时候虽然找到了加解密函数,但是他们的参数(即要加解密的报文)不是我们抓获的格式,需要进行一些处理,这时候就需要在Frida的脚本中也加入类似这种格式处理的方法。可能要使用某个类的某个方法。这个时候就需要主动调用该方法,然后尽可能地将其写成简明的函数形式,然后直接在Frida脚本中进行调用。
1._burp.py:
1、获取请求包,对请求包数据进行解密,展示在请求包的插件标签。
2、对请求包插件标签的内容进行修改,对修改后的数据进行加密后,展示在请求包原始标签内。
from burp import IBurpExtender
from burp import IMessageEditorTab
from burp import IMessageEditorTabFactory
from java.io import PrintWriter
import json, time, base64, httplib
import re
import hashlib
import sys
# config_dict
config_dict = {
"pkgname"
:
"test"
,
"_api"
:
"127.0.0.1:31337"
,
}
class
BurpExtender
(
IBurpExtender
,
IMessageEditorTabFactory
):
def
registerExtenderCallbacks
(
self
, callbacks)
:
self
.stdout = PrintWriter(callbacks.getStdout(), True)
self
.stderr = PrintWriter(callbacks.getStderr(), True)
self
._callbacks = callbacks
self
._helpers = callbacks.getHelpers()
callbacks.setExtensionName(config_dict[
"pkgname"
])
callbacks.registerMessageEditorTabFactory(
self
)
return
def
createNewInstance
(
self
, controller, editable)
:
return
Display_data(
self
, controller, editable)
class
Display_data
(
IMessageEditorTab
):
def
__init__
(
self
, extender, controller, editable)
:
self
._helpers = extender._helpers
self
._txtInput = extender._callbacks.createTextEditor()
self
._extender = extender
def
getUiComponent
(
self
)
:
return
self
._txtInput.getComponent()
def
getTabCaption
(
self
)
:
return
config_dict[
"pkgname"
]
def
isEnabled
(
self
, content, isRequest)
:
return
True
def
setMessage
(
self
, content, isRequest)
:
self
._txtInput.setText(
"Loading ......"
)
_output =
""
self
._content = content
if
(content is None):
self
._txtInput.setText(None)
self
._txtInput.setEditable(False)
return
if
isRequest:
if
(
self
._txtInput.isTextModified()):
return
info =
self
._helpers.analyzeResponse(content)
body = content[info.getBodyOffset()
:
].tostring()
header = info.getHeaders()
header_s =
'rn'
.join(header)
endata = {
"body"
:base64
.b64encode(body),
"header"
:base64
.b64encode(header_s)}
_resp = json.loads(
self
.api_req(
"get_req"
,json.dumps(endata)))
if
(_resp[
'status'
]!=
"ok"
):
# print(_resp)
return
request_bytes =
self
._helpers.stringToBytes(_resp[
"data"
][
"output"
])
_output =
self
._helpers.buildHttpMessage(header, request_bytes)
else:
info =
self
._helpers.analyzeResponse(content)
body = content[info.getBodyOffset()
:
].tostring()
header = info.getHeaders()
header_s =
'rn'
.join(header)
endata = {
"body"
:base64
.b64encode(body),
"header"
:base64
.b64encode(header_s)}
_resp = json.loads(
self
.api_req(
"decrypt"
,json.dumps(endata)))
print(_resp)
if
(_resp[
'status'
]!=
"ok"
):
# print(_resp)
return
request_bytes =
self
._helpers.stringToBytes(_resp[
"data"
][
"output"
])
_output =
self
._helpers.buildHttpMessage(header, request_bytes)
self
._txtInput.setEditable(False)
self
._txtInput.setText(_output)
return
def
getSelectedData
(
self
)
:
return
self
._txtInput.getSelectedText()
def
isModified
(
self
)
:
return
self
._txtInput.isTextModified()
def
getMessage
(
self
)
:
if
(
self
._txtInput.isTextModified()):
raw =
self
._txtInput.getText()
info =
self
._helpers.analyzeResponse(raw)
body = raw[info.getBodyOffset()
:
].tostring()
header = info.getHeaders()
header_s =
'rn'
.join(header)
data = {
"body"
:base64
.b64encode(body),
"header"
:base64
.b64encode(header_s)}
_resp = json.loads(
self
.api_req(
"encrypt"
, json.dumps(data)))
if
(_resp[
'status'
]!=
"ok"
):
print(
"_resp"
)
return
request_bytes =
self
._helpers.stringToBytes(_resp[
"data"
][
"output"
])
return
self
._helpers.buildHttpMessage(header, request_bytes)
else:
_origin =
self
._content
return
_origin
def
api_req
(
self
, api, data)
:
try:
headers = {
"Content-type"
:
"APPlication/json"
,
"Accept"
:
"text/plain"
}
conn = httplib.HTTPConnection(config_dict[
"_api"
])
conn.request(
"POST"
,
"/api/v1/"
+api, data, headers)
resp = conn.getresponse()
assert resp.status ==
200
res_data = resp.read()
conn.close()
return
res_data
except Exception as
e:
return
{
"status"
:
"error"
,
"data"
:e
.__str_
_
()}
实现两个请求响应接口:
1、encrypt接口:对获取到的body数据进行加密后返回。
2、decrypt接口:对获取到的body数据进行解密后返回。
注意:这里并没有给出hook具体加解密函数代码。具体代码可以写到js中然后加载即可
server.py实现代码:
import
gzip
import
json
import
logging
import
Frida
import
tinydb
import
base64
import
time
import
threading
from
queue
import
Queue
from
flask
import
Flask, request, jsonify
APP = Flask(__name__)
db = tinydb.TinyDB(
"backup.bin"
)
_backup = tinydb.Query()
q = Queue()
def
hello
()
:
return
"hello world"
def
insert2db
()
:
for
_line
in
iter(q.get,
None
):
db.insert(_line)
logger = logging.getLogger(
"Frida"
)
def
on_message
(message, data)
:
if
message[
'payload'
]
and
"log_"
in
message[
'payload'
]:
_data = json.loads(message[
'payload'
])
if
"encrypt"
in
message[
'payload'
]:
q.put(_data)
else
:
logger.info(message)
def
init_Frida
()
:
manager = Frida.get_device_manager()
flag =
True
while
flag:
try
:
usb_device = manager.get_usb_device(
10
)
flag =
False
except
:
pass
logger.info(usb_device)
pid = usb_device.spawn(pkgname)
session = usb_device.attach(pid)
global
script
script = session.create_script(
gzip.decompress(base64.b64decode(_magic)).decode())
script.on(
'message'
, on_message)
script.load()
script.exports.init()
usb_device.resume(pid)
def
search_req_input_from_db
(_input)
:
rows = db.search(_backup.output==_input)
if
rows:
return
rows[
-1
][
'input'
]
else
:
return
_input
def
search_req
()
:
# {"data": post_data}
_input = request.json[
'body'
]
return
jsonify({
"status"
:
"ok"
,
"data"
: {
"output"
: search_req_input_from_db(base64.b64decode(_input).decode())}})
def
encrypt_req
(key=
"AAAAAAAAAAAAAAAA"
)
:
# {"data": post_data}
_input = request.json[
'body'
]
print(_input)
print(jsonify({
"status"
:
"ok"
,
"data"
: {
"output"
: script.exports.req_encrypt(base64.b64decode(_input).decode())}}))
return
jsonify({
"status"
:
"ok"
,
"data"
: {
"output"
: script.exports.req_encrypt(base64.b64decode(_input).decode())}})
# {}.keys = [_method, _urlpath, _timestamp, _token, _params]
def
decrypt_resp
(key=
"AAAAAAAAAAAAAAAA"
)
:
# {"data": post_data}
_input = (request.json)[
'body'
]
print(_input)
return
jsonify({
"status"
:
"ok"
,
"data"
: {
"output"
: script.exports.resp_decrypt(base64.b64decode(_input).decode())}})
def
test_all
()
:
# init_Frida()
# threading.Thread(target=insert2db).start()
# time.sleep(3) # wait for Frida ready
with
APP.test_client()
as
c:
# decrypt
print(
"[!] testing decrypt"
)
_output =
"eyJNU0ciOiLmgqjovpPlhaXnmoTnlKjmiLflkI3miJblr4bnoIHplJnor6/vvIEiLCJTVEFUVVMiOiJ2YWxpZGF0aW9uLmxvZ2luX3VzZXJfbm90X2V4aXN0In0="
_input =
"{"MSG":"u60a8u8f93u5165u7684u7528u6237u540du6216u5bc6u7801u9519u8befuff01","STATUS":"validation.login_user_not_exist"}"
info = c.post(
"/api/v1/decrypt"
, json={
"data"
:_output})
info = info.json
assert
info[
'status'
] ==
"ok"
print(info[
'data'
][
'output'
] == _input)
# encrypt
print(
"[!] testing encrypt"
)
info = c.post(
"/api/v1/encrypt"
,json = {
"data"
:_input})
info = info.json
assert
info[
'status'
] ==
"ok"
print(info[
'data'
][
'output'
] ==_output)
if
__name__ ==
'__main__'
:
# init_Frida()
# threading.Thread(target=insert2db).start()
APP.run(host=
"127.0.0.1"
,port=
"31337"
)
# print(script.exports.resp_decrypt("eyJNU0ciOiLmgqjovpPlhaXnmoTnlKjmiLflkI3miJblr4bnoIHplJnor6/vvIEiLCJTVEFUVVMiOiJ2YWxpZGF0aW9uLmxvZ2luX3VzZXJfbm90X2V4aXN0In0="))
# test_all()
关于APP测试非http协议测试:在APP测试过程中会遇到数据包请求非http这种情况,以及前面提到的非对称加密拦截修改响应包等操作。这种情况下如果能找到合适的hook点,然后使用下面这个工具也是可以完成目的的。具体原理和操作这个大佬README.md写的很清楚了,大家自行食用。
传送门:https://github.com/F6JO/mPaas-Frida-hook
五、web前端加解密
web前端加解密这里也只是简单提一下,因为平时做的也比较少一些,主要是两个开源工具的介绍:
1.jsrpc
如果遇到复杂的加解密算法,比如某些在标准算法上魔改的算法,根本没法从JS中扣出关键代码的场景,就非常适合jsrpc的方式直接调用浏览器中的加解密函数。并且高级用法可以直接联动扫描器进行加密流量解密后的扫描工作。如何使用项目文档以及其衍生的介绍文档网上都有,大家搜搜探索一下就可以上手了
传送门:https://github.com/jxhczhl/JsRpc
2.remotejs
1.首先需要开启 devtools设置中的Protocol Monitor⽤于监测cdp协议的调⽤记录
2.然后寻找加解密函数,这里以加密函数为例:
a.原文中给出了寻找加密函数的一个思路。通常我们要找的都是登录相关的数据包加解密。所以我们直接打开控制台的network选项卡
b.直接在登录数据包的启动器中选择submit那个链接,可以直接定位到数据包发送相关函数代码,然后再在函数调用栈中进行追溯寻找,寻找过程多打几个断点看作用域里的代码变量参数变化来不断逼近加解密函数的大概位置。
3.本例中的加密函数如下:
4.启动remotejs_windows_amd64.exe 参数实例如下:
remotejs -h
GLOBAL OPTIONS:
--url
value
, -u
value
open url when open chrome,
default
blank url
--chrome-path
value
, --cp
value
use specified chrome path
--proxy
value
set
proxy
for
browser
--remote-debug-address
value
use remote chrome debugging
--web-
listen
value
web server
port
(
default
:
"8088"
)
--help, -h show help
./remotejs # 打开一个空白的浏览器
./remotejs -u [URL] # 打开一个浏览器,并加载指定url
./remotejs --remote-debug-address "ws:
//127.0.0.1:9222" # 指定一个远程浏览器(需要目标开remote-debugger-port)
我们这里以远程调试地址举例:
本地先启动一个开启了远程调试端口的浏览器,这个是要调试的浏览器。也就是在该浏览器上访问网站找到加密函数,然后在该函数的作用域上打断点进行调试。具体如下列图片所示:
然后点击运行remotejs_windows_amd64.exe 加上 --remote-debug-address "ws://127.0.0.1:9221"参数连接到我们刚才打开的浏览器
调用结果:
最后
完结,给大佬们撒花!!!
原文始发于微信公众号(雁行安全团队):关于加解密测试的一些浅谈
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论